In [62]:
import glob
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import tensorflow as tf
import sys
sys.path.append('..')
from pretty_midi_tokenization import notes_to_midi
import os
import pretty_midi
import collections

pd.set_option('display.max_rows',500)
pd.set_option('display.max_columns',504)
pd.set_option('display.width',1000)


DRUM_MIDI_DICT = {    
    36: 'Kick',
    38: 'Snare',
    42: 'Closed Hi-Hat',
    43: 'Floor Tom',
    44: 'Pedal Hi-Hat',
    46: 'Open Hi-Hat',
    47: 'Tom 2',
    48: 'Tom 1',
    49: 'Crash',
    51: 'Ride'}

DIRECTORY_PATH = '..'
DATASET_PATH = os.path.join(DIRECTORY_PATH, 'dataset')
CHECKPOINTS_PATH = os.path.join(DIRECTORY_PATH, 'training_checkpoints')

# Model parameters
BATCH_SIZE = 4
EPOCHS = 50 
LEARNING_RATE = 0.001

# MIDI parameters
BPM = 120
BEATS_PER_BAR = 4
TICKS_PER_BEAT = 12
BAR_DURATION = BEATS_PER_BAR * (60 / BPM)

# Tokenization parameters
BAR_LENGTH = BEATS_PER_BAR * TICKS_PER_BEAT
SEQ_LENGTH = BAR_LENGTH * 4 # 4 bars
VELOCITY_RANGES = {'p': (0, 64), 'f': (65, 127)}
NOTE_START_TOKEN = 'S'
SILENCE_TOKEN = 'O'
BCI_TOKEN = 'BCI'

In [54]:
def convert_time_to_ticks(time: float):
    pm = pretty_midi.PrettyMIDI(midi_file=None, resolution=TICKS_PER_BEAT, initial_tempo=BPM)
    return pm.time_to_tick(time)

def new_note(pitch, velocity, start, end, bar, convert_to_ticks = True):

    # NB: start and end are relative to the bar they are in
    if convert_to_ticks:
        start = convert_time_to_ticks(start - bar*BAR_DURATION)
        end = convert_time_to_ticks(end - bar*BAR_DURATION)

    new_note = {
        'pitch': pitch,
        'velocity': velocity,
        'start': start,
        'end': end,
        'bar': bar
    }
    
    return new_note


def append_note_to_notes_dict(notes: pd.DataFrame, note: dict):
    for key, value in note.items():
        notes[key].append(value)

