# Importing Libraries

In [3]:
# TensorFlow and Keras Imports
import tensorflow as tf
from tensorflow.keras import regularizers as rg
from tensorflow import keras
from tensorflow.keras import layers

# Garbage Collection and Pickle Imports
import gc
import pickle as pkl

# Visualization Imports
import matplotlib.pyplot as plt

# Operating System and Numerical Computation Imports
import os
import numpy as np

# Scikit-learn Import
from sklearn.model_selection import train_test_split

# Time-related Import
import time

# Data Handling Imports
import pandas as pd


# Load Training Data

In [4]:
features_path = "/kaggle/input/timitpreprocessed/features.pkl"
labels_path = "/kaggle/input/timitpreprocessed/labels.pkl"

with open(features_path, 'rb') as pickle_file:
    features = pkl.load(pickle_file)

with open(labels_path, 'rb') as pickle_file:
    labels = pkl.load(pickle_file)

# Load Testing Data

In [5]:
test_features_path = "/kaggle/input/timitpreprocessed/test_features.pkl"
test_labels_path = "/kaggle/input/timitpreprocessed/test_labels.pkl"

with open(test_features_path, 'rb') as pickle_file:
    test_features = pkl.load(pickle_file)

with open(test_labels_path, 'rb') as pickle_file:
    test_labels = pkl.load(pickle_file)

## Create mapping functions for Phonemes

In [6]:
map_phonemes_61_to_39 = {
            'iy':'iy',  'ih':'ih',   'eh':'eh',  'ae':'ae',    'ix':'ih',  'ax':'ah',   'ah':'ah',  'uw':'uw',
            'ux':'uw',  'uh':'uh',   'ao':'aa',  'aa':'aa',    'ey':'ey',  'ay':'ay',   'oy':'oy',  'aw':'aw',
            'ow':'ow',  'l':'l',     'el':'l',  'r':'r',      'y':'y',    'w':'w',     'er':'er',  'axr':'er',
            'm':'m',    'em':'m',     'n':'n',    'nx':'n',     'en':'n',  'ng':'ng',   'eng':'ng', 'ch':'ch',
            'jh':'jh',  'dh':'dh',   'b':'b',    'd':'d',      'dx':'dx',  'g':'g',     'p':'p',    't':'t',
            'k':'k',    'z':'z',     'zh':'sh',  'v':'v',      'f':'f',    'th':'th',   's':'s',    'sh':'sh',
            'hh':'hh',  'hv':'hh',   'pcl':'h#', 'tcl':'h#', 'kcl':'h#', 'qcl':'h#','bcl':'h#','dcl':'h#',
            'gcl':'h#','h#':'h#',  '#h':'h#',  'pau':'h#', 'epi': 'h#','nx':'n',   'ax-h':'ah','q':'h#'
        }

phonemes_list_61 = list(map_phonemes_61_to_39.keys())
phonemes_list_39 = list(set(map_phonemes_61_to_39.values()))

label_to_phoneme39 = {}
phoneme39_to_label = {}
for index,phoneme in enumerate(phonemes_list_39):
    label_to_phoneme39[phoneme] = index + 1
    phoneme39_to_label[index + 1] = phoneme

map_phonemes_39_to_61 = {}

for phoneme61,phoneme39 in map_phonemes_61_to_39.items():
    if not phoneme39 in map_phonemes_39_to_61:
        map_phonemes_39_to_61[phoneme39] = []
    map_phonemes_39_to_61[phoneme39].append(phoneme61)

# Build CNN+RNN Model

In [7]:
from tensorflow.keras.layers import Flatten

