<a href="https://colab.research.google.com/github/rsaxby/NoteRNN/blob/master/noteRNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
# http://pytorch.org/
from os.path import exists
from wheel.pep425tags import get_abbr_impl, get_impl_ver, get_abi_tag
platform = '{}{}-{}'.format(get_abbr_impl(), get_impl_ver(), get_abi_tag())
cuda_output = !ldconfig -p|grep cudart.so|sed -e 's/.*\.\([0-9]*\)\.\([0-9]*\)$/cu\1\2/'
accelerator = cuda_output[0] if exists('/dev/nvidia0') else 'cpu'

!pip install -q http://download.pytorch.org/whl/{accelerator}/torch-0.4.1-{platform}-linux_x86_64.whl torchvision
import torch

In [0]:
# Load the Drive helper and mount
from google.colab import drive
drive.mount('/content/drive')

In [0]:
# import libraries
import numpy as np
import pandas as pd 
import torch
import matplotlib.pyplot as plt
import matplotlib.lines as mlines
from torch import nn
import torch.nn.functional as F
!pip install music21
from music21 import *
!pip install pygame
import pygame
from google.colab import files
#configure.run()
import glob
from torch import optim
from torchvision import datasets

# check if CUDA is available
train_on_gpu = torch.cuda.is_available()

if not train_on_gpu:
    print('CUDA is not available.  Training on CPU ...')
else:
    print('CUDA is available!  Training on GPU ...')

In [0]:
# directory where we store our music data
data_dir = '/content/drive/My Drive/Colab Notebooks/data/music_data/music/hozier/'

In [0]:
class Dataset():
  def __init__(self, data_dir):
    self.chords = {} # dict of chords
    self.notes = [] # list of all extracted notes from songs
    self.unique_notes = None # set of unique notes
    self.data_dir = data_dir # data directory where midi files are stored/saved
    self.midi_files = [] # list of all midi files extracted from data_dir
    self.pitch2int = self.pitch_to_int() # dict of unique notes & chords
    self.num_classes = None # num of unique notes
    self.rests = {} # dict of rests
    
  # takes in a directory, extracts notes from all songs 
  # returns all notes in a single array
  def create_dataset(self):
      for file in glob.glob(self.data_dir+"/*.mid"):
          try:
              midi = self.open_midi(file)
              self.midi_files.append(midi)
              self.extract_notes(midi)
              
          except:
              print("Could not process: {}".format(file))  
      self.pitch_to_int()
      self.encode_notes()
  # extract notes from a midi file
  # return notes as string or note object
  def extract_notes(self, midi):
    notes_to_parse = None
    # get a list of all the notes and chords in the file
    parts = instrument.partitionByInstrument(midi)
    
    if parts: # if the file has instrument parts
      notes_to_parse = parts.parts[0].recurse()
    else: # file has notes in a flat structure
      notes_to_parse = midi.flat.notes

    for nt in notes_to_parse:
                # try Note.isRest
      if isinstance(nt, note.Rest):
         # encode the rest
        encoded_rest = self.encode_rest(str(nt.fullName))
        # add to notes
        self.notes.append(encoded_rest)
         #add encoded rest object to dict for later song generation
        self.rests[nt.fullName] = nt
     # Note.isNote
      if isinstance(nt, note.Note):
        # append the pitch of note object using its string notation 
        self.notes.append(str(nt.pitch))
      # Note.isChord
      elif isinstance(nt, chord.Chord):           
        self.notes.append(str(nt))
        self.chords[str(nt)] = nt

              
  # encode prev 10 note history in rests
  def encode_rest(self, rest):
    encoded_rest = ""
    # if the song starts with a rest, don't
    # encode the rest
    if len(self.notes) == 0:
      return rest
    # get the previous 10 notes to
    # be encoded with this rest
    if len(self.notes) >= 10:
      rng = -10
    elif len(self.notes) < 10: # if the length of notes is less than 10, use all notes
      rng = 0
    for note in self.notes[rng:]:
      encoded_rest += note
    return encoded_rest +"$"+ rest
  
  # open midi file
  def open_midi(self, file_name):
    print("Processing {}...".format(file_name))
    return converter.parse(file_name)

  # list instruments: Takes in a midi file
  # and prints instruments in the file
  def list_instruments(self, midi):
    # start part stream
    partStream = midi.parts.stream()
    print("Instruments on MIDI file:")
    for part in partStream:
        print(part.partName)

  # analyze timeSignature and music keys
  def analyze_song(self, midi):
      # get the time signatures
      timeSig = midi.getTimeSignatures()[0]
      # get the key
      musicAnalysis = midi.analyze('key')
      print("Time signature: {0}/{1}".format(timeSig.beatCount, timeSig.denominator))
      print("Expected music key: {0}".format(musicAnalysis))
      print("Music key confidence: {0}".format(musicAnalysis.correlationCoefficient))

  # play midi file
  def play_song(self, midi_file):
      print("Playing MIDI...")
      song = midi.realtime.StreamPlayer(midi_file)
      song.play()

  # save midi file
  def save_song(self, midi, file_name):
      midi.write('mid', fp=self.data_dir+file_name)


  # takes in a str and creates a music21 note object
  def create_note(self, note):
      return pitch.Pitch(note).midi
    
  # encode a note
  def pitch_to_int(self):
      # get unique pitch names
      self.unique_notes=sorted(set(item for item in self.notes))
      # map pitches to ints
      # note will be the key
      self.pitch2int = dict((note, number) for number, note in enumerate(self.unique_notes))
      self.num_classes = len(self.pitch2int)
  
  # encode notes in a song
  def encode_notes(self):
      encoded_notes = []
      # for each note in the song, encode it as an int, and add it to
      # the encoded list
      for i in range(0, len(self.notes)):
          encoded_notes.append(self.pitch2int[self.notes[i]])
      self.encoded_notes = np.array(encoded_notes)
      
  # save extracted notes as CSV for easy upload/access
  def save_notes(self):
    df = pd.DataFrame(self.notes)
    df.to_csv("notes.csv", header=None, index=None)


