# **Persian ASR using CTC Loss function**

**Importing Necessary Libraries**

In [2]:
import os
import numpy as np
import pandas as pd
import librosa
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Dense, Masking, TimeDistributed, Activation, Bidirectional, Lambda, Dropout, BatchNormalization
from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

**Downloading and Unzipping Dataset**

This code downloads and unzips the audio files and the dataset CSV file from Google Drive.
The dataset is made by Hamtech (https://ham-tech.ir/)

In [3]:
import gdown

url = 'https://drive.google.com/uc?id=1jyvhdZHn0s5Owkr21k5Ff-c96sIQLtEu'
output = 'all_wav.zip'
gdown.download(url, output, quiet=False)
!unzip -q 'all_wav.zip' -d '/content/all_wav'

url = 'https://drive.google.com/uc?id=1vqvn0F0YYhEFbzLgP9wJ36vyInUnO5b5'
output = 'dataset.csv'
gdown.download(url, output, quiet=False)

Downloading...
From (original): https://drive.google.com/uc?id=1jyvhdZHn0s5Owkr21k5Ff-c96sIQLtEu
From (redirected): https://drive.google.com/uc?id=1jyvhdZHn0s5Owkr21k5Ff-c96sIQLtEu&confirm=t&uuid=604ee740-bd88-46e4-b72b-63dad2710b03
To: /content/all_wav.zip
100%|██████████| 2.48G/2.48G [00:10<00:00, 228MB/s]
Downloading...
From: https://drive.google.com/uc?id=1vqvn0F0YYhEFbzLgP9wJ36vyInUnO5b5
To: /content/dataset.csv
100%|██████████| 2.87M/2.87M [00:00<00:00, 220MB/s]


'dataset.csv'

**Loading the Dataset CSV File**

This code loads the dataset CSV file, displays the first few rows, updates the paths to the audio files, and filters out any non-existent audio files. Since more than 9GB of .wav files are lost but they are still existing in our dataset.csv

In [4]:
# Load dataset
df = pd.read_csv('/content/dataset.csv')
df.head()

# Update paths for WAV files
df['wav_filename'] = df['wav_filename'].apply(lambda x: x.replace('./all_wav/', '/content/all_wav/all_wav/'))

# Filter dataset to include only existing WAV files
df = df[df['wav_filename'].apply(os.path.isfile)]

In [12]:
df.head()

Unnamed: 0,wav_filename,wav_filesize,transcript,confidence_level
0,/content/all_wav/all_wav/Tehran_SayeRoshan0_10...,83044,اتفاقاتی که ندیده بودم,0.927557
1,/content/all_wav/all_wav/Tehran_SayeRoshan0_10...,54468,مسجد,0.927557
2,/content/all_wav/all_wav/Tehran_SayeRoshan0_10...,136036,جمع شدن مسلمین برای نمازهای جماعت,0.864152
3,/content/all_wav/all_wav/Tehran_SayeRoshan0_10...,106788,همیشه برای محمدرضا پهلوی,0.927557
4,/content/all_wav/all_wav/Tehran_SayeRoshan0_10...,170020,چه زمانی در کسوت شاه ایران نوکری اجانب را می‌کرد,0.854824


**Creating Character Maps**

Create mappings between Persian characters and their corresponding indices for use in the model. These mappings are used to convert text to a sequence of indices and vice versa.

In [5]:
# Character map for Persian characters
char_map_str = """
' 0
<SPACE> 1
ا 2
ب 3
پ 4
ت 5
ث 6
ج 7
چ 8
ح 9
خ 10
د 11
ذ 12
ر 13
ز 14
ژ 15
س 16
ش 17
ص 18
ض 19
ط 20
ظ 21
ع 22
غ 23
ف 24
ق 25
ک 26
گ 27
ل 28
م 29
ن 30
و 31
ه 32
ی 33
، 34
؟ 35
"""
char_map = {}
index_map = {}
for line in char_map_str.strip().split('\n'):
    ch, index = line.split()
    char_map[ch] = int(index)
    index_map[int(index)] = ch
index_map[1] = ' '

# Ensure space character is in char_map
char_map[' '] = char_map['<SPACE>']

**Defining Audio Processing Functions**

Define functions for loading audio, extracting features, adding noise, and shifting time to augment the dataset. These augmentations help make the model more robust to variations in the audio data.

In [6]:
# Audio processing functions
def load_audio(file_path, sr=16000):
    audio, _ = librosa.load(file_path, sr=sr)
    return audio

def extract_features(audio, n_mfcc=20, sr=16000):
    mfcc_features = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=n_mfcc)
    delta_mfcc = librosa.feature.delta(mfcc_features)
    combined = np.vstack((mfcc_features, delta_mfcc)).T
    return combined

