<a href="https://colab.research.google.com/github/satvik-venkatesh/audio-seg-data-synth/blob/main/train-CRNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# This notebook trains a convolutional recurrent neural network (CRNN) on the synthesised data

In [None]:
!pip install sed_eval
!pip install librosa==0.7.2
!pip install soundfile

In [None]:
import numpy as np
import tensorflow as tf
import IPython
import math
import glob
import sed_eval
import dcase_util
import pickle
import os
import soundfile as sf
import librosa

In [None]:
"""
Mount Google Drive into Colab.
"""
from google.colab import drive
drive.mount('/content/drive')

In [None]:
"""
Extract the .zip files into the 'train data' folder.
"""
from zipfile import ZipFile

for i in range(0, 8):
  zip_name = "/content/drive/My Drive/Data Synthesis/Train - d_" + str(i + 1) + ".zip"
  with ZipFile(zip_name, 'r') as zip:
    zip.extractall('train data')
    print("Extracted all sound files into the folder {}".format(i + 1))

In [None]:
"""
Extracting BBC+MuSpeak Val data
"""
from zipfile import ZipFile
zip_name = "/content/drive/My Drive/Data Synthesis/Val - d.zip"
with ZipFile(zip_name, 'r') as zip:
  zip.extractall('validation data')
  print("Extracted all sound files into the folder")


In [None]:
import tensorflow as tf
import keras

class DataGenerator(tf.compat.v2.keras.utils.Sequence):
    'Generates data for Keras'
    def __init__(self, list_examples, batch_size=128, dim=(1, ),
                 n_classes=2, shuffle=True):
        'Initialization'
        print("Constructor called!!!")
        self.dim = dim
        self.batch_size = batch_size
        self.list_examples = list_examples
        self.n_classes = n_classes
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        'Denotes the number of batches per epoch'
        #print("The self.list_examples is {}".format(self.list_examples))
        return int(np.floor(len(self.list_examples) / self.batch_size))

    def __getitem__(self, index):
        'Generate one batch of data'
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

        # Find list of IDs
        list_IDs_temp = [self.list_examples[k] for k in indexes]

        # Generate data
        X, y = self.__data_generation(list_IDs_temp)

        return X, y

    def on_epoch_end(self):
      self.indexes = np.arange(len(self.list_examples))
      if self.shuffle == True:
          np.random.shuffle(self.indexes)

    def __data_generation(self, list_IDs_temp):
        # 'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
        # # Initialization

        X = np.empty([self.batch_size, 802, 80], dtype=np.float32)
        y = np.empty([self.batch_size, 802, 2], dtype=np.int16)

        # Generate data
        for i, ID in enumerate(list_IDs_temp):
            # Store sample

            X[i,:, :] = np.load(ID[0])

            # Store class          
            y[i, :, :] = np.load(ID[1])

        return X, y

In [None]:
"""
Functions to perform natural sort on the examples and labels.
"""
import re

def tryint(s):
    try:
        return int(s)
    except ValueError:
        return s
    
def alphanum_key(s):
    """ Turn a string into a list of string and number chunks.
        "z23a" -> ["z", 23, "a"]
    """
    return [ tryint(c) for c in re.split('([0-9]+)', s) ]

def sort_nicely(l):
    """ Sort the given list in the way that humans expect.
    """
    l.sort(key=alphanum_key)

In [None]:
import glob
import random
"""
Load the individual numpy arrays into partition
"""
data = glob.glob("/content/train data/content/Mel Files/**/mel-id-[0-9]*.npy", recursive=True)
sort_nicely(data)

labels = glob.glob("/content/train data/content/Mel Files/**/mel-id-label-[0-9]*.npy", recursive=True)
sort_nicely(labels)

train_examples = [(data[i], labels[i]) for i in range(len(data))]

random.seed(4)
random.shuffle(train_examples)

In [None]:
"""
Creating the train partition.
"""
m_train = 20480 * 2
random.seed()
random.shuffle(train_examples)

data_MS = glob.glob("/content/train data/MuSpeak/content/Mel Files/**/mel-id-[0-9]*.npy", recursive=True) 
sort_nicely(data_MS)

labels_MS = glob.glob("/content/train data/MuSpeak/content/Mel Files/**/mel-id-label-[0-9]*.npy", recursive=True)
sort_nicely(labels_MS)

train_examples_MS = [(data_MS[i], labels_MS[i]) for i in range(len(data_MS))]

partition = {}
partition['train'] = train_examples[0:m_train] + train_examples_MS

random.shuffle(partition['train'])

In [None]:
print("The size of partition['train'] is {}".format(len(partition['train'])))

In [None]:
"""
This loads data for the validation set.
"""
import glob
import random

data = glob.glob("/content/validation data/**/mel-id-[0-9]*.npy", recursive=True)
sort_nicely(data)

labels = glob.glob("/content/validation data/**/mel-id-label-[0-9]*.npy", recursive=True)
sort_nicely(labels)

validation_examples = [(data[i], labels[i]) for i in range(len(data))]

