Includes:

In [None]:
import tensorflow as tf
from tensorflow import keras
import numpy as np
import seaborn as sn
import os
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix
from data_paths import get_file_paths_ordered
from tf_lite_conversion import convert_to_tf_lite, eval_tf_lite_model
from scipy.io import wavfile
from IPython.display import Audio
from pre_process import choose_tot_slice_len, get_data_tensors, compute_mfccs
from models import get_model

Number of speakers and the percentage of the available samples to use to consider:

In [None]:
# Number of speakers and the percentage of the available samples to use to consider:
num_speakers = 15
model_name = 'test_' + str(num_speakers) + '_spk'
dataset_percentage = 1. # 0.85
segmentLength=1024
print("Number of speakers : ", num_speakers)

Save Locations

In [None]:

model_dir_root = './Models/'+ f'{model_name}/'

path_keras_model = model_dir_root + 'KerasModels/'
path_tf_lite_nq = model_dir_root + 'TFLiteModelsNonQuantized/'
path_tf_lite_q = model_dir_root + 'TFLiteModelsQuantized/'
# path_c_model = model_dir_root + 'TFLiteModelsQuantized/'
path_figures = model_dir_root + 'Figures/'

# Create the directories
paths = [path_keras_model, path_tf_lite_nq, path_tf_lite_q, path_figures]
for path in paths:
    if not os.path.exists(path):
        os.makedirs(path)

print('Models will be saved to: ', model_dir_root)


model_name_keras = 'keras_'+ model_name + '.h5'
model_name_tf_lite_nq = 'tflite_nq_'+ model_name + '.tflite'
model_name_tf_lite_q = 'tflite_q_'+ model_name + '.tflite'
# model_name_c = 'c_' + model_name

Read data:

In [None]:
paths_train, paths_test, y_train_l, y_test_l, all_paths_l = \
    get_file_paths_ordered(num_speaker=num_speakers, test_ratio=0.2, balanced_dataset=False, 
                           path_plot_sv=path_figures)

slice_len, durations = choose_tot_slice_len(paths=all_paths_l)


In [None]:
nb_samps = len(all_paths_l)
plt.figure(figsize=(10,9))
plt.title("Sample Lengths")
plt.xlabel("Duration [sec]")
plt.ylabel("Samples")
plt.barh(np.arange(0, nb_samps, 1), durations, height=0.9, label='samples')
plt.grid()
plt.axvline(x=slice_len, ymin=0, ymax=nb_samps, color='r', label='slice_length')
plt.xlim((0, 35))
plt.xticks(np.append(plt.xticks()[0], slice_len))
plt.legend()
plt.savefig(path_figures + 'slice_len_sel.png')
plt.show()

In [None]:
f_s, x_train, y_train, x_test, y_test = get_data_tensors(paths_train=paths_train, paths_test=paths_test, 
                                                         y_train_l=y_train_l, y_test_l=y_test_l,
                                                         tot_slice_len=slice_len,
                                                         used_train_sz_rat=dataset_percentage, 
                                                         used_test_sz_rat=1.,
                                                         segmentLength=segmentLength)

Play a random sample:

In [None]:
i=1
fs_i, audio_data_i = wavfile.read(paths_train[i])
display(Audio(audio_data_i, rate=fs_i))

Pre-Processing:

In [None]:
f_low=80.
f_up=7600.
num_mel_bins=80
num_mfcc=13

with tf.device("/cpu:0"):
    x_train_mfcc = compute_mfccs(x_train, frame_length=segmentLength, sample_rate=f_s, 
                                lower_edge_hertz=f_low, upper_edge_hertz=f_up,
                                num_mel_bins=num_mel_bins, num_mfcc=num_mfcc)
    x_test_mfcc  = compute_mfccs(x_test, frame_length=segmentLength, sample_rate=f_s, 
                                lower_edge_hertz=f_low, upper_edge_hertz=f_up,
                                num_mel_bins=num_mel_bins, num_mfcc=num_mfcc)

Plot an example MFCC

In [None]:
plt.figure(figsize=(10, 4))
extent = [0, x_train_mfcc[i].shape[0]*segmentLength/fs_i, 0, num_mfcc]
plt.imshow(tf.transpose(x_train_mfcc[i], [1, 0, 2]), interpolation="nearest", origin="lower", 
           aspect="auto", extent=extent, cmap='nipy_spectral')
