<a href="https://colab.research.google.com/github/satvik-venkatesh/train-synth-audio-seg/blob/main/train-CRNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install numba==0.48

In [None]:
!pip install sed_eval
!pip install librosa==0.7.2
!pip install soundfile

In [None]:
!pip install keras-tcn

In [None]:
!pip install -q -U keras-tuner
import kerastuner as kt

In [None]:
import numpy as np
import IPython
import math
import glob
import sed_eval
import dcase_util
import pickle
import os
import shutil
import soundfile as sf
import librosa

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

from tensorflow.keras.layers import Dense, Activation
from tensorflow.keras import Input, Model
from tcn import TCN
from kerastuner import HyperModel

In [None]:
"""
Mount Google Drive into Colab.
"""
from google.colab import drive
drive.mount('/content/drive')

In [None]:
"""
Extract artificial data into the 'train data' folder.
"""
from zipfile import ZipFile

for i in range(0, 8):
  zip_name = "/content/drive/My Drive/Data Synthesis/Train - d_" + str(i + 1) + ".zip"
  with ZipFile(zip_name, 'r') as zip:
    zip.extractall('train data')
    print("Extracted all sound files into the folder {}".format(i + 1))

In [None]:
"""
Extracting Train data (if you have annotated real-world data)
"""
from zipfile import ZipFile
zip_name = "/content/drive/My Drive/Data Synthesis/Real-Train.zip"
with ZipFile(zip_name, 'r') as zip:
  zip.extractall('train data')
  print("Extracted all sound files into the folder")


In [None]:
"""
Extracting Real-world Val data
"""
from zipfile import ZipFile
zip_name = "/content/drive/My Drive/Data Synthesis/Val - d.zip"
with ZipFile(zip_name, 'r') as zip:
  zip.extractall('validation data')
  print("Extracted all sound files into the folder")


In [None]:
def to_seg_by_class(events, n_frames = 802):
  labels = np.zeros((n_frames, 2), dtype=np.float32)

  for e in events:
    t1 = float(e[0])
    t1 = int(t1 / 220 * 22050)
    t2 = float(e[1])
    t2 = int(t2 / 220 * 22050)

    if e[2] == 'speech':
      labels[t1:t2, 0] = 1
    elif e[2] == 'music':
      labels[t1:t2, 1] = 1
  
  return labels 

In [None]:
"""
Convert the pickle files to npy
"""

labels = glob.glob("/content/train data/**/mel-id-label-[0-9]*.pickle", recursive=True)

for ll in labels:
  with open(ll, 'rb') as f:
    n = pickle.load(f)
  n2 = to_seg_by_class(n)
  np.save(ll.replace(".pickle", ".npy"), n2)


labels = glob.glob("/content/validation data/**/mel-id-label-[0-9]*.pickle", recursive=True)

for ll in labels:
  with open(ll, 'rb') as f:
    n = pickle.load(f)
  n2 = to_seg_by_class(n)
  np.save(ll.replace(".pickle", ".npy"), n2)

In [None]:
import tensorflow as tf
import keras

class DataGenerator(tf.compat.v2.keras.utils.Sequence):
    'Generates data for Keras'
    def __init__(self, list_examples, batch_size=128, dim=(1, ),
                 n_classes=2, shuffle=True):
        'Initialization'
        print("Constructor called!!!")
        self.dim = dim
        self.batch_size = batch_size
        self.list_examples = list_examples
        self.n_classes = n_classes
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        'Denotes the number of batches per epoch'
        #print("The self.list_examples is {}".format(self.list_examples))
        return int(np.floor(len(self.list_examples) / self.batch_size))

    def __getitem__(self, index):
        'Generate one batch of data'
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

        # Find list of IDs
        list_IDs_temp = [self.list_examples[k] for k in indexes]

        # Generate data
        X, y = self.__data_generation(list_IDs_temp)

        return X, y

    def on_epoch_end(self):
      self.indexes = np.arange(len(self.list_examples))
      if self.shuffle == True:
          np.random.shuffle(self.indexes)

    def __data_generation(self, list_IDs_temp):
        # 'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
        # # Initialization

        X = np.empty([self.batch_size, 802, 80], dtype=np.float32)
        y = np.empty([self.batch_size, 802, 2], dtype=np.float32)

        # Generate data
        for i, ID in enumerate(list_IDs_temp):
          # Store sample

          xx = np.load(ID[0])
          X[i, :, :] = xx

          # Store class
          yy = np.load(ID[1])
                    
          y[i, :, :] = yy

        return X, y

