In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
import os
import librosa
import json
import keras

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)
data_home = '/content/drive/My Drive/dl4m_final/trailer_dataset/'

In [None]:
# Must have id/label mapping
# e.g. for id 1, [0,0,1,0,1,0]
with open('genres.json') as f:
  label_mapper = json.load(f)

In [None]:
import utils_audio as u
import models_audio as m

In [None]:
# Some IDs cause model to crash - remove these
corrupt_ids = ["gYbW1F_c9eM", "VW-F1H-Nonk", "j9N0nvBITzk", "xNstK5rbzcw", "5tGgqyhCIXQ", "t2LI5OOifsQ", "lcwmDAYt22k", "RGyrxamYhUA", "vlEwqBrbPPU"]

In [None]:
# load data in.  this loads file paths, labels, and ids

train_data, train_labels, train_ids = u.load_data_first(data_home+"train/audio", corrupt_ids, label_mapper)
val_data, val_labels, val_ids = u.load_data_first(data_home+"validation/audio", corrupt_ids, label_mapper)
test_data, test_labels, test_ids = u.load_data_first(data_home+"test/audio", corrupt_ids, label_mapper)

In [None]:
# check sizes
print(len(train_ids))
print(len(val_ids))
print(len(test_ids))

In [None]:
# check ids
print(train_ids[:5])
print(val_ids[:5])
print(test_ids[:5])

In [None]:
# Waveform
sample_rate = 22050

# Spectrogram
n_mels = 128
hop_length = 512
audio_seg_size = 1 # seconds: how big the input to the CNN will be
segments_overlap = audio_seg_size/2 # seconds: how much overlap between windows
stft_length = int(np.ceil(sample_rate*audio_seg_size/hop_length))  # samples: how many windows the STFT will have

# The CNN receives windows of spectrograms
input_shape = (n_mels, stft_length, 1) 

# Augmentation
augment = False
pitch_shift_steps=2

input_args_train = [data_home+'train/audio/', augment, train_ids, sample_rate, pitch_shift_steps, n_mels, hop_length, audio_seg_size, segments_overlap, True] # Last arg is shuffle
input_args_val   = [data_home+'validation/audio/', augment, val_ids, sample_rate, pitch_shift_steps, n_mels, hop_length, audio_seg_size, segments_overlap, True]
input_args_test  = [data_home+'test/audio/', augment, test_ids, sample_rate, pitch_shift_steps, n_mels, hop_length, audio_seg_size, segments_overlap, False]

# create datasets
dataset_train = u.create_dataset(u.win_generator, input_args_train, input_shape)
dataset_val = u.create_dataset(u.win_generator, input_args_val, input_shape)
dataset_test= u.create_dataset(u.win_generator, input_args_test, input_shape)

In [None]:
dataset_train

In [None]:
dataset_val

In [None]:
dataset_test

In [None]:
# check windows
import matplotlib.pyplot as plt
# Look at the windows fit into the model
for sp, l in dataset_train.take(3):
  #print(sp)
  plt.imshow(sp)
  plt.show()

In [None]:
model = m.cnn_model(input_shape)

# Print model summary
model.summary()

In [None]:
# train basic CNN

import keras
import tensorflow as tf

batch_size = 32

callbacks = [
keras.callbacks.ModelCheckpoint(
    filepath=f"audio_convnet.keras",
    save_best_only=True,
    monitor="val_loss")
]

# Train the model
history = model.fit(dataset_train.prefetch(tf.data.AUTOTUNE).batch(batch_size).cache(),
    validation_data=dataset_val.prefetch(tf.data.AUTOTUNE).batch(batch_size).cache(),
    epochs=10,
    callbacks=callbacks,
)

In [None]:
# load model and get test loss/accuracy
model_reloaded = keras.models.load_model("audio_convnet.keras")

# Evaluate the model on the test set
test_loss, test_acc = model_reloaded.evaluate(dataset_test.prefetch(tf.data.AUTOTUNE).batch(batch_size))
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}")

In [None]:
# get test predictions
predictions = model_reloaded.predict(dataset_test.prefetch(tf.data.AUTOTUNE).batch(batch_size))

In [None]:
# save predictions
np.save("cnn_predictions", predictions)

In [None]:
# get test labels for each window
test_labels_ = np.concatenate([y for x, y in dataset_test], axis = 0)

In [None]:
# reshape to match predictions
test_labels_array = np.reshape(test_labels_,(32666,10))

In [None]:
# save labels
np.save("cnn_test_labels_shaped", test_labels_array)

In [None]:
# set decision threshold
y_pred_binary = (predictions < .5)

In [None]:
# create confusion matrix
from sklearn.metrics import multilabel_confusion_matrix
confusion = multilabel_confusion_matrix(y_true = test_labels_array, y_pred = y_pred_binary)

In [None]:
# save confusion matrix
np.save("cnn_confusion", confusion)

