# Installing required libraries

In [None]:
!pip install jiwer

# Importing required libraries

The required libraries are:
- *Pandas*     :       for handling the dataframes
- *Numpy*      :       for handling the arrays
- *Tensorflow* :       for making the neural network model
- *Matplotlib* :       for visualizing the features extracted from audio data
- *Ipython*    :       for handling audio files  
- *Jiwer*      :       to compute the word error rate of the model

In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers #type:ignore
import matplotlib.pyplot as plt
from IPython import display
from jiwer import wer

# Downloading the dataset

In [None]:
data_url = "https://data.keithito.com/data/speech/LJSpeech-1.1.tar.bz2"
data_path = keras.utils.get_file("LJSpeech-1.1", data_url, untar=True)

In [None]:
wavs_path = data_path + "/wavs/"
metadata_path = data_path + "/metadata.csv"

In [None]:
metadata_df = pd.read_csv(metadata_path, header=None, quoting=3, sep="|" )

In [None]:
metadata_df.columns = [ 'file_name', 'transcription',  'normalized_trancription']
metadata_df = metadata_df[['file_name', 'normalized_trancription']]
metadata_df = metadata_df.sample(frac = 1).reset_index(drop = True)

## *NOTE: I am training my model only on 5000 datapoints*

In [None]:
metadata_df = metadata_df[:5000]

# Train Test Split

The train val split ratio is *0.9*.

In [None]:
df_train = metadata_df[:int(len(metadata_df) * 0.9)]
df_val = metadata_df[int(len(metadata_df) * 0.9):]

# Preprocessing

### Making the vocabulary to be used

In [None]:
characters = [x for x in "abcdefghijklmnopqrstuvwxyz'?! "] # Set of all characters
char_to_num = keras.layers.StringLookup(vocabulary = characters, oov_token = "") # Mapping chars to nums
num_to_char = keras.layers.StringLookup(vocabulary = char_to_num.get_vocabulary(), oov_token = "", invert = True) # Mapping nums to chars

In [None]:
frame_length = 256
frame_step = 160
fft_length = 384

In [None]:
def encode_single_sample(wav_file, label):

    # Read the wav file
    file = tf.io.read_file(wavs_path + wav_file + ".wav")

    # Decode the wav file
    audio, _ = tf.audio.decode_wav(file)
    audio = tf.squeeze(audio, axis = -1)

    # Change type to float
    audio = tf.cast(audio, tf.float32)

    # Get the spectrogram
    spectrogram = tf.signal.stft(audio, frame_length = frame_length, frame_step = frame_step, fft_length = fft_length)

    # Get the magnitude
    spectrogram = tf.abs(spectrogram)
    spectrogram = tf.math.pow(spectrogram, 0.5)

    # Normalize
    means = tf.math.reduce_mean(spectrogram, 1, keepdims = True)
    stddevs = tf.math.reduce_std(spectrogram, 1, keepdims = True)
    spectrogram = (spectrogram - means) / (stddevs + 1e-10)

    # Get the label
    label = tf.strings.lower(label)
    label = tf.strings.unicode_split(label, input_encoding = "UTF-8")
    label = char_to_num(label)

    return spectrogram, label

# Creating dataset object

In [None]:
batch_size = 32

# Training dataset
train_dataset = tf.data.Dataset.from_tensor_slices((list(df_train['file_name']), list(df_train['normalized_trancription'])))
train_dataset = train_dataset.map(encode_single_sample, num_parallel_calls = tf.data.AUTOTUNE).padded_batch(batch_size).prefetch(buffer_size = tf.data.AUTOTUNE)

# Validation dataset
val_dataset = tf.data.Dataset.from_tensor_slices((list(df_val['file_name']), list(df_val['normalized_trancription'])))
val_dataset = val_dataset.map(encode_single_sample, num_parallel_calls = tf.data.AUTOTUNE).padded_batch(batch_size).prefetch(buffer_size = tf.data.AUTOTUNE)

# Defining loss function

The loss function used here is CTC loss function, which is widely used in speech recognition purposes.

In [None]:
def CTCLoss(y_true, y_pred):

    batch_len = tf.cast(tf.shape(y_true)[0], dtype = "int64")
    input_length = tf.cast(tf.shape(y_pred)[1], dtype = "int64")
    label_length = tf.cast(tf.shape(y_true)[1], dtype = "int64")
    input_length = input_length * tf.ones(shape = (batch_len, 1), dtype = "int64")
    label_length = label_length * tf.ones(shape = (batch_len, 1), dtype = "int64")
    loss = keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)

    return loss