In [None]:
import re

def tryint(s):
    try:
        return int(s)
    except ValueError:
        return s
    
def alphanum_key(s):
    """ Turn a string into a list of string and number chunks.
        "z23a" -> ["z", 23, "a"]
    """
    return [ tryint(c) for c in re.split('([0-9]+)', s) ]

def sort_nicely(l):
    """ Sort the given list in the way that humans expect.
    """
    l.sort(key=alphanum_key)

In [None]:
import glob
import random
"""
Load the individual numpy arrays into partition
"""
data = glob.glob("/content/train data/**/mel-id-[0-9]*.npy", recursive=True) # + glob.glob("/content/train data/MuSpeak/content/Mel Files/**/mel-id-[0-9]*.npy", recursive=True) 
#data = glob.glob("/content/train data/MuSpeak/content/Mel Files/**/mel-id-[0-9]*.npy", recursive=True) 
sort_nicely(data)

labels = glob.glob("/content/train data/**/mel-id-label-[0-9]*.npy", recursive=True) #+ glob.glob("/content/train data/MuSpeak/content/Mel Files/**/mel-id-label-[0-9]*.npy", recursive=True)
#labels = glob.glob("/content/train data/MuSpeak/content/Mel Files/**/mel-id-label-[0-9]*.npy", recursive=True)
sort_nicely(labels)

train_examples = [(data[i], labels[i]) for i in range(len(data))]

random.seed(4)
random.shuffle(train_examples)

partition = {}
partition['train'] = train_examples

random.shuffle(partition['train'])

In [None]:
"""
This loads data for the validation set.
"""
import glob
import random

data = glob.glob("/content/validation data/**/mel-id-[0-9]*.npy", recursive=True)
sort_nicely(data)

labels = glob.glob("/content/validation data/**/mel-id-label-[0-9]*.npy", recursive=True)
sort_nicely(labels)

validation_examples = [(data[i], labels[i]) for i in range(len(data))]

random.seed(4)
random.shuffle(validation_examples)

partition['validation'] = validation_examples

In [None]:
len(partition['train'])

In [None]:
# Parameters
params = {'dim': (1, ),
          'batch_size': 32,
          'n_classes': 2,
          'shuffle': True}

# Generators
training_generator = DataGenerator(partition['train'], **params)
validation_generator = DataGenerator(partition['validation'], **params)

In [None]:
class SpeechF1(tf.keras.metrics.Metric):

  def __init__(self, name='speech_f1', **kwargs):
    super(SpeechF1, self).__init__(name=name, **kwargs)
    self.tp = self.add_weight(name='true_positive', initializer='zeros')
    self.fp = self.add_weight(name='false_positive', initializer='zeros')
    self.tn = self.add_weight(name='true_negative', initializer='zeros')
    self.fn = self.add_weight(name='false_negative', initializer='zeros')

  def update_state(self, y_true, y_pred, sample_weight=None):

    threshold = tf.constant([0.5])

    binary_true = y_true[:, :, 0]
    binary_pred = y_pred[:, :, 0]

    binary_true = tf.greater_equal(binary_true, threshold)
    binary_pred = tf.greater_equal(binary_pred, threshold)

    tp = tf.cast(tf.logical_and(tf.equal(binary_true, True), tf.equal(binary_pred, True)), dtype = np.float32)
    fp = tf.cast(tf.logical_and(tf.equal(binary_true, False), tf.equal(binary_pred, True)), dtype = np.float32)
    tn = tf.cast(tf.logical_and(tf.equal(binary_true, False), tf.equal(binary_pred, False)), dtype = np.float32)
    fn = tf.cast(tf.logical_and(tf.equal(binary_true, True), tf.equal(binary_pred, False)), dtype = np.float32)

    self.tp.assign_add(tf.reduce_sum(tp, axis = None))
    self.fp.assign_add(tf.reduce_sum(fp, axis = None))
    self.tn.assign_add(tf.reduce_sum(tn, axis = None))
    self.fn.assign_add(tf.reduce_sum(fn, axis = None))

  def result(self):
    binary_f1 = self.tp / (self.tp +  0.5 * (self.fp + self.fn))
    return binary_f1

  def reset_states(self):
    self.tp.assign(0)
    self.fp.assign(0)
    self.tn.assign(0)
    self.fn.assign(0)