In [None]:
# look at confusion matrix
confusion

In [None]:
# create classification report
from sklearn.metrics import classification_report
class_report = classification_report(test_labels_array, y_pred_binary, output_dict = True)
class_report

In [None]:
# save classification report
import pickle as pkl
with open("cnn_class_report", "wb") as c:
  pkl.dump(class_report, c)

YAMNET

In [None]:
# create dataset

# Waveform
sample_rate = 16000  # 16kHz for Yamnet
augment = False
input_shape = (29*sample_rate,)

input_args_train = [data_home+'train/audio/', augment, train_ids, sample_rate]
input_args_val   = [data_home+'validation/audio/', augment, val_ids, sample_rate]
input_args_test  = [data_home+'test/audio/', augment, test_ids, sample_rate]

dataset_train = u.create_dataset(u.wav_generator, input_args_train, input_shape)
dataset_val = u.create_dataset(u.wav_generator, input_args_val, input_shape)
dataset_test= u.create_dataset(u.wav_generator, input_args_test, input_shape)


In [None]:
# import model
import tensorflow_hub as hub
yamnet = hub.load('https://tfhub.dev/google/yamnet/1')

In [None]:
# may need to install keras-tcn if using for the first time
#!pip install keras-tcn

In [None]:
from tcn import TCN
input_shape =  (60, 1024)
# Create a tcn model that processes the embeddings
tcn_yamnet = m.tcn_model(input_shape)

# Print model summary
tcn_yamnet.summary()

In [None]:
# Define a mapping function to extract embeddings
def map_function(audio, label):
   return extract_yamnet_embedding(audio, yamnet), label
   #return extract_yamnet_embedding(audio, yamnet), label

# Check input shape from example in the data
for e, l in dataset_train.map(map_function).take(1):
    print(e.shape)

In [None]:
# train tcn using yamnet embeddings
import keras

batch_size = 32
model_path = 'yamnet_model_BFC.json'
model_weights = "yamnet_weights_BFC.h5"

callbacks = [
    keras.callbacks.ModelCheckpoint(
      filepath=model_weights,
      save_best_only=True,
      save_weights_only=True,
      monitor="val_loss")
]

history = tcn_yamnet.fit(dataset_train.prefetch(tf.data.AUTOTUNE).map(map_function).batch(batch_size).cache(),
    validation_data=dataset_val.prefetch(tf.data.AUTOTUNE).map(map_function).batch(batch_size).cache(),
    epochs=20,
    callbacks=callbacks)

model_as_json = tcn_yamnet.to_json()
with open(model_path, "w") as json_file:
    json_file.write(model_as_json)

In [None]:
# Load the best checkpoint of the model 
from tensorflow.keras.models import model_from_json

# set hyperparameters

optimizer = 'adam'
# can use normal BinaryCrossentropy as well
loss = "BinaryFocalCrossentropy"
metrics = ["accuracy"]
model_path = "yamnet_model_BFC.json"
model_weights = "yamnet_weights_BFC.h5"
batch_size = 32


tcn_yamnet_reloaded = u.reload_tcn(model_path, model_weights, optimizer, loss, metrics)

# Evaluate the model on the test set
test_loss_yamnet, test_acc_yamnet = tcn_yamnet_reloaded.evaluate(dataset_test.prefetch(tf.data.AUTOTUNE).map(map_function).batch(batch_size))
print(f"Test Loss: {test_loss_yamnet:.4f}, Test Accuracy: {test_acc_yamnet:.4f}")

In [None]:
# get yamnet predictions
predictions = tcn_yamnet_reloaded.predict(dataset_test.prefetch(tf.data.AUTOTUNE).map(map_function).batch(batch_size))

In [None]:
# save yamnet predictions
np.save("yamnet_predictions", predictions)

In [None]:
# get labels for yamnet
test_labels_ = np.concatenate([y for x, y in dataset_test], axis = 0)

In [None]:
# reshape labels
test_labels_array = np.reshape(test_labels_,(562,10))

In [None]:
# save labels
np.save("yamnet_test_labels_shaped", test_labels_array)

In [None]:
# set decision threshold
y_pred_binary = (predictions > 0.5) 

In [None]:
# make confusion matrix
from sklearn.metrics import multilabel_confusion_matrix
confusion = multilabel_confusion_matrix(y_true = test_labels_array, y_pred = y_pred_binary)

In [None]:
# view confusion matrix
confusion

In [None]:
# save confusion matrix
np.save("confusion_yamnet", confusion)

In [None]:
# make classifiction report
from sklearn.metrics import classification_report
class_report = classification_report(test_labels_array, y_pred_binary, output_dict = True)

In [None]:
# view classification report
class_report

In [None]:
# save classification report
import pickle as pkl
with open("yamnet_class_report", "wb") as c:
  pkl.dump(class_report, c)

VGGISH