In [0]:
# helper functions 

def to_categorical(x, num_classes):
    """ 1-hot encodes a tensor """
    return np.eye(num_classes, dtype=np.float32)[x]
  

def get_batches(arr, batch_size, seq_length):  
  
  '''          
  Generator which creates batches of dim (batch_size * seq_length)
  batch_size = num sequences in a batch
  seq_length = num of notes in a sequence
  '''
  batch_size_total = batch_size * seq_length
  # total number of batches we can make
  n_batches = len(arr)//batch_size_total

  # keep only enough notes to make full batches
  arr = arr[:n_batches * batch_size_total]
  # reshape so we have as many rows/sequences as batch_size
  arr = arr.reshape((batch_size, -1))

  # iterate through the arr (cols), one sequence at a time
  for n in range(0, arr.shape[1], seq_length):
      # features
      x = arr[:, n:n+seq_length]
      # targets
      y = np.zeros_like(x)
      try:
          # targets are shifted by one
          y[:, :-1], y[:, -1] = x[:, 1:], arr[:, n+seq_length]
      except IndexError:
          # grab last target values 
          y[:, :-1], y[:, -1] = x[:, 1:], arr[:, 0]
      yield x, y

# save the model
def save_model(best=False):
  if best:
    model_name = 'best_noteRNN.net'
    print("Saving Best Perf Model....")
  else:
    model_name = 'noteRNN.net'

  checkpoint = {'state_dict': net.state_dict(),
                'input_size': net.num_classes,
                'output_size': net.num_classes,
                'criterion_state': criterion.state_dict(),
                'optimizer_state': optimizer.state_dict(),
                'epochs': num_epochs}
  # model.cpu()
  try:
    torch.save(checkpoint, model_name)
  except:
    print("Unable to save..")

model_name = 'noteRNN.net'
#save_model()
#files.download(model_name)

