In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!pip install mltu librosa soundfile onnx tf2onnx ffmpeg

!apt-get update
!apt-get install -y libsndfile1

Collecting mltu
  Downloading mltu-1.2.5-py3-none-any.whl.metadata (3.4 kB)
Collecting onnx
  Downloading onnx-1.18.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.9 kB)
Collecting tf2onnx
  Downloading tf2onnx-1.16.1-py3-none-any.whl.metadata (1.3 kB)
Collecting ffmpeg
  Downloading ffmpeg-1.4.tar.gz (5.1 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting qqdm==0.0.7 (from mltu)
  Downloading qqdm-0.0.7.tar.gz (5.3 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting onnxruntime>=1.15.0 (from mltu)
  Downloading onnxruntime-1.22.0-cp311-cp311-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (4.5 kB)
Collecting addict (from qqdm==0.0.7->mltu)
  Downloading addict-2.4.0-py3-none-any.whl.metadata (1.0 kB)
Collecting jupyter (from qqdm==0.0.7->mltu)
  Downloading jupyter-1.1.1-py2.py3-none-any.whl.metadata (2.0 kB)
INFO: pip is looking at multiple versions of tf2onnx to determine which version is compatible with other requir

In [None]:
import os
import numpy as np
import pandas as pd
import torch
import torchaudio
import tensorflow as tf
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, TensorBoard, ReduceLROnPlateau
from mltu.dataProvider import DataProvider
from mltu.transformers import SpectrogramPadding, LabelIndexer, LabelPadding
from mltu.tensorflow.model_utils import residual_block, activation_layer
from mltu.tensorflow.metrics import CERMetric, WERMetric
from mltu.tensorflow.losses import CTCloss
from sklearn.model_selection import train_test_split
import warnings
from timeit import default_timer as timer
from mltu.tensorflow.callbacks import Model2onnx, TrainLogger

# Suppress warnings
warnings.filterwarnings("ignore", category=UserWarning, module="keras.src.trainers.data_adapters.py_dataset_adapter")
warnings.filterwarnings("ignore", category=FutureWarning, module="torchaudio")
warnings.filterwarnings("ignore", category=UserWarning, module="keras.src.layers.activations.leaky_relu")

# Configuration class
class Configs:
    batch_size = 4
    train_epochs = 15
    frame_length = 512
    frame_step = 256
    fft_length = 512
    target_sr = 44100
    n_mels = 2
    max_spectrogram_length = 1000  # Will be updated dynamically
    max_text_length = 757
    vocab = list("ءأؤإئابةتثجحخدذرزسشصضطظعغفقكلمنهوىيًٌٍَُِّْٰٕٖٜٓٔٗٞٱۜ۠ۡۢۤۥۦۭۧۨ۬ ")
    learning_rate = 0.0003
    model_path = "/content/drive/MyDrive/task1_ test mariam thesis model/model"
    input_shape = [None, 2]  # For n_mels=2
    data_path = "/content/drive/MyDrive/task1_ test mariam thesis model/data/csv/"
    spectrogram_path = "/content/drive/MyDrive/spectrograms/"

configs = Configs()

# Custom SpectrogramPadding
class TruncatedSpectrogramPadding(SpectrogramPadding):
    def __call__(self, spectrogram, label):
        spectrogram = spectrogram.T  # From (n_mels, time_steps) to (time_steps, n_mels)
        if spectrogram.shape[0] > self.max_spectrogram_length:
            spectrogram = spectrogram[:self.max_spectrogram_length, :]
        elif spectrogram.shape[0] < self.max_spectrogram_length:
            spectrogram = np.pad(spectrogram,
                                 ((0, self.max_spectrogram_length - spectrogram.shape[0]), (0, 0)),
                                 mode="constant",
                                 constant_values=self.padding_value)
        return spectrogram, label

# Precompute spectrograms
def precompute_spectrograms(dataset, output_dir, frame_length, frame_step, fft_length, target_sr, n_mels):
    os.makedirs(output_dir, exist_ok=True)
    spectrogram_paths = []
    max_length = 0
    mel_transform = torchaudio.transforms.MelSpectrogram(
        sample_rate=target_sr,
        n_fft=fft_length,
        hop_length=frame_step,
        win_length=frame_length,
        n_mels=n_mels
    ).to("cpu")
    db_transform = torchaudio.transforms.AmplitudeToDB()

    for idx, (wav_path, txt) in enumerate(dataset):
        try:
            wav_path = os.path.join("/content/drive/MyDrive/task1_ test mariam thesis model/data/audios", os.path.basename(wav_path))
            if not os.path.exists(wav_path):
                print(f"Audio file not found: {wav_path}")
                continue
            audio, sr = torchaudio.load(wav_path)
            if audio.dim() > 1 and audio.shape[0] > 1:
                audio = torch.mean(audio, dim=0, keepdim=True)  # Shape: (1, samples)
            audio = audio.squeeze(0)  # Shape: (samples,)
            if sr != target_sr:
                resampler = torchaudio.transforms.Resample(sr, target_sr)
                audio = resampler(audio)
            spectrogram = mel_transform(audio)
            spectrogram = db_transform(spectrogram).numpy()  # Shape: (n_mels, time_steps)
            if spectrogram.shape[0] != n_mels:
                raise ValueError(f"Spectrogram {wav_path} has n_mels={spectrogram.shape[0]}, expected {n_mels}")
            max_length = max(max_length, spectrogram.shape[1])
            spec_path = os.path.join(output_dir, f"spectrogram_{idx}.npy")
            np.save(spec_path, spectrogram)
            if not os.path.exists(spec_path):
                raise IOError(f"Failed to save spectrogram: {spec_path}")
            spectrogram_paths.append((spec_path, txt))
            print(f"Processed {wav_path}: shape {spectrogram.shape}, saved to {spec_path}")
        except Exception as e:
            print(f"Error processing {wav_path}: {e}")
    return spectrogram_paths, max_length

# Load precomputed spectrograms
def load_precomputed_spectrogram(spec_path, txt):
    if not os.path.exists(spec_path):
        raise FileNotFoundError(f"Spectrogram file not found: {spec_path}")
    spectrogram = np.load(spec_path)
    if spectrogram.shape[0] != configs.n_mels:
        raise ValueError(f"Spectrogram {spec_path} has shape {spectrogram.shape}, expected n_mels={configs.n_mels}")
    return spectrogram, txt

# DataProvider wrapper
def data_provider_generator(data_provider):
    for batch in data_provider:
        inputs, targets = batch
        yield np.array(inputs, dtype=np.float32), np.array(targets, dtype=np.int32)

# DataProvider
def convert_todata_provider(dataset, steps_per_epoch=None):
    # Validate paths before creating DataProvider
    valid_dataset = [(spec_path, txt) for spec_path, txt in dataset if os.path.exists(spec_path)]
    if not valid_dataset:
        raise FileNotFoundError("No valid spectrogram files found in dataset.")
    for spec_path, _ in valid_dataset[:5]:
        print(f"Validated spectrogram path: {spec_path}")
    data_provider = DataProvider(
        dataset=valid_dataset,
        skip_validation=False,
        batch_size=configs.batch_size,
        data_preprocessors=[load_precomputed_spectrogram],
        transformers=[
            TruncatedSpectrogramPadding(max_spectrogram_length=configs.max_spectrogram_length, padding_value=0),
            LabelIndexer(configs.vocab),
            LabelPadding(max_word_length=configs.max_text_length, padding_value=len(configs.vocab)),
        ],
    )
    return data_provider_generator(data_provider), steps_per_epoch

# Model architecture
def train_model(input_dim, output_dim, activation="leaky_relu", dropout=0.2):
    inputs = tf.keras.layers.Input(shape=input_dim, name="input", dtype=tf.float32)
    input = tf.keras.layers.Lambda(lambda x: tf.expand_dims(x, axis=-1), output_shape=lambda s: (s[0], s[1], s[2], 1))(inputs)

    # Convolution layer 1
    x = tf.keras.layers.Conv2D(filters=32, kernel_size=[3, 3], strides=[1, 1], padding="same", use_bias=False)(input)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.LeakyReLU(negative_slope=0.1)(x)

    # Convolution layer 2
    x = tf.keras.layers.Conv2D(filters=32, kernel_size=[3, 3], strides=[1, 1], padding="same", use_bias=False)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.LeakyReLU(negative_slope=0.1)(x)

    # Reshape for RNN
    x = tf.keras.layers.Reshape((-1, x.shape[-2] * x.shape[-1]))(x)

    # RNN layers
    for _ in range(5):
        x = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(128, return_sequences=True))(x)
        x = tf.keras.layers.Dropout(dropout)(x)

    # Dense layer
    x = tf.keras.layers.Dense(256)(x)
    x = tf.keras.layers.LeakyReLU(negative_slope=0.1)(x)
    x = tf.keras.layers.Dropout(dropout)(x)

    # Classification layer
    output = tf.keras.layers.Dense(output_dim + 1, activation="softmax", dtype=tf.float32)(x)

    model = tf.keras.Model(inputs=inputs, outputs=output)
    return model

# Load dataset
csv_verses = "someverse.csv"  # Replace with your CSV file name
df = pd.read_csv(os.path.join(configs.data_path, csv_verses))
df['audio'] = df['audio'].str.replace('EqraTechCompany/tasks/', '', regex=False)

# Split dataset
train_df, val_df = train_test_split(df, test_size=0.2, random_state=42, shuffle=True)
print(f"Train size: {len(train_df)}")
print(f"Validation size: {len(val_df)}")

# Prepare metadata
metadata_train_df = train_df.rename(columns={'audio': 'file_name', 'text': 'normalized_transcription'})[["file_name", "normalized_transcription"]]
metadata_val_df = val_df.rename(columns={'audio': 'file_name', 'text': 'normalized_transcription'})[["file_name", "normalized_transcription"]]

# Create dataset
dataset_train = [[f"{file}", label.lower()] for file, label in metadata_train_df.values.tolist()]
dataset_val = [[f"{file}", label.lower()] for file, label in metadata_val_df.values.tolist()]

# Clear old spectrograms
output_dir = configs.spectrogram_path
if os.path.exists(output_dir):
    for f in os.listdir(output_dir):
        os.remove(os.path.join(output_dir, f))
else:
    os.makedirs(output_dir, exist_ok=True)

# Precompute spectrograms
precomputed_train, train_max_length = precompute_spectrograms(
    dataset_train, output_dir, configs.frame_length, configs.frame_step,
    configs.fft_length, configs.target_sr, configs.n_mels
)
precomputed_val, val_max_length = precompute_spectrograms(
    dataset_val, output_dir, configs.frame_length, configs.frame_step,
    configs.fft_length, configs.target_sr, configs.n_mels
)

# Update max_spectrogram_length (limit to reduce memory usage)
configs.max_spectrogram_length = min(max(train_max_length, val_max_length), 10000)  # Cap at 10000
print(f"Updated max_spectrogram_length: {configs.max_spectrogram_length}")

# Debug spectrogram shapes
for spec_path, _ in precomputed_train[:5]:
    spec = np.load(spec_path)
    print(f"Spectrogram {spec_path}: shape {spec.shape}")

# Create data providers
train_data_generator, train_steps = convert_todata_provider(precomputed_train, steps_per_epoch=len(precomputed_train)//configs.batch_size)
val_data_generator, val_steps = convert_todata_provider(precomputed_val, steps_per_epoch=len(precomputed_val)//configs.batch_size)

# Debug data provider output
def debug_data_provider(generator):
    try:
        for inputs, targets in generator:
            print(f"Input shape: {inputs.shape}, Target shape: {targets.shape}")
            break
    except Exception as e:
        print(f"Error in data provider: {e}")
debug_data_provider(train_data_generator)

# Initialize and compile model
model = train_model(
    input_dim=configs.input_shape,
    output_dim=len(configs.vocab),
    dropout=0.5
)
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=configs.learning_rate),
    loss=CTCloss(),
    metrics=[
        CERMetric(vocabulary=configs.vocab),
        WERMetric(vocabulary=configs.vocab)
    ],
    run_eagerly=False
)
model.summary(line_length=110)

# Define callbacks
earlystopper = EarlyStopping(monitor="val_CER", patience=10, verbose=1, mode="min")
checkpoint = ModelCheckpoint(f"{configs.model_path}/model.h5", monitor="val_CER", verbose=1, save_best_only=True, mode="min")
trainLogger = TrainLogger(configs.model_path)
tb_callback = TensorBoard(f"{configs.model_path}/logs", update_freq=1)
reduceLROnPlat = ReduceLROnPlateau(monitor="val_CER", factor=0.8, min_delta=1e-10, patience=5, verbose=1, mode="auto")
model2onnx = Model2onnx(f"{configs.model_path}/model.h5")

# Train the model
start = timer()
model.fit(
    train_data_generator,
    validation_data=val_data_generator,
    steps_per_epoch=train_steps,
    validation_steps=val_steps,
    epochs=configs.train_epochs,
    callbacks=[earlystopper, checkpoint, trainLogger, reduceLROnPlat, tb_callback, model2onnx]
)
elapsed_time_hours = (timer() - start) / 3600
print(f"Total time consumed: {elapsed_time_hours:.2f} hours.")
print(f"Total time for {configs.train_epochs} epochs")

In [3]:
model.save(f"{configs.model_path}/final_model.h5")

In [None]:
import os
import numpy as np
import torch
import torchaudio
import tensorflow as tf
from mltu.transformers import SpectrogramPadding
from mltu.tensorflow.losses import CTCloss
import warnings

# Suppress warnings
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=FutureWarning, module="torchaudio")