plt.colorbar()
plt.xlabel("Time [s]")
plt.ylabel("MFCC index")
plt.grid()
plt.savefig(path_figures + 'mfcc_example.png')
plt.show()

# Model:

In [None]:
train_set = x_train_mfcc
test_set = x_test_mfcc

In [None]:
from models import get_model
model_idx = 1
input_shape = train_set.shape
model = get_model(input_shape=input_shape, nb_classes=num_speakers, model_idx=model_idx)
model.build(input_shape=input_shape)
model.summary()

Save the model summary:

In [None]:
from contextlib import redirect_stdout

with open(model_dir_root + 'modelsummary.txt', 'w') as f:
    with redirect_stdout(f):
        model.summary()

Define learning parameters:

In [None]:
batchSize = 32 #8 # nb of togetherly processed segments(of 1024 samples each) 
epochs = 150 #40 # nb of back propagations
loss_fct = 'sparse_categorical_crossentropy'

Compile the model and Fit the Data:

In [None]:
model.compile(
    loss=loss_fct, 
    optimizer=tf.keras.optimizers.Adam(), 
    metrics=['accuracy']
)

callbacks = [
    keras.callbacks.ModelCheckpoint(path_keras_model + model_name_keras, save_best_only=True),
    keras.callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.2, patience=15, min_lr=1.0e-5,verbose=1),
    keras.callbacks.EarlyStopping(monitor="val_loss", patience=30, verbose=1),       
]

history_keras = model.fit(x=train_set, 
                          y=y_train, 
                          batch_size=batchSize, 
                          epochs=epochs, 
                          validation_split=0.2,
                          callbacks = callbacks)

Evaluate the performance on the test set:

In [None]:
y_pred = model.predict(x_test_mfcc)

test_loss, test_acc = model.evaluate(x_test_mfcc,  y_test, verbose=2)
print('Test accuracy:', test_acc)
print('Test loss:', test_loss)

In [None]:
fig, ax1 = plt.subplots()
ax2 = ax1.twinx()

# plt.figure()
ax1.plot(history_keras.history['accuracy'], label='Keras-Training')
ax1.plot(history_keras.history['val_accuracy'], label='Keras-Validation')
ax1.axhline(y=test_acc, xmin=0, xmax=epochs, linestyle='-.', color='g', label='Keras-Test')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Accuracy')
ax1.set_ylim([1/num_speakers, 1])
ax2.plot(history_keras.history['lr'], color='y', linestyle='--')
ax2.set_ylabel('Learning rate', color='y')
ax1.legend(loc='right')
plt.grid()
ax1.grid()
ax2.grid()
plt.title("Accuracy")
plt.savefig(path_figures + 'keras_acc_epochs.png')
plt.show()

In [None]:
fig, ax1 = plt.subplots()
ax2 = ax1.twinx()

# plt.figure()
ax1.plot(history_keras.history['loss'], label='Keras-Training')
ax1.plot(history_keras.history['val_loss'], label='Keras-Validation')
ax1.axhline(y=test_loss, xmin=0, xmax=epochs, linestyle='-.', color='g', label='Keras-Test')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss')
ax1.grid()
ax2.plot(history_keras.history['lr'], color='y', linestyle='--')
ax2.set_ylabel('Learning rate', color='y')
ax2.grid()
ax1.legend(loc='upper right')
plt.grid()
plt.title(loss_fct)
plt.savefig(path_figures + 'keras_loss_epochs.png')
plt.show()

In [None]:
y_pred_hard = np.argmax(y_pred, axis=1)
fig = plt.figure(figsize=(10,9))
cm = confusion_matrix(y_test, y_pred_hard, normalize='true')
sn.heatmap(cm, annot=True)
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.savefig(path_figures + 'keras_cm.png')
plt.show()

Information about the saved Kerras Model on disk:

In [None]:
h5_in_kb = os.path.getsize(path_keras_model+model_name_keras) / 1024
print("Keras model file : ", model_name_keras)
print("HDF5 Keras Model size: %d KB" % h5_in_kb)

