<a href="https://colab.research.google.com/github/vic-commits/Speaker-Recognition/blob/main/project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Speaker Recognition Using FFTs in a ResNet architecture

## Import dependencies

In [None]:
from posixpath import join
import os
import shutil
import tensorflow as tf
import keras
from pathlib import Path
import numpy as np
import zipfile
import soundfile as sf
from IPython.display import Audio, display

## Dataset generation

In [None]:
#This model comes with a homemade dataset of 30 samples
DATASET_ROOT = "/content/drive/MyDrive/Colab Notebooks/speaker_recognition/dataset/audio"
DATASET_ROOT = Path(DATASET_ROOT)
AUDIO_SUBFOLDER = "audio"
DATASET_AUDIO_PATH = (DATASET_ROOT.parent / AUDIO_SUBFOLDER)
VALID_SPLIT = 0.2
SHUFFLE_SEED = 43
SAMPLING_RATE = 44100
SCALE =0.5
BATCH_SIZE = 128
EPOCHS = 100
MIN_AUDIO_LENGTH = 0.5
# read files
for folder in os.listdir(DATASET_AUDIO_PATH):
    if os.path.isdir(os.path.join(DATASET_AUDIO_PATH, folder)):
        if folder in AUDIO_SUBFOLDER:
            continue
# dataset generation
def paths_and_labels_to_dataset(audio_paths, labels):
    path_ds = tf.data.Dataset.from_tensor_slices(audio_paths)
    audio_ds = path_ds.map(
        lambda x: path_to_audio(x), num_parallel_calls=tf.data.AUTOTUNE
    )
    label_ds = tf.data.Dataset.from_tensor_slices(labels)
    return tf.data.Dataset.zip((audio_ds, label_ds))
def path_to_audio(path):
    audio = tf.io.read_file(path)
    audio, _ = tf.audio.decode_wav(audio, 1, SAMPLING_RATE)
    return audio

def add_noise(audio, noises=None, scale=0.5):
    if noises is not None:
        tf_rnd = tf.random.uniform(
            (tf.shape(audio)[0],), 0, noises.shape[0], dtype=tf.int32
        )
        noise = tf.gather(noises, tf_rnd, axis=0)
        prop = tf.math.reduce_max(audio, axis=1)/ tf.math.reduce_max(noise, axis=1)
        prop= tf.repeat(tf.expand_dims(prop, axis=1), tf.shape(audio)[1], axis=1)
        audio = audio + noise * prop * scale
        return audio

