# libraries

In [None]:
import librosa
import random
import soundfile as sf
import numpy as np
from tensorflow import keras
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report

# functions

In [None]:
def get_mfcc(file, n_mfcc=20):
    """
    Extract MFCC from speech signal.
    """
    s, sr = librosa.load(file, sr=None)
    mfcc = librosa.feature.mfcc(y=s,
                                sr=sr,
                                n_mfcc=n_mfcc).T
    return mfcc

def load_data_libritts(file):
    """
    Load training data.
    """
    mfcclist = []
    spklist = []
    libritts_spklist = []
    with open(file) as f:
        for line in f:
            if '.wav' in line:
                info = line.strip().split(',')
                wavfile = info[1]
                speaker = info[-1]
                libritts_spk = wavfile.replace('train/','').split('_')[0]
                spklist.append(speaker)
                libritts_spklist.append(libritts_spk)
                mfcc = get_mfcc(wavfile, n_mfcc=40)
                mfcclist.append(mfcc)
    return spklist, mfcclist, libritts_spklist

def load_test_data_libritts(file, nmfcc=40):
    """
    Load test data.
    """
    mfcclist = []
    libritts_spklist = []
    idlist = []
    with open(file) as f:
        for line in f:
            if '.wav' in line:            
                info = line.strip().split(',')
                idspk = info[0]
                wavfile = info[1]
                libritts_spk = wavfile.replace('test/','').split('_')[0]
                libritts_spklist.append(libritts_spk)
                idlist.append(idspk)
                mfcc = get_mfcc(wavfile, n_mfcc=nmfcc)
                mfcclist.append(mfcc)
    return libritts_spklist, mfcclist, idlist

def preprocess_input(spklist, mfcclist, libritts_spklist, pad_length=None):
    """
    Preprocess training input.
    """
    speakers = {}
    for spk, libritts_spk in zip(spklist, libritts_spklist):
        speakers[libritts_spk]=spk
    X = keras.preprocessing.sequence.pad_sequences(mfcclist, maxlen=pad_length)
    y = keras.utils.to_categorical(spklist, dtype='float32')
    return X, y, speakers

def preprocess_test(test_libritts_spklist, test_mfcclist, speakers, pad_length=None):
    """
    Preprocess test input.
    """
    test_spklist = []
    for spk in test_libritts_spklist:
        test_spklist.append(int(speakers.get(spk)))
    X_test = keras.preprocessing.sequence.pad_sequences(test_mfcclist, maxlen=pad_length)
    y_test = keras.utils.to_categorical(test_spklist, num_classes=90, dtype='float32')
    return X_test, y_test, test_spklist

# load training data and preprocess it

In [None]:
# load all data in memory
# NOTE: this is not a good practice
spklist, mfcclist, libritts_spk = load_data_libritts('train.csv')

In [None]:
# preprocess the data
X, y, speakers = preprocess_input(spklist, mfcclist, libritts_spk)
print(f'Shape of the input matrix: {X.shape}')
print(f'Shape of the label matrix: {y.shape}')
print(f'LibriTTS speaker ids and indices: {speakers}')
num_mfcc = X.shape[2]
num_speakers = y.shape[1]
print(f'Number of speakers: {num_speakers}')
print(f'Number of effective speakers: {len(speakers)}\n')

# build the model

In [None]:
# build sequential model
model = keras.Sequential([
    keras.layers.Conv1D(80, 7, padding='same', input_shape=(None, num_mfcc)),
    keras.layers.BatchNormalization(),
    keras.layers.MaxPooling1D(7),
    keras.layers.LeakyReLU(0.2),
    keras.layers.Conv1D(120, 7, padding='same'),
    keras.layers.BatchNormalization(),
    keras.layers.MaxPooling1D(7),
    keras.layers.LeakyReLU(0.2),
    keras.layers.Conv1D(160, 7, padding='same'),
    keras.layers.BatchNormalization(),
    keras.layers.MaxPooling1D(7),
    keras.layers.LeakyReLU(0.2),
    keras.layers.Dropout(0.2),
    keras.layers.GlobalMaxPooling1D(),
    keras.layers.Dense(num_speakers, activation='softmax')
])

model.summary()