In [0]:
class NoteRNN(nn.Module):
    
    def __init__(self, unique_notes, num_hidden=256, num_layers=3,
                               dropout=0.5, lr=0.003):
        super().__init__()
        self.dropout = dropout
        self.num_layers = num_layers
        self.num_hidden = num_hidden
        self.lr = lr
        self.num_classes = len(unique_notes)
            
        # note dictionaries
        self.note2int = dict((note, number) for number, note in enumerate(unique_notes))
        self.int2note = dict(enumerate(self.note2int))
        
        # define LSTM
        self.lstm = nn.LSTM(self.num_classes, num_hidden, num_layers, 
                            dropout=dropout, batch_first=True)
        
        # define dropout layer
        self.dropout = nn.Dropout(dropout)
        
        # define (fully connected) output layer
        self.fc = nn.Linear(num_hidden, self.num_classes)
      
    
    def forward(self, x, hidden):
        ''' Forward pass through the network. 
            These inputs are x, and the hidden/cell state `hidden`. '''
                
        # get outputs and new hidden state from LSTM
        r_output, hidden = self.lstm(x, hidden)
        
        # dropout layer
        out = self.dropout(r_output)
        
        # Stack up LSTM outputs using view
        # for multiple, use contiguous to reshape the output
        out = out.contiguous().view(-1, self.num_hidden)
        
        # fully-connected layer
        out = self.fc(out)
        
        # return final output and hidden state
        return out, hidden
    
    
    def init_hidden(self, batch_size):
        # create 2 tensors of dim: (n_layers x batch_size x n_hidden)
        # initialized to zero, for hidden state and cell state of LSTM
        weight = next(self.parameters()).data
        
        if (train_on_gpu):
            hidden = (weight.new(self.num_layers, batch_size, self.num_hidden).zero_().cuda(),
                  weight.new(self.num_layers, batch_size, self.num_hidden).zero_().cuda())
        else:
            hidden = (weight.new(self.num_layers, batch_size, self.num_hidden).zero_(),
                      weight.new(self.num_layers, batch_size, self.num_hidden).zero_())
        
        return hidden
        

In [0]:
def train(net, data, epochs=100, batch_size=10, seq_length=60, lr=0.001, clip=5, val_split=0.1, print_every=20):
    '''          
        net: a NoteRNN network
        data: encoded notes from which to train the network
        epochs: num of epochs to train
        batch_size: batch size
        seq_length: num of notes per batch
        lr: learning rate
        clip: gradient clipping
        val_split: amount of data to reserve for validation split
        print_every: num of steps for printing training and validation loss
    
    '''
        

    net.train()


    # create training and val set
    split = int(len(data)*(1-val_split))
    data, val_data = data[:split], data[split:]
    if(train_on_gpu):
        net.cuda()
    
    counter = 0
    num_notes = net.num_classes
    val_loss_min = np.Inf # track change in validation loss
    for epoch in range(epochs):
        # initialize hidden state
        h = net.init_hidden(batch_size)
        
        for x, y in get_batches(data, batch_size, seq_length):
            counter += 1
            
            # One-hot encode
            x = to_categorical(x, num_notes)
            # make torch tensors
            inputs, targets = torch.from_numpy(x), torch.from_numpy(y)
            # switch to gpu
            if(train_on_gpu):
                inputs, targets = inputs.cuda(), targets.cuda()

            # Creating new variables for the hidden state, otherwise
            # we'd backprop through the entire training history
            h = tuple([each.data for each in h])

            # zero out gradient
            net.zero_grad()
            
            # get output from net
            output, h = net(inputs, h)
            
            # calculate the loss
            loss = criterion(output, targets.view(batch_size*seq_length))
            # backprop
            loss.backward()
            
            # `clip_grad_norm` helps prevent the exploding gradient problem in RNNs / LSTMs.
            nn.utils.clip_grad_norm_(net.parameters(), clip)
            
            # update weights and biases
            optimizer.step()
                        
            # loss stats
            if counter % print_every == 0:
                print("Calculating loss...")
                # Get validation loss
                val_h = net.init_hidden(batch_size)
                val_losses = []
                # turn dropout off
                net.eval()
                for x, y in get_batches(val_data, batch_size, seq_length):
                    # One-hot encode 
                    x = to_categorical(x, num_notes)
                    # make Torch tensors
                    x, y = torch.from_numpy(x), torch.from_numpy(y)
                    
                    # Creating new variables for the hidden state, otherwise
                    # we'd backprop through the entire training history
                    val_h = tuple([each.data for each in val_h])
                    
                    inputs, targets = x, y
                    # move targets/inputs to gpu
                    if(train_on_gpu):
                        inputs, targets = inputs.cuda(), targets.cuda()
                    # get output from net 
                    output, val_h = net(inputs, val_h)
                    # calc loss
                    val_loss = criterion(output, targets.view(batch_size*seq_length))
                    val_losses.append(val_loss.item())
                # turn dropout back on for training
                net.train()
                
                print("Epoch: {}/{}...".format(epoch+1, epochs),
                      "Step: {}...".format(counter),
                      "Loss: {:.4f}...".format(loss.item()),
                      "Val Loss: {:.4f}".format(np.mean(val_losses)))
                
                # save model if validation loss has decreased
                if val_loss.item() <= val_loss_min:
                    print('Saving best performing model...')
                    save_model(best=True)
                    val_loss_min = val_loss.item()