# Configuration class (matched to training)
class Configs:
    frame_length = 512
    frame_step = 256
    fft_length = 512
    target_sr = 44100
    n_mels = 2
    max_spectrogram_length = 10000  # Same as training cap
    vocab = list("ءأؤإئابةتثجحخدذرزسشصضطظعغفقكلمنهوىيًٌٍَُِّْٰٕٖٜٓٔٗٞٱۜ۠ۡۢۤۥۦۭۧۨ۬ ")
    model_path = "/content/drive/MyDrive/task1_ test mariam thesis model/model/model.h5"  # Or use final_model.h5
    blank_index = len(vocab)  # CTC blank token index

configs = Configs()

# Custom SpectrogramPadding (matched to training)
class TruncatedSpectrogramPadding(SpectrogramPadding):
    def __call__(self, spectrogram):
        spectrogram = spectrogram.T  # From (n_mels, time_steps) to (time_steps, n_mels)
        if spectrogram.shape[0] > self.max_spectrogram_length:
            spectrogram = spectrogram[:self.max_spectrogram_length, :]
        elif spectrogram.shape[0] < self.max_spectrogram_length:
            spectrogram = np.pad(spectrogram,
                                 ((0, self.max_spectrogram_length - spectrogram.shape[0]), (0, 0)),
                                 mode="constant",
                                 constant_values=self.padding_value)
        return spectrogram

