In [2]:
from loaders import *
from collections import Counter
from torch import nn
from torch.autograd import Variable

import numpy as np
import torch
import torch.nn.functional as F
import json
import numpy as np
import matplotlib.pyplot as plt
import random

In [7]:
""" Some global variables """
_loader = Loader(500)
loader = MIDILoader(_loader)

use_cuda = torch.cuda.is_available()
# Is the tokenizer 1 indexed?
vocabulary_size = 16*128*2 + 32*16 + 100 + 1 # 4708 + 1
vocabulary_size = vocabulary_size + 2 # SOS (index 4709) and EOS (index 4710)
SOS_TOKEN = 4709
EOS_TOKEN = 4710

encoding_size = 500
one_hot_embeddings = np.eye(vocabulary_size)

In [12]:
import time
import math


def asMinutes(s):
    m = math.floor(s / 60)
    s -= m * 60
    return '%dm %ds' % (m, s)


def timeSince(since, percent):
    now = time.time()
    s = now - since
    es = s / (percent)
    rs = es - s
    return '%s (- %s)' % (asMinutes(s), asMinutes(rs))

In [3]:
class EncoderLSTM(nn.Module):
    # Your code goes here
    def __init__(self, input_size, hidden_size):
        super(EncoderLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.lstm = nn.LSTM(input_size, hidden_size).double()
        if use_cuda:
            self.lstm = self.lstm.cuda()
        
    def forward(self, input, hidden_in):
        _, hidden_out = self.lstm(input, hidden_in) # encoder only outputs hidden
        return hidden_out
    
    def initHidden(self, hidden):
        
        if hidden == None:
            result = Variable(torch.zeros(1, 1, self.hidden_size)).double()
            
            if use_cuda:
                result = result.cuda()
            return result
        
        else:
            return hidden

In [4]:

class DecoderLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(DecoderLSTM, self).__init__()
        self.hidden_size = hidden_size

        self.lstm = nn.LSTM(input_size, hidden_size).double()
        self.out = nn.Linear(hidden_size, output_size).double()
        self.project = nn.Linear(4096, self.hidden_size).double()
        if use_cuda:
            self.lstm = self.lstm.cuda()
            self.out = self.out.cuda()
            self.project = self.project.cuda()

    def forward(self, input, hidden):
        output = F.relu(input)
        output, hidden = self.lstm(output, hidden)
        output = self.out(output)
        output = output.squeeze()
        return output.unsqueeze(0), hidden

    def initHidden(self):
        result = Variable(torch.zeros(1, 1, self.hidden_size)).double()
        if use_cuda:
            return result.cuda()
        else:
            return result

In [None]:
class MetaLearner(nn.Module):
    
    def __init__(self,
                 input_size,
                 hidden_size):
        super(MetaLearner,self).__init__()
        
    

In [23]:
class Learner(nn.Module):
    
    def __init__(self,
                 input_size,
                 hidden_size,
                 output_size,
                 learning_rate,
                 embeddings=one_hot_embeddings):
        
        super(Learner,self).__init__()
        
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        
        self.encoder = EncoderLSTM(input_size, hidden_size)
        self.decoder = DecoderLSTM(input_size, hidden_size, output_size)
        self.encoder_optimizer = torch.optim.Adam(self.encoder.parameters(), lr=learning_rate)
        self.decoder_optimizer = torch.optim.Adam(self.decoder.parameters(), lr=learning_rate)
        
        self.embeddings = embeddings
        self.criterion = nn.CrossEntropyLoss()
        
    
    def forward(self, sequence, hidden):
        
        encoder = self.encoder
        decoder = self.decoder
        embeddings = self.embeddings
        criterion = self.criterion
        
        sequence_length = sequence.size()[1]
        loss = 0
        
        encoder_hidden = encoder.initHidden(hidden)
        encoder_hidden = (encoder_hidden, encoder_hidden) # Need a tuple

        # Encoder is fed the flipped control sequence
        for index_control in np.arange(sequence_length-1, 0, -1):
            encoder_input = sequence[0][index_control].view(1, 1, vocabulary_size)
            encoder_hidden = encoder(encoder_input, encoder_hidden) # Gets hidden for next input  
        
        # feed encoder_hidden
        decoder_input = sequence[0][1] # One after SOS
        decoder_hidden = encoder_hidden
        predicted_note_index = 0

        for index_control in range(2, sequence_length):
            decoder_input = decoder_input.view(1, 1, vocabulary_size)
            decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)

            topv, topi = decoder_output.data.topk(1)
            predicted_control_index = int(topi)
            
            if random.random() <= 0.9:
                decoder_input = sequence[0][index_control].view(1, 1, vocabulary_size)
            else:
                # This is the next input, without teacher forcing it's the predicted output
                decoder_input = torch.from_numpy(embeddings[predicted_control_index])
                decoder_input = Variable(decoder_input)
                if use_cuda:
                    decoder_input = decoder_input.cuda()
                    
            # CrossEntropyLoss takes input1: (N, C) and input2: (N).
            _, actual_control_index = sequence[0][index_control].topk(1)
            if use_cuda:
                actual_control_index = actual_control_index.cuda()
            loss += criterion(decoder_output, actual_control_index)
            
        return loss / index_control
    
    def map_inference(self, sequence, hidden, embeddings=one_hot_embeddings, max_length=500):
        
        encoder = self.encoder
        decoder = self.decoder
        
        output_control_sequence = []
    
        # Encoder
        encoder_hidden = encoder.initHidden(hidden)
        encoder_hidden = (encoder_hidden, encoder_hidden)

        sequence_length = sequence.size()[1]

        for index_control in np.arange(sequence_length-1, 0, -1):
            encoder_input = sequence[0][index_control].view(1, 1, vocabulary_size)
            encoder_hidden = encoder(encoder_input, encoder_hidden) # Gets hidden for next input

        # This point we have last encoder_hidden, feed into decoder
        decoder_hidden = encoder_hidden
        decoder_input = sequence[0][0]
        predicted_control_index = SOS_TOKEN

        cur_length = 0
        while True:
            decoder_input = decoder_input.view(1, 1, vocabulary_size)
            decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)

            # MAP inference
            topv, topi = decoder_output.data.topk(1)
            predicted_control_index = int(topi)
            if predicted_control_index == EOS_TOKEN:
                break
            output_control_sequence.append(predicted_control_index)

            # This is the next input
            decoder_input = torch.from_numpy(embeddings[predicted_control_index])
            decoder_input = Variable(decoder_input).double()
            if use_cuda:
                decoder_input = decoder_input.cuda()

            cur_length += 1
            if cur_length >= max_length:
                break

        return output_control_sequence
    
    def train(self, sequence, hidden):
        self.encoder_optimizer.zero_grad()
        self.decoder_optimizer.zero_grad()
        
        loss = self.forward(sequence, hidden)
        
        loss.backward()
        self.encoder_optimizer.step()
        self.decoder_optimizer.step()
        return loss