def add_noise(audio, noise_factor=0.005):
    noise = np.random.randn(len(audio))
    augmented_audio = audio + noise_factor * noise
    augmented_audio = augmented_audio.astype(type(audio[0]))
    return augmented_audio

def shift_time(audio, shift_max=0.2):
    shift = np.random.randint(int(shift_max * 16000))
    if np.random.rand() > 0.5:
        shift = -shift
    augmented_audio = np.roll(audio, shift)
    if shift > 0:
        augmented_audio[:shift] = 0
    else:
        augmented_audio[shift:] = 0
    return augmented_audio

**Preparing the Dataset**

Processe the audio files, extract features, and prepare the input and output sequences for the model. The audio features are padded to ensure uniform input length for the model, and the text labels are converted to sequences of indices.

In [7]:
# Prepare the dataset
X = []
y = []
input_lengths = []
label_lengths = []

for index, row in df.iterrows():
    audio_path = row['wav_filename']
    audio = load_audio(audio_path)
    audio = add_noise(audio)
    audio = shift_time(audio)
    features = extract_features(audio)
    X.append(features)
    input_lengths.append(features.shape[0])
    label = [char_map.get(c, char_map[' ']) for c in row['transcript']]
    y.append(label)
    label_lengths.append(len(label))

if len(X) == 0 or len(y) == 0:
    raise ValueError("No valid audio files were found. Please check the dataset and the paths.")

X = tf.keras.preprocessing.sequence.pad_sequences(X, padding='post', dtype='float32')
y = tf.keras.preprocessing.sequence.pad_sequences(y, padding='post', value=-1)

**Defining the Model**

*   **Input**: Defines the input layer with shape (None, 40), where None indicates variable sequence length and 40 is the number of features.

*   **Masking**: Masks the padded values (0.0) in the input sequences.

*   **Bidirectional LSTM**: Two Bidirectional LSTM layers with 256 units each are used to capture temporal dependencies in both forward and backward directions.

*   **BatchNormalization**: Normalizes the activations of the LSTM layers to improve training stability and performance.

*   **TimeDistributed Dense**: Applies a dense layer to each time step of the sequence independently.

*   **Activation**: Applies a softmax activation to generate a probability distribution over the output characters for each time step.



In [8]:
# Define the model with increased LSTM units to 256
input_data = Input(name='the_input', shape=(None, 40))
masking_layer = Masking(mask_value=0.0)(input_data)
bilstm_layer_1 = Bidirectional(LSTM(256, return_sequences=True))(masking_layer)
batch_norm_1 = BatchNormalization()(bilstm_layer_1)
bilstm_layer_2 = Bidirectional(LSTM(256, return_sequences=True))(batch_norm_1)
batch_norm_2 = BatchNormalization()(bilstm_layer_2)
time_dense = TimeDistributed(Dense(len(char_map) + 1))(batch_norm_2)
y_pred = Activation('softmax', name='activation')(time_dense)

**Defining CTC Loss and Compiling the Model**

This code defines the CTC loss function and compiles the model with it.

In [14]:
# Define the CTC loss function
def ctc_lambda_func(args):
    y_pred, labels, input_length, label_length = args
    return tf.keras.backend.ctc_batch_cost(labels, y_pred, input_length, label_length)

# Compile the model with CTC loss
labels = Input(name='the_labels', shape=[None], dtype='float32')
input_length = Input(name='input_length', shape=[1], dtype='int64')
label_length = Input(name='label_length', shape=[1], dtype='int64')
loss_out = Lambda(ctc_lambda_func, output_shape=(1,), name='ctc')([y_pred, labels, input_length, label_length])

