# 1. Data acquisition & curation for the Arabic **verses** dataset  

This block automates the download, conversion, quality filtering, and splitting of Quranic verse audio files to build a high-quality dataset for verse-level recognition.  

- **Target surahs:** Uses a controlled subset for manageability: Al-Fatiha (001, 7 ayahs), Al-Ikhlas (112, 4), Al-Falaq (113, 5), and An-Nas (114, 6).  
- **Source & reciters:** Pulls recitations from *EveryAyah.com*, iterating through a large candidate list of reciters.  
  - **Skip list:** Removes non-audio directories (e.g., `English`, `QuranText`, `MultiLanguage`).  
  - **Alias resolution:** Handles inconsistent folder names with `_` vs. spaces and known aliases.  
- **Download rules:**  
  - For each reciter × surah × ayah, attempt multiple aliases until the file is found.  
  - Retain a reciter’s set if ≥80% of ayahs are successfully downloaded.  
- **Audio normalisation:** Converts `.mp3` to `.wav` at **mono, 16 kHz, 16-bit PCM** using `ffmpeg`. Ensures consistency across reciters.  

---

# 2. Indexing & quality filtering  

- **Index build:** Parses filenames into structured metadata: `reciter`, `surah`, `ayah`, `ayah_id`, and full `wav_path`.  
- **Coverage checks:**  
  - Builds per-reciter pivot tables of surah coverage.  
  - Flags missing ayahs and computes % completion.  
- **Bitrate parsing:** Extracts kbps tags (e.g., 64 kbps, 128 kbps) from reciter names to assess recording quality.  
- **Good reciters criteria:**  
  - 100% coverage for selected surahs.  
  - Bitrate ≥64 kbps.  
- **Curated dataset:** Filters index to only include *good reciters* with complete and clear recordings.  

---

# 3. Dataset split for model training  

- **CSV export:** Writes curated index to `verses_index.csv`.  
- **Splitting:** Stratifies by `ayah_id` to preserve class balance.  
  - **Train:** 70%  
  - **Validation:** 15%  
  - **Test:** 15%  
- **Diagnostics:** Prints number of rows, unique ayahs per split, and reciter distribution in each subset.  

---

**Key parameters:** Surahs `{001, 112, 113, 114}`, min coverage `80%` per reciter, final filter = *full coverage + ≥64 kbps bitrate*, splits `70/15/15`, random seed = `42`.  

**Purpose:** Produces a clean, balanced, and reproducible **verses dataset**—standardised in sampling rate and quality—ready for feature extraction and deep learning models in Quranic recitation recognition.  


In [None]:
%pip install -q tensorflow

import os
import sys
import re
import time
import math
import csv
import random
import subprocess
from pathlib import Path
import glob

import requests
import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split

import tensorflow as tf
from tensorflow.keras import layers, Model

from google.colab import drive

In [None]:
drive.mount('/content/drive')

!ffmpeg -version >/dev/null 2>&1 || sudo apt-get -y install ffmpeg
!pip -q install requests

BASE_DIR = "/content/drive/MyDrive/QariAI/verses_data"
os.makedirs(BASE_DIR, exist_ok=True)

SURAH_AYAH_COUNTS = {
    "001": 7,
    "112": 4,
    "113": 5,
    "114": 6,
}

BASE_URL = "http://www.everyayah.com/data"
TIMEOUT  = 60
SESSION  = requests.Session()
SESSION.headers.update({"User-Agent": "ColabDownloader/1.0"})


