<a href="https://colab.research.google.com/github/jmineroff/Beatle-Basslines/blob/master/BassGeneration.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
# Optional code cell for running in Google Colab
# Links to local git folder in Google Drive and installs modules

from google.colab import drive
drive.mount('/content/drive')

!pip install pypianoroll
!pip install AudioConverter
!apt install fluidsynth
!cp /usr/share/sounds/sf2/FluidR3_GM.sf2 ./font.sf2
!pip install midi2audio

!pip install tables

import os
try:
  os.chdir("drive/My Drive/Beatle-Basslines") # Local git path
except Exception:
  pass

In [0]:
# Initialization

from numpy.random import seed
seed(1)
from tensorflow import set_random_seed
set_random_seed(2)

from midi2audio import FluidSynth as fs
from IPython.display import display, Audio
import numpy as np
import pandas as pd
from pypianoroll import Multitrack, Track
from matplotlib import pyplot as plt
import os
import sys
from scipy import sparse

import pickle
import tables

from __future__ import absolute_import, division, print_function, unicode_literals

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

plt.rcParams["figure.figsize"] = (20,10)

In [0]:
# Helper functions

def write_soundfile(midifile, output): # Write midi to an audio file
  fs(sound_font="font.sf2", sample_rate=11025).midi_to_audio(midifile, output)


def piano_plot(pianofile): # 
  fig, axs = pianofile.plot()
  plt.show()


def parse_midi_dir(list_songs=True, list_tracks=False): # Parse directory of raw midi files
  num_midi = 0

  for subdir, dirs, files in os.walk('RawMIDI/'):
    for file in files:
      num_midi += 1
      
      if not file.endswith(".mid"):
        continue
        
      if list_songs:
        print(subdir.split('/')[-1],'-',file)
      
      if list_tracks:
        filepath = subdir + os.sep + file
        temp = Multitrack(filepath)
        for track in temp.tracks: # 
          print('...',track.name)

  print('Track count:',num_midi)

def tempo_to_progress(tempo): # Get song progress (0-1) from beat-uniform tempo array
  return np.cumsum(1/tempo) / np.sum(1/tempo)


def add_songs_to_df(df, partial_track_match=False, binarize=False): # Add all songs to dataframe
  note_limits = {'Drums': [127,0], 'Bass': [127,0], 'Vocals': [127,0], 'Rhythm': [127,0], 'Lead': [127,0]}
  note_counts = {'Drums': 0, 'Bass': 0, 'Vocals': 0, 'Rhythm': 0, 'Lead': 0}
      
  for subdir, dirs, files in os.walk('RawMIDI/'): # Walk through all files in directory
    for file in files:
      
      track_to_channel = {'Drums': [], 'Bass': [], 'Vocals': [], 'Rhythm': [], 'Lead': [], 'VocalsAll': []}
      
      if not file.endswith(".mid"): # Only process MIDI files
        continue

      print('Processing', subdir.split('/')[-1], '-', file) # Status update

      filepath = subdir + os.sep + file
      temp = Multitrack(filepath)

      for track_idx, track in enumerate(temp.tracks): # Parse all tracks in midi file
        for key in track_to_channel: # Compare all keys to track name
          if ( track.name == key ) or ( partial_track_match and track.name.startswith(key) ): # Exact or partial track-key match (e.g. 'Vocals2', 'Vocals3', etc.)
            track_to_channel[key].append(track_idx)

      if not track_to_channel['Vocals']: # Copy index from 'VocalsAll' to 'Vocals' if necessary
        track_to_channel['Vocals'] = track_to_channel['VocalsAll']
      
      del track_to_channel['VocalsAll']

      for key in track_to_channel: # Set index of nonexistent tracks to -1
        if not track_to_channel[key]:
          track_to_channel[key].append(-1)
      
      if binarize:
        temp.binarize()

      full_pianoroll = temp.get_stacked_pianoroll()
      full_pianoroll = np.append(full_pianoroll, np.zeros((*full_pianoroll.shape[0:2],1)), axis=2) # Add extra track of zeros for nonexistent tracks

      tempo = temp.tempo
      downbeat = temp.downbeat

      progress = tempo_to_progress(tempo)

      pianorolls = {}

      # Get instrument pianorolls from full pianoroll using dictionary indices
      for key in track_to_channel: # 
        
        if binarize:
          pianorolls[key] = np.amax(full_pianoroll[:,:,track_to_channel[key]], axis=2)
        else:
          pianorolls[key] = np.sum(full_pianoroll[:,:,track_to_channel[key]], axis=2)

        note_locations = np.flatnonzero(np.amax(pianorolls[key], axis=0))
        note_count = np.amax(np.count_nonzero(pianorolls[key], axis=1))

        #print(len(note_locations))
        #print(note_locations)
        #print(note_count)
        
        if not note_locations.any():
          continue

        high_note = np.amin(note_locations)
        low_note = np.amax(note_locations)
        if high_note < note_limits[key][0]:
          note_limits[key][0] = high_note
        if low_note > note_limits[key][1]:
          note_limits[key][1] = low_note
        if note_count > note_counts[key]:
          note_counts[key] = note_count
      
      # Add new entry to dataframe
      df.loc[len(df)] = [file.split('.')[0], subdir.split('/')[-1], pianorolls['Drums'], pianorolls['Bass'], pianorolls['Vocals'], pianorolls['Rhythm'], pianorolls['Lead'], tempo, downbeat, progress]

  return note_limits, note_counts


