# Recognition of the Speaker

---

In [3]:
# importing libraries
import os
import shutil
import numpy as np
import tensorflow as tf
from tensorflow import keras
from pathlib import Path
from IPython.display import display, Audio

In [4]:
# loading the dataset
DATASET = os.path.join(os.path.expanduser("~"), "Speaker Recognition\\16000_pcm_speeches")

# Folders for audio and noise samples
AUDIO = "audio"
NOISE = "noise"

AUDIO_PATH = os.path.join(DATASET, AUDIO)
NOISE_PATH = os.path.join(DATASET, NOISE)

## Setting up Directories

In [5]:
# If folders `audio` and `noise` do not exist, create it, otherwise do nothing
if os.path.exists(AUDIO_PATH) is False:
    # make new directory
    os.makedirs(AUDIO_PATH)
if os.path.exists(NOISE_PATH) is False:
    # make new directory
    os.makedirs(NOISE_PATH)



# If folder is `
for folder in os.listdir(DATASET):
    if os.path.isdir(os.path.join(DATASET, folder)):
        if folder in [AUDIO, NOISE]:
            # If folder is `audio` or `noise`, do nothing
            continue
        elif folder in ["other", "_background_noise_"]:
            # else move it to the `noise` folder
            shutil.move(
                os.path.join(DATASET, folder),
                os.path.join(NOISE_PATH, folder),
            )
        else:
            # Otherwise, it should be a speaker folder, then move it to `audio` folder
            shutil.move(
                os.path.join(DATASET, folder),
                os.path.join(AUDIO_PATH, folder),
            )


# Get the list of all noise files
noise_paths = []
for subdir in os.listdir(NOISE_PATH):
    subdir_path = Path(NOISE_PATH) / subdir
    if os.path.isdir(subdir_path):
        noise_paths += [
            os.path.join(subdir_path, filepath)
            for filepath in os.listdir(subdir_path)
            if filepath.endswith(".wav")
        ]

print(
    "Found {} files belonging to {} directories".format(
        len(noise_paths), len(os.listdir(NOISE_PATH))
    )
)


Found 6 files belonging to 2 directories


## Preprocessing

Setting up configurations

In [1]:
# We take 10% samples for validation purposes
SPLIT = 0.1
# To shuffle the noise and samples
SEED = 34
# The sampling rate for all the audio samples
SAMPLING_RATE = 16000
# The factor to multiply noises 
SCALE = 0.5
# Batch size per epoch
BATCH_SIZE = 128
# Number of epochs
NUM_EPOCHS = 10

In [None]:
# Split noise into chunks of 16,000 steps each
def load_noise_sample(path):
    sample, sampling_rate = tf.audio.decode_wav(
        tf.io.read_file(path), desired_channels=1
    )
    if sampling_rate == SAMPLING_RATE: # remember, we set the sampling rate to be 16000
        slices = int(sample.shape[0] / SAMPLING_RATE)
        sample = tf.split(sample[: slices * SAMPLING_RATE], slices)
        return sample
    else:
        print("Sampling rate for {} is incorrect. Ignoring it".format(path))

In [None]:
# Load the noises
noises = []
for path in noise_paths:
    sample = load_noise_sample(path)
    if sample:
        noises.extend(sample)
noises = tf.stack(noises)

print("{} noise files were split into {} noise samples where each is {} sec. long".format(
    len(noise_paths), noises.shape[0], noises.shape[1] // SAMPLING_RATE
    )
)

In [None]:
# Constructs the dataset of audio and labels
def paths_and_labels_to_dataset(audio_paths, labels):
    # path
    path = tf.data.Dataset.from_tensor_slices(audio_paths)
    # audio 
    audio = path.map(lambda x: path_to_audio(x))
    # labels
    label = tf.data.Dataset.from_tensor_slices(labels)
    return tf.data.Dataset.zip((audio, label))

