In [1]:
!pip install music21



In [1]:
import glob
import pickle
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F
from music21 import converter, instrument, note, chord, stream

In [2]:
def one_hot_encode(arr, n_labels):
    
    # Initialize the the encoded array
    one_hot = np.zeros((np.multiply(*arr.shape), n_labels), dtype=np.float32)
    
    # Fill the appropriate elements with ones
    one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1.
    
    # Finally reshape it to get back to the original array
    one_hot = one_hot.reshape((*arr.shape, n_labels))
    
    return one_hot

In [3]:
def get_notes():
    """ Get all the notes and chords from the midi files in the ./midi_songs directory """
    notes = []

    for file in glob.glob("midi_songs/*.midi"):
        midi = converter.parse(file)

        print("Parsing %s" % file)

        notes_to_parse = None

        try: # file has instrument parts
            s2 = instrument.partitionByInstrument(midi)
            notes_to_parse = s2.parts[0].recurse() 
        except: # file has notes in a flat structure
            notes_to_parse = midi.flat.notes

        for element in notes_to_parse:
            if isinstance(element, note.Note):
                notes.append(str(element.pitch))
            elif isinstance(element, chord.Chord):
                notes.append('.'.join(str(n) for n in element.normalOrder))

    with open('data/notes.txt', 'wb') as filepath:
        pickle.dump(notes, filepath)

    return notes

In [4]:
def prepare_sequences(notes, n_vocab):
    """ Prepare the sequences used by the Neural Network """
    sequence_length = 100

    # get all pitch names
    pitchnames = sorted(set(item for item in notes))

     # create a dictionary to map pitches to integers
    note_to_int = dict((note, number) for number, note in enumerate(pitchnames))
    
    network_input = []
    network_output = []

    # create input sequences and the corresponding outputs
    for i in range(0, len(notes) - sequence_length, 1):
        sequence_in = notes[i:i + sequence_length]
        sequence_out = notes[i + sequence_length]
        network_input.append([note_to_int[char] for char in sequence_in])
        network_output.append(note_to_int[sequence_out])

    n_patterns = len(network_input)

    # reshape the input into a format compatible with LSTM layers
#     network_input = np.reshape(network_input, (n_patterns, sequence_length, 1))
    # normalize input
#     network_input = network_input / float(n_vocab)

#     network_output = np_utils.to_categorical(network_output)

    return np.array(network_input), np.array(network_output)

In [5]:
def get_batches(arr, n_seqs, n_steps):
    '''Create a generator that returns batches of size
       n_seqs x n_steps from arr.
       
       Arguments
       ---------
       arr: Array you want to make batches from
       n_seqs: Batch size, the number of sequences per batch
       n_steps: Number of sequence steps per batch
    '''
    
    batch_size = n_seqs * n_steps
    n_batches = len(arr)//batch_size
    
    # Keep only enough characters to make full batches
    arr = arr[:n_batches * batch_size]
    
    # Reshape into n_seqs rows
    arr = arr.reshape((n_seqs, -1))
    
    for n in range(0, arr.shape[1], n_steps):
        
        # The features
        x = arr[:, n:n+n_steps]
        
        # The targets, shifted by one
        y = np.zeros_like(x)
        
        try:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, n+n_steps]
        except IndexError:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, 0]
        yield x, y

In [6]:
notes = get_notes()
pitchnames = sorted(set(item for item in notes))
n_vocab = len(set(notes))
network_input, network_output = prepare_sequences(notes, n_vocab)

batches = get_batches(network_input, 10, 50)
x, y = next(batches)

Parsing midi_songs/MIDI-Unprocessed_SMF_22_R1_2004_01-04_ORIG_MID-AUDIO_22_R1_2004_05_Track05_wav.midi


In [7]:
print('x\n', x[:10, :10])
print('\ny\n', y[:10, :10])

x
 [[61 81 50 72 81 50 61 81 50 72]
 [19 61 80 49 33 45 55 60 67 60]
 [10 80 19 43 60 28 42 45 59 71]
 [48 61 79 19 42 28 42 61 44 70]
 [42 72 37 10 31 10 50 61 81 19]
 [44 68 17 30  5 17 30 39 10 17]
 [71 60 80 65 45 69 60 33 52 43]
 [72 68 34 61 45 44 68  6 43 58]
 [29 59 60 48 70 55 48 59 47 59]
 [55 46 67 45 56 45 71 26 49 62]]