In [0]:
# create dataset
hozier = Dataset(data_dir)
hozier.create_dataset()
chords_ = hozier.chords # retrieve chords for song generation
rests_ = hozier.rests # retrieve rests for song generation
print("Vocab size: {}".format(hozier.num_classes))
encoded_notes = hozier.encoded_notes # encode notes

In [0]:
# define and print the network
num_hidden=512
num_layers=3

net = NoteRNN(hozier.unique_notes, num_hidden, num_layers)
print(net)

In [0]:
# parameters
batch_size = 10 # increase with larger dataset
seq_length = 10 # seq length really matters- it's the amount of context the net receives!
num_epochs = 5
# optimizer and criterion
optimizer = torch.optim.RMSprop(net.parameters(), lr=0.01) #RMSprop good for RNNs
criterion = nn.CrossEntropyLoss()

In [0]:
# train the model
train(net, encoded_notes, epochs=num_epochs, batch_size=batch_size, seq_length=seq_length, lr=0.01, print_every=10)

In [0]:
# predict takes in a trained network and returns
# hidden state and the predicted next note
def predict(net, note, h=None, top_k=None):
        # tensor inputs
        # encode note
        x = np.array([[net.note2int[note]]])
        # one hot encode
        x = to_categorical(x, net.num_classes)
        inputs = torch.from_numpy(x)
        
        if(train_on_gpu):
            inputs = inputs.cuda()
        
        # detach hidden state from history
        h = tuple([each.data for each in h])
        
        # get the output of the model
        out, h = net(inputs, h)

        # get the character probabilities
        p = F.softmax(out, dim=1).data
        if(train_on_gpu):
            p = p.cpu() # move to cpu
        
        # get top predicted notes
        if top_k is None:
            top_ch = np.arange(net.num_classes)
        else:
            p, top_ch = p.topk(top_k)
            top_ch = top_ch.numpy().squeeze()
        
        # select the likely next note with some element of randomness
        p = p.numpy().squeeze()
        note = np.random.choice(top_ch, p=p/p.sum())
        
        # return the encoded value of the predicted note and the hidden state
        return net.int2note[note], h

In [0]:
# generate a sample of notes given a list of notes (prime)
def sample(net, size, prime=['C'], top_k=None):
        
    if(train_on_gpu):
        net.cuda()
    else:
        net.cpu()
    # eval mode (don't want dropout on)
    net.eval() 
    
    # retrieve all notes from prime
    notes = [nt for nt in prime]
    h = net.init_hidden(1)
    for nt in notes:
        
        # predict next note for each note
        nt, h = predict(net, nt, h, top_k=top_k)
        # check if note is in the dictionary
        while nt not in net.note2int:
          nt, h = predict(net, nt, h, top_k=top_k)

    notes.append(nt)
    
    # use the output (last predicted note) to generate new prediction
    for ii in range(size):
        nt, h = predict(net, notes[-1], h, top_k=top_k)
        notes.append(nt)

    return notes

