Google Colab notebook used in the final year project Beat It!

Code written by Matthew Dwyer 17330141.

Mount dataset from Google Drive using codeblock two using the credentials provided in README.md on GitLab

Directory variables are subject to change depending on where the dataset is saved.

In [None]:
#Install necessary packages
!pip install tensorflow torch torchaudio 

In [2]:
#Mount to google drive
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [3]:
#Import necessary packages
import torchaudio
import torch
import torchaudio.transforms as T

import tensorflow as tf
tf.config.run_functions_eagerly(True)

import matplotlib.pyplot as plt
import os
import glob
import numpy as np
from sklearn import model_selection as ms
from tqdm import tqdm

In [4]:
#Setting random seeds to increase reproducibility
os.environ['PYTHONHASHSEED'] = '0'
np.random.seed(1)
tf.random.set_seed(1)

In [None]:
#Connect to gpu
print(torch.cuda.device_count())
print(torch.cuda.get_device_name(0))
device = torch.device("cuda:0")
device

In [5]:
def buildData(n_fft, slice_len,rebuild=False):
    #Define directoies needed for data preprocessing
    audio_directory = "/content/drive/My Drive/wavs/"
    annotation_directory = "/content/drive/My Drive/BPMs/"
    spectrogram_directory = "/content/drive/My Drive/data/spectrogram_pts/"
    tensor_directory = "/content/drive/My Drive/data/tensor_pts/"
    if rebuild:
        #First delete all the files previously created
        spec_fnames = list(fname for fname in os.listdir(spectrogram_directory) if fname.endswith('.pt'))
        tensor_fnames = list(fname for fname in os.listdir(tensor_directory) if fname.endswith('.pt'))        
        for f in spec_fnames:
          os.remove(spectrogram_directory+f)
        for f in tensor_fnames:
          os.remove(tensor_directory+f)
        #Define values needed for mel spectrogram computation
        n_fft = n_fft
        win_length = None
        hop_length = 512
        n_mels = 40
        fixed_sample_rate = 11025

        #Grab all file names from dataset
        fnames = list(fname for fname in os.listdir(audio_directory) if fname.endswith('.wav'))

        for fname in tqdm(fnames):
            #List of mel spectrograms for this file 
            melspecs = []
            #List of bpm tensors for this file
            bpm_tensors = []
            bpm_file = fname[:-3]
            bpm_file = bpm_file+"txt"

            #Get bpm value from file
            f = open(annotation_directory+bpm_file)

            file_name = fname[:-3]
            file_name = file_name+"pt"

            bpm = f.read() 
            bpm = float(bpm)
            bpm = int(bpm)
            index = bpm-30

            #Load the wav file
            waveform, sample_rate = torchaudio.load(audio_directory+fname)

            #Transform audio sample to mono
            resample_transform = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=fixed_sample_rate)
            audio_mono = torch.mean(resample_transform(waveform), dim=0, keepdim=True)

            mel_spectrogram = T.MelSpectrogram(
              sample_rate=fixed_sample_rate,
              n_fft=n_fft,
              win_length=win_length,
              hop_length=hop_length,
              n_mels=n_mels,
            )

            #compute mel spectrogram of mono audio
            melspec = mel_spectrogram(audio_mono)

            #Slice the mel spectrogram for every 256 units of length, step of 128
            #Half overlapping windows
            for lbound in range(0,melspec.shape[2],int(slice_len/2)):
                ubound = lbound+slice_len
                if ubound > melspec.shape[2]:
                    spec_temp = melspec[:,:,lbound:]
                    padding = slice_len-spec_temp.shape[2]
                    zeropad = torch.nn.ZeroPad2d((0,padding,0,0))
                    spec_temp = zeropad(spec_temp)
                    melspecs.append(spec_temp)

                    tensor = np.zeros(256)
                    tensor[index-1] = 1
                    bpm_temp = torch.from_numpy(tensor)
                    bpm_tensors.append(bpm_temp)
                else:
                    spec_temp = melspec[:,:,lbound:ubound]
                    melspecs.append(spec_temp)

                    tensor = np.zeros(256)
                    tensor[index-1] = 1
                    bpm_temp = torch.from_numpy(tensor)
                    bpm_tensors.append(bpm_temp)

            #Save the mel spectrogram and bpm tensors    
            melspec_tensor = torch.stack(melspecs)
            torch.save(melspec_tensor, spectrogram_directory+file_name)

            bpm_tensor = torch.stack(bpm_tensors)
            torch.save(bpm_tensor, tensor_directory+file_name)
            
    #Grab file names of files
    spec_fnames = list(fname for fname in os.listdir(spectrogram_directory) if fname.endswith('.pt'))
    tensor_fnames = list(fname for fname in os.listdir(tensor_directory) if fname.endswith('.pt'))
    
    X = torch.load(spectrogram_directory+spec_fnames[0]).view(-1,1,40,slice_len)
    y = torch.load(tensor_directory+tensor_fnames[0])
    #Delete these elements from the training set
    del spec_fnames[0]
    del tensor_fnames[0]
    #Loop over the remaining examples and labels in the 
    #training set and concatenate them to X and y
    for x in tqdm(spec_fnames):
        temp_spec = torch.load(spectrogram_directory+x).view(-1,1,40,slice_len)
        X = torch.cat((X,temp_spec),0)

        temp_tensor = torch.load(tensor_directory+x)
        y = torch.cat((y,temp_tensor),0)

        if temp_spec.shape[0] != temp_tensor.shape[0]:
          print(x)
        
    X_train, X_test, y_train, y_test = ms.train_test_split(X.numpy(), y.numpy(), test_size=0.2, random_state=1)
    X_train,X_val, y_train, y_val = ms.train_test_split(X_train, y_train, test_size=0.25, random_state=1)
    del X
    del y
    
    X_train = tf.reshape(tf.convert_to_tensor(X_train),(-1,40,slice_len,1))
    y_train = tf.reshape(tf.convert_to_tensor(y_train),(-1,256))
    
    X_val = tf.reshape(tf.convert_to_tensor(X_val),(-1,40,slice_len,1))
    y_val = tf.reshape(tf.convert_to_tensor(y_val),(-1,256))
    
    X_test = tf.reshape(tf.convert_to_tensor(X_test),(-1,40,slice_len,1))
    y_test = tf.reshape(tf.convert_to_tensor(y_test),(-1,256))

    print("Training set has "+str(X_train.shape[0])+" samples and labels")
    print("Testing set has "+str(X_test.shape[0])+" samples and labels")
    print("Validation set has "+str(X_val.shape[0])+" samples and labels")

    return X_train,y_train,X_val,y_val,X_test,y_test

