In [2]:
import numpy as np
import os
import random
import torch 

RANDOM_SEED = 42

random.seed(RANDOM_SEED)
os.environ['PYTHONHASHSEED'] = str(RANDOM_SEED)
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)
torch.cuda.manual_seed_all(RANDOM_SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

## Load dataset

In [3]:
def load_corpus(path):
  corpus = list()

  with open(path, "r") as f:
    corpus = [line.replace(" ", "").replace("\n", "").split("|") for line in f.readlines()]
    corpus = [ [chord for chord in seq if len(chord) > 0] for seq in corpus if len(seq) > 15 ]
  
  return corpus

In [4]:
train = load_corpus("corpus.txt")

# Training

In [13]:
# Training params

t = 1e-5 #@param {type: "number"}
c = 2 #@param {type: "number"}
k = 20 #@param {type: "number"}
alpha = 1 #@param {type: "number"}
epochs = 100 #@param {type: "number"}

batch_size = 2048 #@param {type: "number"}
embedding_dim = 300 #@param {type: "number"}

## word2vec training

In [5]:
from gensim.models import Word2Vec

model = Word2Vec(sentences=train, 
                 vector_size=embedding_dim, 
                 window=c, 
                 min_count=0, 
                 workers=4,
                 hs=0,
                 negative=k,
                 ns_exponent=alpha,
                 sample=t,
                 epochs=epochs,
                 seed=RANDOM_SEED)

model.save("word2vec.gensim")

## fasttext training

In [6]:
from gensim.models import FastText

model = FastText(sentences=train, 
                 vector_size=embedding_dim, 
                 window=c, 
                 min_count=0, 
                 workers=4,
                 hs=0,
                 negative=k,
                 ns_exponent=alpha,
                 sample=t,
                 epochs=epochs,
                 seed=RANDOM_SEED)

model.save("fasttext.gensim")

## harte2vec training

In [6]:
from harte2vec.harte import HarteToIntervals
from collections import Counter
from itertools import chain
import numpy as np

def onehot_intervals(intervals):
  if len(intervals) == 0:
    chord_encoding = np.array([0])
  else:
    intervals_encoding = np.stack([np.eye(12)[i, :].sum(axis=0).clip(0, 1) for i in intervals])
    whole_chord_encoding = intervals_encoding.sum(axis=0).clip(0, 1)
    chord_encoding = np.vstack((intervals_encoding, whole_chord_encoding))
    # convert chord encoding to indexes
    chord_encoding = chord_encoding.dot(2**np.arange(12)[::-1]).astype(int)
  
  return chord_encoding

Subsample data according to 

$$ p(c) = \frac{f(c) - t}{f(c)} - \sqrt{\frac{t}{f(c)}} $$

where

$$ P(c) = \frac{f(c)^\alpha}{\sum_{c'} f(c')^\alpha} $$.

We will set $\alpha = -0.5$, $t = 10^{-5}$, $100$ epochs and a context size of $3$.


[1] https://arxiv.org/pdf/1804.04212.pdf

In [7]:
from typing import List
import numpy as np

def subsample_corpus(data: List[List[str]], freq: np.ndarray, vocab: List[str], alpha: float = -0.5, t: float = 1e-5) -> List[List[str]]:
  # subsample data according to distribution
  P_c = freq**alpha / sum(freq**alpha)
  p_c = ((freq - t) / freq) - np.sqrt(t / freq)
  p_c = dict(zip(vocab, p_c))
  # subsample corpus
  subsampled_data = [[sample for sample in doc if np.random.random() < p_c[sample]] for doc in data]
  return subsampled_data

During training each sample will be augmented with positive and negative samples to ease the training phase. Positive samples are all those items that are in the context window. Negative samples are selected randomly from the dataset based on their probability.

In [8]:
def negative_sampling(data: List[List[str]], 
                      vocab: List[str], 
                      c: int = 3, 
                      k: int = 15):
    # implementation based on 
    # https://tech.hbc.com/2018-03-23-negative-sampling-in-numpy.html
    context_size = 2 * c + 1

    # needs to remove initial and final element without contigous context
    num_samples = len(data) - (2*c)
    X = np.zeros(num_samples, dtype=int)
    y_pos = np.zeros((num_samples, context_size), dtype=int)
    y_neg = np.zeros((num_samples, k), dtype=int)

    data_idxs = np.searchsorted(vocab, data)
    window_idxs = np.lib.stride_tricks.sliding_window_view(data_idxs, context_size)

    for i, positives_index in enumerate(window_idxs):
        # extract positives and central chord idx
        element_idx = positives_index[context_size // 2]

        raw_samp = np.random.randint(0, len(vocab) - context_size, size=k)
        pos_idxs_adj = positives_index - np.arange(len(positives_index))
        ss = np.searchsorted(pos_idxs_adj, raw_samp, side='right')
        negatives_index = raw_samp + ss

        X[i] = element_idx
        y_pos[i] = positives_index
        y_neg[i] = negatives_index

    return X, y_pos, y_neg

Create torch Dataset for easier training and batch creation.

In [9]:
from typing import List
from torch.utils.data import Dataset
from more_itertools import windowed, collapse
from collections import Counter
from itertools import chain
from torch.nn.utils.rnn import pad_sequence

class ChocoHarte2VecDataset(Dataset):
  def __init__(self, data: List[str], t: float = 1e-5, c: int = 3, alpha: float = -0.5, k: int = 20):
    super().__init__()
    self._data = data
    self.t = t
    self.c = c
    self.alpha = alpha
    self.k = k

    # build vocab and sort to enable faster numpy searching
    counter = dict(sorted(Counter(chain(*self._data)).items(), key=lambda item: item[0]))

    # turn into numpy array for faster searching
    self.vocab = np.array(list(counter.keys()))
    self.freq = np.array([counter[chord] for chord in self.vocab])
    
    # compute one hot encoding of whole vocabulary
    # use numpy array for faster searching
    self.h2i = HarteToIntervals()
    self.intervals_encoding = np.array([onehot_intervals(self.h2i.convert(chord)) for chord in self.vocab])
    self._data = subsample_corpus(self._data, self.freq, self.vocab, alpha=self.alpha, t=self.t)

  def __getitem__(self, index: int):
    data_item = self._data[index]
    # perform negative sampling
    X, pos, neg = negative_sampling(data_item, self.vocab, c=self.c, k=self.k)
    
    X = np.take(self.intervals_encoding, X)
    pos = np.take(self.intervals_encoding, pos)
    neg = np.take(self.intervals_encoding, neg)
    
    X_pos = [np.tile(xi, (len(pos_i), 1)) for xi, pos_i in zip(X, pos)]
    X_neg = [np.tile(xi, (len(neg_i), 1)) for xi, neg_i in zip(X, neg)]
    
    return X_pos, pos, X_neg, neg

  def __len__(self):
    return len(self._data)

  def _collate_seq(self, seq):
    return pad_sequence(list(map(torch.tensor, (collapse(seq, levels=2)))),
                        batch_first=True,
                        padding_value=0)

  def collate_fn(self, samples):
    X_pos, y_pos, X_neg, y_neg = zip(*samples)

    X_pos = self._collate_seq(X_pos)
    y_pos = self._collate_seq(y_pos)
    X_neg = self._collate_seq(X_neg)
    y_neg = self._collate_seq(y_neg)
    
    return X_pos, y_pos, X_neg, y_neg

Define the model as in the original fasttext paper [1]

[1] https://arxiv.org/abs/1607.04606

In [17]:
import torch.nn as nn
import pytorch_lightning as pl

class Harte2VecModel(pl.LightningModule):
    def __init__(self, 
                 embedding_dim: int = 10):
        super().__init__()
        self.save_hyperparameters()
        self.embedding_dim = embedding_dim
        self.full_vocab_size = 2**12 + 1
        self.embedding = nn.EmbeddingBag(self.full_vocab_size, self.embedding_dim, mode="sum")
    
    def _predict(self, source, target):
        source = self.embedding(source)
        target = self.embedding(target)
        y = torch.einsum("ij,ik->i", source, target)
        return y
    
    def training_step(self, batch, batch_idx):
        x_pos, y_pos, x_neg, y_neg = batch
        
        pos_pred = self._predict(x_pos, y_pos)
        neg_pred = self._predict(x_neg, y_neg)
        pred = torch.cat([pos_pred, neg_pred])
        target = torch.cat([
            torch.ones_like(pos_pred, device=pos_pred.device),
            torch.zeros_like(neg_pred, device=neg_pred.device)
        ])
    
        loss = nn.functional.binary_cross_entropy_with_logits(pred, target)
        self.log("train_loss", loss)
        return loss
    
    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=0.025)
        return optimizer

In [23]:
from torch.utils.data import DataLoader

class DataModule(pl.LightningDataModule):
    def __init__(self, train, batch_size: int = 512):
        super().__init__()
        self.train = train
        self.batch_size = batch_size
        
    def train_dataloader(self):
        return DataLoader(self.train, batch_size=self.batch_size, shuffle=True, collate_fn=self.train.collate_fn, num_workers=16)

Train the model.

In [24]:
batch_size = 512 #@param {type: "number"}
embedding_dim = 10 #@param {type: "number"}

In [25]:
dataset = ChocoHarte2VecDataset(train, t=t, alpha=alpha, c=c, k=k)
data = DataModule(dataset, batch_size=batch_size)

  self.intervals_encoding = np.array([onehot_intervals(self.h2i.convert(chord)) for chord in self.vocab])


In [26]:
from pytorch_lightning.callbacks import EarlyStopping, StochasticWeightAveraging
import logging

model = Harte2VecModel(embedding_dim=embedding_dim)
    
trainer = pl.Trainer(max_epochs=epochs, accelerator="gpu", devices=1,
                     callbacks=[
                         EarlyStopping(monitor="train_loss", min_delta=0.00, patience=2),
                         StochasticWeightAveraging(swa_lrs=1e-2)
                     ])
trainer.fit(model, datamodule=data)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0,1]

  | Name      | Type         | Params
-------------------------------------------
0 | embedding | EmbeddingBag | 41.0 K
-------------------------------------------
41.0 K    Trainable params
0         Non-trainable params
41.0 K    Total params
0.164     Total estimated model params size (MB)


Training: 0it [00:00, ?it/s]

In [27]:
torch.save({
  "embedding": model.embedding.weight,
  "vocab": dataset.vocab
}, "harte2vec.pt")