In [None]:
ALL_RECITERS = [
    "AbdulSamad_64kbps_QuranExplorer.Com",
    "Abdul_Basit_Mujawwad_128kbps",
    "Abdul_Basit_Murattal_192kbps",
    "Abdul_Basit_Murattal_64kbps",
    "Abdullaah_3awwaad_Al-Juhaynee_128kbps",
    "Abdullah_Basfar_192kbps",
    "Abdullah_Basfar_32kbps",
    "Abdullah_Basfar_64kbps",
    "Abdullah_Matroud_128kbps",
    "Abdurrahmaan_As-Sudais_192kbps",
    "Abdurrahmaan_As-Sudais_64kbps",
    "Abu Bakr Ash-Shaatree_128kbps",
    "Abu_Bakr_Ash-Shaatree_128kbps",
    "Abu_Bakr_Ash-Shaatree_64kbps",
    "Ahmed_Neana_128kbps",
    "Ahmed_ibn_Ali_al-Ajamy_128kbps_ketaballah.net",
    "Ahmed_ibn_Ali_al-Ajamy_64kbps_QuranExplorer.Com",
    "Akram_AlAlaqimy_128kbps",
    "Alafasy_128kbps",
    "Alafasy_64kbps",
    "Ali_Hajjaj_AlSuesy_128kbps",
    "Ali_Jaber_64kbps",
    "Ayman_Sowaid_64kbps",
    "English",  # skip
    "Fares_Abbad_64kbps",
    "Ghamadi_40kbps",
    "Hani_Rifai_192kbps",
    "Hani_Rifai_64kbps",
    "Hudhaify_128kbps",
    "Hudhaify_32kbps",
    "Hudhaify_64kbps",
    "Husary_128kbps",
    "Husary_128kbps_Mujawwad",
    "Husary_64kbps",
    "Husary_Muallim_128kbps",
    "Husary_Mujawwad_64kbps",
    "Ibrahim_Akhdar_32kbps",
    "Ibrahim_Akhdar_64kbps",
    "Karim_Mansoori_40kbps",
    "Khaalid_Abdullaah_al-Qahtaanee_192kbps",
    "MaherAlMuaiqly128kbps",
    "Maher_AlMuaiqly_64kbps",
    "Menshawi_16kbps",
    "Menshawi_32kbps",
    "Minshawy_Mujawwad_192kbps",
    "Minshawy_Mujawwad_64kbps",
    "Minshawy_Murattal_128kbps",
    "Minshawy_Teacher_128kbps",
    "Mohammad_al_Tablaway_128kbps",
    "Mohammad_al_Tablaway_64kbps",
    "Muhammad_AbdulKareem_128kbps",
    "Muhammad_Ayyoub_128kbps",
    "Muhammad_Ayyoub_32kbps",
    "Muhammad_Ayyoub_64kbps",
    "Muhammad_Jibreel_128kbps",
    "Muhammad_Jibreel_64kbps",
    "Muhsin_Al_Qasim_192kbps",
    "MultiLanguage",  # skip
    "Mustafa_Ismail_48kbps",
    "Nabil_Rifa3i_48kbps",
    "Nasser_Alqatami_128kbps",
    "Parhizgar_48kbps",
    "QuranText",       # skip
    "QuranText_jpg",   # skip
    "Sahl_Yassin_128kbps",
    "Salaah_AbdulRahman_Bukhatir_128kbps",
    "Salah_Al_Budair_128kbps",
    "Saood bin Ibraaheem Ash-Shuraym_128kbps",
    "Saood_ash-Shuraym_128kbps",
    "Saood_ash-Shuraym_64kbps",
    "XML",             # skip
    "Yaser_Salamah_128kbps",
    "Yasser_Ad-Dussary_128kbps",
    "ahmed_ibn_ali_al_ajamy_128kbps",
    "aziz_alili_128kbps",
    "images_png",      # skip
    "khalefa_al_tunaiji_64kbps",
    "mahmoud_ali_al_banna_32kbps",
]

SKIP_NAMES = {"English","MultiLanguage","QuranText","QuranText_jpg","XML","images_png"}

KNOWN_ALIASES = {
    "Abu Bakr Ash-Shaatree_128kbps": ["Abu_Bakr_Ash-Shaatree_128kbps"],
    "MaherAlMuaiqly128kbps": ["Maher_AlMuaiqly_64kbps","MaherAlMuaiqly128kbps"],
    "Husary_64kbps": ["Husary_64kbps", "Husary_Mujawwad_64kbps"],
    "Husary_128kbps_Mujawwad": ["Husary_128kbps_Mujawwad","Husary_128kbps"],
}

