In [1]:
%matplotlib inline
# from celluloid import Camera # getting the camera
import json
import glob
import os
import random
import math
import librosa
import itertools
import shutil
import random
import warnings
import numpy as np
import scipy as sp
import pandas as pd
from PIL import Image
import librosa.display
import IPython.display as ipd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import tensorflow.keras as keras
import tensorflow as tf
from tensorflow.keras import backend as K
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.models import Sequential
from tensorflow.keras.preprocessing.image import load_img 
from tensorflow.keras.preprocessing.image import img_to_array 
from tensorflow.keras.applications.imagenet_utils import decode_predictions 
from tensorflow.keras.applications import VGG16, ResNet50, VGG19, DenseNet121, InceptionV3, Xception
from tensorflow.keras.layers import Dense, Dropout, Flatten, BatchNormalization, Activation
from tensorflow.keras.layers import Conv2D, MaxPooling2D
from tensorflow.keras.utils import to_categorical
from tensorflow.keras import utils as np_utils
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from keras_preprocessing.image.dataframe_iterator import DataFrameIterator
from matplotlib.offsetbox import ( OffsetImage,
                                  AnnotationBbox)
from IPython.display import HTML # to show the animation in Jupyter
warnings.simplefilter(action='ignore', category=FutureWarning)

In [2]:
def load_data(data_path):
    """Loads training dataset from json file.
        :param data_path (str): Path to json file containing data
        :return X (ndarray): Inputs
        :return y (ndarray): Targets
    """

    with open(data_path, "r") as fp:
        data = json.load(fp)

    # convert lists to numpy arrays
    X = np.array(data["mfcc"],dtype=np.float64)
    Z = np.array(data["melspec"],dtype=np.float64)
    Q = np.array(data["spect"],dtype=np.float64)
    y = np.array(data["labels"])
    mp = data['mapping']
    print("Data succesfully loaded!")

    return  X, y,Z,Q,mp






def plot_history(history):
    """Plots accuracy/loss for training/validation set as a function of the epochs
        :param history: Training history of model
        :return:
    """

    fig, axs = plt.subplots(2)

    # create accuracy sublpot
    axs[0].plot(history.history["accuracy"], label="train accuracy")
    axs[0].plot(history.history["val_accuracy"], label="test accuracy")
    axs[0].set_ylabel("Accuracy")
    axs[0].legend(loc="lower right")
    axs[0].set_title("Accuracy eval")

    # create error sublpot
    axs[1].plot(history.history["loss"], label="train error")
    axs[1].plot(history.history["val_loss"], label="test error")
    axs[1].set_ylabel("Error")
    axs[1].set_xlabel("Epoch")
    axs[1].legend(loc="upper right")
    axs[1].set_title("Error eval")

    plt.show()

In [3]:
# path to json file that stores MFCCs and genre labels for each processed segment
paths = "/net/projects/scratch/winter/valid_until_31_July_2022/ybrima/data"

JSON_PATH = f"{paths}/data_13.json"

# Audio hyperparamters
sr = 22050

In [4]:
def plot_sample(x,sr):
    
    if(x.shape[0] > x.shape[1]):
        x =  x.T
    librosa.display.specshow(x.T, sr=sr, x_axis='time',y_axis="log",hop_length=HOP_LENGTH)
    plt.colorbar(format='%+2.f')  

In [6]:
def normalize(train, test): 
    mu, sigma = train.mean(),train.std()
    train = (train - mu) / sigma 
    mu, sigma = test.mean(),test.std()
    test = (test - mu) / sigma 
    return train, test 

In [11]:
def prepare_datasets(test_size, validation_size):
    """Loads data and splits it into train, validation and test sets.
    :param test_size (float): Value in [0, 1] indicating percentage of data set to allocate to test split
    :param validation_size (float): Value in [0, 1] indicating percentage of train set to allocate to validation split
    :return X_train (ndarray): Input training set
    :return X_validation (ndarray): Input validation set
    :return X_test (ndarray): Input test set
    :return y_train (ndarray): Target training set
    :return y_validation (ndarray): Target validation set
    :return y_test (ndarray): Target test set
    """

    # load data
    X, y,Z,Q,mp = load_data(JSON_PATH)

    # create train, validation and test split
    X_train, X_test, y_train, y_test = train_test_split(Z, y, test_size=test_size)
    X_train, X_validation, y_train, y_validation = train_test_split(X_train, y_train, test_size=validation_size)

    # add an axis to input sets
    X_train = X_train[..., np.newaxis]
    X_validation = X_validation[..., np.newaxis]
    X_test = X_test[..., np.newaxis]

    return X_train, X_validation, X_test, y_train, y_validation, y_test

