Import all libraries

In [1]:
import os
import re
import pandas as pd
import numpy as np 
import tensorflow as tf 

from keras.layers import Input, LSTM, Dense, Dropout, BatchNormalization, Bidirectional, TimeDistributed, Lambda
from keras.models import Model, Sequential
from keras.optimizers import Adam
from keras import backend as K

import librosa as lb
from librosa.effects import trim
import librosa.display

from sklearn.model_selection import train_test_split

Setting Up the datasets

In [22]:
dataset_path = "dataset/"
metadata = "Datasets.csv"

audio_directory = "dataset/"

# Create a dataframe for the transcript
dataframe = pd.read_csv(metadata)

# Preprocess transcript
def preprocess_text(text):
    text = text.lower()  # Convert text to lowercase
    text = re.sub(r"[^a-zA-Z0-9\s]", "", text)  # Remove non-alphanumeric characters (except spaces)
    text = text.replace(" ", "")  # Remove all whitespace
    return text

dataframe['clean_transcript'] = dataframe['Transcription'].apply(preprocess_text)

print(dataframe.head())

  File_Path Speaker         Transcription Session clean_transcript
0     03M_1     03M  1 2 3 4 5 6 7 8 9 10       1      12345678910
1     03M_2     03M                   ata       2              ata
2     03M_3     03M                   ana       3              ana
3     03M_4     03M                   ara       4              ara
4     03M_5     03M                  atha       5             atha


Data Pre-Processing

In [23]:
# Function to preprocess audio and connect to transcripts
def combine_audio_with_transcript(directory, dataframe):
    audio_data = []    
     # Iterate over each row in the DataFrame
    for index, row in dataframe.iterrows():
        file_name = row['File_Path']  # Get the file name from the CSV (without .wav)
        transcript = row['clean_transcript']  # Get the transcript
        
        # Construct the full file path by combining directory and file name with .wav extension
        file_path = os.path.join(directory, f"{file_name}.wav")
        
        # Check if the file exists in the audio directory
        if os.path.exists(file_path):
            try:
                # Append the processed data along with the transcript
                audio_data.append({
                    "file_path": file_path,
                    "transcript": transcript,
                })
            except Exception as e:
                print(f"Error processing file {file_path}: {e}")
    
    return audio_data

# Preprocess audio files and connect to transcripts
audio_data_with_transcripts = combine_audio_with_transcript(audio_directory, dataframe)
print(audio_data_with_transcripts)

[{'file_path': 'dataset/01F_1.wav', 'transcript': '12345678910'}, {'file_path': 'dataset/01F_3.wav', 'transcript': 'ana'}, {'file_path': 'dataset/01F_4.wav', 'transcript': 'ara'}]


Prepare dataset

In [34]:
# Get Dataset from the folder

prepared_audio_data = []

# Dataset file path
dataset_file_path = [item['file_path'] for item in audio_data_with_transcripts]

# Transcript
dataset_transcript = [item['transcript'] for item in audio_data_with_transcripts]

max_length = 0
sample_rate = 16000
num_mels = 128

try:
    for file_path in dataset_file_path:
        y, sr = librosa.load(file_path, sr=sample_rate, mono=True)
        temp_length = len(y)
        
        if temp_length > max_length:
            max_length = temp_length
except Exception as e:
    print(f"Error loading {file_path}: {e}")
    
try:
    for file_path in dataset_file_path:
        # Load the audio file
        y, sr = lb.load(file_path, sr=sample_rate, mono=True)
        
        # Trim silent edges of the audio
        y, _ = lb.effects.trim(y)
        
        # Normalize the audio
        y = lb.util.normalize(y)
        
        # Pad the audio to the maximum length
        if len(y) < max_length:
            y = np.pad(y, (0, max_length - len(y)))
        else:
            y = y[:max_length]  # This line can be skipped if no truncation is desired
        
        # Convert to Mel spectrogram
        mel_spec = lb.feature.melspectrogram(y=y, sr=sr, n_mels=num_mels)
        
        # Convert to dB scale (log scale)
        mel_spec_db = lb.power_to_db(mel_spec, ref=np.max)
        
        # Normalize the spectrogram between 0 and 1
        mel_spec_db = (mel_spec_db - np.min(mel_spec_db)) / (np.max(mel_spec_db) - np.min(mel_spec_db))
        
        # Append the processed Mel spectrogram to the list
        prepared_audio_data.append(mel_spec_db.T)  # Transpose to match the expected input shape

        print(f"Loaded {file_path} with shape {y.shape}")

except Exception as e:
    print(f"Error loading {file_path}: {e}")

Loaded dataset/01F_1.wav with shape (148857,)
Loaded dataset/01F_3.wav with shape (148857,)
Loaded dataset/01F_4.wav with shape (148857,)


In [47]:
# Convert to numpy arrays

x_audio = np.array(prepared_audio_data)
y_label = np.array(dataset_transcript)