y
 [[81 50 72 81 50 61 81 50 72 81]
 [61 80 49 33 45 55 60 67 60 72]
 [80 19 43 60 28 42 45 59 71 17]
 [61 79 19 42 28 42 61 44 70 17]
 [72 37 10 31 10 50 61 81 19 42]
 [68 17 30  5 17 30 39 10 17 39]
 [60 80 65 45 69 60 33 52 43 44]
 [68 34 61 45 44 68  6 43 58 61]
 [59 60 48 70 55 48 59 47 59 44]
 [46 67 45 56 45 71 26 49 62 47]]


In [8]:
class CharRNN(nn.Module):
    
    def __init__(self, tokens, n_steps=100, n_hidden=256, n_layers=2,
                               drop_prob=0.5, lr=0.001):
        super().__init__()
        self.drop_prob = drop_prob
        self.n_layers = n_layers
        self.n_hidden = n_hidden
        self.lr = lr
        
        # Creating character dictionaries
        self.chars = tokens
        self.char2int = dict((note, number) for number, note in enumerate(tokens))
        self.int2char = {i: note for note, i in self.char2int.items()}
        
        ## Define the LSTM
        self.lstm = nn.LSTM(len(self.chars), n_hidden, n_layers, 
                            dropout=drop_prob, batch_first=True)
        
        ## Define a dropout layer
#         self.dropout = nn.Dropout(drop_prob)
        
        ## Define the final, fully-connected output layer
        self.fc = nn.Linear(n_hidden, len(self.chars))
        
        # Initialize the weights
        self.init_weights()
        
    def forward(self, x, hc):
        ''' Forward pass through the network. 
            These inputs are x, and the hidden/cell state `hc`. '''
        
        ## Get x, and the new hidden state (h, c) from the lstm
        x, (h, c) = self.lstm(x, hc)
        
        ## Ppass x through the dropout layer
#         x = self.dropout(x)
        
        # Stack up LSTM outputs using view
        x = x.contiguous().view(-1, self.n_hidden)
        
        ## Put x through the fully-connected layer
        x = self.fc(x)
        
        # Return x and the hidden state (h, c)
        return x, (h, c)
    
    def predict(self, char, h=None, cuda=False, top_k=None):
        ''' Given a character, predict the next character.
        
            Returns the predicted character and the hidden state.
        '''
        if cuda:
            self.cuda()
        else:
            self.cpu()
        
        if h is None:
            h = self.init_hidden(1)
        
        x = np.array([[self.char2int[char]]])
        x = one_hot_encode(x, len(self.chars))
        
        inputs = torch.from_numpy(x)
        
        if cuda:
            inputs = inputs.cuda()
        
        h = tuple([each.data for each in h])
        out, h = self.forward(inputs, h)

        p = F.softmax(out, dim=1).data
        
        if cuda:
            p = p.cpu()
        
        if top_k is None:
            top_ch = np.arange(len(self.chars))
        else:
            p, top_ch = p.topk(top_k)
            top_ch = top_ch.numpy().squeeze()
        
        p = p.numpy().squeeze()
        
        char = np.random.choice(top_ch, p=p/p.sum())
            
        return self.int2char[char], h
    
    def init_weights(self):
        ''' Initialize weights for fully connected layer '''
        initrange = 0.1
        
        # Set bias tensor to all zeros
        self.fc.bias.data.fill_(0)
        # FC weights as random uniform
        self.fc.weight.data.uniform_(-1, 1)
        
    def init_hidden(self, n_seqs):
        ''' Initializes hidden state '''
        # Create two new tensors with sizes n_layers x n_seqs x n_hidden,
        # initialized to zero, for hidden state and cell state of LSTM
        weight = next(self.parameters()).data
        return (weight.new(self.n_layers, n_seqs, self.n_hidden).zero_(),
                weight.new(self.n_layers, n_seqs, self.n_hidden).zero_())