In [0]:
class Song:
  def __init__(self, chords, notes, rests, data_dir, file_name):
    self.chords = chords
    self.notes = notes
    self.rests = rests
    self.data_dir = data_dir
    self.file_name = file_name
    
  def create_rest(self, nt):
    # rests are seen as note objects without a pitch attached
    # duration should be set to the rest, velocity = 0, pitch = none  
    rest_list = []
    # rests
    r = nt.split('$') # split on the separator
    for elem in r: # sometimes more than one rest is included
      if "Rest" in elem:
        rest_ = elem
        if not elem.endswith("Rest"):
          rest_ = str(elem.split("Rest")[0]) + "Rest"

        new_rest = note.Rest()
        new_rest.duration= self.rests[rest_].duration
        rest_list.append(new_rest)
        velocity = 0
        duration = rest_list[0]
    return velocity, duration

  def create_chord(self, nt):
    #chords
    c = self.chords[nt]
    c.volume = volume.Volume(velocity=90)
    c.volume.velocityIsRelative = False
    eventList = midi.translate.chordToMidiEvents(c)
    return eventList


        
    # create a midi file using the generated notes
  def create_song(self):
    '''
    notes: array of generated notes 
    chords: a chord dict from the dataset
    data_dir: directory (where to save the file), and 
    file name: desired file name

    '''
    file_path = self.data_dir+self.file_name
    # [duration, pitch, velocity]
    data = []
    mt = midi.MidiTrack(1)
    t=0
    tLast=0
    for nt in self.notes:                  
        if "Chord" in nt and "Rest" not in nt:  
          # get chord
          eventList = self.create_chord(nt)
          #add to track events
          for event in eventList:
              mt.events.append(event)

        else:
          if "Rest" in nt: 
            # create rests
            velocity, duration = self.create_rest(nt)
            pitch_ = None
          # takes in a str and creates a music21 note object
          else:
            duration = 1024 
            velocity = 70 # The attribute stores an integer representation (0-127)
            pitch_ = pitch.Pitch(nt).midi

            data.append([duration, pitch_, velocity])

            for d,p,v in data:
                # delta time is how long to wait from the last event
                dt = midi.DeltaTime(mt)
                dt.time = t-tLast
                #add to track events
                mt.events.append(dt)
                # create event
                me=midi.MidiEvent(mt)
                # add notes
                me.type="NOTE_ON"
                me.channel=1 # specify channel
                me.time= None 

                # if not a rest
                if p:
                  me.pitch = p # note
                me.velocity = v 
                # add the event to the track
                mt.events.append(me)
                # create delta time
                dt = midi.DeltaTime(mt)
                dt.time = d
                # add time change event to the track
                mt.events.append(dt)
                # create event
                me=midi.MidiEvent(mt)
                # add note off
                me.type="NOTE_OFF"
                me.channel=1 # specify channel
                me.time= None 
                me.pitch = p # note
                me.velocity = 0 # equivalent to NOTE_OFF
                mt.events.append(me)
                tLast = t+d 
                # increase t for a longer rest
                t +=d+1
    # create delta time
    dt=midi.DeltaTime(mt)
    dt.time = 0 # end of track, dt=0
    mt.events.append(dt)
    # create end of track event
    me = midi.MidiEvent(mt)
    me.type = "END_OF_TRACK"
    me.channel = 1 # specify channel
    me.data =''  # must set data to empty string
    mt.events.append(me)

    mf = midi.MidiFile()
    mf.ticksPerQuarterNote = 1024 # experiment with different timing
    mf.tracks.append(mt)

    # write midi file
    print("Writing MIDI track")
    mf.open(file_path, 'wb')
    mf.write()
    mf.close()


In [0]:
# generate notes
generated_notes = sample(net, 20, prime=['<music21.chord.Chord D4 B3 G1 G3 B3 G3 D4>'], top_k = 5)


In [0]:
# create song and save it
song = Song(chords_,generated_notes, rests_, data_dir, "generated_hozier_1.mid")
song.create_song()