In [None]:
# hyperparameters
BATCH_SIZE = 32
LR = 0.001
BETA1 = 0.9
BETA2 = 0.999
EPSILON = 1.0e-8
DECAY = 0.0
NUM_EPOCHS = 30

In [None]:
# optimizer
opt = keras.optimizers.Adam(learning_rate=LR,
                            beta_1=BETA1,
                            beta_2=BETA2,
                            epsilon=EPSILON,
                            decay=DECAY)

# loss
loss = keras.losses.categorical_crossentropy

# metrics to be considered during training
met = [keras.metrics.categorical_accuracy]

# callbacks: early stop and model checkpoint (save best model)
callbacks_list = [
    keras.callbacks.EarlyStopping(
        monitor='categorical_accuracy',
        patience=10),
    keras.callbacks.ModelCheckpoint(
        filepath='best_model.hdf5',
        monitor='val_categorical_accuracy',
        save_best_only=True),
]

In [None]:
# compilation
model.compile(optimizer=opt, loss=loss, metrics=met)

# training the model

In [None]:
# training
history = model.fit(X,
                y,
                epochs=NUM_EPOCHS,
                batch_size=BATCH_SIZE,
                shuffle=True,
                validation_split=0.2,
                callbacks=callbacks_list,
                verbose=1)

# check if the training went well: learning curves

In [None]:
# learning curves
history_dict = history.history
history_dict.keys()

# losses
loss_values = history_dict['loss']
val_loss_values = history_dict['val_loss']

# accuracies
acc_values = history_dict['categorical_accuracy']
val_acc_values = history_dict['val_categorical_accuracy']

epochs = range(len(loss_values))

fig, (ax1, ax2) = plt.subplots(2,1, figsize=(10,10))

ax1.plot(epochs, loss_values, 'bo', label="Training Loss")
ax1.plot(epochs, val_loss_values, 'b', label="Validation Loss")
ax1.set_title('Training and Validation Loss')
ax1.set_xlabel('Epochs')
ax1.set_ylabel('Loss Value')
ax1.legend()

ax2.plot(epochs, acc_values, 'ro', label="Training Accuracy")
ax2.plot(epochs, val_acc_values, 'r', label="Validation Accuracy")
ax2.set_title('Training and Validation Accuraccy')
ax2.set_xlabel('Epochs')
ax2.set_ylabel('Accuracy')
ax2.legend()

plt.show()

# test the model

In [None]:
# load best weights
model.load_weights('best_model.hdf5')

In [None]:
# load test data
test_libritts_spklist, test_mfcclist, test_idlist = load_test_data_libritts('test.csv')

In [None]:
# preprocess test data
X_test, y_test, test_spklist = preprocess_test(test_libritts_spklist, test_mfcclist, speakers)
print(f'Shape of the input matrix: {X.shape}')
print(f'Shape of the label matrix: {y.shape}')
num_test_speakers = y_test.shape[1]
print(f'Number of test speakers: {num_test_speakers}')
print(f'Number of effective test speakers: {len(set(test_spklist))}\n')

In [None]:
# Given that we have the speaker ids from the sentence names, evaluate the model in the test set
eval = model.evaluate(X_test, y_test, batch_size=BATCH_SIZE, verbose=1)
print(f'Accuracy in the test set: {eval[1]}')

In [None]:
# make predictions using the test set
y_pred = model.predict(X_test, batch_size=BATCH_SIZE, verbose=1)

In [None]:
# get labels and shows classification report
labels_test = np.argmax(y_test, axis=1)
labels_pred = np.argmax(y_pred, axis=1)
print(classification_report(labels_test, labels_pred))

In [None]:
# plot confusion matrix
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import seaborn as sn
import pandas as pd
cm = confusion_matrix(labels_test, labels_pred)
df_cm = pd.DataFrame(cm, range(84), range(84))
plt.figure(figsize=(15,8))
sn.set(font_scale=0.8) # for label size
sn.heatmap(df_cm, annot=True, annot_kws={"size": 1}) # font size
plt.show()

# write the submission file

In [None]:
# build output for submission
output_info = ['id,speaker']
for i, idspk in enumerate(test_idlist):
    pred_label = np.argmax(y_pred[i], axis=-1)
    output_info.append(f'{idspk},{pred_label}')

submission_file = 'submission_v003.csv'

# write submission
with open(submission_file, 'w') as f:
    for data in output_info:
        f.write(data+'\n')