In [77]:
import os
import mido
import string
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from midi_ndarrays import *
import mido

cwd = os.getcwd()+'/'
midi_data_dir  = cwd+'midi_data/'
batch_data_dir = cwd+'batch_data/'

# only download midis if the batch dir doesn't exist and the midi_dir doesn't exist
if not os.path.exists(midi_data_dir) and not os.path.exists(batch_data_dir):
    os.makedirs(midi_data_dir)
    download_midis(midi_data_dir)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [27]:
def dir_idx(dir_name, n):
    """
    return the name of the nth file in a directory
    """
    
    return dir_name+os.listdir(dir_name)[n]

# Data Preprocessing

In [93]:
# download music if it hasn't already been downloaded
if len(os.listdir(midi_data_dir)) == 0:
    from midi_utils import download_midis
    download_midis(midi_data_dir)

    
def load_array(midi_filename):
    """
    return midi_filename as a ndarray with start and end tokens
    Replace all notes with 1 - played, or 0 - not played
    """
    midi_tracks = mido.MidiFile(midi_filename, clip=True)
    midi_array = mid2array(midi_tracks)
    
    # set all values to 1 or 0
    midi_array = np.where(midi_array != 0, 1, 0).astype('uint8')
    
    # add padding and encode start token (first column)
    # and end token (last column)
    midi_array = np.pad(midi_array, 1)
    midi_array[0, 0]   = 1
    midi_array[-1, -1] = 1
    
    return midi_array
   
    
def batches_to_csv(batch_size):
    """
    load one song at a time, concatenate them vertically into a massive matrix
    of songs for that batch, then save to file
    """
    
    # index of the file in midi_data_dir
    file_idx = 0
    
    num_midi_files = len(os.listdir(midi_data_dir))
    
    # loop over batches, ends when we've reached the last file
    while file_idx < num_midi_files:
        
        batch_num   = 0
        batch_array = 0
        
        # for each element in this batch
        for batch_idx in range(batch_size):
            
            midi_filename = dir_idx(midi_data_dir, file_idx)
            file_idx += 1
            
            try:
                midi_arrary = load_array(midi_filename)
                
                # initialize the batch array for the first song in the batch
                if batch_idx == 0:
                    batch_array = midi_array
                   
                # or append to to it vertically
                else:
                    batch_array = np.vstack((batch_array, midi_array))
            
            # key signature can't be read
            except:
                # continue on, but because we haven't added anything to the batch,
                # don't increment batch_idx
                batch_idx -= 1
                continue
               
        np.savetxt(batch_data_dir+'batch-'+str(batch_num)+'.csv', batch_array, fmt="%d", delimiter=",")
        batch_num += 1

In [6]:
# TODO load_batch
# def load_data(filename):
#     arr = np.genfromtxt(filename, delimiter=',')
    
#     # update all values where the note played isn't 0 to 1, else 0
#     return np.where(arr != 0, 1, 0)

# The Model

In [7]:
class EncoderRNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, dropout_p):
        super().__init__()
        
        self.hidden_size = hidden_size
        self.num_layers  = num_layers
        
        self.dropout   = nn.Dropout(dropout_p)
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.rnn       = nn.LSTM(hidden_size, hidden_size, num_layers, dropout=dropout_p)
        
    def forward(self, x):
        embedding = self.dropout(self.embedding(x))
        outputs, (h, c) = self.rnn(embedding)
        return h, c

In [8]:
class DecoderRNN(nn.Module):
    def __init__(self, hidden_size, output_size, 
                num_layers, dropout_p):
    
        super().__init__()
        self.hidden_size = hidden_size
        self.num_layers  = num_layers

        self.dropout   = nn.Dropout(dropout_p)
        self.embedding = nn.Embedding(output_size, hidden_size)
        
        self.rnn = nn.LSTM(hidden_size, hidden_size, num_layers, dropout=dropout_p)
        
        self.fc = nn.Linear(hidden_size, output_size)
        
    def forward(self, x, hidden, cell):
        x = x.unsqueeze(0)
        
        embedding = self.dropout(self.embedding(x))
        
        outputs, (hidden, cell) = self.rnn(embedding, (hidden, cell))
        
        predictions = self.fc(outputs)
         
        predictions = predictions.squeeze(0)
        
        return predictions, hidden, cell

In [9]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()
        
        self.encoder = encoder
        self.decoder = decoder
  
    def forward(self, src, trg, teacher_forcing_ratio = 0.5):
        
        batch_size = trg.shape[1]
        trg_len = trg.shape[0]
        
        trg_out = self.decoder.output_dim
        
        #tensor to store decoder outputs
        outputs = torch.zeros(trg_len, batch_size, trg_out).to(device)
        
        #last hidden state of the encoder is used as the initial hidden state of the decoder
        hidden, cell = self.encoder(src)
        
        #first input to the decoder is the <sos> tokens
        input = trg[0,:]
        
        for t in range(1, trg_len):
            
            #insert input token embedding, previous hidden and previous cell states
            #receive output tensor (predictions) and new hidden and cell states
            output, hidden, cell = self.decoder(input, hidden, cell)
            
            #place predictions in a tensor holding predictions for each token
            outputs[t] = output
            
            #decide if we are going to use teacher forcing or not
            teacher_force = random.random() < teacher_forcing_ratio
            
            #get the highest predicted token from our predictions
            top1 = output.argmax(1) 
            
            #if teacher forcing, use actual next token as next input
            #if not, use predicted token
            input = trg[t] if teacher_force else top1
        
        return outputs

# Train

In [None]:
num_layers = 2
drop_p = 0.5

# data has 88 columns for each piano note, plus two for start and end tokens
midi_dim = 90

hidden_size = 256
num_layers = 2

# number of songs to pass at a time
batch_size = 10

# make sure we have our batch data
if not os.path.exists(batch_data_dir):
    os.makedirs(batch_data_dir)
    batches_to_csv(batch_size)

In [22]:
drop_p  = 0.5

encoder = EncoderRNN(midi_dim, hidden_size, num_layers, drop_p)
decoder = DecoderRNN(hidden_size, midi_dim, num_layers, drop_p)
model = Seq2Seq(encoder, decoder).to(device)

optimizer = optim.Adam(model.parameters())
criterion = nn.BCELoss()

# TODO
# for epoch in epochs
    # for song in epoch
        # pass to seq to seq, compute loss, optimize

In [None]:
## Training Loop
def train(model, iterator, optimizer):
    pass