def alias_candidates(name: str):
    cands = set()
    cands.add(name)
    cands.add(name.replace(" ", "_"))
    cands.add(name.replace("_", " "))
    cands.add(name.replace("–","-").replace("—","-"))
    if name in KNOWN_ALIASES:
        for a in KNOWN_ALIASES[name]:
            cands.add(a)
            cands.add(a.replace(" ", "_"))
            cands.add(a.replace("_", " "))
    return list(dict.fromkeys([c for c in cands if len(c.strip())>0]))


In [None]:
def try_fetch(reciter_alias: str, surah3: str, ayah3: str, out_mp3: Path) -> bool:
    url = f"{BASE_URL}/{reciter_alias}/{surah3}{ayah3}.mp3"
    try:
        with SESSION.get(url, stream=True, timeout=TIMEOUT) as r:
            if r.status_code == 200:
                with open(out_mp3, "wb") as f:
                    for chunk in r.iter_content(8192):
                        if chunk:
                            f.write(chunk)
                return True
    except requests.RequestException:
        pass
    return False

def download_one_set(reciter_key: str, surah3: str, ayah_count: int):
    out_dir = Path(BASE_DIR)/reciter_key/surah3/"ayahs"
    out_dir.mkdir(parents=True, exist_ok=True)
    ok, missing = 0, []
    aliases = alias_candidates(reciter_key)
    for a in range(1, ayah_count+1):
        ayah3 = f"{a:03d}"
        out_mp3 = out_dir / f"{surah3}{ayah3}.mp3"
        if out_mp3.exists():
            ok += 1
            continue
        got = False
        for alias in aliases:
            if try_fetch(alias, surah3, ayah3, out_mp3):
                got = True
                break
        if got:
            ok += 1
        else:
            missing.append(ayah3)
        time.sleep(0.12)
    keep = (ok / ayah_count) >= 0.8
    return ok, missing, keep, out_dir

manifest = []
for rec in ALL_RECITERS:
    if rec in SKIP_NAMES:
        continue
    for surah3, n_ayah in SURAH_AYAH_COUNTS.items():
        ok, missing, keep, path = download_one_set(rec, surah3, n_ayah)
        manifest.append((rec, surah3, ok, n_ayah, keep, str(path)))
        print(f"{rec} {surah3}: {ok}/{n_ayah} | keep={keep} | missing={missing}")


In [None]:
def mp3_to_wav16k(mp3_path: Path) -> Path:
    wav_path = mp3_path.with_suffix("").as_posix() + "_16k.wav"
    wav_path = Path(wav_path)
    if wav_path.exists():
        return wav_path
    cmd = ["ffmpeg", "-y", "-i", str(mp3_path), "-ac", "1", "-ar", "16000", "-sample_fmt", "s16", str(wav_path)]
    subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
    return wav_path

mp3s = glob.glob(os.path.join(BASE_DIR, "*", "*", "ayahs", "*.mp3"))
print("MP3 found:", len(mp3s))
ok = fail = 0
for i, p in enumerate(mp3s, 1):
    try:
        mp3_to_wav16k(Path(p)); ok += 1
    except Exception as e:
        print("Fail:", p, e, file=sys.stderr); fail += 1
    if i % 500 == 0:
        print(f"{i}/{len(mp3s)} converted...")
print("Converted OK:", ok, "Failed:", fail)


In [None]:
def build_index(base_dir: str):
    rows = []
    wavs = glob.glob(os.path.join(base_dir, "*", "*", "ayahs", "*_16k.wav"))
    pat = re.compile(rf"^{re.escape(base_dir)}/([^/]+)/(\d{{3}})/ayahs/(\d{{6}})_16k\.wav$")
    for w in wavs:
        m = pat.match(w)
        if not m:
            continue
        reciter, surah3, ayah6 = m.groups()
        surah = int(surah3)
        ayah  = int(ayah6[-3:])
        ayah_id = int(ayah6)
        rows.append({"reciter": reciter, "surah": surah, "ayah": ayah, "ayah_id": ayah_id, "wav_path": w})
    return pd.DataFrame(rows).sort_values(["surah","ayah","reciter"]).reset_index(drop=True)