def manual_song_test(df, song_name): # Generate original and recombined audio for manual comparison
  # Build track from original MIDI file
  filepath = 'RawMIDI/' + df.loc[df.Song == song_name].Album.values[0] + '/' + song_name + '.mid'

  original = Multitrack(filepath)
  #piano_plot(original)

  original.write('temp/original.mid')
  write_soundfile('temp/original.mid', 'temp/original.mp3')
  print('Original File')
  display(Audio('temp/original.mp3'))
  
  # Build track from dataframe
  drum_track = Track(df.loc[df.Song == song_name].Drums.values[0], is_drum=True, name='Drums')
  bass_track = Track(df.loc[df.Song == song_name].Bass.values[0], program=34, is_drum=False, name='Bass')
  vocal_track = Track(df.loc[df.Song == song_name].Vocals.values[0], program=73, is_drum=False, name='Vocals')
  rhythm_track = Track(df.loc[df.Song == song_name].Rhythm.values[0], program=24, is_drum=False, name='Rhythm')
  lead_track = Track(df.loc[df.Song == song_name].Lead.values[0], program=26, is_drum=False, name='Lead')
  tempo = df.loc[df.Song == song_name].Tempo.values[0]
  downbeat = df.loc[df.Song == song_name].Downbeat.values[0]

  bass_track.transpose(-12) # Only intermittently needed for proper playback with FluidSynth Soundfont - don't use for exporting raw MIDI files
  
  recombined = Multitrack(tracks=[drum_track, bass_track, vocal_track, rhythm_track, lead_track], tempo=tempo, downbeat=downbeat)

  recombined.write('temp/recombined.mid')
  write_soundfile('temp/recombined.mid', 'temp/recombined.mp3')
  print('Recombined File')
  display(Audio('temp/recombined.mp3'))


def raw_song_to_ndarray(df, song_idx, note_limits, note_counts, binarize_threshold=None): # Trim raw song data into ndarray
  inputs = ['Drums','Vocals','Rhythm','Lead','Tempo','Downbeat','Progress']  # 'Progress' is last input variable
  outputs = ['Bass']
  
  print(df.iloc[song_idx]['Song'])

  # Initialize input array
  input_dim = 0
  for key in inputs:
    if key in note_limits:
      input_dim += note_limits[key][1] - note_limits[key][0] + 1
    else:
      input_dim += 1

  # Initialize output array
  output_dim = 0
  for key in outputs:
    if key in note_limits:
      output_dim += note_limits[key][1] - note_limits[key][0] + 1
    else:
      output_dim += 1

  for num, key in enumerate(inputs): # Build full inputs array
    if key in note_limits:
      new_input = df.iloc[song_idx][key][:,note_limits[key][0]:note_limits[key][1]+1][:,:]

      if binarize_threshold is not None:
        new_input[new_input <= binarize_threshold] = 0
        new_input[new_input > binarize_threshold] = 1
    else:
      new_input = df.iloc[song_idx][key][:,np.newaxis]

    if key == 'Tempo':
      new_input = new_input/240.0

    if key == 'Downbeat':
      new_input = 1.0*new_input

    if num == 0:
      song_input = new_input
    else:
      song_input = np.append(song_input, new_input, axis=1)
      
  for num, key in enumerate(outputs): # Build full outputs array
    if key in note_limits:
      new_output = df.iloc[song_idx][key][:,note_limits[key][0]:note_limits[key][1]+1][:,:]

      if binarize_threshold is not None:
        new_output[new_output <= binarize_threshold] = 0
        new_output[new_output > binarize_threshold] = 1
    else:
      new_output = df.iloc[song_idx][key][:,np.newaxis]

    if num == 0:
      song_output = new_output
    else:
      song_output = np.append(song_output, new_output, axis=1)
  
  return song_input, song_output