# Tokenize the transcripts

tokenizer = tf.keras.layers.TextVectorization(output_mode='int', split='whitespace', standardize=None)
tokenizer.adapt(y_label)

# Encode transcripts
y_encoded = tokenizer(y_label)

# Vocabulary size (needed for the model output layer)
vocab_size = tokenizer.vocabulary_size()

# print(f'Input Shape:  {x_audio}')
print(f"Tokenize Transcripts: {y_encoded}")

Tokenize Transcripts: [[4]
 [3]
 [2]]


Set up the model

In [None]:
# Define input shape
input_shape = (None, x_audio.shape[2])  # Variable time steps, fixed features
inputs = Input(shape=input_shape, name="audio_input")

# BiLSTM Layers
x = Bidirectional(LSTM(128, return_sequences=True, name="bilstm_1"))(inputs)
x = BatchNormalization(name="batch_norm_1")(x)
x = Dropout(0.4, name="dropout_1")(x)

x = Bidirectional(LSTM(128, return_sequences=True, name="bilstm_2"))(x)
x = BatchNormalization(name="batch_norm_2")(x)
x = Dropout(0.4, name="dropout_2")(x)

x = Bidirectional(LSTM(128, return_sequences=True, name="bilstm_3"))(x)
x = BatchNormalization(name="batch_norm_3")(x)
x = Dropout(0.4, name="dropout_3")(x)

x = Bidirectional(LSTM(64, return_sequences=True, name="bilstm_4"))(x)
x = BatchNormalization(name="batch_norm_4")(x)
x = Dropout(0.4, name="dropout_4")(x)


# Output layer with vocabulary size + 1 (for CTC blank token)
output = TimeDistributed(Dense(vocab_size + 1, activation="softmax"), name="output")(x)

# Define standalone ASR model
model = Model(inputs=inputs, outputs=output, name="ASR_BiLSTM_Model")

# CTC Loss Function
def ctc_lambda_func(args):
    y_pred, labels, input_length, label_length = args

    # Ensure input_length and label_length are 1D tensors
    input_length = tf.squeeze(input_length, axis=-1)
    label_length = tf.squeeze(label_length, axis=-1)

    return tf.nn.ctc_loss(
        labels=labels,
        logits=y_pred,
        label_length=tf.cast(label_length, dtype=tf.int32),
        logit_length=tf.cast(input_length, dtype=tf.int32),
        logits_time_major=False
    )

# Additional inputs for CTC loss
labels = Input(name="labels", shape=(None,), dtype="int32")  # Target transcripts
input_length = Input(name="input_length", shape=(1,), dtype="int32")  # Length of input sequences
label_length = Input(name="label_length", shape=(1,), dtype="int32")  # Length of label sequences

# Compute CTC loss using a Lambda layer
ctc_loss = Lambda(ctc_lambda_func, output_shape=(1,), name="ctc_loss")(
    [output, labels, input_length, label_length]
)

# Define the full CTC Model
ctc_model = Model(inputs=[inputs, labels, input_length, label_length], outputs=ctc_loss)

learning_rate = 1e-4

optimizer = Adam(learning_rate = learning_rate)

# Compile the model with a dummy loss (CTC loss is computed manually)
ctc_model.compile(optimizer=optimizer, loss=lambda y_true, y_pred: y_pred)

# Print model summary
ctc_model.summary()

Train Model

In [71]:

# Compute input lengths (number of time steps per audio before padding)
input_lengths = np.array([audio.shape[0] for audio in prepared_audio_data])  # Time steps per sample

# Compute label lengths (number of tokens in each transcript)
label_lengths = np.array([len(label) for label in y_label])  # Tokens per transcript

print(f"x_audio shape: {x_audio.shape}")
print(f"y_encoded shape: {y_encoded.shape}")
print(f"input_lengths shape: {input_lengths.shape}")
print(f"label_lengths shape: {label_lengths.shape}")

x_audio shape: (3, 291, 128)
y_encoded shape: (3, 1)
input_lengths shape: (3,)
label_lengths shape: (3,)


In [72]:
history = ctc_model.fit(
    x=[x_audio, y_encoded, input_lengths, label_lengths], 
    y=np.zeros(len(x_audio)),  # Dummy labels for loss
    batch_size=64, 
    epochs=100
)

Epoch 1/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m23s[0m 23s/step - loss: 1217.6240
Epoch 2/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 669ms/step - loss: 1160.5482
Epoch 3/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 556ms/step - loss: 1067.0018
Epoch 4/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 658ms/step - loss: 1068.3885
Epoch 5/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 683ms/step - loss: 1089.9294
Epoch 6/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 555ms/step - loss: 1071.6106
Epoch 7/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 573ms/step - loss: 1098.1395
Epoch 8/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 586ms/step - loss: 1093.8693
Epoch 9/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 551ms/step - loss: 1099.7899
Epoch 10/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m