df = build_index(BASE_DIR)
csv_path = os.path.join(BASE_DIR, "verses_index.csv")
df.to_csv(csv_path, index=False)
print("Wrote:", csv_path)
print("Stats -> rows:", len(df), "| reciters:", df.reciter.nunique(), "| ayahs:", df.ayah_id.nunique())
df.head()


In [None]:
BASE_DIR = "/content/drive/MyDrive/QariAI/verses_data"

mp3s = glob.glob(os.path.join(BASE_DIR, "*", "*", "ayahs", "*.mp3"))
wavs = glob.glob(os.path.join(BASE_DIR, "*", "*", "ayahs", "*_16k.wav"))

print("📦 Download summary")
print("Total reciter folders:", len([d for d in os.listdir(BASE_DIR) if os.path.isdir(os.path.join(BASE_DIR,d))]))
print("Total MP3 files found :", len(mp3s))
print("Total 16k WAVs found  :", len(wavs))

sample = wavs[:10] if wavs else mp3s[:10]
df = pd.DataFrame(sample, columns=["example_paths"])
print("\n🔎 First few files:")
print(df.to_string(index=False))


In [None]:
BASE_DIR = "/content/drive/MyDrive/QariAI/verses_data"
SURAH_AYAH_COUNTS = {"001":7, "112":4, "113":5, "114":6}

wavs = glob.glob(os.path.join(BASE_DIR, "*", "*", "ayahs", "*_16k.wav"))

def parse(path):
    m = re.search(r"/([^/]+)/(\d{3})/ayahs/(\d{6})_16k\.wav$", path)
    if not m: return None
    reciter, surah3, ayah6 = m.groups()
    return reciter, surah3, ayah6[-3:], path

rows = [parse(p) for p in wavs]
rows = [r for r in rows if r is not None]
df = pd.DataFrame(rows, columns=["reciter","surah","ayah","path"])

pivot = df.groupby(["reciter","surah"]).size().unstack(fill_value=0)
for s in SURAH_AYAH_COUNTS.keys():
    if s not in pivot.columns: pivot[s] = 0
pivot = pivot[sorted(SURAH_AYAH_COUNTS.keys())]

pivot["total"] = pivot.sum(axis=1)
pivot["expected"] = sum(SURAH_AYAH_COUNTS.values())
pivot["pct"] = (pivot["total"] / pivot["expected"]).round(3)

print("Per‑reciter counts (first 15 rows):")
display(pivot.head(15))

missing_tbl = pivot[pivot["pct"] < 1.0].sort_values("pct")
print(f"\nReciters with missing files: {len(missing_tbl)}")
display(missing_tbl.head(20))

def missing_for_reciter(rec):
    have = set(df[(df.reciter==rec)][["surah","ayah"]].apply(tuple, axis=1))
    expected = {(s, f"{i:03d}") for s,n in SURAH_AYAH_COUNTS.items() for i in range(1, n+1)}
    miss = sorted(expected - have)
    return miss

if len(missing_tbl):
    rec0 = missing_tbl.index[0]
    print(f"\nExample missing items for: {rec0}")
    print(missing_for_reciter(rec0)[:20])

print("\nGlobal totals:")
print("Found wavs:", len(df))
print("Unique reciters:", df['reciter'].nunique())
print("Expected per reciter:", sum(SURAH_AYAH_COUNTS.values()))
print("Max possible files:", df['reciter'].nunique() * sum(SURAH_AYAH_COUNTS.values()))


In [None]:
BASE_DIR = "/content/drive/MyDrive/QariAI/verses_data"
SURAH_AYAH_COUNTS = {"001":7, "112":4, "113":5, "114":6}
EXPECTED_PER_RECITER = sum(SURAH_AYAH_COUNTS.values())

assert os.path.isdir(BASE_DIR), f"Base not found: {BASE_DIR}"

wavs = glob.glob(os.path.join(BASE_DIR, "*", "*", "ayahs", "*_16k.wav"))
print("Found WAVs:", len(wavs))