# Custom CTC Greedy Decoder
def ctc_greedy_decoder(logits, vocab, blank_index):
    """
    Decode CTC logits using greedy decoding.
    Args:
        logits: np.array of shape (batch, time_steps, vocab_size + 1)
        vocab: List of characters
        blank_index: Index of the blank token
    Returns:
        List of decoded strings
    """
    # Get the argmax at each time step
    predicted_ids = np.argmax(logits, axis=-1)  # Shape: (batch, time_steps)

    decoded_texts = []
    for batch_idx in range(predicted_ids.shape[0]):
        sequence = predicted_ids[batch_idx]
        # Collapse repeats and remove blanks
        prev_id = None
        decoded = []
        for id_ in sequence:
            if id_ != prev_id and id_ != blank_index:
                decoded.append(vocab[id_])
            prev_id = id_
        decoded_texts.append(''.join(decoded))

    return decoded_texts

# Process WAV file to spectrogram
def process_wav_to_spectrogram(wav_path):
    try:
        if not os.path.exists(wav_path):
            raise FileNotFoundError(f"WAV file not found: {wav_path}")

        audio, sr = torchaudio.load(wav_path)
        # Convert to mono
        if audio.dim() > 1 and audio.shape[0] > 1:
            audio = torch.mean(audio, dim=0, keepdim=True)
        audio = audio.squeeze(0)  # Shape: (samples,)

        if sr != configs.target_sr:
            resampler = torchaudio.transforms.Resample(sr, configs.target_sr)
            audio = resampler(audio)

        mel_transform = torchaudio.transforms.MelSpectrogram(
            sample_rate=configs.target_sr,
            n_fft=configs.fft_length,
            hop_length=configs.frame_step,
            win_length=configs.frame_length,
            n_mels=configs.n_mels
        ).to("cpu")
        db_transform = torchaudio.transforms.AmplitudeToDB()

        spectrogram = mel_transform(audio)
        spectrogram = db_transform(spectrogram).numpy()  # Shape: (n_mels, time_steps)

        if spectrogram.shape[0] != configs.n_mels:
            raise ValueError(f"Spectrogram has n_mels={spectrogram.shape[0]}, expected {configs.n_mels}")

        print(f"Processed {wav_path}: spectrogram shape {spectrogram.shape}")

        # Apply padding
        padder = TruncatedSpectrogramPadding(max_spectrogram_length=configs.max_spectrogram_length, padding_value=0)
        spectrogram = padder(spectrogram)

        # Add batch dimension
        spectrogram = np.expand_dims(spectrogram, axis=0)  # Shape: (1, max_spectrogram_length, n_mels)

        return spectrogram
    except Exception as e:
        print(f"Error processing {wav_path}: {e}")
        return None