In [None]:
class MusicF1(tf.keras.metrics.Metric):

  def __init__(self, name='music_f1', **kwargs):
    super(MusicF1, self).__init__(name=name, **kwargs)
    self.tp = self.add_weight(name='true_positive', initializer='zeros')
    self.fp = self.add_weight(name='false_positive', initializer='zeros')
    self.tn = self.add_weight(name='true_negative', initializer='zeros')
    self.fn = self.add_weight(name='false_negative', initializer='zeros')

  def update_state(self, y_true, y_pred, sample_weight=None):

    threshold = tf.constant([0.5])

    binary_true = y_true[:, :, 1]
    binary_pred = y_pred[:, :, 1]

    binary_true = tf.greater_equal(binary_true, threshold)
    binary_pred = tf.greater_equal(binary_pred, threshold)

    tp = tf.cast(tf.logical_and(tf.equal(binary_true, True), tf.equal(binary_pred, True)), dtype = np.float32)
    fp = tf.cast(tf.logical_and(tf.equal(binary_true, False), tf.equal(binary_pred, True)), dtype = np.float32)
    tn = tf.cast(tf.logical_and(tf.equal(binary_true, False), tf.equal(binary_pred, False)), dtype = np.float32)
    fn = tf.cast(tf.logical_and(tf.equal(binary_true, True), tf.equal(binary_pred, False)), dtype = np.float32)

    self.tp.assign_add(tf.reduce_sum(tp, axis = None))
    self.fp.assign_add(tf.reduce_sum(fp, axis = None))
    self.tn.assign_add(tf.reduce_sum(tn, axis = None))
    self.fn.assign_add(tf.reduce_sum(fn, axis = None))

  def result(self):
    binary_f1 = self.tp / (self.tp +  0.5 * (self.fp + self.fn))
    return binary_f1

  def reset_states(self):
    self.tp.assign(0)
    self.fp.assign(0)
    self.tn.assign(0)
    self.fn.assign(0)

In [None]:
initial_learning_rate = 0.001
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate,
    decay_steps=2500,
    decay_rate=0.84,
    staircase=True)

In [None]:

import os
class MyCustomCallback_3(tf.keras.callbacks.Callback):
  def __init__(self, model_dir, patience=0):
    super(MyCustomCallback_3, self).__init__()
    self.patience = patience
    # best_weights to store the weights at which the minimum loss occurs.
    self.best_weights = None
    self.model_best_path = os.path.join(model_dir, 'model-best.h5')
    self.model_last_path = os.path.join(model_dir, 'model-last-epoch.h5')
    self.custom_params = {"best_loss":np.inf, "last_epoch":0, "best_binary_accuracy":0}
    
    self.custom_params_path = os.path.join(model_dir, 'custom_params.pickle')
    if os.path.isfile(self.custom_params_path):
      with open(self.custom_params_path, 'rb') as f:
        self.custom_params = pickle.load(f)
      best_model = tf.keras.models.load_model(self.model_best_path, custom_objects={ 
                  'binary_acc':binary_acc, 'TCN':TCN(), 'SpeechF1':SpeechF1(), 'MusicF1':MusicF1()})
      self.best_weights = best_model.get_weights()


  def on_train_begin(self, logs=None):
    # The number of epoch it has waited when loss is no longer minimum.
    self.wait = 0
    # The epoch the training stops at.
    self.stopped_epoch = 0
    # Initialize the best F1 as 0.0.
    self.is_impatient = False

  def on_train_end(self, logs=None):
    if not self.is_impatient:
      print("Restoring model weights from the end of the best epoch.")
      self.model.set_weights(self.best_weights)
      # temp_model_path = self.model_path.replace(".h5", "_temp.h5")
      #os.remove(temp_model_path)

  def on_epoch_end(self, epoch, logs=None):
    current_val_loss = logs.get("val_loss")
    current_binary_accuracy = logs.get("val_binary_accuracy")
    self.model.save(self.model_last_path)
    self.custom_params["last_epoch"] = self.custom_params["last_epoch"] + 1

    if current_binary_accuracy > self.custom_params['best_binary_accuracy']:
      self.custom_params['best_binary_accuracy'] = current_binary_accuracy
      self.custom_params['best_loss'] = current_val_loss
      self.wait = 0
      self.best_weights = self.model.get_weights()
      self.model.save(self.model_best_path)

    else:
        self.wait += 1
        if self.wait >= self.patience:
            self.stopped_epoch = epoch
            self.is_impatient = True
            self.model.stop_training = True
            print("Restoring model weights from the end of the best epoch.")
            self.model.set_weights(self.best_weights)
            #os.remove(temp_model_path)
    with open(self.custom_params_path, 'wb') as f:
      pickle.dump(self.custom_params, f, pickle.HIGHEST_PROTOCOL)


