<a href="https://colab.research.google.com/github/thamsuppp/MusicGenDL/blob/main/Transformer_and_Single_Note_RNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Imports**

In [None]:
import os
import shutil
import glob
import numpy as np 
import pandas as pd
import pretty_midi
import pypianoroll
import tables
from music21 import *
import music21
import librosa
import librosa.display
import matplotlib.pyplot as plt
from keras.utils import np_utils
import json
import IPython.display
from datetime import datetime
from google.colab import files



from musicautobot.numpy_encode import *
from musicautobot.config import *
from musicautobot.music_transformer import *
from musicautobot.multitask_transformer import *
from musicautobot.utils import midifile

import random
import itertools
root_dir = 'drive/MyDrive/CIS522 Project'
data_dir = root_dir + '/Lakh Piano Dataset/LPD-5/lpd_5/lpd_5_cleansed'
music_dataset_lpd_dir = root_dir + '/Music Dataset/midis/lmd_matched'

!apt-get update -qq && apt-get install -qq libfluidsynth1 fluid-soundfont-gm build-essential libasound2-dev libjack-dev
!pip install -qU pyfluidsynth pretty_midi
!pip install music21
!pip install pypianoroll
!pip install musicautobot
!pip install pebble

from google.colab import drive
drive.mount('/content/drive')

**Getting MIDI and Song Metadata**

In [None]:
RESULTS_PATH = os.path.join(root_dir, 'Lakh Piano Dataset', 'Metadata')

# Utility functions for retrieving paths
def msd_id_to_dirs(msd_id):
    """Given an MSD ID, generate the path prefix.
    E.g. TRABCD12345678 -> A/B/C/TRABCD12345678"""
    return os.path.join(msd_id[2], msd_id[3], msd_id[4], msd_id)


def msd_id_to_h5(msd_id):
    """Given an MSD ID, return the path to the corresponding h5"""
    return os.path.join(RESULTS_PATH, 'lmd_matched_h5',
                        msd_id_to_dirs(msd_id) + '.h5')

# Load the midi npz file from the LMD cleansed folder
def get_midi_npz_path(msd_id, midi_md5):
    return os.path.join(data_dir,
                        msd_id_to_dirs(msd_id), midi_md5 + '.npz')
    
# Load the midi file from the Music Dataset folder
def get_midi_path(msd_id, midi_md5):
    return os.path.join(music_dataset_lpd_dir,
                        msd_id_to_dirs(msd_id), midi_md5 + '.mid')
    
# Open the cleansed ids - cleansed file ids : msd ids
cleansed_ids = pd.read_csv(os.path.join(root_dir, 'Lakh Piano Dataset', 'cleansed_ids.txt'), delimiter = '    ', header = None)
lpd_to_msd_ids = {a:b for a, b in zip(cleansed_ids[0], cleansed_ids[1])}
msd_to_lpd_ids = {a:b for a, b in zip(cleansed_ids[1], cleansed_ids[0])}

**Reading Genre Annotations**

In [None]:
genre_file_dir = os.path.join(root_dir, 'Lakh Piano Dataset', 'Genre', 'msd_tagtraum_cd1.cls')
ids = []
genres = []
with open(genre_file_dir) as f:
    line = f.readline()
    while line:
        if line[0] != '#':
          split = line.strip().split("\t")
          if len(split) == 2:
            ids.append(split[0])
            genres.append(split[1])
          elif len(split) == 3:
            ids.append(split[0])
            ids.append(split[0])
            genres.append(split[1])
            genres.append(split[2])
        line = f.readline()
genre_df = pd.DataFrame(data={"TrackID": ids, "Genre": genres})

genre_dict = genre_df.groupby('TrackID')['Genre'].apply(lambda x: x.tolist()).to_dict()

**Retrieving MSD IDs**

In [None]:
# Load the processed metadata
with open(os.path.join(root_dir, 'Lakh Piano Dataset', 'processed_metadata.json'), 'r') as outfile:
  lmd_metadata = json.load(outfile)

# Change this into a dictionary of MSD_ID: metadata
lmd_metadata = {e['msd_id']:e for e in lmd_metadata}

# Get all song MSD IDs in pop rock genre
pop_song_msd_ids = [k for k, v in lmd_metadata.items() if 'Pop_Rock' in v['genre_tagtraum']]