# Load model and predict
def predict_transcription(model, spectrogram):
    try:
        # Predict logits
        logits = model.predict(spectrogram, verbose=0)  # Shape: (1, time_steps, vocab_size + 1)

        # CTC greedy decoding
        decoded = ctc_greedy_decoder(logits, configs.vocab, configs.blank_index)

        return decoded[0]  # Return first (and only) transcription
    except Exception as e:
        print(f"Error during prediction: {e}")
        return None

# Main test function
def test_model(wav_path):
    # Load model
    if not os.path.exists(configs.model_path):
        print(f"Model file not found: {configs.model_path}")
        print("Trying final_model.h5...")
        configs.model_path = configs.model_path.replace("model.h5", "final_model.h5")
        if not os.path.exists(configs.model_path):
            raise FileNotFoundError(f"Final model file not found: {configs.model_path}")

    model = tf.keras.models.load_model(configs.model_path, custom_objects={"CTCloss": CTCloss})
    print(f"Loaded model from {configs.model_path}")

    # Process WAV file
    spectrogram = process_wav_to_spectrogram(wav_path)
    if spectrogram is None:
        return

    # Predict transcription
    transcription = predict_transcription(model, spectrogram)
    if transcription is None:
        return

    print(f"\nPredicted Transcription: {transcription}")

# Example usage
wav_file = "/content/drive/MyDrive/task1_ test mariam thesis model/data/audios/Mosab_Mohammad_067024.wav"  # Replace with your WAV file path
test_model(wav_file)