def parse_meta(path: str):
    m = re.search(r"/([^/]+)/(\d{3})/ayahs/(\d{6})_16k\.wav$", path)
    if not m:
        return None
    reciter, surah3, ayah6 = m.groups()
    return {
        "reciter": reciter,
        "surah": int(surah3),
        "ayah": int(ayah6[-3:]),
        "ayah_id": int(ayah6),
        "wav_path": path
    }

rows = [parse_meta(p) for p in wavs]
rows = [r for r in rows if r is not None]
index_df = pd.DataFrame(rows).sort_values(["surah","ayah","reciter"]).reset_index(drop=True)

print("Index stats -> rows:", len(index_df),
      "| reciters:", index_df.reciter.nunique(),
      "| ayahs:", index_df.ayah_id.nunique())

pivot = index_df.groupby(["reciter","surah"]).size().unstack(fill_value=0)
for s in SURAH_AYAH_COUNTS:
    if int(s) not in pivot.columns:
        pivot[int(s)] = 0
pivot = pivot[sorted(pivot.columns)]

pivot["total"] = pivot.sum(axis=1)
pivot["expected"] = EXPECTED_PER_RECITER
pivot["pct"] = (pivot["total"] / pivot["expected"]).round(3)

print("\nPer‑reciter coverage (first 12 rows):")
display(pivot.head(12))

missing_tbl = pivot[pivot["pct"] < 1.0].sort_values("pct")
print(f"\nReciters with any missing ayahs: {len(missing_tbl)}")
display(missing_tbl.head(10))

def parse_bitrate(name: str):
    m = re.search(r"(\d+)\s*kbps", name, flags=re.IGNORECASE)
    return int(m.group(1)) if m else None

bitrate_map = {r: parse_bitrate(r) for r in index_df.reciter.unique()}
bitrate_series = pd.Series(bitrate_map, name="bitrate").sort_index()

qual = pd.DataFrame({
    "total_files": index_df.groupby("reciter").size(),
    "pct_complete": pivot["pct"],
    "bitrate": bitrate_series
}).sort_values(["pct_complete","bitrate","total_files"], ascending=[False, False, False])

print("\nQuality summary (top 15):")
display(qual.head(15))

GOOD_RECITERS = qual[(qual["pct_complete"] >= 1.0) & (qual["bitrate"].fillna(999) >= 64)].index.tolist()
print(f"\nSuggested GOOD_RECITERS (coverage=100% & bitrate>=64kbps): {len(GOOD_RECITERS)}")
print(GOOD_RECITERS[:20], "..." if len(GOOD_RECITERS) > 20 else "")

good_df = index_df[index_df.reciter.isin(GOOD_RECITERS)]
print("\nIf we keep GOOD_RECITERS only -> rows:", len(good_df),
      "| reciters:", good_df.reciter.nunique())

print("\nSample index rows:")
display(index_df.head(10))


In [None]:
assert 'index_df' in globals() and 'GOOD_RECITERS' in globals(), "Run Cell 1 first."

USE_ONLY_GOOD_RECITERS   = True
DROP_LOW_BITRATE_TAGS    = True
REQUIRE_FULL_COVERAGE    = True
OUT_CSV                  = os.path.join(BASE_DIR, "verses_index.csv")

def parse_bitrate(name: str):
    m = re.search(r"(\d+)\s*kbps", name, flags=re.IGNORECASE)
    return int(m.group(1)) if m else None

df = index_df.copy()

if USE_ONLY_GOOD_RECITERS:
    df = df[df.reciter.isin(GOOD_RECITERS)].copy()

if DROP_LOW_BITRATE_TAGS:
    df["bitrate"] = df.reciter.apply(parse_bitrate)
    df = df[(df["bitrate"].isna()) | (df["bitrate"] >= 64)].copy()
    df = df.drop(columns=["bitrate"])

if REQUIRE_FULL_COVERAGE:
    counts = df.groupby("reciter").size()
    full_reciters = counts[counts == counts.max()].index
    df = df[df.reciter.isin(full_reciters)].copy()

df = df.sort_values(["surah","ayah","reciter"]).reset_index(drop=True)

df.to_csv(OUT_CSV, index=False)