In [None]:
import tensorflow_hub as hub
# Load the model
vggish = hub.load('https://tfhub.dev/google/vggish/1')

In [None]:
def extract_vggish_embedding(wav_data):
  embeddings = vggish(wav_data)
  return embeddings

In [None]:
# Define a mapping function to extract embeddings
def map_function_vggish(audio, label):
    embedding = extract_vggish_embedding(audio)
    return embedding, label

# Extract input shape from example in the data
for e, l in dataset_train.map(map_function_vggish).take(1):
    print(e.shape)

In [None]:
input_shape =  (30, 128)
# Create a tcn model that processes the embeddings
tcn_vggish = m.tcn_model(input_shape)

# Print model summary
tcn_vggish.summary()

In [None]:
#train vggish model
import keras

batch_size = 32
model_path = 'vggish_model_class_weights.json'
model_weights = "vggish_weights_class_weights.h5"

callbacks = [
    keras.callbacks.ModelCheckpoint(
      filepath=model_weights,
      save_best_only=True,
      save_weights_only=True,
      monitor="val_loss")
]

history = tcn_vggish.fit(dataset_train.prefetch(tf.data.AUTOTUNE).map(map_function_vggish).batch(batch_size).cache(),
    validation_data=dataset_val.prefetch(tf.data.AUTOTUNE).map(map_function_vggish).batch(batch_size).cache(),
    epochs=10,
    class_weight = class_weights_wav,
    callbacks=callbacks)

model_as_json = tcn_vggish.to_json()
with open(model_path, "w") as json_file:
    json_file.write(model_as_json)

In [None]:
from tensorflow.keras.models import model_from_json

# set hyperparameters

batch_size = 32
model_path = 'vggish_model.json'
model_weights = "vggish_weights.h5"
optimizer = 'adam'
loss = "BinaryCrossentropy"
metrics = ["accuracy"]

print(model_weights)
# Load the best checkpoint of the model 
tcn_vggish_reloaded = u.reload_tcn(model_path, model_weights, optimizer, loss, metrics)
#tcn_vggish_reloaded = reload_tcn(model_path, model_weights, optimizer, loss, metrics)

# Evaluate the model on the test set
test_loss_vggish, test_acc_vggish = tcn_vggish_reloaded.evaluate(dataset_test.prefetch(tf.data.AUTOTUNE).map(map_function_vggish).batch(batch_size))
print(f"Test Loss: {test_loss_vggish:.4f}, Test Accuracy: {test_acc_vggish:.4f}")

In [None]:
# get labels
_y = np.concatenate([y for x, y in dataset_test], axis = 0)

In [None]:
# get predictions
predictions = tcn_vggish_reloaded.predict(dataset_test.prefetch(tf.data.AUTOTUNE).map(map_function_vggish).batch(batch_size))

In [None]:
# save predictions
np.save("predictions_vggish", predictions)

In [None]:
# load predictions
predictions = np.load("predictions_vggish.npy")

In [None]:
# reshape test labels
vggish_test_labels_shaped = np.reshape(_y, (562,10))

In [None]:
# save test labels
np.save("vggish_test_labels_shaped", vggish_test_labels_shaped)

In [None]:
# set decision threshold
y_pred_binary = (predictions > 0.5) 

In [None]:
# get confusion matrix
confusion = multilabel_confusion_matrix(y_true = vggish_test_labels_shaped, y_pred = y_pred_binary)

In [None]:
# display confusion matrix
confusion

In [None]:
# look at predictions / labels
print(predictions[10:15])
print(test_labels_array[10:15])

In [None]:
# save confusion matrix
np.save("confusion_vggish", confusion)

In [None]:
# make classification report
from sklearn.metrics import classification_report
class_report = classification_report(vggish_test_labels_shaped, y_pred_binary, output_dict = True)

In [None]:
# show classification report
class_report

In [None]:
# save classification report
import pickle as pkl
with open("vggish_class_report", "wb") as c:
  pkl.dump(class_report, c)

In [None]:
# TEST DO NOT USE
# def calculating_class_weights(y_true):
#     from sklearn.utils.class_weight import compute_class_weight
#     number_dim = np.shape(y_true)[1]
#     weights = np.empty([number_dim, 2])
#     for i in range(number_dim):
#         weights[i] = compute_class_weight('balanced', classes = [0.,1.], y = y_true[:, i])
#     return weights

In [None]:
# class_weights_wav = calculating_class_weights(test_labels_array)

In [None]:
# class_weights_wav

In [None]:
# TEST DO NOT USE
# def get_weighted_loss(weights):
#     def weighted_loss(y_true, y_pred):
#         print(type(y_true))
#         print(type(y_pred))
#         return K.mean((weights[:,0]**(1-y_true))*(weights[:,1]**(y_true))*K.binary_crossentropy(y_true, y_pred), axis=-1)
#     return weighted_loss