In [8]:
def build_model(input_shape):
    """Generates CNN model
    :param input_shape (tuple): Shape of input set
    :return model: CNN model
    """

    # build network topology
    model = keras.Sequential()

    # 1st conv layer
    model.add(keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=input_shape))
    model.add(keras.layers.MaxPooling2D((3, 3), strides=(2, 2), padding='same'))
    model.add(keras.layers.BatchNormalization())

    # 2nd conv layer
    model.add(keras.layers.Conv2D(32, (3, 3), activation='relu'))
    model.add(keras.layers.MaxPooling2D((3, 3), strides=(2, 2), padding='same'))
    model.add(keras.layers.BatchNormalization())

    # 3rd conv layer
    model.add(keras.layers.Conv2D(32, (2, 2), activation='relu'))
    model.add(keras.layers.MaxPooling2D((2, 2), strides=(2, 2), padding='same'))
    model.add(keras.layers.BatchNormalization())
    

    # flatten output and feed it into dense layer
    model.add(keras.layers.Flatten())
    model.add(keras.layers.Dense(64, activation='relu'))
    model.add(keras.layers.Dropout(0.3))

    # output layer
    model.add(keras.layers.Dense(13, activation='softmax'))

    return model

In [12]:
def predict(model, X, y):
    """Predict a single sample using the trained model
    :param model: Trained classifier
    :param X: Input data
    :param y (int): Target
    """

    # add a dimension to input data for sample - model.predict() expects a 4d array in this case
    X = X[np.newaxis, ...] # array shape (1, 130, 13, 1)

    # perform prediction
    prediction = model.predict(X)

    # get index with max value
    predicted_index = np.argmax(prediction, axis=1)

    print("Target: {}, Predicted label: {}".format(y, predicted_index))

In [None]:
# get train, validation, test splits
X_train, X_validation, X_test, y_train, y_validation, y_test = prepare_datasets(0.25, 0.2)

In [None]:
used = set()
label = [x for x in mp if x not in used and (used.add(x) or True)]



In [None]:
# create network
input_shape = (X_train.shape[1], X_train.shape[2], 1)
model = build_model(input_shape)

# compile model

optim_params = dict(
    learning_rate = 0.0001,
    momentum = 0.9394867962846013,
    decay = 0.0001
)

