In [None]:
# %matplotlib inline

import numpy as np
# from matplotlib import pyplot as plt
import time
import os
# import Levenshtein as L

import torch
import torch.nn as nn
import torch.nn.functional as F
#from torchnlp.nn import WeightDropLSTM
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
DEVICE

In [None]:
# Hyper parameters

NUM_EPOCHS = 15
BATCH_SIZE = 32

EXPERIMENT_PATH = '../greenday_experiments'
GENERATION_PATH = '../greenday_generation'
DATA_PATH = '../greenday_dataset'

In [None]:
# load all that we need
dataset = np.load(os.path.join(DATA_PATH, 'midis_array_guitar_greenday.npy'))
chord_vocab = np.load(os.path.join(DATA_PATH, 'chord_vocab_greenday.npy'))

split_ratio = 0.9
split = int(split_ratio * len(dataset))
train_dataset = []
val_dataset = []
for song in dataset:
    train_split = int(split_ratio * len(song))
    train_dataset.append(song[:train_split])
    val_dataset.append(song[train_split:])
train_dataset = np.array(train_dataset)
val_dataset = np.array(val_dataset)
# train_dataset = dataset[:split]
# val_dataset = dataset[split:]

print (train_dataset.shape)

In [None]:
class MusicDataLoader(DataLoader):
    def __init__(self, dataset, batch_size, shuffle=True):
        self.dataset = dataset
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.p = 0.95
        self.seq_len = 70
        self.std = 5
        
    def sample_seq_len_(self):
        rand_p = np.random.random_sample()
        if rand_p < self.p:
            seq_mean = self.seq_len
        else:
            seq_mean = self.seq_len // 2
        return int(np.random.normal(seq_mean, self.std))

    def __iter__(self):
        if self.shuffle:
            rand_idx = np.random.permutation(len(self.dataset))
        else:
            rand_idx = np.arange(len(self.dataset))
        concate_dataset = torch.from_numpy(np.hstack(self.dataset[rand_idx]))
        num_iter = len(concate_dataset) // self.batch_size
        concate_dataset = concate_dataset[:num_iter*self.batch_size].view(self.batch_size, -1)
        concate_dataset.transpose_(0,1)
        index = 0
        while index < len(concate_dataset):
            seq_len = self.sample_seq_len_();
            if index + seq_len > len(concate_dataset):
                break
            yield concate_dataset[index:index+seq_len-1], concate_dataset[index+1:index+seq_len]
            index += seq_len

In [None]:
# model

class MusicModel(nn.Module):

    def __init__(self, note_size, embed_size, nlayers):
        super(MusicModel, self).__init__()
        self.embedding = nn.Embedding(note_size, embed_size)
        self.rnn = nn.LSTM(input_size=embed_size, hidden_size=embed_size, num_layers=nlayers, dropout=0.5)
        self.linear = nn.Linear(embed_size, note_size)
        self.linear.weight = self.embedding.weight
        
        self.init_weight()
        
    def init_weight(self):
        self.embedding.weight.data.uniform_(-0.1, 0.1)

    def forward(self, seq_batch): # L x B
        seq_batch = self.embedding(seq_batch) # L x B x E
        seq_batch, hidden = self.rnn(seq_batch) # L x B x H
        seq_batch = self.linear(seq_batch)
        return seq_batch, hidden
    
    def generate(self, seq, n_notes):
        generated_notes = []
        embed = self.embedding(seq).unsqueeze(1) # L x 1 x E
        output_lstm, hidden = self.rnn(embed) # L x 1 x H
        output = output_lstm[-1] # 1 x H
        logits = self.linear(output) # 1 x V
        scores = F.gumbel_softmax(logits)
        _,current_note = torch.max(scores,dim=1) # 1 x 1
        generated_notes.append(current_note)
        if n_notes > 1:
            for i in range(n_notes-1):
                embed = self.embedding(current_note).unsqueeze(0) # 1 x 1 x E
                output_lstm, hidden = self.rnn(embed, hidden) # 1 x 1 x H
                output = output_lstm[0] # 1 x H
                logits = self.linear(output) # V
                scores = F.gumbel_softmax(logits)
                _,current_note = torch.max(scores,dim=1) # 1
                generated_notes.append(current_note)
        return torch.cat(generated_notes,dim=0)