print("✅ Wrote:", OUT_CSV)
print("Rows:", len(df), "| Reciters:", df.reciter.nunique(), "| Unique ayahs:", df.ayah_id.nunique())
print("\nPer‑reciter totals (first 12):")
display(df.groupby("reciter").size().sort_values(ascending=False).head(12))

print("\nSample rows:")
display(df.head(10))


In [None]:
df = pd.read_csv(OUT_CSV)

train_df, tmp_df = train_test_split(
    df, test_size=0.30, random_state=42, stratify=df["ayah_id"]
)
val_df, test_df = train_test_split(
    tmp_df, test_size=0.50, random_state=42, stratify=tmp_df["ayah_id"]
)

print("Rows -> train:", len(train_df), "| val:", len(val_df), "| test:", len(test_df))
print("Unique ayahs in train:", train_df.ayah_id.nunique())

print("\nReciters per split (counts):")
print("Train:", train_df.reciter.nunique(),
      "Val:", val_df.reciter.nunique(),
      "Test:", test_df.reciter.nunique())

print("\nExample rows (train):")
display(train_df.head(5))


# 1. Data prep & TF pipeline for the Arabic **verses** model

This block takes the curated *verses* index (train/val/test DataFrames), converts raw 16 kHz WAVs into variable-length **log-mel spectrogram** tensors, builds efficient `tf.data` pipelines with on-the-fly augmentation, and trains a **BiLSTM + Attention** classifier over ayah IDs.

- **Assumptions & inputs:** Expects prebuilt `train_df`, `val_df`, `test_df` with columns `wav_path` and `ayah_id` (from your earlier acquisition/curation step).

- **Label mapping:**  
  - `label_space = sorted(train_df.ayah_id.unique())`.  
  - `tf.lookup.StaticHashTable` maps integer `ayah_id` → contiguous class indices (`0…NUM_CLASSES-1`).

- **Audio → log-mel features:**  
  - **I/O:** Reads mono WAV at **16 kHz**; asserts sample rate.  
  - **Duration control:** Pads/trims to **≤ 16 s** (`MAX_SECONDS=16`). During training, applies **random time crop** to a target length uniformly sampled in **[6 s, 12 s]** (data augmentation).  
  - **STFT/Mel:** `WIN=25 ms`, `HOP=10 ms`, `FFT=1024`, **64 mel bins** over **0–7.6 kHz**.  
  - **Log & norm:** `log(mel+1e-6)` then per-example standardization (z-score).  
  - **Shapes:** Time dimension varies by clip/augmentation; feature tensors are **(time, 64)**.

- **tf.data pipelines:**  
  - `make_ds(df, batch=16, shuffle, training)` →  
    map(`load_and_preprocess`) → **padded_batch** to `([None, 64], [])` → `prefetch(AUTOTUNE)`.  
  - Datasets: `ds_train` (shuffled + augmented), `ds_val`, `ds_test`.  
  - Prints one batch to confirm shapes (e.g., `(batch, time_pad, 64)` and labels `(batch,)`).

- **Model architecture (BiLSTM + Attention):**  
  - **Input:** `(None, 64)` log-mel sequence with `Masking`.  
  - **Encoder:** 2× **Bidirectional LSTM(256)** with `return_sequences=True`, `dropout=0.25`.  
  - **Attention pooling:** Custom `AttentionPool1D` learns weights over time and reduces to a single utterance embedding.  
  - **Classifier head:** Dense(256, ReLU, **L2=1e-5**) → Dropout(0.3) → Dense(128, ReLU, **L2=1e-5**) → Dropout(0.3) → Dense(**NUM_CLASSES**, softmax).

- **Compile & metrics:**  
  - Optimizer: **AdamW(lr=1e-3, weight_decay=1e-4)** with fallback to Adam.  
  - Loss: **sparse_categorical_crossentropy**.  
  - Metrics: `accuracy` and **SparseTopKCategoricalAccuracy(k=5)** (`top5`).