def get_processed_data(df, note_limits, note_counts, seq_length=200, train_ratio=0.7, validate_ratio=0.25, binarize_threshold=None):

  num_songs = df.shape[0]
  train_num = int(num_songs*train_ratio)
  validate_num = int(num_songs*(train_ratio+validate_ratio))
  permutation = np.random.permutation(num_songs)

  for song_num, song_idx in enumerate(permutation):
    song_input, song_output = raw_song_to_ndarray(raw_songs_df, song_idx=song_idx, note_limits=note_limits, note_counts=note_counts, binarize_threshold=binarize_threshold)

    # Trim excess 
    progress_at_cut = np.random.normal(loc=0.5, scale=0.15) # 'Progress' is last input variable
    cut_idx = np.abs(song_input[:,-1] - progress_at_cut).argmin()
    cut_size = song_input.shape[0]%seq_length

    if song_input.shape[0] - cut_idx < cut_size:
      cut_idx = song_input.shape[0] - cut_size

    song_input = np.delete(song_input, np.arange(cut_idx, cut_idx+cut_size), axis=0)
    song_output = np.delete(song_output, np.arange(cut_idx, cut_idx+cut_size), axis=0)

    new_input_seqs = np.stack(np.split(song_input, int(song_input.shape[0]/seq_length), axis=0), axis=0)
    new_output_seqs = np.stack(np.split(song_output, int(song_output.shape[0]/seq_length), axis=0), axis=0)

    if song_num == 0:
      x = new_input_seqs
      y = new_output_seqs
    else:
      x = np.append(x, new_input_seqs, axis=0)
      y = np.append(y, new_output_seqs, axis=0)

    if song_num == train_num:
      train_idx = x.shape[0]
    
    if song_num == validate_num:
      validate_idx = x.shape[0]
  
  x_train = x[:train_idx,:,:]
  x_validate = x[train_idx:validate_idx,:,:]
  x_test = x[validate_idx:,:,:]

  y_train = y[:train_idx,:,:]
  y_validate = y[train_idx:validate_idx,:,:]
  y_test = y[validate_idx:,:,:]

  return x_train, x_validate, x_test, y_train, y_validate, y_test


def play_sequence_audio(x, y, note_limits, note_counts, volume_scaling=1.0, output_boost=1.5):

  seq_length = x.shape[0]

  inputs = ['Drums','Vocals','Rhythm','Lead','Tempo','Downbeat','Progress']  # 'Progress' is last input variable
  outputs = ['Bass']
  song_data = dict.fromkeys(inputs + outputs) # Dictionary with full ndarrays

  for key in inputs:
    if key in note_limits:
      track_data = np.zeros((seq_length, 128)) # Initialize track notes
      slice_size = note_limits[key][1] - note_limits[key][0] + 1

      track_data[:,note_limits[key][0]:note_limits[key][1]+1] = x[:,:slice_size]
      x = np.delete(x,np.s_[:slice_size],1)
      song_data[key] = track_data*volume_scaling
    else:
      track_data = x[:,0]
      x = np.delete(x,np.s_[0],1)

      if key == 'Tempo':
        track_data = track_data*240.0

      if key == 'Downbeat':
        track_data = track_data.astype(bool)
        #print(track_data)
      
      song_data[key] = track_data
        
  for key in outputs:
    if key in note_limits:
      track_data = np.zeros((seq_length, 128)) # Initialize track notes
      slice_size = note_limits[key][1] - note_limits[key][0] + 1

      track_data[:,note_limits[key][0]:note_limits[key][1]+1] = y[:,:slice_size]
      y = np.delete(y,np.s_[:slice_size],1)
      song_data[key] = track_data*volume_scaling*output_boost
    else:
      track_data = y[:,0]
      y = np.delete(y,np.s_[0],1)

      if key == 'Tempo':
        track_data = track_data*240.0
      
      if key == 'Downbeat':
        track_data = track_data.astype(bool)
        #print(track_data)
      
      song_data[key] = track_data

  drum_track = Track(song_data['Drums'], is_drum=True, name='Drums')
  bass_track = Track(song_data['Bass'], program=34, is_drum=False, name='Bass')
  vocal_track = Track(song_data['Vocals'], program=73, is_drum=False, name='Vocals')
  rhythm_track = Track(song_data['Rhythm'], program=24, is_drum=False, name='Rhythm')
  lead_track = Track(song_data['Lead'], program=26, is_drum=False, name='Lead')
  tempo = song_data['Tempo']
  downbeat = song_data['Downbeat']

  bass_track.transpose(-12) # Only intermittently needed for proper playback with FluidSynth Soundfont - don't use for exporting raw MIDI files

  sequence = Multitrack(tracks=[drum_track, bass_track, vocal_track, rhythm_track, lead_track], tempo=tempo)#, downbeat=downbeat)

  sequence.check_validity()

  sequence.write('temp/sequence.mid')
  write_soundfile('temp/sequence.mid', 'temp/sequence.mp3')
  print('Sequence')
  display(Audio('temp/sequence.mp3'))

  sequence_bass = Multitrack(tracks=[bass_track], tempo=tempo)#, downbeat=downbeat)

  sequence_bass.write('temp/sequence_bass.mid')
  write_soundfile('temp/sequence_bass.mid', 'temp/sequence_bass.mp3')
  print('Sequence (Bass Only)')
  display(Audio('temp/sequence_bass.mp3'))