# TF Lite Conversion and Quantization:

In [None]:
from tf_lite_conversion import convert_to_tf_lite
tflite_model_nq, tflite_model_q, sizes_on_disk, comp_rats, legends = \
    convert_to_tf_lite(keras_model=model,
                       path_keras_model=path_keras_model + model_name_keras, 
                       path_tf_lite_nq_model=path_tf_lite_nq + model_name_tf_lite_nq,
                       train_set=train_set, 
                       path_tf_lite_q_model=path_tf_lite_q + model_name_tf_lite_q)

Plot the disk size comparisons:

In [None]:
# Plots for comparison
fig, ax1 = plt.subplots()
ax2 = ax1.twinx()
width=0.25
x_ax_tmp = np.arange(len(legends))

ax1.bar(x_ax_tmp, sizes_on_disk, color='b', width=width, edgecolor='k', label='Size')
ax1.set_ylabel("Sizes of the models [KB]", color='b')
ax1.legend(loc='upper center')

ax2.bar(x_ax_tmp+width, comp_rats, color='g', width=width, edgecolor='k', label='Compression')
ax2.set_ylabel("Compression Ratios", color='g')
ax2.legend(loc='center', bbox_to_anchor=(0.52, 0.85))

plt.grid()
plt.title("Disk Size Comparisons")
plt.xticks(x_ax_tmp+width/2, legends)
plt.savefig(path_figures + 'size_comps.png')
plt.show()

# Performance evaluation Non-Quantized Model

In [None]:
pred_nq = eval_tf_lite_model(path_tf_lite_model=path_tf_lite_nq + model_name_tf_lite_nq,
                             test_set=test_set, 
                             quantized=False)

In [None]:
tflite_nq_score = accuracy_score(y_test, pred_nq)
print("Accuracy of non-quantized tflite model is {}%".format(tflite_nq_score*100))
print("Compared to float32 accuracy of {}%".format(test_acc*100))
print("We have a change of {}%".format((tflite_nq_score-test_acc)*100))

In [None]:
plt.figure(figsize=(10,9))
cm = confusion_matrix(y_test, pred_nq, 
                      normalize='true')
sn.heatmap(cm, annot=True)
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.savefig(path_figures + 'tflite_nq_cm.png')
plt.show()

# Performance evaluation Quantized Model

In [None]:
pred_q = eval_tf_lite_model(path_tf_lite_model=path_tf_lite_q + model_name_tf_lite_q,
                            quantized=True, 
                            test_set=test_set)

In [None]:
tflite_q_score = accuracy_score(y_test, pred_q)
print("Accuracy of quantized to int8 model is {}%".format(tflite_q_score*100))
print("Compared to float32 accuracy of {}%".format(test_acc*100))
print("We have a change of {}%".format((tflite_q_score-test_acc)*100))

In [None]:
plt.figure(figsize=(10,9))
cm = confusion_matrix(y_test, pred_q, 
                      normalize='true')
sn.heatmap(cm, annot=True)
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.savefig(path_figures + 'tflite_q_cm.png')
plt.show()

# Accuracy comparison

In [None]:
# Plots for comparison
fig, ax1 = plt.subplots()
ax2 = ax1.twinx()
width=0.25
x_ax_tmp = np.arange(len(legends))

ax1.bar(x_ax_tmp, sizes_on_disk, color='b', width=width, edgecolor='k', label='Size')
ax1.set_ylabel("Sizes of the models [KB]", color='b')
ax1.legend(loc='upper right',  bbox_to_anchor=(0.77, 0.73))

ax2.bar(x_ax_tmp+width, [test_acc, tflite_nq_score, tflite_q_score], 
        color='g', width=width, edgecolor='k', label='Accuracy')
ax2.set_ylabel("Accuracy", color='g')
ax2.set_ylim([0., 1.])
ax2.axhline(y=test_acc, xmin=0., xmax=1., linestyle='-.', color='r', label='KerasAcc')
ax2.legend(loc='upper right',  bbox_to_anchor=(0.85, 0.85))

plt.grid()
plt.title("Accuracy vs Size Comparisons")
plt.xticks(x_ax_tmp+width/2, legends)
plt.savefig(path_figures + 'acc_comps.png')
plt.show()