In [None]:
class TFInception(tf.keras.layers.Layer):
    def __init__(self, n_filters=24, kernel_sizes=[32, 64, 96, 128, 192, 256], bottleneck_channels=36):
        super(TFInception, self).__init__()
        self.conv1 = tf.keras.layers.Conv2D(filters=n_filters, kernel_size=(1,kernel_sizes[0]), padding="same", activation='elu')
        self.conv2 = tf.keras.layers.Conv2D(filters=n_filters, kernel_size=(1,kernel_sizes[1]), padding="same", activation='elu')
        self.conv3 = tf.keras.layers.Conv2D(filters=n_filters, kernel_size=(1,kernel_sizes[2]), padding="same", activation='elu')
        self.conv4 = tf.keras.layers.Conv2D(filters=n_filters, kernel_size=(1,kernel_sizes[3]), padding="same", activation='elu')
        self.conv5 = tf.keras.layers.Conv2D(filters=n_filters, kernel_size=(1,kernel_sizes[4]), padding="same", activation='elu')
        self.conv6 = tf.keras.layers.Conv2D(filters=n_filters, kernel_size=(1,kernel_sizes[5]), padding="same", activation='elu')
        self.bottleneck = tf.keras.layers.Conv2D(filters=bottleneck_channels, kernel_size=(1,1), activation='elu')

    def call(self, x):
        a = tf.concat([self.conv1(x), self.conv2(x), self.conv3(x), self.conv4(x), self.conv5(x), self.conv6(x)], axis=3)
        return self.bottleneck(a)