model = Model(inputs=[input_data, labels, input_length, label_length], outputs=loss_out)
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), loss={'ctc': lambda y_true, y_pred: y_pred})

**Splitting Data and Defining Data Generator**

This code splits the dataset into training and validation sets and defines a data generator for batch processing.

In [10]:
# Split the data into training and validation sets
X_train, X_val, y_train, y_val, input_length_train, input_length_val, label_length_train, label_length_val = train_test_split(X, y, input_lengths, label_lengths, test_size=0.2, random_state=42)

def data_generator(X, y, input_lengths, label_lengths, batch_size=16):
    while True:
        for i in range(0, len(X), batch_size):
            X_batch = X[i:i+batch_size]
            y_batch = y[i:i+batch_size]
            input_lengths_batch = input_lengths[i:i+batch_size]
            label_lengths_batch = label_lengths[i:i+batch_size]
            yield (
                {
                    'the_input': np.array(X_batch),
                    'the_labels': np.array(y_batch),
                    'input_length': np.array(input_lengths_batch),
                    'label_length': np.array(label_lengths_batch)
                },
                {'ctc': np.zeros([len(X_batch)])}
            )

train_gen = data_generator(X_train, y_train, input_length_train, label_length_train, batch_size=16)
val_gen = data_generator(X_val, y_val, input_length_val, label_length_val, batch_size=16)

**Defining Callbacks and Training the Model**

This code defines the callbacks for early stopping and model checkpointing, and trains the model.

In [15]:
# Define callbacks
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
model_checkpoint = ModelCheckpoint('/content/asr_best_model.keras', monitor='val_loss', save_best_only=True)

# Train the model
model.fit(train_gen, steps_per_epoch=len(X_train) // 16, epochs=30, validation_data=val_gen, validation_steps=len(X_val) // 16, callbacks=[early_stopping, model_checkpoint])

Epoch 1/30
Epoch 2/30



  return {key: serialize_keras_object(value) for key, value in obj.items()}


Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30


<keras.src.callbacks.History at 0x79f90ad01690>

**Saving the Model and Displaying Summary**

In [16]:
# Save the final model
model.save('/content/asr_model.keras')

# Model summary
model.summary()

Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 the_input (InputLayer)      [(None, None, 40)]           0         []                            
                                                                                                  
 masking (Masking)           (None, None, 40)             0         ['the_input[0][0]']           
                                                                                                  
 bidirectional (Bidirection  (None, None, 512)            608256    ['masking[0][0]']             
 al)                                                                                              
                                                                                                  
 batch_normalization (Batch  (None, None, 512)            2048      ['bidirectional[0][0]'] 

**Redefining Inference Model and Loading Weights**

This code redefines the inference model and loads the best model weights.

In [17]:
# Redefine the inference model
inference_model = Model(inputs=input_data, outputs=y_pred)

# Load the weights from the best saved model
inference_model.load_weights('/content/asr_best_model.keras')

**Predicting on a New Sample**

This code defines a function to predict on a new sample and tests the model on a specific sample index. Adjust the sample_index to test on different samples.

In [26]:
# Function to predict on a new sample
def predict_sample(sample_index):
    sample_features = X[sample_index]
    sample_input_length = np.array([sample_features.shape[0]])

    sample_features = np.expand_dims(sample_features, axis=0)
    sample_input_length = np.array([sample_features.shape[1]], dtype=np.int32)

    # Predict with beam search decoding
    preds = inference_model.predict(sample_features)
    decoded_pred = tf.keras.backend.ctc_decode(preds, input_length=sample_input_length, greedy=False, beam_width=10, top_paths=1)[0][0]
    decoded_pred = tf.keras.backend.get_value(decoded_pred)

    # Ensure decoded_pred is a 1D array
    decoded_pred = decoded_pred.flatten()

    # Convert the decoded prediction to text
    predicted_text = ''.join([index_map[i] for i in decoded_pred if i != -1])

    # Actual text from the dataset
    actual_text = df.iloc[sample_index]['transcript']

    print(f"Predicted text: {predicted_text}")
    print(f"Actual text: {actual_text}")

# Test the model on a new sample
predict_sample(25)

Predicted text: دس بردار ازیم کارا 
Actual text: دست بردار از این کارها