#Break each audio file in a FFT
def audio_to_fft(audio, min_n_fft=256, max_n_fft=2048):
    audio = tf.squeeze(audio, axis=-1)
    fft = tf.signal.fft(
        tf.cast(tf.complex(real=audio, imag=tf.zeros_like(audio)), tf.complex64)
    )
    fft = tf.expand_dims(fft, axis=-1)
    # Return the absolute value of the first half of the FFT
    # which represents the positive frequencies
    return tf.math.abs(fft[:, : (audio.shape[1] // 2), :])
class_names= os.listdir(DATASET_AUDIO_PATH)
print("Our class names: {}".format(class_names,))
num_classes = len(class_names)
audio_paths = []
labels = []
for label, name in enumerate(class_names):
  print("Processing speaker {}".format(name,))
  dir_path = Path(DATASET_AUDIO_PATH/name)
  if os.path.isdir(dir_path):
    speaker_sample_paths = [
        os.path.join(dir_path, filepath)
        for filepath in os.listdir(dir_path)]
    audio_paths += speaker_sample_paths
    labels += [label]*len(speaker_sample_paths)
print("Audio files count: {}. Classes: {}".format(len(audio_paths), len(class_names)))
rng = np.random.RandomState(SHUFFLE_SEED)
rng.shuffle(audio_paths)
rng = np.random.RandomState(SHUFFLE_SEED)
rng.shuffle(labels)

num_val_samples = int(VALID_SPLIT * len(audio_paths))
print("Using {} files for training.".format(len(audio_paths) - num_val_samples))
train_audio_paths = audio_paths[:-num_val_samples]
train_labels = labels[:-num_val_samples]
print("Using {} files for validation.".format(num_val_samples))
valid_audio_paths = audio_paths[-num_val_samples:]
valid_labels = labels[-num_val_samples:]
#Create 2 datasets, one for training and the other for validation
train_ds = paths_and_labels_to_dataset(train_audio_paths, train_labels)
train_ds = train_ds.shuffle(buffer_size=BATCH_SIZE * 8, seed=SHUFFLE_SEED).batch(
    BATCH_SIZE)
valid_ds = paths_and_labels_to_dataset(valid_audio_paths, valid_labels)
valid_ds = valid_ds.shuffle(buffer_size=32 * 8, seed=SHUFFLE_SEED).batch(32)
noise=tf.random.normal(shape=[BATCH_SIZE,SAMPLING_RATE])
train_ds = train_ds.map(
    lambda x, y: (audio_to_fft(x), y),
    num_parallel_calls=tf.data.AUTOTUNE,
)
train_ds = train_ds.prefetch(tf.data.AUTOTUNE)
valid_ds = valid_ds.map(
    lambda x, y: (audio_to_fft(x), y),
    num_parallel_calls=tf.data.AUTOTUNE,
)
valid_ds = valid_ds.prefetch(tf.data.AUTOTUNE)

Our class names: ['Michelle', 'Victoria', '.ipynb_checkpoints']
Processing speaker Michelle
Processing speaker Victoria
Processing speaker .ipynb_checkpoints
Audio files count: 30. Classes: 3
Using 24 files for training.
Using 6 files for validation.


## Model definition

In [None]:
from keras import regularizers
def residual_block(x, filters, conv_num=2, activation='relu'):
  s = keras.layers.Conv1D(filters, 1, padding='same', kernel_regularizer=regularizers.l2(0.01))(x)
  x = keras.layers.Conv1D(filters, 2, padding="same", kernel_regularizer=regularizers.l2(0.01))(x)
  x = keras.layers.Add()([x, s])
  x = keras.layers.Activation(activation)(x)
  return keras.layers.MaxPool1D(pool_size=1, strides=1)(x)
def build_model(input_shape, num_classes, dropout_rate=0.3, l2_regularization=0.01):
  inputs = keras.layers.Input(shape=input_shape, name='input')
  x = residual_block(inputs, 8)
  x = residual_block(x,16)
  x = keras.layers.AveragePooling1D(pool_size=1, strides=1)(x)
  x = keras.layers.Flatten()(x)
  x = keras.layers.Dense(8, activation='relu',kernel_regularizer=regularizers.l2(l2_regularization))(x)
  x = keras.layers.Dropout(dropout_rate)(x)
  outputs = keras.layers.Dense(num_classes, activation='softmax')(x)
  return keras.models.Model(inputs=inputs, outputs=outputs)

input_shape=(SAMPLING_RATE // 2, 1)
model = build_model(input_shape, num_classes)
model.summary()

optimizer = keras.optimizers.Adam(learning_rate=0.0001)
model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])

model_save_file = "model.keras"
earlystopping_cb = keras.callbacks.EarlyStopping(monitor='loss', patience=5, restore_best_weights=True)
mdlcheckpoint_cb = keras.callbacks.ModelCheckpoint(model_save_file, monitor="val_accuracy", save_best_only=True)
lr_scheduler = keras.callbacks.ReduceLROnPlateau(monitor='loss', factor=0.5, patience=3)

# Hyperparameter tuning

In [None]:
from scipy.stats import uniform, randint
import random
param_distribution = {
    'optimizer': ['Adam','SGD'],
    'learning_rate': uniform(loc=1e-5, scale=1e-3 - 1e-5),
    'batch_size': [32, 64, 128],
    'epochs': randint(low=50, high=151),
    'dropout_rate': uniform(loc=0.2, scale=0.4 - 0.2),
    'l2_regularization': uniform(loc=0.001, scale=0.01 - 0.001),
}

num_iterations = 20
best_accuracy = 0
best_hyperparameters = {}
for _ in range(num_iterations):
    hyperparameters = {
        'optimizer': random.choice(param_distribution['optimizer']),
        'learning_rate': param_distribution['learning_rate'].rvs(),
        'batch_size': random.choice(param_distribution['batch_size']),
        'epochs': param_distribution['epochs'].rvs(),
        'dropout_rate': param_distribution['dropout_rate'].rvs(),
        'l2_regularization': param_distribution['l2_regularization'].rvs(),
    }
    model = build_model(input_shape, num_classes, dropout_rate=hyperparameters['dropout_rate'], l2_regularization=hyperparameters['l2_regularization'])
    optimizer = getattr(keras.optimizers, hyperparameters['optimizer'])(learning_rate=hyperparameters['learning_rate'])
    model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])

    history = model.fit(
        train_ds,
        epochs=hyperparameters['epochs'],
        validation_data=valid_ds,
        callbacks=[earlystopping_cb, mdlcheckpoint_cb],
    )

    _, accuracy = model.evaluate(valid_ds)

    if accuracy > best_accuracy:
        best_accuracy = accuracy
        best_hyperparameters = hyperparameters

