<a href="https://colab.research.google.com/github/jehad-halahla/Arabic-English-Langauge-Diarizer/blob/main/final_code_LD_graduation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Organized WhisperX-Based Model Notebook
This notebook processes audio data, identifies language (Arabic/English), and creates annotation packages for multiple annotators.
It uses separation of concerns with modular functions, clear comments, and cells ordered by execution dependency.
The functionality is identical to the original notebook, with no changes to the language identification logic.
Unused or potentially unused elements are commented.

## 1. Imports and Setup
This cell imports all required libraries and defines global configuration variables.
Note: Some imports (e.g., zipfile, shutil) are used only in potentially unused cells.

## 1.1. Install packages and setup environment

**run this**

In [None]:
# Install virtualenv

# !pip install virtualenv --quiet

# # Create a virtual environment
# !virtualenv whisperx_env --quiet

# !source whisperx_env/bin/activate

!pip install whisperx --quiet

!pip install datasets --quiet

!pip install ipykernel --quiet

!pip install pydub --quiet

!pip install transformers torch torchaudio speechbrain --quiet

# !pip install numpy==1.23.5 --force-reinstall

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/91.2 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m91.2/91.2 kB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m59.6/59.6 kB[0m [31m6.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m16.5/16.5 MB[0m [31m93.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m37.4/37.4 MB[0m [31m38.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m62.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m16.4/16.4 MB[0m [31m97.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[

don't run this

In [None]:
# !pip install huggingface_hub[hf_xet] --quiet

# # Activate and install WhisperX with compatible pandas
!pip install whisperx pandas==2.2.3 --quiet

# !pip install datasets --quiet

# !pip install ipykernel

# !pip install pydub --quiet

# !pip install transformers torch torchaudio speechbrain --quiet

## 1.2. Import libs

In [None]:
import os
import json
import zipfile  # Potentially unused; used only in alternative annotator package creation
import numpy as np
import pandas as pd
import torch
import torchaudio
from google.colab import drive, files # files potentially unused; used only in alternative package creation
from IPython.display import Audio, clear_output  # Audio potentially unused; used only in alternative package creation
from tqdm import tqdm
import librosa
from transformers import Wav2Vec2ForSequenceClassification, Wav2Vec2Processor
import shutil  # Potentially unused; used only in alternative directory creation
from datasets import load_from_disk
import whisperx
from speechbrain.pretrained import EncoderClassifier

# !whisperx_env/bin/python -c "import whisperx"

# Configuration
TARGET_SR = 16000  # Target sample rate for audio
DRIVE_BASE = "/content/drive/MyDrive/audio_annotation_project"
ANNOTATORS = 3
SEED = 42

DEBUG:speechbrain.utils.checkpoints:Registered checkpoint save hook for _speechbrain_save
DEBUG:speechbrain.utils.checkpoints:Registered checkpoint load hook for _speechbrain_load
DEBUG:speechbrain.utils.checkpoints:Registered checkpoint save hook for save
DEBUG:speechbrain.utils.checkpoints:Registered checkpoint load hook for load
DEBUG:speechbrain.utils.checkpoints:Registered checkpoint save hook for _save
DEBUG:speechbrain.utils.checkpoints:Registered checkpoint load hook for _recover
  from speechbrain.pretrained import EncoderClassifier


## 1.1. Load and store dataset

In [None]:
drive.mount('/content/drive')


Mounted at /content/drive


In [None]:
# Reload later (even in a new session)
ds = load_from_disk("/content/drive/MyDrive/subset-ds/subset") # might be "/content/drive/MyDrive/subset-ds/subset"

df = pd.DataFrame(ds)  # Replace "train" with the correct split name
print(df.head().keys())

  table = cls._concat_blocks(blocks, axis=0)


Index(['audio', 'sentence'], dtype='object')


## 1.2. Filter short audio samples

In [None]:
min_duration = 5

# Add duration information to dataframe
df['duration'] = df['audio'].apply(lambda x: len(x['array']) / x['sampling_rate'])

filtered_df = df[df['duration'] >= min_duration].copy()
print(f"Original samples: {len(df)}")
print(f"Remaining after filtering: {len(filtered_df)}")
print(f"Shortest remaining sample: {filtered_df['duration'].min():.2f}s")

num_samples = len(filtered_df)

print(num_samples)

Original samples: 1000
Remaining after filtering: 285
Shortest remaining sample: 5.00s
285


#loading the whisper model that is most appropriate

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"  # Dynamic device selection
model = whisperx.load_model("medium", device, compute_type="float32")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json: 0.00B [00:00, ?B/s]

vocabulary.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

model.bin:   0%|          | 0.00/1.53G [00:00<?, ?B/s]

No language specified, language will be first be detected for each audio file (increases inference time).
>>Performing voice activity detection using Pyannote...


INFO:pytorch_lightning.utilities.migration.utils:Lightning automatically upgraded your loaded checkpoint from v1.5.4 to v2.5.2. To apply the upgrade to your files permanently, run `python -m pytorch_lightning.utilities.upgrade_checkpoint ../usr/local/lib/python3.11/dist-packages/whisperx/assets/pytorch_model.bin`


Model was trained with pyannote.audio 0.0.1, yours is 3.3.2. Bad things might happen unless you revert pyannote.audio to 0.x.
Model was trained with torch 1.10.0+cu102, yours is 2.6.0+cu124. Bad things might happen unless you revert torch to 1.x.


## 2.2. Audio Preprocessing

In [None]:
def get_audio_tensor_from_audio_array(audio_array):
  # 1. Convert to mono if stereo
  if len(audio_array.shape) > 1:
      audio_array = np.mean(audio_array, axis=0)

  # 2. Resample to 16kHz if needed
  if sampling_rate != 16000:
      audio_array = librosa.resample(
          audio_array.astype(np.float32),
          orig_sr=sampling_rate,
          target_sr=16000
      )

  # 3. Convert to float32 and normalize
  audio_array = audio_array.astype(np.float32)
  audio_array /= np.max(np.abs(audio_array))

  # 4. Convert to torch tensor and ensure float32
  audio_tensor = torch.from_numpy(audio_array).to(device)
  audio_tensor = audio_tensor.float()  # Force float32

  return audio_tensor

## a function to transcribe audio using whisperX

In [None]:
def transcribe_audio_whisperx(audio_tensor):
    batch_size = 8
    try:
        # Move tensor to CPU, then convert to NumPy
        audio_np = audio_tensor.cpu().numpy()  # Fix: Explicit CPU conversion
        result = model.transcribe(audio_np, batch_size=batch_size)
        return result
    except Exception as e:
        print(f"Transcription error: {e}")
        # Fallback to CPU (model and data)
        model.to("cpu")  # Move model to CPU
        audio_np = audio_tensor.cpu().numpy()
        result = model.transcribe(audio_np, batch_size=batch_size)
        return result

## a function to call the alignment model on the transcriptions and etc

In [None]:
def run_whisperx_alignment(audio_array):
  audio_tensor = get_audio_tensor_from_audio_array(audio_array)
  result = transcribe_audio_whisperx(audio_tensor)
  # Load alignment model
  result["language"]
  device="cuda" if torch.cuda.is_available() else "cpu"
  model_a, metadata = whisperx.load_align_model(
      language_code=result["language"],
      device=device
  )

  # Convert back to numpy for alignment
  audio_np = audio_tensor.cpu().numpy()
  result_aligned = whisperx.align(
      result["segments"],
      model_a,
      metadata,
      audio_np,
      device,
      return_char_alignments=False
  )
  return result_aligned

here i will generate the aligned results for some sample files from the dataset using the below function

In [None]:
base_aligned_output_folder = "/content/drive/MyDrive/alignment_results"


import os
import pickle  # لحفظ النتيجة كـ ملف بايثون ثنائي (يمكنك تغييره لـ JSON لو تحب)

def save_all_aligned_results(df, base_output_folder="aligned_results"):
    os.makedirs(base_output_folder, exist_ok=True)

    for idx in range(len(df)):
        sample = df.iloc[idx]["audio"]
        audio_array = sample["array"]
        sampling_rate = sample["sampling_rate"]
        path = sample["path"]

        # استخرج اسم الملف فقط من path (بدون المجلدات)
        filename = os.path.basename(path)
        # لو بدك تستبدل الامتداد، مثلاً من wav لـ pkl:
        filename = os.path.splitext(filename)[0] + ".pkl"

        output_path = os.path.join(base_output_folder, filename)
        print(output_path)
        # تحقق إذا ملف الـ pkl موجود مسبقًا
        if os.path.exists(output_path):
            print(f"Skipping {filename} — already processed.")
            continue  # تخطى هذه العينة
        # تشغيل الalignment
        aligned_result = run_whisperx_alignment(audio_array)


        # حفظ النتيجة في مجلد aligned_results
        output_path = os.path.join(base_output_folder, filename)
        with open(output_path, "wb") as f:
            pickle.dump(aligned_result, f)

        print(f"Saved aligned result for {filename} at {output_path}")


In [None]:
print(len(filtered_df))

285


In [None]:
save_all_aligned_results(filtered_df,base_aligned_output_folder)

/content/drive/MyDrive/alignment_results/qMqPJIozZvc_214.862.pkl
Skipping qMqPJIozZvc_214.862.pkl — already processed.
/content/drive/MyDrive/alignment_results/S10-C03-R10_015378-016145.pkl
Skipping S10-C03-R10_015378-016145.pkl — already processed.
/content/drive/MyDrive/alignment_results/I02-C03-R26_025800-026343.pkl
Skipping I02-C03-R26_025800-026343.pkl — already processed.
/content/drive/MyDrive/alignment_results/S33-C03-R33_047134-047965.pkl
Skipping S33-C03-R33_047134-047965.pkl — already processed.
/content/drive/MyDrive/alignment_results/S31-C03-R31_073080-074926.pkl
Skipping S31-C03-R31_073080-074926.pkl — already processed.
/content/drive/MyDrive/alignment_results/S17-C03-R17_102762-104296.pkl
Skipping S17-C03-R17_102762-104296.pkl — already processed.
/content/drive/MyDrive/alignment_results/I01-C03-R13_099097-101147.pkl
Skipping I01-C03-R13_099097-101147.pkl — already processed.
/content/drive/MyDrive/alignment_results/S27-C03-R27_120195-121019.pkl
Skipping S27-C03-R27_120

In [None]:
import os
import pickle


def load_all_aligned_results(base_input_folder=base_aligned_output_folder):
    results = {}

    # لو المجلد مش موجود، يرجع قاموس فاضي
    if not os.path.exists(base_input_folder):
        print(f"Folder {base_input_folder} does not exist.")
        return results

    for filename in os.listdir(base_input_folder):
        if filename.endswith(".pkl"):
            file_path = os.path.join(base_input_folder, filename)
            with open(file_path, "rb") as f:
                data = pickle.load(f)
            key = os.path.splitext(filename)[0]
            results[key] = data
            print(f"Loaded {filename}")

    return results


In [None]:
l_results = load_all_aligned_results()
# الآن all_results['اسم_الملف_بدون_الامتداد'] = المحتوى المحمل

Loaded qMqPJIozZvc_214.862.pkl
Loaded S10-C03-R10_015378-016145.pkl
Loaded I02-C03-R26_025800-026343.pkl
Loaded S33-C03-R33_047134-047965.pkl
Loaded S31-C03-R31_073080-074926.pkl
Loaded S17-C03-R17_102762-104296.pkl
Loaded I01-C03-R13_099097-101147.pkl
Loaded S27-C03-R27_120195-121019.pkl
Loaded S37-C03-R37_034007-034931.pkl
Loaded S21-C03-R21_105541-106131.pkl
Loaded S12-C03-R12_077999-078651.pkl
Loaded S24-C03-R24_060848-061947.pkl
Loaded I02-C03-R03_027523-028241.pkl
Loaded S25-C03-R25_019165-021569.pkl
Loaded S38-C03-R38_049724-050594.pkl
Loaded S25-C03-R25_127028-128282.pkl
Loaded S27-C03-R27_150531-152226.pkl
Loaded S17-C03-R17_100881-102687.pkl
Loaded 1l5GgRc_PVo_1329.68.pkl
Loaded S27-C03-R27_018079-019184.pkl
Loaded I01-C03-R17_096172-097385.pkl
Loaded S16-C03-R16_001686-002731.pkl
Loaded S22-C03-R22_035601-036145.pkl
Loaded I01-C03-R26_093740-094748.pkl
Loaded S09-C03-R09_108708-109527.pkl
Loaded S29-C03-R29_075659-076208.pkl
Loaded S33-C03-R33_059024-059800.pkl
Loaded S38-C0

In [None]:
print(len(l_results))

285


In [None]:
for file_id, result in l_results.items():
    print(f"\nFile: {file_id}")
    for seg in result["segments"]:
        for word in seg["words"]:
            print(f"{word['start']:.2f}s - {word['end']:.2f}s: {word['word']} (score: {word['score']:.2f})")
    break


File: qMqPJIozZvc_214.862
0.03s - 0.31s: وهي (score: 0.63)
0.38s - 0.70s: أنها (score: 0.60)
0.78s - 1.28s: بتطلع (score: 0.62)
1.39s - 1.63s: لير (score: 0.66)
1.69s - 1.81s: أو (score: 0.58)
1.87s - 2.11s: طبقة (score: 0.22)
2.13s - 2.42s: جديدة (score: 0.63)
2.52s - 2.98s: لجزع (score: 0.65)
3.02s - 3.39s: الشجرة (score: 0.68)
3.43s - 3.53s: من (score: 0.63)
3.59s - 4.01s: بره (score: 0.79)
4.06s - 4.18s: كل (score: 0.81)
4.20s - 4.44s: سنة (score: 0.69)
4.50s - 4.64s: أو (score: 0.53)
4.72s - 5.03s: كل (score: 0.57)
5.05s - 5.35s: موسم (score: 0.49)


In [None]:
def extract_word_segments(result):
    """Flatten nested 'words' lists from WhisperX alignment into a flat word_segments list."""
    word_segments = []
    for seg in result["segments"]:
        word_segments.extend(seg.get("words", []))
    return word_segments


In [None]:
#extract the word segments for the first key in l_results
word_segments_test = extract_word_segments(l_results[list(l_results.keys())[0]])
print(word_segments_test)

[{'word': 'وهي', 'start': 0.031, 'end': 0.314, 'score': np.float64(0.633)}, {'word': 'أنها', 'start': 0.375, 'end': 0.698, 'score': np.float64(0.601)}, {'word': 'بتطلع', 'start': 0.779, 'end': 1.285, 'score': np.float64(0.625)}, {'word': 'لير', 'start': 1.386, 'end': 1.629, 'score': np.float64(0.657)}, {'word': 'أو', 'start': 1.689, 'end': 1.811, 'score': np.float64(0.582)}, {'word': 'طبقة', 'start': 1.871, 'end': 2.114, 'score': np.float64(0.222)}, {'word': 'جديدة', 'start': 2.134, 'end': 2.418, 'score': np.float64(0.633)}, {'word': 'لجزع', 'start': 2.519, 'end': 2.984, 'score': np.float64(0.653)}, {'word': 'الشجرة', 'start': 3.024, 'end': 3.388, 'score': np.float64(0.676)}, {'word': 'من', 'start': 3.429, 'end': 3.53, 'score': np.float64(0.633)}, {'word': 'بره', 'start': 3.591, 'end': 4.015, 'score': np.float64(0.786)}, {'word': 'كل', 'start': 4.056, 'end': 4.177, 'score': np.float64(0.809)}, {'word': 'سنة', 'start': 4.197, 'end': 4.44, 'score': np.float64(0.694)}, {'word': 'أو', 'sta

In [None]:
# Load SpeechBrain LID model once
model_lid = EncoderClassifier.from_hparams(
    source="speechbrain/lang-id-voxlingua107-ecapa",
    savedir="/tmp/speechbrain"
)

INFO:speechbrain.utils.fetching:Fetch hyperparams.yaml: Fetching from HuggingFace Hub 'speechbrain/lang-id-voxlingua107-ecapa' if not cached


hyperparams.yaml: 0.00B [00:00, ?B/s]

DEBUG:speechbrain.utils.fetching:Fetch: Local file found, creating symlink '/root/.cache/huggingface/hub/models--speechbrain--lang-id-voxlingua107-ecapa/snapshots/0253049ae131d6a4be1c4f0d8b0ff483a0f8c8e9/hyperparams.yaml' -> '/tmp/speechbrain/hyperparams.yaml'
INFO:speechbrain.utils.fetching:Fetch custom.py: Fetching from HuggingFace Hub 'speechbrain/lang-id-voxlingua107-ecapa' if not cached
DEBUG:speechbrain.utils.checkpoints:Registered checkpoint save hook for _save
DEBUG:speechbrain.utils.checkpoints:Registered checkpoint load hook for _load
DEBUG:speechbrain.utils.checkpoints:Registered parameter transfer hook for _load
  wrapped_fwd = torch.cuda.amp.custom_fwd(fwd, cast_inputs=cast_inputs)
DEBUG:speechbrain.utils.checkpoints:Registered checkpoint save hook for save
DEBUG:speechbrain.utils.checkpoints:Registered checkpoint load hook for load_if_possible
DEBUG:speechbrain.utils.parameter_transfer:Collecting files (or symlinks) for pretraining in /tmp/speechbrain.
INFO:speechbrain.ut

embedding_model.ckpt:   0%|          | 0.00/84.5M [00:00<?, ?B/s]

DEBUG:speechbrain.utils.fetching:Fetch: Local file found, creating symlink '/root/.cache/huggingface/hub/models--speechbrain--lang-id-voxlingua107-ecapa/snapshots/0253049ae131d6a4be1c4f0d8b0ff483a0f8c8e9/embedding_model.ckpt' -> '/tmp/speechbrain/embedding_model.ckpt'
DEBUG:speechbrain.utils.parameter_transfer:Set local path in self.paths["embedding_model"] = /tmp/speechbrain/embedding_model.ckpt
INFO:speechbrain.utils.fetching:Fetch classifier.ckpt: Fetching from HuggingFace Hub 'speechbrain/lang-id-voxlingua107-ecapa' if not cached


classifier.ckpt:   0%|          | 0.00/763k [00:00<?, ?B/s]

DEBUG:speechbrain.utils.fetching:Fetch: Local file found, creating symlink '/root/.cache/huggingface/hub/models--speechbrain--lang-id-voxlingua107-ecapa/snapshots/0253049ae131d6a4be1c4f0d8b0ff483a0f8c8e9/classifier.ckpt' -> '/tmp/speechbrain/classifier.ckpt'
DEBUG:speechbrain.utils.parameter_transfer:Set local path in self.paths["classifier"] = /tmp/speechbrain/classifier.ckpt
INFO:speechbrain.utils.fetching:Fetch label_encoder.txt: Fetching from HuggingFace Hub 'speechbrain/lang-id-voxlingua107-ecapa' if not cached


label_encoder.txt: 0.00B [00:00, ?B/s]

DEBUG:speechbrain.utils.fetching:Fetch: Local file found, creating symlink '/root/.cache/huggingface/hub/models--speechbrain--lang-id-voxlingua107-ecapa/snapshots/0253049ae131d6a4be1c4f0d8b0ff483a0f8c8e9/label_encoder.txt' -> '/tmp/speechbrain/label_encoder.ckpt'
DEBUG:speechbrain.utils.parameter_transfer:Set local path in self.paths["label_encoder"] = /tmp/speechbrain/label_encoder.ckpt
INFO:speechbrain.utils.parameter_transfer:Loading pretrained files for: embedding_model, classifier, label_encoder
DEBUG:speechbrain.utils.parameter_transfer:Redirecting (loading from local path): embedding_model -> /tmp/speechbrain/embedding_model.ckpt
DEBUG:speechbrain.utils.parameter_transfer:Redirecting (loading from local path): classifier -> /tmp/speechbrain/classifier.ckpt
DEBUG:speechbrain.utils.parameter_transfer:Redirecting (loading from local path): label_encoder -> /tmp/speechbrain/label_encoder.ckpt
DEBUG:speechbrain.dataio.encoder:Loaded categorical encoding from /tmp/speechbrain/label_en

In [None]:
def preprocess_audio(audio_array, sample_rate=16000):
    """Convert NumPy array to 16kHz mono PyTorch tensor."""
    waveform = torch.tensor(np.asarray(audio_array, dtype=np.float32)).float()

    if waveform.ndim == 1:
        waveform = waveform.unsqueeze(0)
    elif waveform.ndim == 2 and waveform.shape[0] > 1:
        waveform = torch.mean(waveform, dim=0, keepdim=True)

    if sample_rate != 16000:
        resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)
        waveform = resampler(waveform)

    return waveform

In [None]:
def identify_language(model_lid, audio_array, sample_rate=16000):
    label_encoder = model_lid.hparams.label_encoder
    lang_codes = label_encoder.ind2lab

    # Preprocess
    waveform = preprocess_audio(audio_array, sample_rate)

    # Handle edge case: empty audio
    if waveform.numel() == 0:
        print("⚠️ Audio waveform is empty. Skipping...")
        return None

    segment_len_sec = waveform.shape[-1] / sample_rate

    # ⚠️ If audio is too short (<0.1s), pad to 0.5s
    if segment_len_sec < 0.1:
        print(f"🔁 Padding short segment ({segment_len_sec:.3f}s) to 0.5s")
        min_len = int(0.5 * sample_rate)

        # Strategy: repeat the signal if possible, else zero-pad
        repeats = (min_len // waveform.shape[-1]) + 1
        waveform = waveform.repeat(1, repeats)[:, :min_len]

    elif waveform.abs().max() < 1e-6:
        print("⚠️ Audio is nearly silent.")

    # LID inference
    with torch.no_grad():
        output = model_lid.classify_batch(waveform)

    logits = output[0]
    probabilities = torch.softmax(logits, dim=-1).squeeze(0)

    # Aggregate Arabic and English probabilities
    prob_dict = {'Arabic': 0.0, 'English': 0.0}
    for idx, lang_code in sorted(lang_codes.items()):
        prob = probabilities[idx].item()
        if 'Arabic' in lang_code:
            prob_dict['Arabic'] = prob
        elif 'English' in lang_code:
            prob_dict['English'] = prob

    return prob_dict


In [None]:
def generate_rttm(
    model_lid,
    word_segments,
    full_audio_array,
    sampling_rate,
    file_id,
    play_audio_seg=False,
    silence_threshold=0.2,
):
    """
    Generate a structured RTTM from word segments:
    - Inserts non-speech for long gaps
    - Merges same-language segments
    - Ensures no overlaps and complete time coverage
    - Ensures no unlabeled time (e.g., [0, first word])
    """
    from IPython.display import Audio, display

    rttm_lines = []
    waveform = preprocess_audio(full_audio_array)
    adjusted_segments = []

    # === ✅ Handle time before the first word ===
    if word_segments:
        first_start = word_segments[0]["start"]
        if first_start >= silence_threshold*2: # the threshold here is exceptionally higher
            adjusted_segments.append({
                "start": 0.0,
                "end": first_start,
                "lang": "non-speech",
                "score": 1.0
            })
        elif first_start > 0:
            word_segments[0]["start"] = 0.0  # Shift first segment to begin at 0

    # === 1. Adjust word segments for continuity ===
    for i in range(len(word_segments)):
        seg = word_segments[i]

        if "start" not in seg or "end" not in seg:
            continue

        start = seg["start"]
        end = seg["end"]

        if (end - start) < 0.025:
            continue  # skip short segments

        if i < len(word_segments) - 1:
            next_start = word_segments[i + 1]["start"]
            gap = next_start - end

            if gap >= silence_threshold:
                adjusted_segments.append({"start": start, "end": end})
                adjusted_segments.append({
                    "start": end,
                    "end": next_start,
                    "lang": "non-speech",
                    "score": 1.0
                })
            else:
                adjusted_segments.append({"start": start, "end": next_start})
        else:
            adjusted_segments.append({"start": start, "end": end})

    # === 2. Language ID and labeling ===
    labeled_segments = []

    for seg in adjusted_segments:
        start = seg["start"]
        end = seg["end"]
        start_sample = int(start * sampling_rate)
        end_sample = int(end * sampling_rate)

        if end_sample <= start_sample:
            continue

        segment_wave = waveform[:, start_sample:end_sample]

        if seg.get("lang") == "non-speech":
            label = "non-speech"
            score = 1.0
        else:
            if play_audio_seg:
                display(Audio(segment_wave, rate=sampling_rate))

            prob_dict = identify_language(model_lid, segment_wave)
            if prob_dict is None:
                continue

            label = "Arabic" if prob_dict["Arabic"] > prob_dict["English"] else "English"
            total = prob_dict["Arabic"] + prob_dict["English"]
            score = prob_dict[label] / total if total > 0 else 0.5

        labeled_segments.append({
            "start": start,
            "end": end,
            "lang": label,
            "score": score
        })

    # === 3. Merge consecutive same-label segments ===
    merged_segments = []
    last = None

    for seg in labeled_segments:
        start = seg["start"]
        end = seg["end"]
        lang = seg["lang"]
        score = seg["score"]

        if last is None:
            last = {"start": start, "end": end, "lang": lang, "score": score}
        else:
            if lang == last["lang"]:
                last["end"] = end
                last["score"] += score
            else:
                duration = last["end"] - last["start"]
                rttm_lines.append(
                    f"SPEAKER {file_id} 1 {last['start']:.3f} {duration:.3f} <NA> <NA> {last['lang']} <NA> {last['score']:.3f}"
                )
                last = {"start": start, "end": end, "lang": lang, "score": score}

    if last:
        duration = last["end"] - last["start"]
        rttm_lines.append(
            f"SPEAKER {file_id} 1 {last['start']:.3f} {duration:.3f} <NA> <NA> {last['lang']} <NA> {last['score']:.3f}"
        )

    return rttm_lines


single audio file

In [None]:
#extract the RTTM for a singular audio file using its path
sample = filtered_df.iloc[11]["audio"]
audio_array = sample["array"]
sampling_rate = sample["sampling_rate"]
path = sample["path"]

print(path)
file_id = path.split("/")[-1].replace(".wav", "")
print(file_id)
word_segments_test = extract_word_segments(l_results[file_id])
print(word_segments_test)


S24-C03-R24_060848-061947.wav
S24-C03-R24_060848-061947
[{'word': 'مش', 'start': 0.031, 'end': 1.515, 'score': np.float64(0.472)}, {'word': 'حاولة', 'start': 1.535, 'end': 1.876, 'score': np.float64(0.659)}, {'word': 'يعني', 'start': 1.937, 'end': 2.659, 'score': np.float64(0.957)}, {'word': 'أحبطت', 'start': 2.719, 'end': 3.22, 'score': np.float64(0.961)}, {'word': 'وأحلامي', 'start': 3.24, 'end': 3.742, 'score': np.float64(0.872)}, {'word': 'تدمرت', 'start': 3.842, 'end': 4.103, 'score': np.float64(0.527)}, {'word': 'بس', 'start': 4.223, 'end': 4.604, 'score': np.float64(0.898)}, {'word': 'أنا', 'start': 4.624, 'end': 4.685, 'score': np.float64(0.229)}, {'word': 'أكيد', 'start': 4.765, 'end': 5.126, 'score': np.float64(0.82)}, {'word': 'I', 'start': 5.146, 'end': 5.166, 'score': np.float64(0.511)}, {'word': 'got', 'start': 5.186, 'end': 5.627, 'score': np.float64(0.816)}, {'word': 'down', 'start': 5.647, 'end': 5.768, 'score': np.float64(0.78)}, {'word': 'لما', 'start': 5.788, 'end':

In [None]:
#now we generate the RTTM
rttm_lines = generate_rttm(model_lid, word_segments_test, audio_array, sampling_rate, file_id, play_audio_seg=False)

🔁 Padding short segment (0.060s) to 0.5s


In [None]:
print(rttm_lines)

['SPEAKER S24-C03-R24_060848-061947 1 0.031 1.504 <NA> <NA> English <NA> 0.634', 'SPEAKER S24-C03-R24_060848-061947 1 1.535 3.089 <NA> <NA> Arabic <NA> 5.756', 'SPEAKER S24-C03-R24_060848-061947 1 4.624 0.141 <NA> <NA> English <NA> 0.996', 'SPEAKER S24-C03-R24_060848-061947 1 4.765 0.381 <NA> <NA> Arabic <NA> 1.000', 'SPEAKER S24-C03-R24_060848-061947 1 5.186 0.461 <NA> <NA> English <NA> 0.999', 'SPEAKER S24-C03-R24_060848-061947 1 5.647 0.502 <NA> <NA> Arabic <NA> 2.868', 'SPEAKER S24-C03-R24_060848-061947 1 6.149 1.885 <NA> <NA> English <NA> 3.712', 'SPEAKER S24-C03-R24_060848-061947 1 8.034 0.422 <NA> <NA> Arabic <NA> 1.743', 'SPEAKER S24-C03-R24_060848-061947 1 8.456 0.260 <NA> <NA> English <NA> 0.997', 'SPEAKER S24-C03-R24_060848-061947 1 8.716 0.662 <NA> <NA> Arabic <NA> 1.941', 'SPEAKER S24-C03-R24_060848-061947 1 9.418 0.261 <NA> <NA> English <NA> 1.903', 'SPEAKER S24-C03-R24_060848-061947 1 9.679 0.903 <NA> <NA> Arabic <NA> 3.000', 'SPEAKER S24-C03-R24_060848-061947 1 10.582 0

all audio files

In [None]:
# الإخراج إلى مجلد في Drive
output_dir = "/content/drive/MyDrive/rttm_outputs_vox"
os.makedirs(output_dir, exist_ok=True)

for sample in filtered_df.iloc[:len(filtered_df)]["audio"]:
    try:
        audio_array = sample["array"]
        sampling_rate = sample["sampling_rate"]
        path = sample["path"]

        file_id = path.split("/")[-1].replace(".wav", "")

        # محاذاة الكلمات
        aligned_result = l_results[file_id]
        word_segments = aligned_result["word_segments"]

        # توليد RTTM
        rttm_lines = generate_rttm(
            model_lid,
            word_segments,
            audio_array,
            sampling_rate,
            file_id,
            play_audio_seg=False
        )

        # حفظ إلى ملف داخل Google Drive
        rttm_path = os.path.join(output_dir, f"{file_id}.rttm")
        with open(rttm_path, "w", encoding="utf-8") as f:
            f.write("\n".join(rttm_lines))

        print(f"✅ RTTM saved to {rttm_path}")

    except Exception as e:
        print(f"❌ Error processing {sample['path']}: {e}")


✅ RTTM saved to /content/drive/MyDrive/rttm_outputs_vox/qMqPJIozZvc_214.862.rttm
🔁 Padding short segment (0.081s) to 0.5s
✅ RTTM saved to /content/drive/MyDrive/rttm_outputs_vox/S10-C03-R10_015378-016145.rttm
✅ RTTM saved to /content/drive/MyDrive/rttm_outputs_vox/I02-C03-R26_025800-026343.rttm
🔁 Padding short segment (0.060s) to 0.5s
✅ RTTM saved to /content/drive/MyDrive/rttm_outputs_vox/S33-C03-R33_047134-047965.rttm
🔁 Padding short segment (0.080s) to 0.5s
✅ RTTM saved to /content/drive/MyDrive/rttm_outputs_vox/S31-C03-R31_073080-074926.rttm
🔁 Padding short segment (0.080s) to 0.5s
🔁 Padding short segment (0.060s) to 0.5s
🔁 Padding short segment (0.060s) to 0.5s
🔁 Padding short segment (0.080s) to 0.5s
🔁 Padding short segment (0.090s) to 0.5s
✅ RTTM saved to /content/drive/MyDrive/rttm_outputs_vox/S17-C03-R17_102762-104296.rttm
🔁 Padding short segment (0.100s) to 0.5s
🔁 Padding short segment (0.040s) to 0.5s
✅ RTTM saved to /content/drive/MyDrive/rttm_outputs_vox/I01-C03-R13_099097

In [None]:
# a code to find the RTTM files missing
for sample in filtered_df.iloc[:len(filtered_df)]["audio"]:
  #we extract the path and then the ID
  path = sample["path"]
  file_id = path.split("/")[-1].replace(".wav", "")
  #we check if the RTTM file exists in drive
  if os.path.join("/content/drive/MyDrive/rttm_outputs_vox", f"{file_id}.rttm"):
    print(f"{file_id}.rttm exists")
  else:
    print(f"{file_id}.rttm does not exist")

qMqPJIozZvc_214.862.rttm exists
S10-C03-R10_015378-016145.rttm exists
I02-C03-R26_025800-026343.rttm exists
S33-C03-R33_047134-047965.rttm exists
S31-C03-R31_073080-074926.rttm exists
S17-C03-R17_102762-104296.rttm exists
I01-C03-R13_099097-101147.rttm exists
S27-C03-R27_120195-121019.rttm exists
S37-C03-R37_034007-034931.rttm exists
S21-C03-R21_105541-106131.rttm exists
S12-C03-R12_077999-078651.rttm exists
S24-C03-R24_060848-061947.rttm exists
I02-C03-R03_027523-028241.rttm exists
S25-C03-R25_019165-021569.rttm exists
S38-C03-R38_049724-050594.rttm exists
S25-C03-R25_127028-128282.rttm exists
S27-C03-R27_150531-152226.rttm exists
S17-C03-R17_100881-102687.rttm exists
1l5GgRc_PVo_1329.68.rttm exists
S27-C03-R27_018079-019184.rttm exists
I01-C03-R17_096172-097385.rttm exists
S16-C03-R16_001686-002731.rttm exists
S22-C03-R22_035601-036145.rttm exists
I01-C03-R26_093740-094748.rttm exists
S09-C03-R09_108708-109527.rttm exists
S29-C03-R29_075659-076208.rttm exists
S33-C03-R33_059024-05980

#installing praat textgrid converter

In [None]:
!pip3 install praat-textgrids


Collecting praat-textgrids
  Downloading praat_textgrids-1.4.0-py3-none-any.whl.metadata (9.7 kB)
Downloading praat_textgrids-1.4.0-py3-none-any.whl (25 kB)
Installing collected packages: praat-textgrids
Successfully installed praat-textgrids-1.4.0


#Base RTTM section.

In [None]:
'''

this a script to help convert praat textGrids to RTTM format directly

'''

#parsing textgrid files


from textgrids import TextGrid

import os

def textgrid_to_rttm(textgrid_path, tier_name='union', rttm_path=None):
    tg = TextGrid()
    tg.read(textgrid_path)

    file_id = os.path.splitext(os.path.basename(textgrid_path))[0]
    rttm_lines = []

    for interval in tg[tier_name]:
        label = interval.text.strip().lower()
        if label == 'ar':
            label = 'Arabic'
        elif label == 'en':
            label = 'English'
        else:
           label = 'non-speech'
        start = float(interval.xmin)
        duration = float(interval.xmax - interval.xmin)

        if label:  # Skip empty intervals
            line = f"SPEAKER {file_id} 1 {start:.3f} {duration:.3f} <NA> <NA> {label} <NA> <NA>"
            rttm_lines.append(line)

    # Write to file
    if rttm_path is None:
        rttm_path = f"{file_id}.rttm"

    with open(rttm_path, "w") as f:
        for line in rttm_lines:
            f.write(line + "\n")

    print(f"RTTM written to: {rttm_path}")



def process_all_textgrids(input_dir="textGrids", output_dir="rttms"):
    os.makedirs(output_dir, exist_ok=True)

    # List all TextGrid files
    textgrid_files = [f for f in os.listdir(input_dir) if f.endswith(".TextGrid")]

    # Process each TextGrid file
    for tg_file in textgrid_files:
        tg_path = os.path.join(input_dir, tg_file)
        file_id = os.path.splitext(tg_file)[0]
        output_path = os.path.join(output_dir, f"{file_id}.rttm")
        textgrid_to_rttm(textgrid_path=tg_path, rttm_path=output_path)

# process_all_textgrids("textGrids", "rttms")

In [None]:
#unzip textgrid.zip file
!unzip textGrids.zip -d textGrids

Archive:  textGrids.zip
  inflating: textGrids/-2is0rI_tow_1203_778.TextGrid  
  inflating: textGrids/Epa9CFnSqso_1042_251.TextGrid  
  inflating: textGrids/G28SocqOwOE_212_287.TextGrid  
  inflating: textGrids/I01-C03-R01_011947-013003.TextGrid  
  inflating: textGrids/I01-C03-R02_001793-002675.TextGrid  
  inflating: textGrids/I01-C03-R02_074510-075216.TextGrid  
  inflating: textGrids/I01-C03-R02_094704-095822.TextGrid  
  inflating: textGrids/I01-C03-R03_052077-053371.TextGrid  
  inflating: textGrids/I01-C03-R03_078617-079296.TextGrid  
  inflating: textGrids/I01-C03-R03_082238-083013.TextGrid  
  inflating: textGrids/I01-C03-R05_085121-086651.TextGrid  
  inflating: textGrids/I01-C03-R07_044756-045736.TextGrid  
  inflating: textGrids/I01-C03-R08_003115-003639.TextGrid  
  inflating: textGrids/I01-C03-R08_049199-049756.TextGrid  
  inflating: textGrids/I01-C03-R09_028706-029592.TextGrid  
  inflating: textGrids/I01-C03-R09_059514-060709.TextGrid  
  inflating: textGrids/I01-C03-R

In [None]:
#define some variables.

textgrid_path_gt = "textGrids"
rttm_path_gt = "rttms_gt"

#now convert the textgrids to rttm

process_all_textgrids(textgrid_path_gt, rttm_path_gt)

RTTM written to: rttms_gt/S32-C03-R32_040037-041515.rttm
RTTM written to: rttms_gt/I01-C03-R34_097895-099160.rttm
RTTM written to: rttms_gt/I01-C03-R23_042251-043098.rttm
RTTM written to: rttms_gt/S24-C03-R24_060848-061947.rttm
RTTM written to: rttms_gt/I02-C03-R32_093198-093820.rttm
RTTM written to: rttms_gt/S21-C03-R21_050049-050918.rttm
RTTM written to: rttms_gt/I02-C03-R25_092146-093001.rttm
RTTM written to: rttms_gt/I01-C03-R09_059514-060709.rttm
RTTM written to: rttms_gt/S33-C03-R33_047134-047965.rttm
RTTM written to: rttms_gt/I01-C03-R20_094184-095746.rttm
RTTM written to: rttms_gt/I01-C03-R07_044756-045736.rttm
RTTM written to: rttms_gt/S04-C03-R04_040675-041178.rttm
RTTM written to: rttms_gt/I01-C03-R02_001793-002675.rttm
RTTM written to: rttms_gt/S24-C03-R24_019873-020374.rttm
RTTM written to: rttms_gt/I01-C03-R19_092226-092870.rttm
RTTM written to: rttms_gt/I02-C03-R07_026413-027081.rttm
RTTM written to: rttms_gt/I01-C03-R02_074510-075216.rttm
RTTM written to: rttms_gt/I01-C

In [None]:
!pip install pyannote.metrics




In [None]:
import os
from io import StringIO
from pyannote.core import Annotation, Segment
from pyannote.metrics.diarization import DiarizationErrorRate

# ========== [1] Parse RTTM String into pyannote Annotation ==========
def parse_rttm_to_annotation(rttm_str):
    """
    Parses RTTM content (as string) into a pyannote.core.Annotation object.
    """
    annotation = Annotation()
    for line in StringIO(rttm_str):
        parts = line.strip().split()
        if len(parts) < 9:
            continue
        start_time = float(parts[3])
        duration = float(parts[4])
        label = parts[7]
        segment = Segment(start_time, start_time + duration)
        annotation[segment] = label
    return annotation

# ========== [2] Compute DER from RTTM Strings ==========
def compute_der_pyannote(ref_rttm_str, hyp_rttm_str):
    """
    Computes DER using pyannote.metrics from RTTM file contents as strings.
    """
    reference = parse_rttm_to_annotation(ref_rttm_str)
    hypothesis = parse_rttm_to_annotation(hyp_rttm_str)

    metric = DiarizationErrorRate()
    der_result = metric(reference, hypothesis)

    return {
        "DER": round(der_result, 4),
    }

# ========== [3] Evaluate All Matching Files in Two RTTM Folders ==========
def evaluate_rttm_folders(vox_output_dir, reference_dir):
    ders = []
    files_matched = 0

    hyp_files = os.listdir(vox_output_dir)
    ref_files = os.listdir(reference_dir)

    for filename in hyp_files:
        if filename in ref_files:
            try:
                ref_path = os.path.join(reference_dir, filename)
                hyp_path = os.path.join(vox_output_dir, filename)

                with open(ref_path, "r") as f:
                    ref_rttm_str = f.read()
                with open(hyp_path, "r") as f:
                    hyp_rttm_str = f.read()

                result = compute_der_pyannote(ref_rttm_str, hyp_rttm_str)

                print(f"{filename} → 📊 DER: {result['DER']}")
                ders.append(result["DER"])
                files_matched += 1

            except Exception as e:
                print(f"⚠️ Error processing {filename}: {e}")

    # ========== Summary ==========
    if files_matched > 0:
        average_der = sum(ders) / len(ders)
        print(f"\n✅ Average DER across {files_matched} files: {round(average_der, 4)}")
    else:
        print("❌ No matching RTTM files found.")

# ========== [4] Run Main Evaluation ==========
# Modify these paths as needed
vox_output_dir = "/content/drive/MyDrive/rttm_outputs_vox"
reference_rttm_dir = "rttms_gt"

evaluate_rttm_folders(vox_output_dir, reference_rttm_dir)




S10-C03-R10_015378-016145.rttm → 📊 DER: 0.2147
I02-C03-R26_025800-026343.rttm → 📊 DER: 0.3937
S33-C03-R33_047134-047965.rttm → 📊 DER: 0.2965
I01-C03-R13_099097-101147.rttm → 📊 DER: 0.4661
S27-C03-R27_120195-121019.rttm → 📊 DER: 0.4316
S37-C03-R37_034007-034931.rttm → 📊 DER: 0.4197
S24-C03-R24_060848-061947.rttm → 📊 DER: 0.4287
I02-C03-R03_027523-028241.rttm → 📊 DER: 0.2817
S25-C03-R25_127028-128282.rttm → 📊 DER: 0.4994
S17-C03-R17_100881-102687.rttm → 📊 DER: 0.4053
I01-C03-R17_096172-097385.rttm → 📊 DER: 0.3577
S38-C03-R38_052741-053245.rttm → 📊 DER: 0.081
I01-C03-R16_084929-085643.rttm → 📊 DER: 0.5352
S08-C03-R08_039160-039943.rttm → 📊 DER: 0.3245
S29-C03-R29_083517-084869.rttm → 📊 DER: 0.4598
I01-C03-R01_011947-013003.rttm → 📊 DER: 0.2612
S20-C03-R20_003792-004925.rttm → 📊 DER: 0.4356
I02-C03-R25_092146-093001.rttm → 📊 DER: 0.4965
S22-C03-R22_024078-025146.rttm → 📊 DER: 0.5378
I01-C03-R23_042251-043098.rttm → 📊 DER: 0.3813
I01-C03-R21_103605-104365.rttm → 📊 DER: 0.2767
I01-C03-R34_09

In [None]:
import os
from io import StringIO
from pyannote.core import Annotation, Segment
from pyannote.metrics.diarization import DiarizationErrorRate

# ========== [1] Parse RTTM to pyannote Annotation ==========
def parse_rttm_to_annotation(rttm_str):
    annotation = Annotation()
    for line in StringIO(rttm_str):
        parts = line.strip().split()
        if len(parts) < 9:
            continue
        start_time = float(parts[3])
        duration = float(parts[4])
        label = parts[7]
        segment = Segment(start_time, start_time + duration)
        annotation[segment] = label
    return annotation

# ========== [2] Compute Detailed Metrics ==========
def compute_der_metrics(ref_rttm_str, hyp_rttm_str):
    reference = parse_rttm_to_annotation(ref_rttm_str)
    hypothesis = parse_rttm_to_annotation(hyp_rttm_str)

    metric = DiarizationErrorRate()
    detail = metric(reference, hypothesis, detailed=True)  # ← هذا مهم

    return {
        "DER": round(detail['diarization error rate'], 4),
        "Missed Detection": round(detail['missed detection'], 4),
        "False Alarm": round(detail['false alarm'], 4),
        "Confusion": round(detail['confusion'], 4),
    }

# ========== [3] Evaluate RTTM Folders ==========
def evaluate_rttm_folders(vox_output_dir, reference_dir):
    ders, missed, fa, conf = [], [], [], []
    files_matched = 0

    hyp_files = os.listdir(vox_output_dir)
    ref_files = os.listdir(reference_dir)

    for filename in hyp_files:
        if filename in ref_files:
            try:
                ref_path = os.path.join(reference_dir, filename)
                hyp_path = os.path.join(vox_output_dir, filename)

                with open(ref_path, "r") as f:
                    ref_rttm_str = f.read()
                with open(hyp_path, "r") as f:
                    hyp_rttm_str = f.read()

                result = compute_der_metrics(ref_rttm_str, hyp_rttm_str)

                # print(f"\n📁 {filename}")
                # print(f"  ➤ DER: {result['DER']}")
                # print(f"  ➤ Missed Detection: {result['Missed Detection']}")
                # print(f"  ➤ False Alarm: {result['False Alarm']}")
                # print(f"  ➤ Confusion: {result['Confusion']}")

                # Aggregate
                ders.append(result["DER"])
                missed.append(result["Missed Detection"])
                fa.append(result["False Alarm"])
                conf.append(result["Confusion"])
                files_matched += 1

            except Exception as e:
                print(f"⚠️ Error processing {filename}: {e}")

    # ========== Summary ==========
    if files_matched > 0:
        print("\n📊 AVERAGE METRICS:")
        print(f"✅ Average DER: {round(sum(ders) / files_matched, 4)}")
        print(f"✅ Average Missed Detection: {round(sum(missed) / files_matched, 4)}")
        print(f"✅ Average False Alarm: {round(sum(fa) / files_matched, 4)}")
        print(f"✅ Average Confusion: {round(sum(conf) / files_matched, 4)}")
    else:
        print("❌ No matching RTTM files found.")

# ========== [4] Run Evaluation ==========
vox_output_dir = "/content/drive/MyDrive/rttm_outputs_vox"
reference_rttm_dir = "rttms_gt"

# evaluate_rttm_folders(vox_output_dir, reference_rttm_dir)





📊 AVERAGE METRICS:
✅ Average DER: 0.3785
✅ Average Missed Detection: 0.2609
✅ Average False Alarm: 0.0068
✅ Average Confusion: 3.4608


#TESTING THE CUSTOM LID

In [None]:
import numpy as np
import librosa

def extract_mfcc_from_array(
    audio_array: np.ndarray,
    sample_rate: int = 16000,
    n_mfcc: int = 40,
    n_fft: int = 512,
    hop_length: int = 256,
    max_frames: int = 200  # Output shape (200, 39)
) -> np.ndarray:

    try:
        # Compute MFCCs
        mfcc = librosa.feature.mfcc(
            y=audio_array,
            sr=sample_rate,
            n_mfcc=n_mfcc,
            n_fft=n_fft,
            hop_length=hop_length,
            win_length=n_fft,
            window='hann',
            fmin=20,
            fmax=8000
        )  # Shape: (13, T)

        # Compute delta and delta-delta
        delta = librosa.feature.delta(mfcc)
        delta2 = librosa.feature.delta(mfcc, order=2)

        # Concatenate features: (13 + 13 + 13) = (39, T)
        features = np.concatenate([mfcc, delta, delta2], axis=0)

        # Normalize (feature-wise)
        features = (features - np.mean(features, axis=1, keepdims=True)) / \
                   (np.std(features, axis=1, keepdims=True) + 1e-8)

        # Pad or truncate to 200 frames
        if features.shape[1] > max_frames:
            features = features[:, :max_frames]
        else:
            pad_width = ((0, 0), (0, max_frames - features.shape[1]))
            features = np.pad(features, pad_width, mode='constant')

        return features.T  # Shape: (200, 39)

    except Exception as e:
        print(f"❌ Error extracting MFCC features: {e}")
        return None


In [None]:
# prompt: load the language_id_model.h5 file into a model

from tensorflow import keras
custom_lid_model = keras.models.load_model('language_id_model.h5')



In [None]:
import numpy as np
from typing import List, Dict
import librosa


def predict_language_custom(model, audio_array, sample_rate=16000):
    """
    Predict language using CNN model on audio segment with robust handling of short/empty inputs.
    Returns: Dict with Arabic_prob, English_prob, predicted_label, score
    """

    def extract_mfcc_from_array(audio_array, sample_rate=16000, n_mfcc=40,
                                n_fft=512, hop_length=256, max_frames=200):
        try:
            if len(audio_array) < n_fft:
                pad_width = (0, max(0, n_fft - len(audio_array)))
                audio_array = np.pad(audio_array, pad_width, mode='constant')

            audio_array = audio_array.astype(np.float32)
            if np.max(np.abs(audio_array)) > 0:
                audio_array /= np.max(np.abs(audio_array))

            n_fft = min(n_fft, len(audio_array))
            hop_length = min(hop_length, len(audio_array) // 2)

            mfcc = librosa.feature.mfcc(
                y=audio_array, sr=sample_rate, n_mfcc=n_mfcc,
                n_fft=n_fft, hop_length=hop_length, win_length=n_fft,
                window='hann', fmin=20, fmax=8000, center=True)

            # Delta width must be odd and <= number of time frames
            time_steps = mfcc.shape[1]
            if time_steps < 3:
                raise ValueError("Too few time frames for delta calculation.")
            delta_width = min(9, time_steps if time_steps % 2 == 1 else time_steps - 1)

            delta = librosa.feature.delta(mfcc, width=delta_width)
            delta2 = librosa.feature.delta(mfcc, order=2, width=delta_width)
            features = np.concatenate([mfcc, delta, delta2], axis=0)

            features = (features - np.mean(features, axis=1, keepdims=True)) / \
                       (np.std(features, axis=1, keepdims=True) + 1e-8)

            if features.shape[1] > max_frames:
                features = features[:, :max_frames]
            else:
                pad_width = ((0, 0), (0, max_frames - features.shape[1]))
                features = np.pad(features, pad_width, mode='constant')

            return features.T

        except Exception as e:
            print(f"MFCC extraction failed: {e}")
            return None

    if audio_array.size == 0:
        print("⚠️ Empty audio input")
        return None

    segment_len = len(audio_array) / sample_rate
    if segment_len < 0.1:
        min_samples = int(0.1 * sample_rate)
        if len(audio_array) < min_samples:
            audio_array = np.pad(audio_array, (0, min_samples - len(audio_array)), mode='constant')

    features = extract_mfcc_from_array(audio_array, sample_rate)
    if features is None or features.size == 0:
        return None

    if features.shape != (200, 120):
        features = np.pad(features,
                          ((0, max(0, 200 - features.shape[0])),
                           (0, max(0, 120 - features.shape[1]))),
                          mode='constant')

    try:
        input_tensor = features.reshape(1, 200, 120, 1)
        predictions = model.predict(input_tensor, verbose=0)

        arabic_prob = float(np.maximum(0.0, predictions[0][0]))
        english_prob = float(np.maximum(0.0, predictions[0][1]))
        total = arabic_prob + english_prob

        if total > 0:
            if arabic_prob > english_prob:
                return {
                    "predicted_label": "Arabic",
                    "Arabic_prob": arabic_prob,
                    "English_prob": english_prob,
                    "score": arabic_prob / total
                }
            else:
                return {
                    "predicted_label": "English",
                    "Arabic_prob": arabic_prob,
                    "English_prob": english_prob,
                    "score": english_prob / total
                }

        return {
            "predicted_label": "Unknown",
            "Arabic_prob": 0.0,
            "English_prob": 0.0,
            "score": 0.5
        }

    except Exception as e:
        print(f"Prediction error: {e}")
        return None


def generate_rttm_custom(model, word_segments: List[Dict],
                         full_audio_array: np.ndarray,
                         sampling_rate: int,
                         file_id: str,
                         silence_threshold: float = 0.2) -> List[str]:
    """
    Generate RTTM lines using a CNN model on audio_array.
    """
    rttm_lines = []
    segments = []

    if word_segments[0]['start'] > 0:
        initial_gap = word_segments[0]['start']
        if initial_gap >= silence_threshold:
            segments.append({'start': 0.0, 'end': word_segments[0]['start'], 'lang': 'non-speech', 'score': 1.0})
        else:
            word_segments[0]['start'] = 0.0

    for i, seg in enumerate(word_segments):
        start = seg['start']
        end = seg['end']
        if i < len(word_segments) - 1:
            next_start = word_segments[i + 1]['start']
            gap = next_start - end
            if gap >= silence_threshold:
                segments.append({'start': start, 'end': end})
                segments.append({'start': end, 'end': next_start, 'lang': 'non-speech', 'score': 1.0})
            else:
                segments.append({'start': start, 'end': next_start})
        else:
            segments.append({'start': start, 'end': end})

    labeled_segments = []
    for seg in segments:
        start = seg['start']
        end = seg['end']
        start_sample = int(start * sampling_rate)
        end_sample = int(end * sampling_rate)
        audio_slice = full_audio_array[start_sample:end_sample]

        if seg.get("lang") == "non-speech":
            labeled_segments.append({"start": start, "end": end, "lang": "non-speech", "score": 1.0})
        else:
            pred = predict_language_custom(model, audio_slice, sampling_rate)
            if pred:
                labeled_segments.append({
                    "start": start,
                    "end": end,
                    "lang": pred['predicted_label'],
                    "score": pred['score']
                })

    merged = []
    last = None
    for seg in labeled_segments:
        if last is None:
            last = seg.copy()
        elif seg['lang'] == last['lang']:
            last['end'] = seg['end']
            last['score'] += seg['score']
        else:
            duration = last['end'] - last['start']
            rttm_lines.append(
                f"SPEAKER {file_id} 1 {last['start']:.3f} {duration:.3f} <NA> <NA> {last['lang']} <NA> {last['score']:.3f}")
            last = seg.copy()

    if last:
        duration = last['end'] - last['start']
        rttm_lines.append(
            f"SPEAKER {file_id} 1 {last['start']:.3f} {duration:.3f} <NA> <NA> {last['lang']} <NA> {last['score']:.3f}")

    return rttm_lines


In [None]:
# prompt: now run the above code on the l_results, and extract everything and store it in drive in a folder named "rttm_output_custom"

import os

# Create the output directory in Google Drive if it doesn't exist
output_dir_custom = "/content/drive/MyDrive/rttm_output_custom"
os.makedirs(output_dir_custom, exist_ok=True)

# Iterate through each file in l_results
for file_id, result in l_results.items():
    try:
        # Find the corresponding sample information from filtered_df
        sample_info = filtered_df[filtered_df['audio'].apply(lambda x: os.path.basename(x['path']).replace(".wav", "") == file_id)].iloc[0]['audio']
        audio_array = sample_info['array']
        sampling_rate = sample_info['sampling_rate']

        # Extract word segments using the function defined previously
        word_segments = extract_word_segments(result)

        rttm_lines = generate_rttm_custom(custom_lid_model, word_segments, audio_array, sampling_rate, file_id)

        # Save to a file in the designated Google Drive folder
        rttm_path_custom = os.path.join(output_dir_custom, f"{file_id}.rttm")
        with open(rttm_path_custom, "w", encoding="utf-8") as f:
            f.write("\n".join(rttm_lines))

        print(f"✅ Custom RTTM saved to {rttm_path_custom}")

    except Exception as e:
        print(f"❌ Error processing file {file_id}: {e}")

✅ Custom RTTM saved to /content/drive/MyDrive/rttm_output_custom/qMqPJIozZvc_214.862.rttm
✅ Custom RTTM saved to /content/drive/MyDrive/rttm_output_custom/S10-C03-R10_015378-016145.rttm
✅ Custom RTTM saved to /content/drive/MyDrive/rttm_output_custom/I02-C03-R26_025800-026343.rttm
✅ Custom RTTM saved to /content/drive/MyDrive/rttm_output_custom/S33-C03-R33_047134-047965.rttm
✅ Custom RTTM saved to /content/drive/MyDrive/rttm_output_custom/S31-C03-R31_073080-074926.rttm
✅ Custom RTTM saved to /content/drive/MyDrive/rttm_output_custom/S17-C03-R17_102762-104296.rttm
✅ Custom RTTM saved to /content/drive/MyDrive/rttm_output_custom/I01-C03-R13_099097-101147.rttm
✅ Custom RTTM saved to /content/drive/MyDrive/rttm_output_custom/S27-C03-R27_120195-121019.rttm
✅ Custom RTTM saved to /content/drive/MyDrive/rttm_output_custom/S37-C03-R37_034007-034931.rttm
✅ Custom RTTM saved to /content/drive/MyDrive/rttm_output_custom/S21-C03-R21_105541-106131.rttm
✅ Custom RTTM saved to /content/drive/MyDrive/

In [None]:
# prompt: now write a code to examine the metrics of the custom LID model just like the previous

from pyannote.metrics.diarization import DiarizationErrorRate

# Define the directories for the hypothesis (custom model) and reference (ground truth) RTTM files
custom_output_dir = "/content/drive/MyDrive/rttm_output_custom"
reference_rttm_dir = "rttms_gt"

# Use the previously defined function to evaluate the custom model's RTTM files
print("\nEvaluating Custom LID Model Metrics:")
evaluate_rttm_folders(custom_output_dir, reference_rttm_dir)


Evaluating Custom LID Model Metrics:

📁 S10-C03-R10_015378-016145.rttm
  ➤ DER: 0.524
  ➤ Missed Detection: 0.093
  ➤ False Alarm: 0.0
  ➤ Confusion: 3.926

📁 I02-C03-R26_025800-026343.rttm
  ➤ DER: 0.3788
  ➤ Missed Detection: 0.942
  ➤ False Alarm: 0.0
  ➤ Confusion: 1.115

📁 S33-C03-R33_047134-047965.rttm
  ➤ DER: 0.5723
  ➤ Missed Detection: 0.294
  ➤ False Alarm: 0.0
  ➤ Confusion: 4.462





📁 I01-C03-R13_099097-101147.rttm
  ➤ DER: 0.4866
  ➤ Missed Detection: 1.739
  ➤ False Alarm: 0.005
  ➤ Confusion: 8.231

📁 S27-C03-R27_120195-121019.rttm
  ➤ DER: 0.4034
  ➤ Missed Detection: 0.0
  ➤ False Alarm: 0.046
  ➤ Confusion: 3.278

📁 S37-C03-R37_034007-034931.rttm
  ➤ DER: 0.4003
  ➤ Missed Detection: 0.313
  ➤ False Alarm: 0.0
  ➤ Confusion: 3.386

📁 S24-C03-R24_060848-061947.rttm
  ➤ DER: 0.4813
  ➤ Missed Detection: 0.0
  ➤ False Alarm: 0.013
  ➤ Confusion: 5.277

📁 I02-C03-R03_027523-028241.rttm
  ➤ DER: 0.5178
  ➤ Missed Detection: 0.058
  ➤ False Alarm: 0.001
  ➤ Confusion: 3.658

📁 S25-C03-R25_127028-128282.rttm
  ➤ DER: 0.4956
  ➤ Missed Detection: 0.356
  ➤ False Alarm: 0.0
  ➤ Confusion: 5.859

📁 S17-C03-R17_100881-102687.rttm
  ➤ DER: 0.4693
  ➤ Missed Detection: 0.376
  ➤ False Alarm: 0.0
  ➤ Confusion: 8.1

📁 I01-C03-R17_096172-097385.rttm
  ➤ DER: 0.4195
  ➤ Missed Detection: 0.116
  ➤ False Alarm: 0.001
  ➤ Confusion: 4.971

📁 S38-C03-R38_052741-053245.rttm
  

In [None]:
# prompt: compare the results of the 2 models here

from pyannote.metrics.diarization import DiarizationErrorRate

# Define the directories for the two models' outputs and the ground truth
vox_output_dir = "/content/drive/MyDrive/rttm_outputs_vox"
custom_output_dir = "/content/drive/MyDrive/rttm_output_custom"
reference_rttm_dir = "rttms_gt"

print("Evaluating VoxLingua107 Model Metrics:")
evaluate_rttm_folders(vox_output_dir, reference_rttm_dir)

print("\nEvaluating Custom LID Model Metrics:")
evaluate_rttm_folders(custom_output_dir, reference_rttm_dir)

Evaluating VoxLingua107 Model Metrics:





📊 AVERAGE METRICS:
✅ Average DER: 0.3785
✅ Average Missed Detection: 0.2609
✅ Average False Alarm: 0.0068
✅ Average Confusion: 3.4608

Evaluating Custom LID Model Metrics:

📊 AVERAGE METRICS:
✅ Average DER: 0.4094
✅ Average Missed Detection: 0.2581
✅ Average False Alarm: 0.0068
✅ Average Confusion: 3.8127