In [None]:
# model trainer
class MusicModelTrainer:
    def __init__(self, model, train_loader, val_loader, max_epochs=1, run_id='exp'):
        self.model = model.to(DEVICE)
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.train_losses = []
        self.val_losses = []
        self.epochs = 0
        self.max_epochs = max_epochs
        self.run_id = run_id
        
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=3e-4, weight_decay=1.2e-6)
        self.criterion = nn.CrossEntropyLoss(reduction="sum")

    def train(self):
        self.model.train() # set to training mode
        epoch_loss = 0
        num_batches = 0
        n_notes = 0
        for inputs, targets in self.train_loader:
            num_batches += 1
            batch_loss, n_note = self.train_batch(inputs, targets)
            epoch_loss += batch_loss
            n_notes += n_note
            if (num_batches % 100 == 0):
                print ('[TRAIN]  Iter [%d]   Loss: %.4f'
                          % (num_batches, batch_loss / n_note))
        epoch_loss = epoch_loss / n_notes
        self.epochs += 1
        print('[TRAIN]  Epoch [%d/%d]   Loss: %.4f'
                      % (self.epochs, self.max_epochs, epoch_loss))
        self.train_losses.append(epoch_loss)

    def train_batch(self, inputs, targets):
        inputs = inputs.to(DEVICE)
        targets = targets.to(DEVICE)
        output, hidden = self.model(inputs)
        loss = self.criterion(output.view(-1, output.size(2)), targets.contiguous().view(-1))
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        return loss.item(), output.size(0) * output.size(1)
    
    def evaluate(self):
        self.model.eval()
        epoch_loss = 0
        n_notes = 0
        with torch.no_grad():
            for inputs, targets in self.val_loader:
                inputs = inputs.to(DEVICE)
                targets = targets.to(DEVICE)
                output, hidden = self.model(inputs)
                loss = self.criterion(output.view(-1, output.size(2)), targets.contiguous().view(-1))
                epoch_loss += loss.item()
                n_notes += output.size(0) * output.size(1)
            epoch_loss = epoch_loss / n_notes
            print('[VAL] Val Loss: %.4f' % epoch_loss)
            self.val_losses.append(epoch_loss)
    
    def save(self):
        model_path = os.path.join(EXPERIMENT_PATH, self.run_id, 'model-{}.pt'.format(self.epochs))
        torch.save(self.model.state_dict(), model_path)
    
    def load(self, model_path):
        if DEVICE == 'cpu':
            self.model.load_state_dict(torch.load(model_path, map_location=lambda storage, loc: storage))
        else:
            self.model.load_state_dict(torch.load(model_path))
        print ("loaded model")
    
    def generate(self, seed, n_notes):
        self.model.eval()
        seq = np.array(seed.split(), dtype=int)
        seq = torch.from_numpy(seq).to(DEVICE)
        output = model.generate(seq, n_notes)
        return output.cpu().detach().numpy()

In [None]:
run_id = "guitar"
if not os.path.exists(EXPERIMENT_PATH):
    os.mkdir(EXPERIMENT_PATH)
if not os.path.exists(os.path.join(EXPERIMENT_PATH, run_id)):
    os.mkdir(os.path.join(EXPERIMENT_PATH, run_id))
print("Saving models, predictions, and generated words to %s/%s" % (EXPERIMENT_PATH, run_id))

In [None]:
model = MusicModel(len(chord_vocab)+1, 512, 3)
train_loader = MusicDataLoader(train_dataset, batch_size=BATCH_SIZE)
val_loader = MusicDataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
trainer = MusicModelTrainer(model=model, train_loader=train_loader, val_loader=val_loader,
                            max_epochs=NUM_EPOCHS, run_id=run_id)
print (model)

In [None]:
best_nll = 1e30  # set to super large value at first
for epoch in range(NUM_EPOCHS):
    trainer.train()
    nll = trainer.evaluate()
trainer.save()

In [None]:
trainer.load(os.path.join(EXPERIMENT_PATH, 'guitar/model-15.pt'))
# from collections import Counter
start = []
for song in dataset:
    start.append(song[0])
# Counter(start).most_common

# 186, 122, 182
start_note = np.random.choice(start)
gen = np.array([start_note] + list(trainer.generate(str(start_note), 800)))
print (gen)

In [None]:
def gen_one_hot_bass():
    gen_one_hot = []

    for i in range(len(gen)):
        if gen[i] == 128:
            continue
        one_hot = np.zeros((128,1))
        if gen[i] != 0:
            if i == 0 or gen[i] != gen[i-1]:
                one_hot[gen[i]] = 1
            else:
                one_hot[gen[i]] = 0.5
        gen_one_hot.append(one_hot)

    gen_one_hot = np.hstack(gen_one_hot)
    np.save(os.path.join(GENERATION_PATH, 'try_bass.npy'), gen_one_hot)

def gen_one_hot_guitar(ind):
    gen_one_hot = []
    
    for i in range(len(gen)):
        if gen[i] == len(chord_vocab):
            continue
        one_hot = np.zeros((128, 1))
        chord = chord_vocab[gen[i]]
        if i == 0 or gen[i] != gen[i-1]:
            for c in chord:
                one_hot[c] = 1
        else:
            for c in chord:
                one_hot[c] = 0.5
        gen_one_hot.append(one_hot)
    gen_one_hot = np.hstack(gen_one_hot)
    np.save(os.path.join(GENERATION_PATH, 'try_guitar_{}.npy'.format(ind)), gen_one_hot)

gen_one_hot_guitar(1)