- **Training & checkpointing:**  
  - **EPOCHS=40** with callbacks:  
    - `ModelCheckpoint(..., monitor="val_accuracy", save_best_only=True)` → saves to `best_verses_bilstm_attn.keras`.  
    - `EarlyStopping(monitor="val_accuracy", patience=7, restore_best_weights=True)`.  
    - `ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=3, min_lr=1e-5)`.  
  - Trains on `ds_train`, validates on `ds_val`.

- **Evaluation:**  
  - Reloads the best checkpoint (registering `AttentionPool1D`) and reports a metrics dict on **`ds_test`** (includes `loss`, `accuracy`, and `top5`).

**Key hyperparameters:** `SR=16000`, `N_MELS=64`, `WIN=0.025 s`, `HOP=0.010 s`, `FFT=1024`, `FMIN=0`, `FMAX=7600`, `MAX_SECONDS=16`, random crop in **[6 s, 12 s]**, `BATCH=16`, `EPOCHS=40`, `L2=1e-5`, `Dropout=0.25/0.3`, `lr=1e-3`.

**Purpose:** End-to-end preparation and training for **verse-level** recognition with variable-length inputs—augmenting in time, pooling with attention, and reporting accuracy/top-5 on a held-out test set for robust evaluation.


In [None]:
SR = 16000
N_MELS = 64
WIN = int(0.025 * SR)
HOP = int(0.010 * SR)
FFT = 1024
FMIN, FMAX = 0.0, 7600.0

MAX_SECONDS = 16
MAX_SAMPLES = SR * MAX_SECONDS

label_space = sorted(train_df.ayah_id.unique())
NUM_CLASSES = len(label_space)
keys = tf.constant(label_space, dtype=tf.int64)
vals = tf.range(NUM_CLASSES, dtype=tf.int64)
table = tf.lookup.StaticHashTable(tf.lookup.KeyValueTensorInitializer(keys, vals), default_value=-1)

def wav_to_logmel(audio_1d):
    stft = tf.signal.stft(audio_1d, frame_length=WIN, frame_step=HOP,
                          fft_length=FFT, window_fn=tf.signal.hann_window, pad_end=True)
    mag2 = tf.abs(stft) ** 2
    num_bins = FFT // 2 + 1
    mel_w = tf.signal.linear_to_mel_weight_matrix(
        num_mel_bins=N_MELS, num_spectrogram_bins=num_bins,
        sample_rate=SR, lower_edge_hertz=FMIN, upper_edge_hertz=FMAX
    )
    mel = tf.tensordot(mag2, mel_w, 1)
    mel.set_shape(mag2.shape[:-1].concatenate(mel_w.shape[-1:]))
    logmel = tf.math.log(mel + 1e-6)
    mean, std = tf.reduce_mean(logmel), tf.math.reduce_std(logmel)
    return (logmel - mean) / (std + 1e-6)

def random_time_crop(audio, target_len):
    n = tf.shape(audio)[0]
    def crop():
        start = tf.random.uniform((), 0, n - target_len + 1, dtype=tf.int32)
        return audio[start:start+target_len]
    return tf.cond(n > target_len, crop, lambda: tf.pad(audio, [[0, target_len - n]]))

def load_and_preprocess(path, ayah_id, training=False):
    audio_bytes = tf.io.read_file(path)
    audio, sr = tf.audio.decode_wav(audio_bytes, desired_channels=1)
    audio = tf.squeeze(audio, -1)
    tf.debugging.assert_equal(sr, tf.constant(SR, dtype=sr.dtype), "SR must be 16k")
    if training:
        target = tf.random.uniform((), minval=6*SR, maxval=12*SR, dtype=tf.int32)
        audio = random_time_crop(audio, target)
    audio = tf.cond(tf.shape(audio)[0] < MAX_SAMPLES,
                    lambda: tf.pad(audio, [[0, MAX_SAMPLES - tf.shape(audio)[0]]]),
                    lambda: audio[:MAX_SAMPLES])
    feat = wav_to_logmel(audio)
    y = table.lookup(tf.cast(ayah_id, tf.int64))
    return feat, y