In [None]:
with tf.device('/CPU:0'): 
  TFModel = tf.keras.Sequential(
  [
      #Input layer
      tf.keras.layers.InputLayer(input_shape=(40,128,1)),
   
      #Short Conv Layers
      tf.keras.layers.BatchNormalization(),
      tf.keras.layers.Conv2D(filters=16, kernel_size=(1,5), padding="same", activation = "elu"),
      tf.keras.layers.BatchNormalization(),
      tf.keras.layers.Conv2D(filters=16, kernel_size=(1,5), padding="same", activation = "elu"),
      tf.keras.layers.BatchNormalization(),
      tf.keras.layers.Conv2D(filters=16, kernel_size=(1,5), padding="same", activation = "elu"),

      #Inception Layers
      tf.keras.layers.AveragePooling2D(pool_size=(5,1), strides=(5,1)),
      tf.keras.layers.BatchNormalization(),
      TFInception(),
      tf.keras.layers.AveragePooling2D(pool_size=(2,1), strides=(2,1)),
      tf.keras.layers.BatchNormalization(),
      TFInception(),
      tf.keras.layers.AveragePooling2D(pool_size=(2,1), strides=(2,1)),
      tf.keras.layers.BatchNormalization(),
      TFInception(),
      tf.keras.layers.AveragePooling2D(pool_size=(2,1), strides=(2,1)),
      tf.keras.layers.BatchNormalization(),
      TFInception(),
      
      #Linear Layers
      tf.keras.layers.BatchNormalization(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dropout(rate=0.5),
      tf.keras.layers.Dense(128),
      tf.keras.layers.BatchNormalization(),
      tf.keras.layers.Dense(64),
      tf.keras.layers.BatchNormalization(),
      tf.keras.layers.Dense(256,activation="softmax"),
  ]
  )

In [None]:
#Check the architecture of the model is correct
TFModel.summary()

In [None]:
#Define accuracy 1, accuracy within 4% margin of error
def accuracy1(y_true, y_pred):
  total = 0
  correct = 0
  for true, pred in zip(y_true, y_pred):
    true_bpm = tf.math.argmax(true)
    true_bpm = true_bpm+30
    margin_of_error = (np.array(true_bpm)*0.04)
    ubound = true_bpm + margin_of_error
    lbound = true_bpm - margin_of_error
    pred_bpm = tf.math.argmax(pred)
    pred_bpm = pred_bpm+30
    total+=1
    if lbound <= pred_bpm <= ubound:
      correct+=1
  accuracy1 = correct/total
  return(accuracy1)

In [6]:
X_train,y_train,X_val,y_val,X_test,y_test = buildData(n_fft=1024,slice_len=256,rebuild=True)

100%|██████████| 882/882 [19:34<00:00,  1.33s/it]
100%|██████████| 881/881 [05:08<00:00,  2.85it/s]


Training set has 9591 samples and labels
Testing set has 3197 samples and labels
Validation set has 3197 samples and labels


In [None]:
#Define the optimizer and compile the network
opt = tf.keras.optimizers.Adam(learning_rate=0.0001,epsilon=1e-8)
TFModel.compile(optimizer=opt,loss=tf.keras.losses.CategoricalCrossentropy(),metrics=['accuracy',accuracy1])

#Define where to save the checkpoint with the best validation accuracy
checkpoint_directory = "/content/drive/My Drive/cnn_best_val.h5"

#Define callback to save checkpoint with best validaiton accuracy
model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_directory,
    save_weights_only=True,
    monitor='val_loss',
    mode='min',
    save_best_only=True)

#Define callback to stop training when the validaiton loss
#does not improve over 10 epochs
early_stop_callback = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss', 
    patience=10)

In [None]:
#Fit the network to the training data
history = TFModel.fit(X_train,y_train, epochs = 100, batch_size=32, validation_data=(X_val, y_val), 
                  validation_batch_size=32, callbacks=[early_stop_callback,model_checkpoint_callback])

In [None]:
#Create graph for training and validation accuracy0 scores
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label = 'Validation Accuracy')
plt.axvline(x = len(history.history['accuracy'])-11, label = "Lowest Validation Checkpoint",linestyle='--',color="red")
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.ylim([0, 1])
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy')

In [None]:
#Create graph for training and validation accuracy1 scores
plt.plot(history.history['accuracy1'], label='Training Accuracy 1')
plt.plot(history.history['val_accuracy1'], label = 'Validation Accuracy 1')
plt.axvline(x = len(history.history['accuracy'])-11, label = "Lowest Validation Checkpoint",linestyle='--',color="red")
plt.xlabel('Epoch')
plt.ylabel('Accuracy 1')
plt.ylim([0.4, 1])
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy 1')

In [None]:
#Create graph for training and validation loss
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label = 'Validation Loss')
plt.axvline(x = len(history.history['accuracy'])-11, label = "Lowest Validation Checkpoint",linestyle='--',color="red")
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss')

In [None]:
#Load the model weights with the lowest validation loss
#and test it on the testing set
TFModel.load_weights(checkpoint_directory)
test_loss, test_acc, test_acc1 = TFModel.evaluate(X_test,  y_test)