In [9]:
def train(net, data, epochs=10, n_seqs=10, n_steps=50, lr=0.001, clip=5, val_frac=0.1, cuda=False, print_every=10):
    ''' Training a network 
    
        Arguments
        ---------
        
        net: CharRNN network
        data: text data to train the network
        epochs: Number of epochs to train
        n_seqs: Number of mini-sequences per mini-batch, aka batch size
        n_steps: Number of character steps per mini-batch
        lr: learning rate
        clip: gradient clipping
        val_frac: Fraction of data to hold out for validation
        cuda: Train with CUDA on a GPU
        print_every: Number of steps for printing training and validation loss
    
    '''
    
    net.train()
    
    opt = torch.optim.Adam(net.parameters(), lr=lr)
    
    criterion = nn.CrossEntropyLoss()
    
    # create training and validation data
    val_idx = int(len(data)*(1-val_frac))
    data, val_data = data[:val_idx], data[val_idx:]
    
    if cuda:
        net.cuda()
    
    counter = 0
    n_chars = len(net.chars)
    
    for e in range(epochs):
        h = net.init_hidden(n_seqs)
        
        for x, y in get_batches(data, n_seqs, n_steps):
            
            counter += 1
            
            # One-hot encode our data and make them Torch tensors
            x = one_hot_encode(x, n_chars)
            inputs, targets = torch.from_numpy(x), torch.from_numpy(y)
            
            if cuda:
                inputs, targets = inputs.cuda(), targets.cuda()

            # Creating new variables for the hidden state, otherwise
            # we'd backprop through the entire training history
            h = tuple([each.data for each in h])

            net.zero_grad()
                        
            output, h = net.forward(inputs, h)
            
            loss = criterion(output, targets.view(n_seqs*n_steps))

            loss.backward()
            
            # `clip_grad_norm` helps prevent the exploding gradient problem in RNNs / LSTMs.
            nn.utils.clip_grad_norm_(net.parameters(), clip)

            opt.step()
            
            if counter % print_every == 0:
                
                # Get validation loss
                val_h = net.init_hidden(n_seqs)
                val_losses = []
                
                for x, y in get_batches(val_data, n_seqs, n_steps):
                    
                    # One-hot encode our data and make them Torch tensors
                    x = one_hot_encode(x, n_chars)
                    x, y = torch.from_numpy(x), torch.from_numpy(y)
                    
                    # Creating new variables for the hidden state, otherwise
                    # we'd backprop through the entire training history
                    val_h = tuple([each.data for each in val_h])
                    
                    inputs, targets = x, y
                    if cuda:
                        inputs, targets = inputs.cuda(), targets.cuda()

                    output, val_h = net.forward(inputs, val_h)
                    val_loss = criterion(output, targets.view(n_seqs*n_steps))
                
                    val_losses.append(val_loss.item())
                
                print("Epoch: {}/{}...".format(e+1, epochs),
                      "Step: {}...".format(counter),
                      "Loss: {:.4f}...".format(loss.item()),
                      "Val Loss: {:.4f}".format(np.mean(val_losses)))

In [10]:
if 'net' in locals():
    del net

In [11]:
# Initialize and print the network
net = CharRNN(pitchnames, n_hidden=512, n_layers=2)

print(net)

CharRNN(
  (lstm): LSTM(82, 512, num_layers=2, batch_first=True, dropout=0.5)
  (fc): Linear(in_features=512, out_features=82, bias=True)
)


In [12]:
n_seqs, n_steps = 10, 50

In [13]:
train(net, network_input, epochs=1, n_seqs=n_seqs, n_steps=n_steps, lr=0.001, print_every=1)

  out=out, **kwargs)
  ret = ret.dtype.type(ret / rcount)


Epoch: 1/1... Step: 1... Loss: 4.4070... Val Loss: nan
Epoch: 1/1... Step: 2... Loss: 4.1409... Val Loss: nan
Epoch: 1/1... Step: 3... Loss: 3.9063... Val Loss: nan
Epoch: 1/1... Step: 4... Loss: 3.9040... Val Loss: nan
Epoch: 1/1... Step: 5... Loss: 3.7754... Val Loss: nan
Epoch: 1/1... Step: 6... Loss: 3.7645... Val Loss: nan
Epoch: 1/1... Step: 7... Loss: 3.7172... Val Loss: nan
Epoch: 1/1... Step: 8... Loss: 3.7063... Val Loss: nan
Epoch: 1/1... Step: 9... Loss: 3.6362... Val Loss: nan
Epoch: 1/1... Step: 10... Loss: 3.6450... Val Loss: nan
Epoch: 1/1... Step: 11... Loss: 3.5645... Val Loss: nan
Epoch: 1/1... Step: 12... Loss: 3.5538... Val Loss: nan
Epoch: 1/1... Step: 13... Loss: 3.4712... Val Loss: nan
Epoch: 1/1... Step: 14... Loss: 3.4376... Val Loss: nan
Epoch: 1/1... Step: 15... Loss: 3.3507... Val Loss: nan
Epoch: 1/1... Step: 16... Loss: 3.2881... Val Loss: nan
Epoch: 1/1... Step: 17... Loss: 3.2139... Val Loss: nan
Epoch: 1/1... Step: 18... Loss: 3.2327... Val Loss: nan
E