def make_ds(frame, batch=16, shuffle=False, training=False):
    paths = frame["wav_path"].astype(str).values
    ids   = frame["ayah_id"].astype(np.int64).values
    ds = tf.data.Dataset.from_tensor_slices((paths, ids))
    if shuffle: ds = ds.shuffle(min(len(frame), 8000), reshuffle_each_iteration=True)
    ds = ds.map(lambda p, i: load_and_preprocess(p, i, training=training),
                num_parallel_calls=tf.data.AUTOTUNE)
    ds = ds.padded_batch(batch, padded_shapes=([None, N_MELS], []), drop_remainder=False)
    return ds.prefetch(tf.data.AUTOTUNE)

BATCH = 16
ds_train = make_ds(train_df, batch=BATCH, shuffle=True,  training=True)
ds_val   = make_ds(val_df,   batch=BATCH, shuffle=False, training=False)
ds_test  = make_ds(test_df,  batch=BATCH, shuffle=False, training=False)

print("✅ datasets ready")
for ds, name in [(ds_train,"train"),(ds_val,"val"),(ds_test,"test")]:
    for feat, y in ds.take(1):
        print(f"{name} batch -> feat:", feat.shape, "labels:", y.shape)


In [None]:
class AttentionPool1D(layers.Layer):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.supports_masking = True
        self.score = layers.Dense(1)

    def call(self, x, mask=None):
        a = self.score(x)
        if mask is not None:
            mask_exp = tf.cast(mask, tf.bool)[..., None]
            neg_inf = tf.fill(tf.shape(a), tf.constant(-1e9, a.dtype))
            a = tf.where(mask_exp, a, neg_inf)
        w = tf.nn.softmax(a, axis=1)
        return tf.reduce_sum(w * x, axis=1)

    def compute_mask(self, inputs, mask=None):
        return None

inputs = layers.Input(shape=(None, N_MELS), name="logmel")
x = layers.Masking()(inputs)

x = layers.Bidirectional(layers.LSTM(256, return_sequences=True, dropout=0.25))(x)
x = layers.Bidirectional(layers.LSTM(256, return_sequences=True, dropout=0.25))(x)

z = AttentionPool1D(name="attn_pool")(x)

z = layers.Dense(256, activation="relu", kernel_regularizer=tf.keras.regularizers.l2(1e-5))(z)
z = layers.Dropout(0.3)(z)
z = layers.Dense(128, activation="relu", kernel_regularizer=tf.keras.regularizers.l2(1e-5))(z)
z = layers.Dropout(0.3)(z)
outputs = layers.Dense(NUM_CLASSES, activation="softmax")(z)

model = Model(inputs, outputs, name="Verses_BiLSTM_Attn")

try:
    opt = tf.keras.optimizers.experimental.AdamW(learning_rate=1e-3, weight_decay=1e-4)
except Exception:
    opt = tf.keras.optimizers.Adam(learning_rate=1e-3)

loss_fn = "sparse_categorical_crossentropy"
top5 = tf.keras.metrics.SparseTopKCategoricalAccuracy(k=5, name="top5")

model.compile(optimizer=opt, loss=loss_fn, metrics=["accuracy", top5])
model.summary()


In [None]:
EPOCHS = 40
CKPT   = os.path.join(BASE_DIR, "best_verses_bilstm_attn.keras")

cbs = [
    tf.keras.callbacks.ModelCheckpoint(
        CKPT, monitor="val_accuracy", save_best_only=True, verbose=1
    ),
    tf.keras.callbacks.EarlyStopping(
        monitor="val_accuracy", patience=7, restore_best_weights=True, verbose=1
    ),
    tf.keras.callbacks.ReduceLROnPlateau(
        monitor="val_loss", factor=0.5, patience=3, min_lr=1e-5, verbose=1
    ),
]

history = model.fit(
    ds_train,
    validation_data=ds_val,
    epochs=EPOCHS,
    callbacks=cbs,
)

print("Loading best checkpoint:", CKPT)
best_model = tf.keras.models.load_model(CKPT, custom_objects={"AttentionPool1D": type(model.get_layer('attn_pool'))})

print("Evaluating on TEST…")
test_metrics = best_model.evaluate(ds_test, return_dict=True)
print(test_metrics)