# Filter by artist name
carey_song_ids = [k for k, v in lmd_metadata.items() if 'Carey' in v['artist']]
mj_song_ids = [k for k, v in lmd_metadata.items() if 'Michael Jackson' in v['artist']]
green_song_ids = [k for k, v in lmd_metadata.items() if 'Green Day' in v['artist']]
spice_song_ids = [k for k, v in lmd_metadata.items() if 'Spice Girls' in v['artist']]

all = [k for k, v in lmd_metadata.items()]

# Randomly choose 1000 songs out of these
train_ids = random.choices(pop_song_msd_ids, k = 1000)

**Transformer Code**

In [None]:
import pebble
from musicautobot.numpy_encode import *
from musicautobot.utils.file_processing import process_all, process_file
from musicautobot.config import *
from musicautobot.music_transformer import *


# Location of your midi files
midi_path = Path('drive/My Drive/CIS522 Project/spice midis')
data_path = Path('data/numpy')
data_save_name = 'musicitem_data_save.pkl'


midi_files = get_files(midi_path, '.mid', recurse=True)
data_mj = MusicDataBunch.from_files(midi_files, data_path, processors=[Midi2ItemProcessor()], bs=4, bptt=128, encode_position=False)

learn_mj = music_model_learner(data, arch=TransformerXL, config=default_config())


import warnings
warnings.simplefilter("ignore", UserWarning)

learn_mj.fit_one_cycle(5)

midi_file = Path('drive/MyDrive/CIS522 Project/spice midis/78152aeb43e3e22ccc608e330e5bcb92.mid')
item = MusicItem.from_file(midi_file, data.vocab);
pred = midi_file.predict(item, n_words=500)

pred[0].to_stream().show('text')
play(pred[0].to_stream())
show(pred[0].to_stream())

**LSTM Single-Note Generation - Accesing Audio Data**

In [None]:
# Get all song MSD IDs satisfying the condition
filtered_msd_ids = [k for k, v in lmd_metadata.items() if 'Michael Jackson' in v['artist']]

# Randomly choose 1000 songs out of these
train_ids = random.choices(filtered_msd_ids, k = 1000)

# Loop that reads each song in train_ids, parses the PIANO notes and saves the string representation of the note in notes
notes = []

i = 0
for msd_file_name in filtered_msd_ids:
  lpd_file_name = msd_to_lpd_ids[msd_file_name]

  # Get the NPZ path
  npz_path = get_midi_npz_path(msd_file_name, lpd_file_name)

  multitrack = pypianoroll.load(npz_path)
  pm = pypianoroll.to_pretty_midi(multitrack)
  new_midi_path = npz_path[:-4] + '.mid'
  pypianoroll.write(new_midi_path, multitrack)
  # Get the MIDI path (should already be generated)
  new_midi_path = npz_path[:-4] + '.mid'
  midi = converter.parse(new_midi_path)

  s2 = instrument.partitionByInstrument(midi)
  piano_part = None
  # Filter for  only the piano part
  instr = instrument.Piano
  for part in s2:
    if isinstance(part.getInstrument(), instr):
      piano_part = part

  notes_song = []
  if piano_part: # Some songs somehow have no piano parts
    for element in piano_part:
      if isinstance(element, note.Note):
        # Return the pitch of the single note
          notes_song.append(str(element.pitch))
      elif isinstance(element, chord.Chord):
        # Returns the normal order of a Chord represented in a list of integers
          notes_song.append('.'.join(str(n) for n in element.normalOrder))

  notes.append(notes_song)
  i+=1
  print(i)

**Preparing Input and Output Sequences**

In [None]:
# Prepare input and output sequences

def prepare_sequences(notes, note_to_int = None, sequence_length = 32):
  network_input = []
  network_output = []

  if not note_to_int:
    # Set of note/chords (collapse into list)
    pitch_names = sorted(set(itertools.chain(*notes)))
    # create a dictionary to map pitches to integers
    note_to_int = dict((note, number) for number, note in enumerate(pitch_names))

  # Loop through all songs
  for song in notes:
    # Check for the end
    i = 0
    while i + sequence_length < len(song):
      # seq_len notes for the input seq
      sequence_in = song[i: i + sequence_length]
      # Next note to predict
      sequence_out = song[i+sequence_length]
      # Return the int representation of the note - *(If note not found)
      network_input.append([note_to_int.get(char, 0) for char in sequence_in])
      network_output.append(note_to_int.get(sequence_out, 0))
      i += sequence_length

  n_patterns = len(network_input)

  # Reshape for LSTM input
  network_input = np.reshape(network_input, (n_patterns, sequence_length, 1))
  # Normalize input (?? - CHECK LATER - this assumes the alphabetical order of the notes carries semantic meaning?)
  #network_input = network_input / len(pitch_names)
  #network_output = np_utils.to_categorical(network_output)

  return network_input, network_output, note_to_int