def build_model(input_dim, output_dim, rnn_layers=5, rnn_units=128):
    """Model similar to DeepSpeech2."""
    # Model's input

    input_mfcc = layers.Input((None, input_dim), name="input")
    # Convolution layer 1
    x = layers.Conv1D(
        filters=32,
        kernel_size=11,

        padding="same",
        use_bias=False,
        name="conv_1",
    )(input_mfcc)
    x = layers.BatchNormalization(name="conv_1_bn")(x)
    x = layers.ReLU(name="conv_1_relu")(x)

    # Convolution layer 2
    x = layers.Conv1D(
        filters=32,
        kernel_size=11,

        padding="same",
        use_bias=False,
        name="conv_2",
    )(x)
    x = layers.BatchNormalization(name="conv_2_bn")(x)
    x = layers.ReLU(name="conv_2_relu")(x)

    # Model's input
    #input_mfcc = layers.Input((None, input_dim), name="input")  # None represents variable sequence length, input_dim should be defined according to your data

    # RNN layers
    x = layers.LSTM(128, return_sequences=True) (x)
    x = layers.LSTM(64, return_sequences=True) (x)

    #x = layers.GlobalAveragePooling1D()(x)

    # Dense layer
    output = layers.TimeDistributed(layers.Dense(output_dim, activation='softmax'))(x)

    model = keras.Model(input_mfcc, output, name="CNN_RNN")
    # Optimizer
    opt = keras.optimizers.Adam(learning_rate=1e-4)
    # Compile the model and return
    model.compile(optimizer=opt, loss='categorical_crossentropy', run_eagerly=True, metrics=['accuracy'])
    return model

n_mels = 64
fft_length =  384
# Get the model
model = build_model(
    input_dim= n_mels*3,
    output_dim=len(phoneme39_to_label),
    rnn_units=512,
)
model.summary(line_length=110)


def preprocess_features(features, labels):
    # Add your preprocessing steps here if any
    return features, labels

Model: "CNN_RNN"
______________________________________________________________________________________________________________
 Layer (type)                                    Output Shape                                Param #          
 input (InputLayer)                              [(None, None, 192)]                         0                
                                                                                                              
 conv_1 (Conv1D)                                 (None, None, 32)                            67584            
                                                                                                              
 conv_1_bn (BatchNormalization)                  (None, None, 32)                            128              
                                                                                                              
 conv_1_relu (ReLU)                              (None, None, 32)                            0 

## Create callback class to computer Phoneme Error Rate

In [8]:
import numpy as np
from tensorflow.keras.callbacks import Callback
import Levenshtein as lev

performance_cnn_rnn = {}

class PERCallback(Callback):
    def __init__(self, X_val, y_val, phoneme_mapping):
        self.X_val = X_val
        self.X_val = self.X_val.reshape(self.X_val.shape[0], 1, self.X_val.shape[1])
        self.y_val = y_val
        self.phoneme_mapping = phoneme_mapping

    def on_epoch_end(self, epoch, logs=None):
        gc.collect()
        # Get the model predictions
        predictions = self.model.predict(self.X_val)

        # Convert one-hot encoded vectors to phoneme sequences
        reference_phonemes = [self.one_hot_to_phoneme(vec) for vec in self.y_val]
        predicted_phonemes = [self.one_hot_to_phoneme(vec) for vec in predictions]

        # Calculate PER
        per = self.calculate_per(" ".join(reference_phonemes), " ".join(predicted_phonemes))
        performance_cnn_rnn[epoch] = per
        # Print PER
        print(f'\n Phoneme Error Rate after epoch {epoch}: {per}%')

    def one_hot_to_phoneme(self, one_hot_vector):
        index = np.argmax(one_hot_vector)
        return self.phoneme_mapping.get(index-1, "")

    import Levenshtein as lev

    def calculate_per(self, reference, hypothesis):
        distance = lev.distance(reference, hypothesis)
        per = distance / len(reference)
        return per * 100

# Train the model

In [9]:
# Define the number of epochs.
batch_size = 128

# This will split your data so that 70% is used for training and 30% for testing.
X_train, X_val, y_train, y_val = train_test_split(features, labels, test_size=0.3, random_state=42)

X_train_reshaped = X_train.reshape(X_train.shape[0], 1, X_train.shape[1])
X_val_reshaped = X_val.reshape(X_val.shape[0], 1, X_val.shape[1])

y_train_reshaped = y_train.reshape(y_train.shape[0], 1, y_train.shape[1])
y_val_reshaped = y_val.reshape(y_val.shape[0], 1, y_val.shape[1])

train_dataset = tf.data.Dataset.from_tensor_slices(
    (X_train_reshaped, y_train_reshaped)
)
train_dataset = (
    train_dataset.map(preprocess_features, num_parallel_calls=tf.data.AUTOTUNE)
    .padded_batch(batch_size)
    .prefetch(buffer_size=tf.data.AUTOTUNE)
)

validation_dataset = tf.data.Dataset.from_tensor_slices(
     (X_val_reshaped, y_val_reshaped)
)
validation_dataset = (
    validation_dataset.map(preprocess_features, num_parallel_calls=tf.data.AUTOTUNE)
    .padded_batch(batch_size)
    .prefetch(buffer_size=tf.data.AUTOTUNE)
)