# Making the model

In [None]:
from tensorflow.keras import layers, models, optimizers

def build_model(input_dim, output_dim, rnn_layers=5, rnn_units=128):

    # Input layer
    input_spectrogram = layers.Input((None, input_dim), name="input")
    x = layers.Reshape((-1, input_dim, 1), name="expand_dim")(input_spectrogram)

    # Conv layer 1
    x = layers.Conv2D(
        filters=32,
        kernel_size=[11, 41],
        strides=[2, 2],
        padding="same",
        use_bias=False,
        name="conv_1"
    )(x)
    x = layers.BatchNormalization(name="conv_1_bn")(x)
    x = layers.ReLU(name="conv_1_relu")(x)

    # Conv layer 2
    x = layers.Conv2D(
        filters=16,
        kernel_size=[11, 21],
        strides=[1, 2],
        padding="same",
        use_bias=False,
        name="conv_2"
    )(x)
    x = layers.BatchNormalization(name="conv_2_bn")(x)
    x = layers.ReLU(name="conv_2_relu")(x)

    # Reshape
    x = layers.Reshape((-1, x.shape[-2] * x.shape[-1]), name="reshape_1")(x)

    # RNN layers
    for i in range(0, 3):
        recurrent = layers.GRU(
            units=rnn_units,
            activation="tanh",
            recurrent_activation="sigmoid",
            use_bias=True,
            return_sequences=True,
            reset_after=True,
            name=f"gru_{i}"
        )
        x = layers.Bidirectional(
            recurrent, name=f"bidirectional_{i}", merge_mode="concat"
        )(x)
        if i < rnn_layers:
            x = layers.Dropout(rate=0.5, name=f"dropout_{i}")(x)

    # Dense layers
    x = layers.Dense(units=rnn_units, name="dense_1")(x)
    x = layers.ReLU(name="dense_1_relu")(x)
    x = layers.Dropout(rate=0.5, name="dropout_final")(x)

    # Classification layer
    output = layers.Dense(units=output_dim + 1, activation="softmax", name="output")(x)

    # Model
    model = models.Model(input_spectrogram, output, name="DeepSpeech_2")

    # Optimizer
    opt = optimizers.Adam(learning_rate=1e-4)

    # Compile the model
    model.compile(optimizer=opt, loss="CTC")

    return model


In [None]:
model = build_model(
    input_dim = fft_length // 2 + 1,
    output_dim = char_to_num.vocabulary_size(),
    rnn_units = 512,
)

In [None]:
model.summary()

# Training the model

In [None]:
def decode_batch_predictions(pred):
    input_len = np.ones(pred.shape[0]) * pred.shape[1]
    results = keras.backend.ctc_decode(pred, input_length = input_len, greedy = True)[0][0]
    output_text = []
    for result in results:
        result = tf.strings.reduce_join(num_to_char(result)).numpy().decode("utf-8")
        output_text.append(result)
    return output_text

In [None]:
class CustomCallback(keras.callbacks.Callback):
    def __init__(self, dataset):
        super().__init__()
        self.dataset = dataset

    def on_epoch_end(self, epoch, logs = None):
        predictions = []
        targets = []
        for batch in self.dataset:
            X, y = batch
            batch_predictions = model.predict(X)
            batch_predictions = decode_batch_predictions(batch_predictions)
            predictions.extend(batch_predictions)
            for label in y:
                label = (
                    tf.strings.reduce_join(num_to_char(label)).numpy().decode("utf-8")
                )
                targets.append(label)

        wer_score = wer(targets, predictions)
        print("-" * 100)
        print(f"Word Error Rate: {wer_score:.4f}")
        print("-" * 100)
        for i in np.random.randint(0, len(predictions), 2):
            print(f"Target    : {targets[i]}")
            print(f"Prediction: {predictions[i]}")
            print("-" * 100)

In [None]:
epochs = 1
validation_callback = CustomCallback(val_dataset)
history = model.fit(
    train_dataset,
    validation_data = val_dataset,
    epochs = epochs,
    callbacks = [validation_callback],
)