In [1]:
import glob
import pickle
import numpy as np
from music21 import *
import torch
import torch.nn as nn
from torch import optim
from torch.autograd import Variable
import sys

In [3]:
notes = []
durs = []
offsets = []
for file in glob.glob("midi_s2/*.mid"):
    notes_file = []
    durs_file = []
    offsets_file = []
    midi = converter.parse(file)
    print("Parsing %s" % file)
    notes_to_parse = None
    #try: # file has instrument parts
    #    s2 = instrument.partitionByInstrument(midi)
    #    notes_to_parse = s2.parts[0].recurse() 
    #except: # file has notes in a flat structure
    notes_to_parse = midi.flat.notes
    print(len(notes_to_parse))
    for element in notes_to_parse:
        if isinstance(element, note.Note):
            notes_file.append(str(element.pitch))
        elif isinstance(element, chord.Chord):
            notes_file.append(str(element.root()))
        durs_file.append(str(element.quarterLength))
        offsets_file.append(str(element.quarterLength))
    notes.append(notes_file)
    durs.append(durs_file)
    offsets.append(offsets_file)

#np.array_equal(offsets[0],durs[0])
ind_notes = list(set([item for sublist in notes for item in sublist]))
ind_durs = list(set([item for sublist in durs for item in sublist]))
print(ind_notes)
print(ind_durs)
print([[item for sublist in notes for item in sublist].count(item) for item in ind_notes])
print([[item for sublist in durs for item in sublist].count(item) for item in ind_durs])