epochs = 10

# Train the model
start_time = time.time()

history = model.fit(
    train_dataset,
    validation_data=validation_dataset,
    batch_size=batch_size,
    epochs=epochs,
    callbacks=[PERCallback(X_val, y_val, phoneme39_to_label)]
)

end_time = time.time()

# Calculate the training time
training_time = end_time - start_time
print(f"Training time: {training_time:.2f} seconds")

Epoch 1/10

 Phoneme Error Rate after epoch 0: 37.07162163568799%
Epoch 2/10

 Phoneme Error Rate after epoch 1: 34.565974691977004%
Epoch 3/10

 Phoneme Error Rate after epoch 2: 33.38679569340305%
Epoch 4/10

 Phoneme Error Rate after epoch 3: 33.92859001627291%
Epoch 5/10

 Phoneme Error Rate after epoch 4: 32.95395008483427%
Epoch 6/10

 Phoneme Error Rate after epoch 5: 32.493173356834795%
Epoch 7/10

 Phoneme Error Rate after epoch 6: 33.54648883275678%
Epoch 8/10

 Phoneme Error Rate after epoch 7: 32.33608249568889%
Epoch 9/10

 Phoneme Error Rate after epoch 8: 31.737818735709155%
Epoch 10/10

 Phoneme Error Rate after epoch 9: 31.736430853998314%
Training time: 10868.34 seconds


# Save training performance metrics

In [10]:
import json
# Collect training history metrics
training_metrics = {
    'epochs': list(range(1, len(history.history['accuracy']) + 1)),
    'accuracy': history.history['accuracy'],
    'loss': history.history['loss'],
    'val_acc': history.history['val_accuracy'],
    'val_loss': history.history['val_loss']
}

# Save the training metrics to a JSON file
with open('/kaggle/working/history_cnn_rnn.json', 'w') as file:
    json.dump(training_metrics, file, indent=4)

# Save the PER metrics to a JSON file
with open('/kaggle/working/per_cnn_rnn.json', 'w') as file:
    json.dump(performance_cnn_rnn, file, indent=4)

## Save the model

In [11]:
# Save the model as an HDF5 file
model.save("/kaggle/working/cnn_rnn_model.h5")

# Testing block

In [12]:
def compute_per(X, y, model, phoneme_mapping):
    X = X.reshape(X.shape[0], 1, X.shape[1])
    # Get the model predictions
    predictions = model.predict(X)

    # Convert one-hot encoded vectors to phoneme sequences
    reference_phonemes = [one_hot_to_phoneme(vec, phoneme_mapping) for vec in y]
    predicted_phonemes = [one_hot_to_phoneme(vec, phoneme_mapping) for vec in predictions]

    # Calculate PER
    per = calculate_per(" ".join(reference_phonemes), " ".join(predicted_phonemes))

    return per

def one_hot_to_phoneme(one_hot_vector, phoneme_mapping):
    index = np.argmax(one_hot_vector)
    return phoneme_mapping.get(index-1, "")

def calculate_per(reference, hypothesis):
    distance = lev.distance(reference, hypothesis)
    per = distance / len(reference)
    return per * 100

# Testing

In [13]:
X_test = test_features
y_test = test_labels
X_test = X_test.reshape(X_test.shape[0], 1, X_test.shape[1])
y_test = y_test.reshape(y_test.shape[0], 1, y_test.shape[1])

test_dataset = tf.data.Dataset.from_tensor_slices(
    (X_test, y_test)
)

test_dataset = (
    test_dataset.map(preprocess_features, num_parallel_calls=tf.data.AUTOTUNE)
    .padded_batch(batch_size)
    .prefetch(buffer_size=tf.data.AUTOTUNE)
)

# Compute testing performance metrics

In [14]:
# Evaluate the model
loss, accuracy = model.evaluate(test_dataset)



In [15]:
# Compute PER on the test data
per = compute_per(test_features, test_labels, model, phoneme39_to_label)

print(f'Loss: {loss}, Accuracy: {accuracy}, Phoneme Error Rate: {per}%')

Loss: 1.6500742435455322, Accuracy: 0.5115134119987488, Phoneme Error Rate: 30.411115028866735%
