In [467]:
import midi
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import copy
import keras
import warnings
import pickle
from keras.models import Sequential
from keras.layers import Dense, TimeDistributed, LSTM, Dropout
from sklearn.preprocessing import MinMaxScaler, RobustScaler, StandardScaler
from sklearn.metrics import mean_squared_error
warnings.filterwarnings("ignore")
%matplotlib inline

In [2]:
class MidiTokenizer():
    def __init__(self, filepath, padding=True):
        '''
        filepath: path to MIDI file
        padding: whether or not to pad time series data in case of polyphony
        self.mid: python-midi parsing of MIDI file
        self.format: format of MIDI file (0, 1, 2)
        self.tick_relative: whether event ticks are recorded relative to other
            events or absolutely
        '''
        self.mid = midi.read_midifile(filepath)
        self.format = self.mid.format
        self.tick_relative = self.mid.tick_relative
        self.padding = padding
        
        self.event_data = {i: [] for i in range(17)}
        self.time_series_data = {}
        
        if self.format == 0:         # one midi track
            self.parse_track([mid], self.tick_relative)
        elif self.format == 1:       # multiple midi tracks
            for track in self.mid:
                self.parse_track(track, self.tick_relative)
                
        for i in self.event_data.keys():
            self.event_data[i] = pd.DataFrame(
                self.event_data[i], columns=['start', 'duration', 'pitch', 'velocity'])
            
        self.gen_time_series_data()
                            
    def parse_track(self, track, tick_relative=True):        
        tick = 0
        pitch_data = {}
        
        for event in track:
            if type(event) != midi.NoteOnEvent and type(event) != midi.NoteOffEvent:
                # if event does not correspond to a note, skip event
                continue

            tick = (tick + event.tick) if tick_relative else event.tick
            
            if event.velocity != 0 and type(event) != midi.NoteOffEvent:
                # note on
                datum = [tick, None, event.pitch, event.velocity]
                self.event_data[event.channel].append(datum)
                try:
                    if pitch_data[event.pitch][-1][1] == None:
                        pitch_data[event.pitch][-1][1] = tick
                        
                    pitch_data[event.pitch].append(datum)
                except KeyError:
                    pitch_data[event.pitch] = [datum]
            else:
                # note off
                pitch_data[event.pitch][-1][1] = tick - self.event_data[event.channel][-1][0]
                                
    def gen_time_series_data(self):
        for channel, parsed_track in self.event_data.items():
            if parsed_track.shape[0] == 0:
                continue
            
            ticks = parsed_track[['start', 'duration']].values
            max_tick = max(map(lambda x:np.sum(x), ticks)) + 1
            
            self.time_series_data[channel] = [[] for _ in range(max_tick)]
            
            for _, event in parsed_track.iterrows():
                start, duration, pitch, velocity = event.values
                end = start + duration
                note = (pitch, velocity)
                                
                self.time_series_data[channel][start].append(note)
                self.time_series_data[channel][end].append(note)
                
            activated_notes = set([])
            
            for i, notes in enumerate(self.time_series_data[channel]):
                for note in notes:
                    if note not in activated_notes:
                        activated_notes.add(note)
                    else:
                        activated_notes.remove(note)
                
                self.time_series_data[channel][i] = list(activated_notes)
              
            if self.padding:
                self.time_series_data[channel] = \
                    keras.preprocessing.sequence.pad_sequences(self.time_series_data[channel])

            

In [548]:
mt = MidiTokenizer('data/big_poppa/BigPoppa.mid')

In [549]:
mt.mid[0]

midi.Track(\
  [midi.KeySignatureEvent(tick=0, data=[0, 0]),
   midi.SmpteOffsetEvent(tick=0, data=[96, 0, 0, 0, 0]),
   midi.SetTempoEvent(tick=0, data=[10, 230, 45]),
   midi.TimeSignatureEvent(tick=0, data=[4, 2, 24, 8]),
   midi.EndOfTrackEvent(tick=0, data=[])])

In [4]:
def process_data(data, timestep):
    X, y = [], []
    for i in range(len(data)-timestep-1):
        X.append(np.array([data[i:(i+timestep)]]))
        y.append(np.array([data[(i+timestep)]]))
        
    X, y = np.array(X), np.array(y)
    return X.reshape(*[_ for _ in X.shape if _ != 1]), \
           y.reshape(*[_ for _ in y.shape if _ != 1])

def cantor_pair(k1, k2):
    return int((k1+k2)*(k1+k2+1)/2+k2)

def cantor_unpair(z):
    w = int((np.sqrt(8*z+1)-1)/2)
    t = (w**2+w)/2
    y = int(z-t)
    x = int(w-y)
    return x, y