In [0]:
# Initialize dataframe
col_names = ['Song','Album','Drums','Bass','Vocals','Rhythm','Lead','Tempo','Downbeat','Progress']
raw_songs_df = pd.DataFrame(columns = col_names)
#raw_songs_df

# Populate dataframe
note_limits, note_counts = add_songs_to_df(raw_songs_df, partial_track_match=True)

In [0]:
# Check df

raw_songs_df.info()

print('Note limits:', note_limits)
print('Note counts:', note_counts)
raw_songs_df.head()

#manual_song_test(raw_songs_df, 'DearPrudence')

In [0]:
# Process data from df

x_train, x_validate, x_test, y_train, y_validate, y_test = get_processed_data(df=raw_songs_df, note_limits=note_limits, note_counts=note_counts, seq_length=200, train_ratio=0.7, validate_ratio=0.25, binarize_threshold=0.1)

In [0]:
# Listen to a sequence

seq_num = 26
play_sequence_audio(x=x_train[seq_num,:,:], y=y_train[seq_num,:,:], note_limits=note_limits, note_counts=note_counts, volume_scaling=100.0)

print(np.amax(x_train[seq_num,:,:-3]))

In [0]:
# Save dataset (huge)

np.savez('temp/outfile.npz', x_train=x_train, x_validate=x_validate, x_test=x_test, y_train=y_train, y_validate=y_validate, y_test=y_test, note_limits=note_limits, note_counts=note_counts)

In [0]:
# Load dataset

data_cache = np.load('temp/outfile.npz', allow_pickle=True)

x_train = data_cache['x_train']
x_validate = data_cache['x_validate']
x_test = data_cache['x_test']
y_train = data_cache['y_train']
y_validate = data_cache['y_validate']
y_test = data_cache['y_test']
note_limits = data_cache['note_limits']
note_counts = data_cache['note_counts']

In [0]:
# Write all song/track names to a text file

from contextlib import redirect_stdout

with open('TrackList.txt', 'w') as f:
  with redirect_stdout(f):
    parse_midi_dir(list_tracks=True)

In [0]:
# Basic model (for testing)

model = tf.keras.Sequential()
model.add(keras.Input(shape=(200,276)))
model.add(layers.SimpleRNN(51, return_sequences=True))

# Configure a model for categorical classification.
model.compile(optimizer='adam',
              loss='binary_crossentropy', # Each note is an independent 'class'
              metrics=['accuracy'])

model.summary()

In [0]:
# Real model

model = tf.keras.Sequential()

model.add(keras.Input(shape=(200,276)))

model.add(layers.Bidirectional(layers.LSTM(128, return_sequences=True)))
model.add(layers.Dropout(0.2))
model.add(layers.Bidirectional(layers.LSTM(128, return_sequences=True)))
model.add(layers.Dropout(0.2))
model.add(layers.Dense(51, activation='sigmoid'))

model.compile(optimizer='adam',
              loss='binary_crossentropy', # Each note is an independent 'class'
              metrics=['accuracy'])

# Save model checkpoints
checkpoint = keras.callbacks.ModelCheckpoint('best.h5', monitor='val_acc', verbose=1, save_best_only=True, mode='max')
callbacks_list = [checkpoint]

model.summary()

In [0]:
# Run the model

history = model.fit(x_train, y_train, epochs=100, batch_size=32, callbacks=callbacks_list,
          validation_data=(x_validate, y_validate))

model.save('bass_model.h5')

In [0]:
# Plot loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validate'], loc='upper left')
plt.show()

# Plot accuracy
plt.plot(history.history['acc'])
plt.plot(history.history['val_acc'])
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validate'], loc='upper left')
plt.show()

In [0]:
# Load model
#model = tf.keras.models.load_model('bass_model.h5')
model = tf.keras.models.load_model('best.h5')

In [0]:
# Predict test set
y_predicted = model.predict(x_test, batch_size=32)

# Binarize
threshold = 0.1
y_predicted[y_predicted <= threshold] = 0
y_predicted[y_predicted > threshold] = 1

#Sanity check
np.count_nonzero(y_predicted,axis=(1,2))

In [0]:
# Listen to a test set sequence

seq_num = 85
print('ORIGINAL')
play_sequence_audio(x=x_test[seq_num,:,:], y=y_test[seq_num,:,:], note_limits=note_limits, note_counts=note_counts, volume_scaling=100.0)

print('PREDICTED')
play_sequence_audio(x=x_test[seq_num,:,:], y=y_predicted[seq_num,:,:], note_limits=note_limits, note_counts=note_counts, volume_scaling=100.0)