In [14]:
def sample(net, size, prime=notes[:2], top_k=None, cuda=False):
        
    if cuda:
        net.cuda()
    else:
        net.cpu()

    net.eval()
    
    # First off, run through the prime characters
    chars = [ch for ch in prime]
    
    h = net.init_hidden(1)
    
    for ch in prime:
        print(ch)
        char, h = net.predict(ch, h, cuda=cuda, top_k=top_k)

    chars.append(char)
    
    # Now pass in the previous character and get a new one
    for ii in range(size):
        
        char, h = net.predict(chars[-1], h, cuda=cuda, top_k=top_k)
        chars.append(char)

    return chars

In [15]:
generated_notes = sample(net, 2000, top_k=5)

D5
G5


In [16]:
generated_notes

['D5',
 'G5',
 'B5',
 'B5',
 'D5',
 'G5',
 'B5',
 'D5',
 'G5',
 'B5',
 'B5',
 'D5',
 'G5',
 'B5',
 'G4',
 'G5',
 'C5',
 'D5',
 'E5',
 'C5',
 'D5',
 'G5',
 'B4',
 'G4',
 'B4',
 'C5',
 'D5',
 'G4',
 'E5',
 'B4',
 'D5',
 'A4',
 'B4',
 'D5',
 'G5',
 'E5',
 'B4',
 'D5',
 'G4',
 'C5',
 'C5',
 'D5',
 'E5',
 'D5',
 'B5',
 'G5',
 'B5',
 'D5',
 'G5',
 'B5',
 'G5',
 'B5',
 'D5',
 'G4',
 'C5',
 'C5',
 'D5',
 'E5',
 'D5',
 'G5',
 'B4',
 'B5',
 'G5',
 'D5',
 'G4',
 'B4',
 'C5',
 'D5',
 'G5',
 'B4',
 'D5',
 'E5',
 'C5',
 'D5',
 'G5',
 'B4',
 'B5',
 'D5',
 'G4',
 'G5',
 'B5',
 'C5',
 'D5',
 'G4',
 'G5',
 'B4',
 'D5',
 'C5',
 'G4',
 'B4',
 'D5',
 'D5',
 'E5',
 'C5',
 'D5',
 'G5',
 'B5',
 'D5',
 'G5',
 'B5',
 'G4',
 'G5',
 'C5',
 'D5',
 'G4',
 'C5',
 'B4',
 'D5',
 'G4',
 'B4',
 'A4',
 '2.7',
 'D5',
 'B4',
 'B4',
 'D5',
 'G4',
 'A4',
 'C5',
 'B4',
 'D5',
 'G4',
 'D5',
 'E5',
 '7.11',
 'B4',
 'A4',
 'D4',
 'B4',
 'F#4',
 'D5',
 'E5',
 'C5',
 'F#5',
 'D5',
 'E5',
 'B5',
 '7.11',
 'D5',
 'G5',
 'B4',
 'B5',

In [17]:
offset = 0
output_notes = []
# create note and chord objects based on the values generated by the model
for pattern in generated_notes:
    # pattern is a chord
    if ('.' in pattern) or pattern.isdigit():
        notes_in_chord = pattern.split('.')
        notes = []
        for current_note in notes_in_chord:
            new_note = note.Note(int(current_note))
            new_note.storedInstrument = instrument.Piano()
            notes.append(new_note)
        new_chord = chord.Chord(notes)
        new_chord.offset = offset
        output_notes.append(new_chord)
    # pattern is a note
    else:
        new_note = note.Note(pattern)
        new_note.offset = offset
        new_note.storedInstrument = instrument.Piano()
        output_notes.append(new_note)
    # increase offset each iteration so that notes do not stack
    offset += 0.5

In [18]:
midi_stream = stream.Stream(output_notes)
midi_stream.write('midi', fp='test_output.mid')

'test_output.mid'

In [150]:
with open('test_output.midi', 'w') as f:
    f.write(generated_notes)