In [None]:
"""
HP optimised CRNN
"""
mel_input = keras.Input(shape=(802, 80), name="mel_input")
X = mel_input

X = tf.keras.layers.Reshape((802, 80, 1))(X)

X = tf.keras.layers.Conv2D(filters=64, kernel_size=3, strides=1, padding='same')(X)
X = tf.keras.layers.Activation('relu')(X)
X = tf.keras.layers.MaxPool2D(pool_size=(1, 2))(X)
X = layers.LayerNormalization(axis = [-2, -1])(X)

X = tf.keras.layers.Conv2D(filters=64, kernel_size=11, strides=1, padding='same')(X)
X = tf.keras.layers.Activation('relu')(X)
X = tf.keras.layers.MaxPool2D(pool_size=(1, 2))(X)
X = layers.LayerNormalization(axis = [-2, -1])(X)


X = tf.keras.layers.Conv2D(filters=16, kernel_size=11, strides=1, padding='same')(X)
X = tf.keras.layers.Activation('relu')(X)
X = tf.keras.layers.MaxPool2D(pool_size=(1, 2))(X)
X = layers.LayerNormalization(axis = [-2, -1])(X)
_, _, sx, sy = X.shape
X = tf.keras.layers.Reshape((-1, int(sx * sy)))(X)

X = layers.Bidirectional(layers.GRU(80, return_sequences = True))(X)
X = layers.LayerNormalization()(X)


X = layers.Bidirectional(layers.GRU(40, return_sequences = True))(X)
X = layers.LayerNormalization()(X)

pred = layers.Dense(2, name="speech_and_music", activation='sigmoid')(X)

model = keras.Model(inputs = [mel_input], outputs = [pred])

keras.utils.plot_model(model, "multi_input_and_output_model.png", show_shapes=True)

model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=lr_schedule),
    loss=[tf.keras.losses.BinaryCrossentropy()], metrics=[tf.keras.metrics.BinaryAccuracy(), SpeechF1(), MusicF1()]
)

In [None]:
"""
Create a directory to train the model.
"""
os.mkdir("/content/drive/MyDrive/Models-2")

In [None]:
# Load the TensorBoard notebook extension
root_dir = "/content/drive/MyDrive/Models-2"
model_name = 'CRNN-2'
model_dir = os.path.join(root_dir, model_name)

try: 
    os.mkdir(model_dir) 
except OSError as error: 
    pass  

%load_ext tensorboard
import datetime, os
logdir = os.path.join(root_dir)
tensorboard_callback = tf.keras.callbacks.TensorBoard(os.path.join(root_dir, model_name), histogram_freq=1)

%tensorboard --logdir "{logdir}"

In [None]:
initial_epoch = 0
p = os.path.join(model_dir, 'custom_params.pickle')
if os.path.isfile(p):
  print("Entered if!!!")
  with open(p, 'rb') as f:
    custom_params = pickle.load(f)
    last_epoch = custom_params['last_epoch']
    initial_epoch = last_epoch
  model_path = os.path.join(model_dir, 'model-last-epoch.h5')
  print(model_path)
  model = tf.keras.models.load_model(model_path, custom_objects={ 
                  'binary_acc':binary_acc, 'TCN':TCN(), 'SpeechF1':SpeechF1(), 'MusicF1':MusicF1()})
  # model.load_weights(model_path)
  model.fit(training_generator, validation_data=validation_generator, epochs=300, initial_epoch = initial_epoch, 
            callbacks=[MyCustomCallback_3(model_dir, patience=20), tensorboard_callback], verbose=2)

else:
  print("Entered else!!!")

  model.fit(training_generator, validation_data=validation_generator, epochs=300,
            callbacks=[MyCustomCallback_3(model_dir, patience=20), tensorboard_callback], verbose=2)

In [None]:
print("Training is complete!!!")
import datetime
now = datetime.datetime.now()
print ("Current date and time : ")
print (now.strftime("%Y-%m-%d %H:%M:%S"))