train_input, train_output, note_to_int = prepare_sequences(notes_train, sequence_length = 64)
#test_input, test_output, _ = prepare_sequences(notes_test, note_to_int = note_to_int, sequence_length = 64)

**RNN Generation Code**

In [None]:
import torch
import torch.nn as nn
from torch.nn import functional as F

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Take a random observation from the network input, return (input, target), each shifted by 1
# NOT NEEDED ANYMORE - each epoch just using entire dataset
def random_training_set(network_input):    
    chunk = network_input[random.randint(0, network_input.shape[0] - 1), : , :]
    input = torch.tensor(chunk[:-1], dtype = torch.long).squeeze()
    target = torch.tensor(chunk[1:], dtype = torch.long).squeeze()
    return input, target


def grad_clipping(net, theta):  
    """Clip the gradient."""
    params = [p for p in net.parameters() if p.requires_grad]

    norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))
    
    if norm > theta:
        for param in params:
            param.grad[:] *= theta / norm

class GenerationRNN(nn.Module):
  # input_size: number of possible pitches
  # hidden_size: embedding size of each pitch
  # output_size: number of possible pitches (probability distribution)
    def __init__(self, input_size, hidden_size, output_size, n_layers=1):
        super(GenerationRNN, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.n_layers = n_layers
        
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.gru = nn.GRU(hidden_size, hidden_size, n_layers)
        self.decoder = nn.Linear(hidden_size * n_layers, output_size)
    
    def forward(self, input, hidden):
        # Creates embedding of the input texts
        #print('initial input', input.size())
        input = self.embedding(input.view(1, -1))
        #print('input after embedding', input.size())
        output, hidden = self.gru(input, hidden)
        #print('output after gru', output.size())
        #print('hidden after gru', hidden.size())
        output = self.decoder(hidden.view(1, -1))
        #print('output after decoder', output.size())
        return output, hidden

    def init_hidden(self):
        return torch.zeros(self.n_layers, 1, self.hidden_size).to(device)

# Single training step for ONE sequence
def train_sequence(input, target, model, optimizer, criterion):
    # Initialize hidden state, zero the gradients of model 
    hidden = model.init_hidden()
    model.zero_grad()
    loss = 0
    # For each character in our chunk (except last), compute the hidden and ouput
    # Using each output, compute the loss with the corresponding target 
    for i in range(len(input)):
        output, hidden = model(input[i], hidden)
        loss += criterion(output, target[i].unsqueeze(0))
    
    # Backpropagate, clip gradient and optimize
    loss.backward()
    grad_clipping(model, 1)
    optimizer.step()

    # Return average loss for the input sequence
    return loss.data.item() / len(input)

def test_sequence(input, target, model, criterion):
    # Initialize hidden state, zero the gradients of model 
    hidden = model.init_hidden()
    model.zero_grad()
    loss = 0
    # For each character in our chunk (except last), compute the hidden and ouput
    # Using each output, compute the loss with the corresponding target 
    for i in range(len(input)):
        output, hidden = model(input[i], hidden)
        loss += criterion(output, target[i].unsqueeze(0))

    # Return average loss for the input sequence
    return loss.data.item() / len(input)


# Overall training loop
def training_loop(model, optimizer, scheduler, criterion, train_input, test_input):

  train_losses = []
  test_losses = []

  for epoch in range(1, n_epochs + 1):
    running_loss = 0
    model.train()

    # Training - sample 2000
    sampled_train_ids = random.choices(range(train_input.shape[0]), k = 2000)
    print(scheduler.get_last_lr())
    for i in range(train_input.shape[0]):
      sequence = train_input[i, : , :]
      input = torch.tensor(sequence[:-1], dtype = torch.long).squeeze().to(device)
      target = torch.tensor(sequence[1:], dtype = torch.long).squeeze().to(device)
      loss = train_sequence(input, target, model, optimizer, criterion)
      running_loss += loss

    train_epoch_loss = running_loss / 2000
    train_losses.append(train_epoch_loss)
    scheduler.step()

    running_loss = 0
    # model.eval()
    # # Testing
    # for i in range(test_input.shape[0]):
    #   sequence = test_input[i, : , :]
    #   input = torch.tensor(sequence[:-1], dtype = torch.long).squeeze().to(device)
    #   target = torch.tensor(sequence[1:], dtype = torch.long).squeeze().to(device)
    #   loss = test_sequence(input, target, model, criterion)
    #   running_loss += loss

    # test_epoch_loss = running_loss / 1000
    # test_losses.append(test_epoch_loss)
    test_epoch_loss = 0

    print('Epoch {}, Train Loss: {}, Test Loss: {}, Time: {}'.format(epoch, train_epoch_loss, test_epoch_loss, datetime.now()))

  return train_losses, test_losses

In [None]:
n_pitches = len(note_to_int)
hidden_size = 96
n_layers = 2
n_epochs = 40
lr = 0.002
lr_lambda = 0.99

model = GenerationRNN(input_size = n_pitches, hidden_size = hidden_size, output_size = n_pitches, n_layers = n_layers).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr = lr)
scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda = lambda epoch: lr_lambda ** epoch)
criterion = nn.CrossEntropyLoss()
train_losses, test_losses = training_loop(model, optimizer, scheduler, criterion, train_input, train_input)