random.seed(4)
random.shuffle(validation_examples)
print(validation_examples[0])

partition['validation'] = validation_examples

In [None]:
# """
# This loads data for the test set.
# """
# import glob
# import random

# data = glob.glob("/content/test data/**/mel-id-[0-9]*.npy", recursive=True)
# sort_nicely(data)

# labels = glob.glob("/content/test data/**/mel-id-label-[0-9]*.npy", recursive=True)
# sort_nicely(labels)

# test_examples = [(data[i], labels[i]) for i in range(len(data))]

# random.seed(4)
# random.shuffle(test_examples)
# print(test_examples[0])

# partition['test'] = test_examples

In [None]:
# Parameters
params = {'dim': (1, ),
          'batch_size': 128,
          'n_classes': 2,
          'shuffle': True}

# Generators
training_generator = DataGenerator(partition['train'], **params)
validation_generator = DataGenerator(partition['validation'], **params)
# test_generator = DataGenerator(partition['test'], **params)

In [None]:
import os
class MyCustomCallback(tf.keras.callbacks.Callback):
  def __init__(self, model_path, patience=0):
    super(MyCustomCallback, self).__init__()
    self.patience = patience
    # best_weights to store the weights at which the minimum loss occurs.
    self.best_weights = None
    self.model_path = model_path
 
  def on_train_begin(self, logs=None):
    # The number of epoch it has waited when loss is no longer minimum.
    self.wait = 0
    # The epoch the training stops at.
    self.stopped_epoch = 0
    # Initialize the best F1 as 0.0.
    self.best_val_loss = np.inf
    self.is_impatient = False

  def on_train_end(self, logs=None):
    if not self.is_impatient:
      print("Restoring model weights from the end of the best epoch.")
      self.model.set_weights(self.best_weights)
      temp_model_path = self.model_path.replace(".h5", "_temp.h5")
      os.remove(temp_model_path)

  def on_epoch_end(self, epoch, logs=None):
    current_val_loss = logs.get("val_loss")
    print("\n current_val_loss: {}".format(current_val_loss))
    temp_model_path = self.model_path.replace(".h5", "_temp.h5")
    self.model.save(temp_model_path)
    if current_val_loss < self.best_val_loss:
      self.best_val_loss = current_val_loss
      self.wait = 0
      self.best_weights = self.model.get_weights()
      self.model.save(self.model_path)

    else:
        self.wait += 1
        if self.wait >= self.patience:
            self.stopped_epoch = epoch
            self.is_impatient = True
            self.model.stop_training = True
            print("Restoring model weights from the end of the best epoch.")
            self.model.set_weights(self.best_weights)
            #os.remove(temp_model_path)

In [None]:
"""
The CRNN developed for audio segmentation.
"""

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

mel_input = keras.Input(shape=(802, 80), name="mel_input")
X = mel_input

X = tf.keras.layers.Reshape((802, 80, 1))(X)
print(X.shape)

X = tf.keras.layers.Conv2D(filters=16, kernel_size=7, strides=1, padding='same')(X)
X = layers.BatchNormalization(momentum=0.0)(X)
X = tf.keras.layers.Activation('relu')(X)
X = tf.keras.layers.MaxPool2D(pool_size=(1, 2))(X)
X = tf.keras.layers.Dropout(rate = 0.2)(X)

X = tf.keras.layers.Conv2D(filters=64, kernel_size=7, strides=1, padding='same')(X)
X = layers.BatchNormalization(momentum=0.0)(X)
X = tf.keras.layers.Activation('relu')(X)
X = tf.keras.layers.MaxPool2D(pool_size=(1, 2))(X)
X = layers.Dropout(rate = 0.2)(X)

print(X.shape)
_, _, sx, sy = X.shape
X = tf.keras.layers.Reshape((-1, int(sx * sy)))(X)

X = layers.Bidirectional(layers.GRU(80, return_sequences = True))(X)
X = layers.BatchNormalization(momentum=0.0)(X)

X = layers.Bidirectional(layers.GRU(80, return_sequences = True))(X)
X = layers.BatchNormalization(momentum=0.0)(X)

pred = layers.TimeDistributed(layers.Dense(2, activation='sigmoid'))(X)

model = keras.Model(inputs = [mel_input], outputs = [pred])

keras.utils.plot_model(model, "CRNN.png", show_shapes=True)

model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=0.001),
    loss=[keras.losses.BinaryCrossentropy()], metrics=['binary_accuracy'] #, 'categorical_accuracy', tf.keras.metrics.Precision(class_id=0), tf.keras.metrics.Precision(class_id=1), tf.keras.metrics.Recall(class_id=0), tf.keras.metrics.Recall(class_id=1)]
)

model.summary()

In [None]:
# Ensure the directory for the model path is already created.
model_path = "/content/drive/My Drive/model-1.h5"

In [None]:
history = model.fit(training_generator, validation_data=validation_generator, epochs=30, 
                    callbacks=[MyCustomCallback(model_path, patience=15)])

In [None]:
model.evaluate(validation_generator)