learner = Learner(vocabulary_size, 
              encoding_size, 
              vocabulary_size,
              learning_rate=0.01)

In [19]:
input_files = ['bach_846.mid']
input_variables = []

for index, input_file in enumerate(input_files):
    orig_seq = loader.read('../data/' + input_file)
    orig_seq = loader.tokenize(orig_seq)
    
    trunc_seq = orig_seq[0:500]
    trunc_seq = [SOS_TOKEN] + trunc_seq + [EOS_TOKEN]
    seq_length = len(trunc_seq)
    
    trunc_seq = torch.from_numpy(np.array(one_hot_embeddings[trunc_seq])) # This is really time consuming
    trunc_seq = trunc_seq.view(1, seq_length, vocabulary_size)
    trunc_seq = Variable(trunc_seq)
    if use_cuda:
        trunc_seq = trunc_seq.cuda()
    input_variables.append(trunc_seq)

In [24]:
""" Testing Learner """
print_every = 10
total_epochs = 200
print_loss_total = 0
start = time.time()
for epoch in range(1, total_epochs+1):
    for index, sequence in enumerate(input_variables):
        loss = learner.train(sequence, hidden=None)
        print_loss_total += loss
    
    if epoch % print_every == 0:
        print_loss_avg = print_loss_total / print_every
        print_loss_total = 0
        print('%s (%d %d%%) %.4f' % (timeSince(start, epoch / total_epochs),
                                     epoch, epoch / total_epochs * 100, print_loss_avg))

1m 53s (- 35m 53s) (10 5%) 4.5626
3m 46s (- 33m 58s) (20 10%) 3.4049
5m 38s (- 32m 0s) (30 15%) 3.2659


KeyboardInterrupt: 