In [None]:
experiment_params_list = [
                          {'HIDDEN': 128, 'N_LAYERS': 2, 'LR': 0.001, 'LR_LAMBDA': 1, 'OPTIM': 'Adam'},
                          {'HIDDEN': 64, 'N_LAYERS': 4, 'LR': 0.001, 'LR_LAMBDA': 1, 'OPTIM': 'Adam'},
                          {'HIDDEN': 96, 'N_LAYERS': 2, 'LR': 0.05, 'LR_LAMBDA': 0.95, 'OPTIM': 'SGD'},
                          {'HIDDEN': 96, 'N_LAYERS': 2, 'LR': 0.025, 'LR_LAMBDA': 0.95, 'OPTIM': 'SGD'},
                          {'HIDDEN': 96, 'N_LAYERS': 2, 'LR': 0.001, 'LR_LAMBDA': 0.975, 'OPTIM': 'Adam'},
                          {'HIDDEN': 96, 'N_LAYERS': 2, 'LR': 0.0025, 'LR_LAMBDA': 0.95, 'OPTIM': 'Adam'},
                    ]

experiment_losses = {}
experiment_num = 0

for params in experiment_params_list:

  n_pitches = len(note_to_int)
  hidden_size = params['HIDDEN']
  n_layers = params['N_LAYERS']
  n_epochs = 50
  lr = params['LR']
  lr_lambda = params['LR_LAMBDA']

  print(experiment_num, params)

  # Create model, optimizer and loss function
  model = GenerationRNN(input_size = n_pitches, hidden_size = hidden_size, output_size = n_pitches, n_layers = n_layers).to(device)
  if params['OPTIM'] == 'Adam':
    optimizer = torch.optim.Adam(model.parameters(), lr = lr)
  else:
    optimizer = torch.optim.SGD(model.parameters(), lr = lr)

  scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda = lambda epoch: lr_lambda ** epoch)
  criterion = nn.CrossEntropyLoss()
  train_losses, test_losses = training_loop(model, optimizer, scheduler, criterion, train_input, test_input)

  # Save Model
  model_name = 'RNN 21 Apr Overnight Old exp{}'.format(experiment_num)
  save_path = os.path.join(root_dir, 'Saved Models', model_name)
  torch.save(model.state_dict(), save_path)

  # Save experiment losses
  experiment_losses.update({experiment_num: {'train_losses': train_losses, 'test_losses': test_losses}})

  # Plot the losses over epochs
  plt.figure()
  plt.plot(train_losses, label = 'Train Loss')
  plt.plot(test_losses, label = 'Test Loss')
  plt.xlabel('Epoch')
  plt.ylabel('Loss')
  plt.legend()
  plt.title(experiment_num)
  plt.show()
  try:
    with open(os.path.join(root_dir, 'experiment_losses 20 Apr.json'), 'w') as outfile:
      json.dump(experiment_losses, outfile)
  except:
    print('failed to save')

  experiment_num += 1