In [416]:
class RNN:
    def __init__(self, X, train_test_split=0.8, epochs=100, batch_size=10, lstm_units=128, timestep=32):
        self.X = X
        self.split = int(self.X.shape[0]*train_test_split)
        self.X_train, self.X_test = self.X[:self.split], self.X[self.split:]
        self.epochs = epochs
        self.batch_size = batch_size
        self.lstm_units = lstm_units
        self.timestep = timestep
        self.scaler = StandardScaler()
        self.scale_data()
        
        self.model = Sequential()
        self.model.add(
            LSTM(self.lstm_units, input_shape=(
                self.timestep, self.X_train_processed.shape[-1]
            ), activation='relu')
        )
        self.model.add(Dense(self.X_train_processed.shape[-1]))
        self.model.compile(optimizer='adamax', loss='mse', metrics=['accuracy'])
        
    def flatten_data(self):
        self.X_train_flattened = np.apply_along_axis(lambda x:x[1], 2, self.X_train)
        self.X_test_flattened = np.apply_along_axis(lambda x:x[1], 2, self.X_test)
        
    def scale_data(self):
        self.flatten_data()
        self.scaler.fit(self.X_train_flattened)
        self.X_train_scaled = self.scaler.transform(self.X_train_flattened)
        self.X_test_scaled = self.scaler.transform(self.X_test_flattened)
        
        self.X_train_processed, self.y_train_processed = \
            process_data(self.X_train_scaled, self.timestep)
        self.X_test_processed, self.y_test_processed = \
            process_data(self.X_test_scaled, self.timestep)
        
    def train(self, verbose=0):
        history = self.model.fit(self.X_train_processed, self.y_train_processed,
                                 batch_size=self.batch_size, epochs=self.epochs)
        
        return history
    
    def evaluate(self):
        keras.evaluate(x=self.y_test_processed,
                       y=self.model.predict())

In [417]:
rnn = RNN(mt.time_series_data[0], epochs=5)

In [418]:
rnn.train(verbose=1)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.callbacks.History at 0x14124d710>

In [154]:
mean_squared_error(rnn.scaler.inverse_transform(rnn.y_test_processed),
                   rnn.scaler.inverse_transform(
                       rnn.model.predict(rnn.X_test_processed)
                   ))

3.1058648028537323

In [155]:
rnn.model.evaluate(rnn.X_test_processed, rnn.y_test_processed)



[0.010141453635063314, 0.9648351648351648]

In [355]:
pred = rnn.scaler.inverse_transform(rnn.model.predict(rnn.X_train_processed))

In [356]:
def note_map(x):
    if x < 0:
        return 0
    elif x > 127:
        return 0
    else:
        return x

In [369]:
note_map = np.vectorize(note_map)
pred = note_map(pred)

In [545]:
pattern = midi.Pattern()
track = midi.Track()
pattern.append(track)
track.append(midi.PortEvent(tick=0, data=[0]))
track.append(midi.TrackNameEvent(tick=0, text='Thank you Kanye, very cool!'))
track.append(midi.ProgramChangeEvent(tick=0, channel=8))

In [546]:
track2 = midi.Track()

pred = rnn.scaler.inverse_transform(rnn.y_train_processed).astype(int)
prev = []

for tick, note_arr in enumerate(pred[:-1]):
    for note in note_arr:
        if note == 0: continue
        
        if note in prev:
            if note not in pred[tick+1]:
                track2.append(midi.NoteOnEvent(tick=tick, channel=10, data=[note, 0]))
        else:
            track2.append(midi.NoteOnEvent(tick=tick, channel=10, data=[note, 60]))
            
    prev = note_arr
    
for i, event in reversed(list(enumerate(track2))):
    if i == 0:
        continue
        
    event.tick = (event.tick - track2[i-1].tick)
    
pattern.append(track2)

In [547]:
midi.write_midifile("test.mid", pattern)

In [459]:
pred = rnn.scaler.inverse_transform(rnn.model.predict(rnn.X_train_processed)).astype(int)

In [481]:
def one_d_convert(datum):
    return datum.reshape(1, *datum.shape)

In [511]:
def babl(model, length, seed):
    X, Y = copy.deepcopy(seed), []
    init = model.predict(one_d_convert(X))
    
    for i in range(length):
        y = rnn.model.predict(one_d_convert(X))
        Y.append(y[0])
        X = np.concatenate((X[1:], y), axis=0)

    return np.array(Y)

In [512]:
Y = babl(rnn.model, 32, rnn.X_test_processed[0])

In [516]:
with open('pred', 'rb') as f:
    pred = pickle.load(f)