Parsing midi_s2/chpn_op23_format0.mid
3116
['C#2', 'C#5', 'C5', 'A2', 'F#6', 'C#4', 'F#1', 'B6', 'B-6', 'E-2', 'C#7', 'B4', 'C#3', 'F1', 'E6', 'F5', 'B-5', 'F7', 'G#1', 'E-4', 'D2', 'G#6', 'F6', 'B1', 'C#6', 'F3', 'G6', 'D5', 'G4', 'G#5', 'F#3', 'E5', 'A6', 'B-3', 'E2', 'B5', 'C4', 'A1', 'E-3', 'G5', 'D3', 'G2', 'B-4', 'F4', 'G3', 'B2', 'E1', 'G#2', 'C2', 'F2', 'G1', 'B-1', 'D4', 'A5', 'D6', 'C6', 'E3', 'C7', 'E-7', 'E-6', 'G#3', 'F#4', 'B3', 'E-5', 'G#4', 'D7', 'C3', 'A3', 'F#2', 'B-2', 'D1', 'A4', 'F#5', 'E4', 'E7']
['3.0', '12.0', '2.25', '2.0', '3.75', '2.75', '0.25', '2/3', '4.75', '0.0', '12.25', '3.5', '1.0', '4.0', '8.0', '7/3', '4.5', '0.75', '6.0', '5/3', '1.5', '1.25', '5.0', '2.5', '0.5', '4/3', '7.25', '37/3', '1/3']
[8, 41, 103, 15, 17, 31, 1, 7, 27, 43, 5, 29, 5, 1, 13, 55, 69, 4, 1, 92, 37, 14, 26, 5, 14, 72, 39, 145, 126, 54, 33, 29, 18, 111, 15, 16, 91, 2, 85, 101, 78, 43, 125, 59, 94, 15, 2, 2, 10, 15, 10, 12, 116, 52, 47, 43, 38, 12, 6, 33, 68, 55, 34, 86, 72, 12, 3

In [4]:
seq = 200

pits_dataq = np.empty((0, seq))
durs_dataq = np.empty((0, seq))

for song in range(len(notes)):
    song_notes_i = np.array([ind_notes.index(x) for x in notes[song]])
    song_durs_i = np.array([ind_durs.index(x) for x in durs[song]])
    nseq = int(np.floor(song_notes_i.shape[0] / seq)) # throw away end of song
    pits_dataq = np.concatenate((pits_dataq, np.reshape(song_notes_i[:nseq*seq], (-1, seq))))
    durs_dataq = np.concatenate((durs_dataq, np.reshape(song_durs_i[:nseq*seq], (-1, seq))))
pitch_nq = len(ind_notes)
dur_nq = len(ind_durs)
pitch_scale = pitch_nq - 1
dur_scale = dur_nq - 1
pits_data = 2 * (pits_dataq / pitch_scale - 0.5)
durs_data = 2 * (durs_dataq / dur_scale - 0.5)

In [5]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

hidden_size = 300
fr_len = 4

class Sequence(nn.Module):
    def __init__(self):
        super(Sequence, self).__init__()
        self.pWz2 = nn.Linear(hidden_size+fr_len, hidden_size) #pitch tier2 GRU
        self.pWr2 = nn.Linear(hidden_size+fr_len, hidden_size) #pitch tier2 GRU
        self.pWh2 = nn.Linear(hidden_size+fr_len, hidden_size) #pitch tier2 GRU
        self.pWz = nn.Linear(2*hidden_size+1, hidden_size) #pitch GRU
        self.pWr = nn.Linear(2*hidden_size+1, hidden_size) #pitch GRU
        self.pWh = nn.Linear(2*hidden_size+1, hidden_size) #pitch GRU
        self.pO2 = nn.Linear(hidden_size, hidden_size)
        self.pO = nn.Linear(hidden_size, pitch_nq)
        self.dWz = nn.Linear(hidden_size+2, hidden_size) #pitch GRU
        self.dWr = nn.Linear(hidden_size+2, hidden_size) #pitch GRU
        self.dWh = nn.Linear(hidden_size+2, hidden_size) #pitch GRU
        self.dO2 = nn.Linear(hidden_size, hidden_size)
        self.dO = nn.Linear(hidden_size, dur_nq)
        
    def pitch_tier2_model(self, x, h2):
        z2 = torch.sigmoid(self.pWz2(torch.cat([h2, x], dim=1)))
        r2 = torch.sigmoid(self.pWr2(torch.cat([h2, x], dim=1)))       
        hh2 = torch.tanh(self.pWh2(torch.cat([r2 * h2, x], dim=1)))
        h2 = (1 - z2) * h2 + z2 * hh2
        return h2
        
    def pitch_model(self, x, h, h2): # P(pitch cur|pitch hist,pitch tier2 hist)
        z = torch.sigmoid(self.pWz(torch.cat([h, h2, x], dim=1))) 
        r = torch.sigmoid(self.pWr(torch.cat([h, h2, x], dim=1)))       
        hh = torch.tanh(self.pWh(torch.cat([r * h, h2, x], dim=1)))
        h = (1 - z) * h + z * hh
        o = self.pO(torch.relu(self.pO2(h)))
        return o, h
    
    def dur_model(self, x, x2, h): # P(dur cur|dur hist,pitch cur)
        z = torch.sigmoid(self.dWz(torch.cat([h, x, x2], dim=1)))
        r = torch.sigmoid(self.dWr(torch.cat([h, x, x2], dim=1)))       
        hh = torch.tanh(self.dWh(torch.cat([r * h, x, x2], dim=1)))
        h = (1 - z) * h + z * hh
        o = self.dO(torch.relu(self.dO2(h)))
        return o, h

    def forward(self, pitch_input, dur_input, pitch_ref, generate=False, future = 0):
        pitch_outputs = []
        pitch_genputs = np.empty((pitch_input.shape))
        dur_outputs = []
        dur_genputs = np.empty((dur_input.shape))
        ph_t = torch.zeros(pitch_input.size(0), hidden_size, dtype=torch.float32, device=device)
        ph2_t = torch.zeros(pitch_input.size(0), hidden_size, dtype=torch.float32, device=device)
        pitch_input2_t = torch.zeros(pitch_input.size(0), fr_len, dtype=torch.float32, device=device)  
        dh_t = torch.zeros(dur_input.size(0), hidden_size, dtype=torch.float32, device=device)
        for i, (pitch_input_t, dur_input_t, pitch_ref_t) in enumerate(zip(pitch_input.chunk(pitch_input.size(1), dim=1), dur_input.chunk(dur_input.size(1), dim=1), pitch_ref.chunk(pitch_ref.size(1), dim=1))):
            pitch_output, ph_t = self.pitch_model(pitch_input_t, ph_t, ph2_t)
            pitch_outputs += [pitch_output]              
            if (i+1) % fr_len == 0:
                pitch_input2_t = pitch_input[:,i-fr_len+1:i+1]
                ph2_t = self.pitch_tier2_model(pitch_input2_t, ph2_t)
            dur_output, dh_t = self.dur_model(dur_input_t, pitch_ref_t, dh_t)
            dur_outputs += [dur_output]
            if generate:                           
                probabilities = nn.functional.softmax(pitch_output, dim=-1).cpu().data.numpy()
                pitch_genput = np.empty(pitch_output.shape[0])
                for bi in range(pitch_output.shape[0]):
                    pitch_genput[bi] = 2 * (int(np.random.choice(np.arange(pitch_nq), p=probabilities[bi])) / pitch_scale - 0.5)
                pitch_genputs[:,i] = pitch_genput
                probabilities = nn.functional.softmax(dur_output, dim=-1).cpu().data.numpy()
                dur_genput = np.empty(dur_output.shape[0])
                for bi in range(dur_output.shape[0]):
                    dur_genput[bi] = 2 * (int(np.random.choice(np.arange(dur_nq), p=probabilities[bi])) / dur_scale - 0.5)
                dur_genputs[:,i] = dur_genput                                                      
        if generate:
            for fi in range(future):
                pitch_genput_torch = torch.from_numpy(pitch_genput[:,None]).float().to(device)
                if fi % fr_len == 0:
                    pitch_genput2_torch = torch.from_numpy(pitch_genputs[:,-fr_len:]).float().to(device)
                    ph2_t = self.pitch_tier2_model(pitch_genput2_torch, ph2_t)                   
                pitch_output, ph_t = self.pitch_model(pitch_genput_torch, ph_t, ph2_t)
                pitch_outputs += [pitch_output]                                                       
                probabilities = nn.functional.softmax(pitch_output, dim=-1).cpu().data.numpy()
                pitch_genput = np.empty(pitch_output.shape[0])
                for bi in range(pitch_output.shape[0]):
                    pitch_genput[bi] = 2 * (int(np.random.choice(np.arange(pitch_nq), p=probabilities[bi])) / pitch_scale - 0.5)
                pitch_genputs = np.concatenate((pitch_genputs,pitch_genput[:,None]), axis=1)
                pitch_genput_torch = torch.from_numpy(pitch_genput[:,None]).float().to(device) # handle more efficient?
                dur_genput_torch = torch.from_numpy(dur_genput[:,None]).float().to(device)                                                    
                dur_output, dh_t = self.dur_model(dur_genput_torch, pitch_genput_torch, dh_t)
                dur_outputs += [dur_output]  
                probabilities = nn.functional.softmax(dur_output, dim=-1).cpu().data.numpy()
                dur_genput = np.empty(dur_output.shape[0])
                for bi in range(dur_output.shape[0]):
                    dur_genput[bi] = 2 * (int(np.random.choice(np.arange(dur_nq), p=probabilities[bi])) / dur_scale - 0.5)
                dur_genputs = np.concatenate((dur_genputs,dur_genput[:,None]), axis=1)                                                   
        pitch_outputs = torch.stack(pitch_outputs, 1).squeeze(2)
        dur_outputs = torch.stack(dur_outputs, 1).squeeze(2)
        return pitch_outputs, pitch_genputs, dur_outputs, dur_genputs
    
pits_inp = torch.from_numpy(pits_data[:, :-1]).float().to(device)
pits_ref = torch.from_numpy(pits_data[:, 1:]).float().to(device)
pits_target = torch.from_numpy(pits_dataq[:, 1:]).long().to(device)
durs_inp = torch.from_numpy(durs_data[:, :-1]).float().to(device)
durs_target = torch.from_numpy(durs_dataq[:, 1:]).long().to(device)                                                                   
seq = Sequence().to(device)
print('Number of train seqs: ' + str(pits_inp.shape[0]))
print('Model total params: ' + str(sum(p.numel() for p in seq.parameters() if p.requires_grad)))
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(seq.parameters(), lr=0.001)#
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, factor=0.1, patience=20, verbose=True)
for i in range(6000):
    optimizer.zero_grad()
    pits_out, _, durs_out, _ = seq(pits_inp, durs_inp, pits_ref)
    loss_pitch = criterion(pits_out.permute(2,0,1).view(pitch_nq,-1).permute(1,0), pits_target.view(-1))#view(-1,batch_size*time_steps)
    loss_dur = criterion(durs_out.permute(2,0,1).view(dur_nq,-1).permute(1,0), durs_target.view(-1))
    loss = loss_pitch + loss_dur
    sys.stdout.write('\rStep: %i --- NLL: %.10f  ' % (i, loss))
    loss.backward()
    optimizer.step()
    scheduler.step(loss)