In [94]:
def midi_to_tokens(midi_file_path: str, bpm = BPM, beats_per_bar = BEATS_PER_BAR) -> pd.DataFrame:

  pm = pretty_midi.PrettyMIDI(midi_file_path)
  instrument = pm.instruments[0]
  notes = collections.defaultdict(list) # Dictionary with values as list
  bar_duration = (60/bpm) * beats_per_bar

  ticks_per_beat = pm.resolution

  # Sort the notes by start time
  sorted_notes = sorted(instrument.notes, key=lambda note: note.start)

  for note in sorted_notes:

    pitch = note.pitch
    velocity = note.velocity
    start = note.start
    end = note.end
    # step = start - prev_start
    duration = end - start
    bar = int(start // bar_duration) # integer part of the division

    # split the note in two if it spans multiple bars
    if start + duration > (bar + 1) * bar_duration: 

      # update the current note to end at the end of the bar and update its duration
      note = new_note(pitch, velocity, start, (bar + 1) * bar_duration, bar)
      append_note_to_notes_dict(notes, note)

      # create new note in the succeeding bar with the remaining duration
      note = new_note(pitch, velocity, (bar + 1) * bar_duration, end, bar + 1)
      append_note_to_notes_dict(notes, note)

    else:
      note = new_note(pitch, velocity, start, end, bar)
      append_note_to_notes_dict(notes, note)

  # create a dataframe from the notes dictionary
  notes_df = pd.DataFrame({name: np.array(value) for name, value in notes.items()})


  # split notes into bars and convert notes ticks into a time serie of strings
  bars_time_series = []
  for bar_id in notes_df['bar'].unique():
    bar_df = notes_df[notes_df['bar'] == bar_id]
    bar_df = bar_df.reset_index(drop=True)

    # fill the beginning and end of each bar with empty notes if necessary
    if bar_df.loc[len(bar_df) - 1, 'end'] != BAR_LENGTH:
      note = new_note(pitch = 0,
                      velocity = 0,
                      start = bar_df.loc[len(bar_df) - 1, 'end'],
                      end = BAR_LENGTH,
                      bar = bar,
                      convert_to_ticks = False)
      bar_df = bar_df.append(note, ignore_index=True)

    if bar_df.at[0, 'start'] != 0:
      note = new_note(pitch = 0,
                      velocity = 0,
                      start = 0,
                      end = bar_df.at[0, 'start'],
                      bar = bar,
                      convert_to_ticks = False)
      bar_df = bar_df.append(note, ignore_index=True) 
      bar_df = bar_df.sort_values(by=['start']) 
      bar_df = bar_df.reset_index(drop=True)


    # convert note ticks into a time serie of strings 
    bar_time_serie = np.empty((BAR_LENGTH), dtype=object)
    bar_time_serie[:] = SILENCE_TOKEN
    for i in range(len(bar_df)):
      note = bar_df.loc[i, 'pitch']
      if note != 0:
        start = bar_df.loc[i, 'start']
        end = bar_df.loc[i, 'end']
        bar_time_serie[start] = str(note)+NOTE_START_TOKEN
        bar_time_serie[start+1:end] = str(note)
    bars_time_series.append(bar_time_serie)


  # flat bars and extract the string vocabulary
  flatten_time_series = np.concatenate(bars_time_series)
  token_list = list(set(flatten_time_series))


  # add the token related to the BCI classification (start) to the input sequences vocabulary
  if 'input' in midi_file_path:
    token_list.append(BCI_TOKEN)


  # convert strings tokens to integer tokens and make sequences shifted of 1 bar per time step
  STRINGS_TO_TOKENS_VOCAB = {}
  for i in range(0, len(token_list)):
      STRINGS_TO_TOKENS_VOCAB[token_list[i]] = i

  TOKENS_TO_STRING_VOCAB = {}
  for i in range(0, len(token_list)):
      TOKENS_TO_STRING_VOCAB[i] = token_list[i]


  sequences=[]
  num_sequences = len(flatten_time_series) - SEQ_LENGTH
  for i in range(0, num_sequences, BAR_LENGTH):
    seq = flatten_time_series[i:(i+SEQ_LENGTH)].copy() # NB: copy is necessary to avoid modifying the original array

    # add the BCI token to the input sequences at each time step
    if 'input' in midi_file_path:
      seq = np.concatenate(([BCI_TOKEN], seq[:-1]))

    for i in range(len(seq)):
      seq[i] = STRINGS_TO_TOKENS_VOCAB[seq[i]] 

      # normalize the input sequence 
      if 'input' in midi_file_path:
        seq[i] = seq[i]/len(STRINGS_TO_TOKENS_VOCAB)

    sequences.append(seq)

  return sequences, TOKENS_TO_STRING_VOCAB, notes_df

In [95]:
'''
Assumptions:
Sequences described as input_#.mid and output_#.mid in the corresponding folders
'''

input_filenames = glob.glob(os.path.join(DATASET_PATH, 'input/*.MID'))
print('Number of input files:', len(input_filenames))

output_filenames = glob.glob(os.path.join(DATASET_PATH, 'output/*.MID'))
print('Number of output files:', len(output_filenames))

for i, (in_file, out_file) in enumerate(zip(input_filenames, output_filenames)):

    in_file_name = os.path.basename(in_file)
    out_file_name = os.path.basename(out_file)
    print(f'\n\n{i + 1}: {in_file_name} -> {out_file_name}')

    input_sequences, INPUT_TOKENS_TO_STRING_VOCAB, input_notes_df = midi_to_tokens(in_file)
    n_bar = len(input_notes_df['bar'].unique())
    IN_VOCAB_SIZE = len(INPUT_TOKENS_TO_STRING_VOCAB)
    print(f'\nNumber of input bars: {n_bar}')
    print(f'Number of input sequences: {len(input_sequences)}')
    print(f'Input sequence length: {len(input_sequences[0])}')
    print(f'Input vocabulars size: {IN_VOCAB_SIZE}')

    output_sequences, OUTPUT_TOKENS_TO_STRING_VOCAB, output_notes_df = midi_to_tokens(out_file)
    n_bar = len(output_notes_df['bar'].unique())
    OUT_VOCAB_SIZE = len(OUTPUT_TOKENS_TO_STRING_VOCAB)
    print(f'\nNumber of output bars: {n_bar}')
    print(f'Number of output sequences: {len(output_sequences)}')
    print(f'Output sequence length: {len(output_sequences[0])}')
    print(f'Output vocabulars size: {OUT_VOCAB_SIZE}')

    min_length = min(len(input_sequences), len(output_sequences))
    input_sequences = input_sequences[:min_length]
    output_sequences = output_sequences[:min_length]
    print(f'\nNumber of sequences after truncation: {len(input_sequences)}, {len(output_sequences)}')


Number of input files: 1
Number of output files: 1


1: drum_excited.MID -> bass_example.MID

Number of input bars: 24
Number of input sequences: 20
Input sequence length: 192
Input vocabulars size: 13

Number of output bars: 11
Number of output sequences: 7
Output sequence length: 192
Output vocabulars size: 30

Number of sequences after truncation: 7, 7


In [92]:
import time
import math
import matplotlib.pyplot as plt
plt.switch_backend('agg')
import matplotlib.ticker as ticker
import numpy as np

from __future__ import unicode_literals, print_function, division

import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F

import numpy as np
from torch.utils.data import TensorDataset, DataLoader, RandomSampler

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
def get_dataloader(input_sequences, output_sequences, batch_size = BATCH_SIZE):

    train_data = TensorDataset(torch.LongTensor(input_sequences).to(device),
                               torch.LongTensor(output_sequences).to(device))

    train_sampler = RandomSampler(train_data)
    train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=batch_size)
    return train_dataloader