print("Best Hyperparameters:", best_hyperparameters)
print("Best Validation Accuracy:", best_accuracy)

Epoch 1/135
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 5s/step - accuracy: 0.4583 - loss: 1.4272 - val_accuracy: 0.6667 - val_loss: 1.6934
Epoch 2/135
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 3s/step - accuracy: 0.6250 - loss: 4.8211 - val_accuracy: 0.5000 - val_loss: 2.3266
Epoch 3/135
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step - accuracy: 0.6667 - loss: 4.3078 - val_accuracy: 0.6667 - val_loss: 1.2573
Epoch 4/135
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 944ms/step - accuracy: 0.8750 - loss: 0.9212 - val_accuracy: 0.6667 - val_loss: 1.3924
Epoch 5/135
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step - accuracy: 0.9167 - loss: 0.8434 - val_accuracy: 0.6667 - val_loss: 1.2052
Epoch 6/135
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step - accuracy: 0.7917 - loss: 1.9724 - val_accuracy: 0.6667 - val_loss: 1.0555
Epoch 7/135
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━

## Model training

In [None]:
history = model.fit(
    train_ds,
    epochs=EPOCHS,
    validation_data=valid_ds,
    callbacks=[earlystopping_cb, mdlcheckpoint_cb],
)

Epoch 1/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 929ms/step - accuracy: 0.5833 - loss: 657.6513 - val_accuracy: 0.3333 - val_loss: 2.7683
Epoch 2/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step - accuracy: 0.4583 - loss: 5.4440 - val_accuracy: 0.6667 - val_loss: 1.6219
Epoch 3/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 875ms/step - accuracy: 0.4583 - loss: 1.6221 - val_accuracy: 0.6667 - val_loss: 1.6218
Epoch 4/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 915ms/step - accuracy: 0.4583 - loss: 1.6219 - val_accuracy: 0.6667 - val_loss: 1.6216
Epoch 5/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step - accuracy: 0.4583 - loss: 1.6217 - val_accuracy: 0.6667 - val_loss: 1.6214


## Model evaluation

In [None]:
print(model.evaluate(valid_ds))

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 161ms/step - accuracy: 0.3333 - loss: 2.7683
[2.768285036087036, 0.3333333432674408]


## Model demonstration

In [None]:
SAMPLES_TO_DISPLAY = 10
test_ds = paths_and_labels_to_dataset(valid_audio_paths, valid_labels)
test_ds = test_ds.shuffle(buffer_size=BATCH_SIZE * 8, seed=SHUFFLE_SEED).batch(
    BATCH_SIZE
)

for audios, labels in test_ds.take(1):
  actual_batch_size = tf.shape(audios)[0]
  noise = tf.random.normal(shape=[actual_batch_size, SAMPLING_RATE, 1])
  break

test_ds = test_ds.map(
    lambda x, y: (add_noise(x, noise, scale=SCALE), y),
    num_parallel_calls=tf.data.AUTOTUNE,
)

for audios, labels in test_ds.take(1):
    ffts = audio_to_fft(audios)
    y_pred = model.predict(ffts)
    actual_batch_size = tf.shape(audios)[0]
    rnd = np.random.randint(0, actual_batch_size.numpy(), SAMPLES_TO_DISPLAY)
    SAMPLES_TO_DISPLAY = min(SAMPLES_TO_DISPLAY, actual_batch_size.numpy())
    audios = audios.numpy()[rnd, :]
    labels = labels.numpy()[rnd]
    y_pred = np.argmax(y_pred, axis=-1)[rnd]
    for index in range(SAMPLES_TO_DISPLAY):
      print(
        'Speaker:\33{} {}\33[0M\tPredicted:\33{} {}\33[0m'.format(
            "green" if labels[index] == y_pred[index] else "red",
            class_names[labels[index]],
            "green" if labels[index] == y_pred[index] else "red",
            class_names[y_pred[index]],
          )
      )
      display(Audio(audios[index, :, :].squeeze(), rate=SAMPLING_RATE))

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 199ms/step
Speaker:red Victoria[0M	Predicted:red Michelle[0m


Speaker:green Michelle[0M	Predicted:green Michelle[0m
Speaker:red Victoria[0M	Predicted:red Michelle[0m
Speaker:green Michelle[0M	Predicted:green Michelle[0m
Speaker:red Victoria[0M	Predicted:red Michelle[0m


Speaker:green Michelle[0M	Predicted:green Michelle[0m