cuda
Number of train seqs: 15
Model total params: 1300904
Step: 642 --- NLL: 0.1278852522  Epoch   642: reducing learning rate of group 0 to 1.0000e-04.
Step: 4999 --- NLL: 0.0079557840  

In [8]:
init = 10
pits_test_inp = torch.from_numpy(pits_data[init:init+1, :-1]).float().to(device)
pits_test_ref = torch.from_numpy(pits_data[init:init+1, 1:]).float().to(device)
pits_test_target = torch.from_numpy(pits_dataq[init:init+1, 1:]).long().to(device)
durs_test_inp = torch.from_numpy(durs_data[init:init+1, :-1]).float().to(device)
durs_test_target = torch.from_numpy(durs_dataq[init:init+1, 1:]).long().to(device)   
with torch.no_grad():
    _, pitch_pred, _, dur_pred = seq(pits_test_inp, durs_test_inp, pits_test_ref, generate=True, future=500)
print('done')
pits_syn = np.round((pitch_pred / 2.0 + 0.5) * pitch_scale)
durs_syn = np.round((dur_pred / 2.0 + 0.5) * dur_scale)
output_notes = []
offset = 0
for n in range(pits_syn[0].shape[0]):
    d = eval(ind_durs[int(durs_syn[0][n])])
    new_note = note.Note(pitch=ind_notes[int(pits_syn[0][n])], quarterLength=d)
    new_note.offset = offset
    offset += d
    new_note.storedInstrument = instrument.Piano()
    output_notes.append(new_note)

midi_stream = stream.Stream(output_notes)
midi_stream.write('midi', fp='test_output.mid')

done


'test_output.mid'