In [None]:
# Decodes the audio file
def path_to_audio(path):
    audio = tf.io.read_file(path)
    audio, _ = tf.audio.decode_wav(audio, 1, SAMPLING_RATE)
    return audio


In [None]:
# Adds noise 
def add_noise(audio, noises=None, scale=0.5):
    if noises is not None:
        # Create a random tensor of the same size as audio ranging from
        # 0 to the number of noise stream samples that we have.
        tf_rnd = tf.random.uniform(
            (tf.shape(audio)[0],), 0, noises.shape[0], dtype=tf.int32
        )
        noise = tf.gather(noises, tf_rnd, axis=0)
        
        # Get the amplitude proportion between the audio and the noise
        prop = tf.math.reduce_max(audio, axis=1) / tf.math.reduce_max(noise, axis=1)
        prop = tf.repeat(tf.expand_dims(prop, axis=1), tf.shape(audio)[1], axis=1)

        # Adding the rescaled noise to audio
        audio = audio + noise * prop * scale

    return audio


In [None]:
# Fourier Transformation
def audio_ff_transformation(audio):
    # Since tf.signal.fft applies FFT on the innermost dimension,
    # we need to squeeze the dimensions and then expand them again
    # after FFT
    audio = tf.squeeze(audio, axis=-1)
    fft = tf.signal.fft(
        tf.cast(tf.complex(real=audio, imag=tf.zeros_like(audio)), tf.complex64)
    )
    fft = tf.expand_dims(fft, axis=-1)

    # Return the absolute value of the first half of the FFT
    # which represents the positive frequencies
    return tf.math.abs(fft[:, : (audio.shape[1] // 2), :])


In [None]:
# Get the list of audio file paths along with their corresponding labels

class_names = os.listdir(AUDIO_PATH)
print("Our class names: {}".format(class_names,))

audio_paths = []
labels = []
for label, name in enumerate(class_names):
    print("Processing speaker {}".format(name,))
    dir_path = Path(AUDIO_PATH) / name
    speaker_sample_paths = [
        os.path.join(dir_path, filepath)
        for filepath in os.listdir(dir_path)
        if filepath.endswith(".wav")
    ]
    audio_paths += speaker_sample_paths
    labels += [label] * len(speaker_sample_paths)

print(
    "Found {} files belonging to {} classes.".format(len(audio_paths), len(class_names))
)

In [None]:
# Shuffle
random_gen = np.random.RandomState(SEED)
random_gen.shuffle(audio_paths)
random_gen = np.random.RandomState(SEED)
random_gen.shuffle(labels)

## Splitting the Dataset

In [None]:
# Split into training and validation
num_val_samples = int(SPLIT * len(audio_paths))
print("Using {} files for training.".format(len(audio_paths) - num_val_samples))
train_audio_paths = audio_paths[:-num_val_samples]
train_labels = labels[:-num_val_samples]

print("Using {} files for validation.".format(num_val_samples))
valid_audio_paths = audio_paths[-num_val_samples:]
valid_labels = labels[-num_val_samples:]

# Create 2 datasets, one for training and the other for validation
train_set = paths_and_labels_to_dataset(train_audio_paths, train_labels)
train_set = train_set.shuffle(buffer_size=BATCH_SIZE * 8, seed=SEED).batch(
    BATCH_SIZE
)

valid_set = paths_and_labels_to_dataset(valid_audio_paths, valid_labels)
valid_set = valid_set.shuffle(buffer_size=32 * 8, seed=SEED).batch(32)


In [None]:
# Adding noise to train data
train_set = train_set.map(
    lambda x, y: (add_noise(x, noises, scale=SCALE), y),
    num_parallel_calls=tf.data.AUTOTUNE,
)

# Transform audio wave to the frequency domain using audio_ff_transformation (Fast Fourier Transformation)
train_set = train_set.map(
    lambda x, y: (audio_ff_transformation(x), y), num_parallel_calls=tf.data.AUTOTUNE
)
train_set = train_set.prefetch(tf.data.AUTOTUNE)

valid_set = valid_set.map(
    lambda x, y: (audio_ff_transformation(x), y), num_parallel_calls=tf.data.AUTOTUNE
)
valid_set = valid_set.prefetch(tf.data.AUTOTUNE)

## Defining the model

In [None]:
def resnet_block(x, filters, conv_num=3, activation="relu"):
    # Developing Resnet blocks
    s = keras.layers.Conv1D(filters, 1, padding="same")(x)
    for i in range(conv_num - 1): # 2
        x = keras.layers.Conv1D(filters, 3, padding="same")(x)
        x = keras.layers.Activation(activation)(x)
    x = keras.layers.Conv1D(filters, 3, padding="same")(x)
    x = keras.layers.Add()([x, s])
    x = keras.layers.Activation(activation)(x)
    return keras.layers.MaxPool1D(pool_size=2, strides=2)(x)

In [None]:
def build_model(input_shape, num_classes):
    inputs = keras.layers.Input(shape=input_shape, name="input")
    # Deploying Resnet blocks
    x = resnet_block(inputs, 16, 2)
    x = resnet_block(x, 32, 2)
    x = resnet_block(x, 64, 3)
    # pooling layer
    x = keras.layers.AveragePooling1D(pool_size=3, strides=3)(x)
    x = keras.layers.Flatten()(x)
    # two dense layers with RELU
    x = keras.layers.Dense(128, activation="relu")(x)
    x = keras.layers.Dense(64, activation="relu")(x)
    # Output with Softmax activation
    outputs = keras.layers.Dense(num_classes, activation="softmax", name="output")(x)

    return keras.models.Model(inputs=inputs, outputs=outputs)

In [None]:
model = build_model((SAMPLING_RATE // 2, 1), len(class_names))

In [None]:
model.summary()

# Compile the model using Adam's default learning rate
model.compile(
    optimizer="Adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]

In [None]:
model_save_filename = "model.h5"


# Early Stopping to stop training if model doesn't improve
earlystopping_cb = keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)

# Model checkpoint to make sure that model has the best validation accuracy
mdlcheckpoint_cb = keras.callbacks.ModelCheckpoint(model_save_filename, monitor="val_accuracy", save_best_only=True)

## Training the model

In [None]:
# Fit the model
history = model.fit(
    train_set,
    epochs=NUM_EPOCHS,
    validation_data=valid_set,
    callbacks=[earlystopping_cb, mdlcheckpoint_cb],
)

In [None]:
# Evaluate
print(model.evaluate(valid_set))

## Testing the performance

In [None]:
SAMPLES_TO_DISPLAY = 10

test_set = paths_and_labels_to_dataset(valid_audio_paths, valid_labels)
test_set = test_set.shuffle(buffer_size=BATCH_SIZE * 8, seed=SEED).batch(
    BATCH_SIZE
)

test_set = test_set.map(lambda x, y: (add_noise(x, noises, scale=SCALE), y))

In [None]:
for audios, labels in test_set.take(1):
    # Get the signal FFT
    ffts = audio_ff_transformation(audios)
    # Predict
    y_pred = model.predict(ffts)
    # Take random samples
    rnd = np.random.randint(0, BATCH_SIZE, SAMPLES_TO_DISPLAY)
    # adjusting back the audios and labels
    audios, labels = audios.numpy()[rnd, :, :], labels.numpy()[rnd]
    y_pred = np.argmax(y_pred, axis=-1)[rnd]

    for index in range(SAMPLES_TO_DISPLAY):
        # For every sample, print the true and predicted label
        # as well as run the voice with the noise
        print(
            "Speaker:\33{} {}\33[0m\tPredicted:\33{} {}\33[0m".format(
                "[92m" if labels[index] == y_pred[index] else "[91m",
                class_names[labels[index]],
                "[92m" if labels[index] == y_pred[index] else "[91m",
                class_names[y_pred[index]],
            )
        )
        display(Audio(audios[index, :, :].squeeze(), rate=SAMPLING_RATE))