optimiser = keras.optimizers.SGD(**optim_params)
model.compile(optimizer=optimiser,
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

model.summary()


In [None]:
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping, TensorBoard

check_point= ModelCheckpoint(
    # 'model_v2.best.h5', 
    'weighted_model_v2.best.h5', 
    monitor='val_loss', verbose=1, 
    save_best_only=True, save_weights_only=False, save_freq=1
)

reduce_lr = ReduceLROnPlateau(
    monitor='val_loss', factor=0.2, patience=5, verbose=1, min_lr=0.00001
)

early_stop= EarlyStopping(
    monitor='val_loss', 
    min_delta=0.001, patience=11, verbose=1, restore_best_weights=True
)



In [None]:
# train model
history = model.fit(X_train, y_train, validation_data=(X_validation, y_validation), batch_size=32, epochs=30,callbacks=[check_point, early_stop, reduce_lr])

# Working with standard CNN architectures

In [None]:
# plot accuracy/error for training and validation
plot_history(history)

In [13]:
# training parameters
NUM_CLASSES = 13
BATCH_SIZE = 32
CLASS_MODE = 'categorical'
# CLASS_MODE = 'binary'
COLOR_MODE = 'grayscale'
TARGET_SIZE = (87, 87)
EPOCHS = 50
SEED = 214
input_shape = (87, 256, 1)

In [15]:
def print_layers(model):
    for idx, layer in enumerate(model.layers):
        print("layer {}: {}, trainable: {}".format(idx, layer.name, layer.trainable))


K.clear_session()

# base_model = VGG16(weights='imagenet', include_top=False, input_shape=TARGET_SIZE+(3,) )
# base_model = VGG16(weights='weights/vgg16_weights_tf_dim_ordering_tf_kernels_notop.h5', include_top=False, input_shape=TARGET_SIZE+(3,))

# base_model = VGG16(weights='imagenet', include_top=False, input_shape=TARGET_SIZE+(3,))

base_model = VGG19(weights=None, include_top=False, input_shape=input_shape)
# base_model = ResNet50(weights='imagenet', include_top=False, input_shape=TARGET_SIZE+(3,))
# base_model = DenseNet121( weights='weights/densenet121_weights_tf_dim_ordering_tf_kernels_notop.h5', include_top=False, input_shape=TARGET_SIZE+(3,))
# base_model = InceptionV3(weights='weights/inception_v3_weights_tf_dim_ordering_tf_kernels_notop.h5', include_top=False, input_shape=TARGET_SIZE+(3,))
# can also try other architectures


x = base_model.output
x = Flatten()(x)
# x = Dense(512, activation='relu')(x)
# x = Dropout(0.1)(x)
# x = Dense(256, activation='relu')(x)
# x = Dropout(0.1)(x)
# x = Dense(128, activation='relu')(x)
# x = Dropout(0.1)(x)
# x = Dense(64, activation='relu')(x)
# x = Dropout(0.1)(x)

x = Dense(NUM_CLASSES, activation='softmax')(x)

model = Model(inputs=base_model.input, outputs=x)

# print(model.summary())

for layer in model.layers[0:-14]:
    layer.trainable = False

print_layers(model)

layer 0: input_1, trainable: False
layer 1: block1_conv1, trainable: False
layer 2: block1_conv2, trainable: False
layer 3: block1_pool, trainable: False
layer 4: block2_conv1, trainable: False
layer 5: block2_conv2, trainable: False
layer 6: block2_pool, trainable: False
layer 7: block3_conv1, trainable: False
layer 8: block3_conv2, trainable: False
layer 9: block3_conv3, trainable: False
layer 10: block3_conv4, trainable: True
layer 11: block3_pool, trainable: True
layer 12: block4_conv1, trainable: True
layer 13: block4_conv2, trainable: True
layer 14: block4_conv3, trainable: True
layer 15: block4_conv4, trainable: True
layer 16: block4_pool, trainable: True
layer 17: block5_conv1, trainable: True
layer 18: block5_conv2, trainable: True
layer 19: block5_conv3, trainable: True
layer 20: block5_conv4, trainable: True
layer 21: block5_pool, trainable: True
layer 22: flatten, trainable: True
layer 23: dense, trainable: True


In [16]:
print(model.summary())

Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 87, 256, 1)]      0         
_________________________________________________________________
block1_conv1 (Conv2D)        (None, 87, 256, 64)       640       
_________________________________________________________________
block1_conv2 (Conv2D)        (None, 87, 256, 64)       36928     
_________________________________________________________________
block1_pool (MaxPooling2D)   (None, 43, 128, 64)       0         
_________________________________________________________________
block2_conv1 (Conv2D)        (None, 43, 128, 128)      73856     
_________________________________________________________________
block2_conv2 (Conv2D)        (None, 43, 128, 128)      147584    
_________________________________________________________________
block2_pool (MaxPooling2D)   (None, 21, 64, 128)       0     

In [17]:
from tensorflow.keras.optimizers import SGD

optim_params = dict(
    learning_rate = 0.0001,
    momentum = 0.9394867962846013,
    decay = 0.0001
)

model.compile(
  loss='categorical_crossentropy',
  optimizer=SGD(**optim_params),
  metrics=['accuracy'])

model.summary()

Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 87, 256, 1)]      0         
_________________________________________________________________
block1_conv1 (Conv2D)        (None, 87, 256, 64)       640       
_________________________________________________________________
block1_conv2 (Conv2D)        (None, 87, 256, 64)       36928     
_________________________________________________________________
block1_pool (MaxPooling2D)   (None, 43, 128, 64)       0         
_________________________________________________________________
block2_conv1 (Conv2D)        (None, 43, 128, 128)      73856     
_________________________________________________________________
block2_conv2 (Conv2D)        (None, 43, 128, 128)      147584    
_________________________________________________________________
block2_pool (MaxPooling2D)   (None, 21, 64, 128)       0     