In [None]:
# Plot the losses over epochs
plt.figure()
plt.plot(train_losses, label = 'Train Loss')
plt.plot(test_losses, label = 'Test Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title(experiment_num)
plt.show()

**RNN Single-Note Generation Evaluation**

In [None]:
# Code to evaluate the language model i.e. generate new music

def evaluate(net, prime_seq, predict_len):
    '''
    Arguments:
    prime_seq - priming sequence (converted t)
    predict_len - number of notes to predict for after prime sequence
    '''
    hidden = net.init_hidden()

    predicted = prime_seq.copy()
    prime_seq = torch.tensor(prime_seq, dtype = torch.long).to(device)


    # "Building up" the hidden state using the prime sequence
    for p in range(len(prime_seq) - 1):
        input = prime_seq[p]
        _, hidden = net(input, hidden)
    
    # Last character of prime sequence
    input = prime_seq[-1]
    
    # For every index to predict
    for p in range(predict_len):

        # Pass the inputs to the model - output has dimension n_pitches - scores for each of the possible characters
        output, hidden = net(input, hidden)

        # Pick the character with the highest probability 
        predicted_id = torch.argmax(torch.softmax(output, dim = 1))

        # Add predicted index to the list and use as next input
        predicted.append(predicted_id.item()) 
        input = predicted_id

    return predicted

def evaluateMultinomial(net, prime_seq, predict_len, temperature=0.8):
    '''
    Arguments:
    prime_seq - priming sequence (converted t)
    predict_len - number of notes to predict for after prime sequence
    '''
    hidden = net.init_hidden()

    predicted = prime_seq.copy()
    prime_seq = torch.tensor(prime_seq, dtype = torch.long).to(device)


    # "Building up" the hidden state using the prime sequence
    for p in range(len(prime_seq) - 1):
        input = prime_seq[p]
        _, hidden = net(input, hidden)
    
    # Last character of prime sequence
    input = prime_seq[-1]
    
    # For every index to predict
    for p in range(predict_len):

        # Pass the inputs to the model - output has dimension n_pitches - scores for each of the possible characters
        output, hidden = net(input, hidden)
        # Sample from the network output as a multinomial distribution
        output = output.data.view(-1).div(temperature).exp()
        predicted_id = torch.multinomial(output, 1)

        # Add predicted index to the list and use as next input
        predicted.append(predicted_id.item()) 
        input = predicted_id

    return predicted

In [None]:
generated_seq = evaluate(model, [100, 101, 102, 101, 100], predict_len = 100)
generated_seq_multinomial = evaluateMultinomial(model, [100, 101, 102, 101, 100], predict_len = 500, temperature = 1.2)
print(generated_seq)
print(generated_seq_multinomial)

**Converting Generated Sequences Into MIDI/Audio**

In [None]:
# Convert the generated ints into notes
generated_seq = [int_to_note[e] for e in generated_seq]
generated_seq_multinomial = [int_to_note[e] for e in generated_seq_multinomial]

In [None]:
def create_midi(prediction_output):
    """ convert the output from the prediction to notes and create a midi file
        from the notes """
    offset = 0
    output_notes = []

    # create note and chord objects based on the values generated by the model
    for pattern in prediction_output:
        # pattern is a chord
        if ('.' in pattern) or pattern.isdigit():
            notes_in_chord = pattern.split('.')
            notes = []
            for current_note in notes_in_chord:
                new_note = note.Note(int(current_note))
                new_note.storedInstrument = instrument.Piano()
                notes.append(new_note)
            new_chord = chord.Chord(notes)
            new_chord.offset = offset
            output_notes.append(new_chord)
        # pattern is a note
        else:
            new_note = note.Note(pattern)
            new_note.offset = offset
            new_note.storedInstrument = instrument.Piano()
            output_notes.append(new_note)

        # increase offset each iteration so that notes do not stack
        offset += 0.5

    midi_stream = stream.Stream(output_notes)

    return midi_stream

In [None]:
# Load the generated MIDI
generated_multitrack = pypianoroll.read(generated_path)
generated_pm = pypianoroll.to_pretty_midi(generated_multitrack)
generated_midi_audio = generated_pm.fluidsynth()
IPython.display.Audio(generated_midi_audio, rate = 44100)