def showPlot(points):
    plt.figure()
    fig, ax = plt.subplots()
    # this locator puts ticks at regular intervals
    loc = ticker.MultipleLocator(base=0.2)
    ax.yaxis.set_major_locator(loc)
    plt.plot(points)



def train_epoch(dataloader, encoder, decoder, encoder_optimizer,
          decoder_optimizer, criterion):

    total_loss = 0
    for data in dataloader:
        input_tensor, target_tensor = data

        encoder_optimizer.zero_grad()
        decoder_optimizer.zero_grad()

        encoder_outputs, encoder_hidden = encoder(input_tensor)
        decoder_outputs, _, _ = decoder(encoder_outputs, encoder_hidden, target_tensor)

        loss = criterion(
            decoder_outputs.view(-1, decoder_outputs.size(-1)),
            target_tensor.view(-1)
        )
        loss.backward()

        encoder_optimizer.step()
        decoder_optimizer.step()

        total_loss += loss.item()

    return total_loss / len(dataloader)


def asMinutes(s):
    m = math.floor(s / 60)
    s -= m * 60
    return '%dm %ds' % (m, s)

def timeSince(since, percent):
    now = time.time()
    s = now - since
    es = s / (percent)
    rs = es - s
    return '%s (- %s)' % (asMinutes(s), asMinutes(rs))

def train(train_dataloader, encoder, decoder, n_epochs, learning_rate=0.001,
               print_every=100, plot_every=100):
    start = time.time()
    plot_losses = []
    print_loss_total = 0  # Reset every print_every
    plot_loss_total = 0  # Reset every plot_every

    encoder_optimizer = optim.Adam(encoder.parameters(), lr=learning_rate)
    decoder_optimizer = optim.Adam(decoder.parameters(), lr=learning_rate)
    criterion = nn.NLLLoss()

    for epoch in range(1, n_epochs + 1):
        loss = train_epoch(train_dataloader, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion)
        print_loss_total += loss
        plot_loss_total += loss

        if epoch % print_every == 0:
            print_loss_avg = print_loss_total / print_every
            print_loss_total = 0
            print('%s (%d %d%%) %.4f' % (timeSince(start, epoch / n_epochs),
                                        epoch, epoch / n_epochs * 100, print_loss_avg))

        if epoch % plot_every == 0:
            plot_loss_avg = plot_loss_total / plot_every
            plot_losses.append(plot_loss_avg)
            plot_loss_total = 0


In [None]:
hidden_size = 128
batch_size = 32

input_lang, output_lang, train_dataloader = get_dataloader(batch_size)

encoder = EncoderRNN(input_lang.n_words, hidden_size).to(device)
decoder = AttnDecoderRNN(hidden_size, output_lang.n_words).to(device)

train(train_dataloader, encoder, decoder, 80, print_every=5, plot_every=5)