# **Section 1: Environment Setup & Dependencies**
In this section, we configure the runtime environment, mount persistent storage (Google Drive), and authenticate with Hugging Face to access the required datasets.

### **1. Project Setup & Directory Initialization**
We initialize the project structure by creating the necessary subdirectories for raw data, processed features, metadata, and model checkpoints.

In [None]:
!mkdir -p /content/drive/MyDrive/hindi_dfake/{raw,processed,metadata,scripts,checkpoints}

### **2. Environment Configuration**
We mount Google Drive to ensure persistent storage and dynamically define the root paths (`ROOT_DIR`, `OUT_DIR`, etc.) to avoid hardcoding errors across different sessions.

In [None]:
from google.colab import drive

drive.mount('/content/drive', force_remount=True)

import os, glob
MOUNT_PATH = "/content/drive" if os.path.isdir("/content/drive/MyDrive") else "/content/gdrive"
print("Drive is already mounted here →", MOUNT_PATH)
ROOT_DIR = f"{MOUNT_PATH}/MyDrive/hindi_dfake"
OUT_DIR  = f"{ROOT_DIR}/raw/real_clean/ivr"
CSV_PATH = f"{ROOT_DIR}/metadata/master_real.csv"

Mounted at /content/drive
Drive is already mounted here → /content/drive


### **3. Hugging Face Authentication**
Authentication is required to download the **IndicVoices-R** and **Common Voice** datasets directly from the Hugging Face Hub.

In [None]:
from huggingface_hub import notebook_login
notebook_login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

# **Section 2: Preprocessing**
We preprocess the dataset into `base` (normalized), `strong` (augmented with RawBoost/EQ), and `clean` (EQ-only) profiles to ensure robust training and evaluation.

### **1. Metadata Audit**
We scan all available CSV files to verify that our `real` and `fake` labels are consistent before merging them into a final training set.

In [None]:
import pandas as pd, glob, os
root = "/content/drive/MyDrive/hindi_dfake/metadata"
for f in sorted(glob.glob(f"{root}/*.csv")):
    try:
        df = pd.read_csv(f, nrows=5)  # peek header fast
        if "label" in df.columns:
            df = pd.read_csv(f, usecols=["label"])
            print(f, "\n", df["label"].value_counts(dropna=False), "\n")
        else:
            print(f, "(no 'label' column) {os.path.basename(f)} — skipped\n")
    except Exception as e:
        print(f, "(read issue) {os.path.basename(f)} — {e}\n")

/content/drive/MyDrive/hindi_dfake/metadata/attacks.clean.csv 
 label
real        20672
fake        17334
NaN          5287
tts_edge     1800
Name: count, dtype: int64 

/content/drive/MyDrive/hindi_dfake/metadata/attacks.csv 
 label
real        20672
fake        17334
tts_edge     1800
Name: count, dtype: int64 

/content/drive/MyDrive/hindi_dfake/metadata/attacks.labeled.csv 
 label
real    23658
fake    21435
Name: count, dtype: int64 

/content/drive/MyDrive/hindi_dfake/metadata/master_fake.csv 
 label
fake    23731
Name: count, dtype: int64 

/content/drive/MyDrive/hindi_dfake/metadata/master_real.csv 
 label
real    25965
Name: count, dtype: int64 

/content/drive/MyDrive/hindi_dfake/metadata/proc_manifest.csv (no 'label' column) {os.path.basename(f)} — skipped



### **2. Pre-Processing Probe**
We test the FFmpeg filter chain (Silence Removal + 16kHz Resampling) on a single file to ensure validity before bulk processing.

In [None]:
# --- PROBE ONE FILE to see ffmpeg/filter errors clearly ---
import pandas as pd
from pathlib import Path
import subprocess, tempfile, soundfile as sf, numpy as np

ROOT = Path("/content/drive/MyDrive/hindi_dfake")
META = ROOT/"metadata"
REAL = pd.read_csv(META/"master_real.csv")
FAKE = pd.read_csv(META/"master_fake.csv")
df = pd.concat([REAL, FAKE], ignore_index=True)
df = df[df["path"].apply(lambda p: Path(p).exists())]
sample = df.iloc[0]["path"]
print("Sample file:", sample)

TRIM_THR_DB = -45
TRIM_DUR_S  = 0.20
GAIN_DB     = 0.0  # just to check filter wiring
PROFILE     = "base"

filt_parts = []
filt_parts.append("highpass=f=20")
# no EQ for base
# per-file gain
filt_parts.append(f"volume={GAIN_DB}dB")
# NOTE: use start_duration/stop_duration (not start_silence/stop_silence)
filt_parts.append(f"silenceremove=start_periods=1:start_duration={TRIM_DUR_S}:start_threshold={TRIM_THR_DB}dB:"
                  f"stop_periods=1:stop_duration={TRIM_DUR_S}:stop_threshold={TRIM_THR_DB}dB")
FILTER = ",".join(filt_parts)

tmp = Path(tempfile.gettempdir())/ "probe_out.wav"
cmd = [
    "ffmpeg","-nostdin","-hide_banner","-loglevel","error","-y",
    "-i", sample,
    "-ac","1","-ar","16000","-af", FILTER, "-sample_fmt","s16",
    str(tmp)
]
print("Filter:", FILTER)
p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
print("Return code:", p.returncode)
if p.returncode != 0:
    print("----- STDERR -----\n", p.stderr[:1200])
else:
    try:
        import soundfile as sf, numpy as np
        x, sr = sf.read(str(tmp), dtype="float32")
        print("OK. Out shape:", x.shape, "sr:", sr, "dur(s):", len(x)/sr)
    except Exception as e:
        print("Wrote file but could not read back:", e)

Sample file: /content/drive/MyDrive/hindi_dfake/raw/real_clean/ivr/1a0b61275bc58baf.wav
Filter: highpass=f=20,volume=0.0dB,silenceremove=start_periods=1:start_duration=0.2:start_threshold=-45dB:stop_periods=1:stop_duration=0.2:stop_threshold=-45dB
Return code: 0
OK. Out shape: (129127,) sr: 16000 dur(s): 8.0704375


### **3. Profile: Base**
We normalize the dataset using the `base` profile (20Hz high-pass, -26dB volume, silence removal) to produce standardized 16kHz mono audio.

In [None]:
# ========== PREPROCESSING (full run; resumable; same-FS temp) ==========
# Pipeline: DC high-pass -> (optional artifact EQ) -> per-file volume -> light trim
# Writes: /content/drive/MyDrive/hindi_dfake/processed/wav/<profile>/...
# Logs  : /content/drive/MyDrive/hindi_dfake/metadata/proc_manifest.csv

import os, csv, time, uuid, subprocess, hashlib
from pathlib import Path
import pandas as pd, numpy as np, soundfile as sf
from concurrent.futures import ThreadPoolExecutor, as_completed

# ---------------- CONFIG ----------------
ROOT = Path("/content/drive/MyDrive/hindi_dfake")
META = ROOT / "metadata"
OUT_ROOT = ROOT / "processed" / "wav"

PROFILE = "base"           # "base", "research_eq", "research_rb", "strong"
INCLUDE_ATTACKS = True     # include attacks.labeled.csv
FRACTION_REAL = 1.00       # process EVERYTHING
FRACTION_FAKE = 1.00
HASH_SEED = 2025

TARGET_DB   = -26.0        # target RMS (dBFS-ish)
TRIM_THR_DB = -45          # silence threshold for trim
TRIM_DUR_S  = 0.20         # trim head/tail if >= this length

MAX_WORKERS = 6            # bump if stable
MANIFEST = META / "proc_manifest.csv"
BATCH_WRITE = 1000

REAL_CSV  = META / "master_real.csv"
FAKE_CSV  = META / "master_fake.csv"
ATT_CSV   = META / "attacks.labeled.csv"   # cleaned labels
assert REAL_CSV.exists() and FAKE_CSV.exists(), "Missing master_real.csv/master_fake.csv"

def log(s): print(s, flush=True)

def read_csv_safe(path: Path) -> pd.DataFrame:
    df = pd.read_csv(path)
    for c in ["utt_id","path","duration","label","fake_type","source"]:
        if c not in df.columns:
            df[c] = "" if c != "duration" else 0.0
    return df

def stable_frac_mask(series: pd.Series, frac: float, seed: int) -> pd.Series:
    if frac >= 1.0: return pd.Series([True]*len(series), index=series.index)
    s = series.astype(str).apply(lambda p: int(hashlib.sha1((p+str(seed)).encode()).hexdigest()[:8], 16) / 0xFFFFFFFF)
    return s < frac

def out_path_for(in_path: str) -> Path:
    p = Path(in_path)
    try:    rel = p.relative_to(ROOT)
    except: rel = Path("_external") / p.name
    return OUT_ROOT / PROFILE / rel

def ensure_parent(p: Path): p.parent.mkdir(parents=True, exist_ok=True)

def rms_dbfs(p: Path) -> float:
    try:
        x, sr = sf.read(str(p), dtype="float32", always_2d=False)
        if x.ndim > 1: x = x.mean(axis=1)
        if len(x) == 0: return -120.0
        rms = float(np.sqrt(np.mean(np.square(x))))
        if rms <= 1e-9: return -120.0
        return 20.0*np.log10(np.clip(rms, 1e-9, 1.0))
    except Exception:
        return -120.0

def build_filter_chain(profile: str, gain_db: float) -> str:
    parts = ["highpass=f=20"]
    if profile in ("research_eq","strong"):
        parts += [
            "equalizer=f=3000:t=q:w=1.0:g=2.5",
            "equalizer=f=4800:t=q:w=0.9:g=2.0",
            "treble=g=1.0:f=6000:t=h:w=0.7"
        ]
    parts.append(f"volume={gain_db}dB")
    # portable args: start_duration/stop_duration
    parts.append(
        f"silenceremove=start_periods=1:start_duration={TRIM_DUR_S}:start_threshold={TRIM_THR_DB}dB:"
        f"stop_periods=1:stop_duration={TRIM_DUR_S}:stop_threshold={TRIM_THR_DB}dB"
    )
    return ",".join(parts)

def duration_s(p: Path) -> float:
    try:
        x, sr = sf.read(str(p), dtype="float32", always_2d=False)
        return len(x)/float(sr) if sr else 0.0
    except Exception:
        return 0.0

# -------- Load inputs --------
dfR = read_csv_safe(REAL_CSV)
dfF = read_csv_safe(FAKE_CSV)
dfs = [dfR, dfF]
if INCLUDE_ATTACKS and ATT_CSV.exists(): dfs.append(read_csv_safe(ATT_CSV))
df_all = pd.concat(dfs, ignore_index=True)

# keep only files that exist
df_all = df_all[df_all["path"].astype(str).apply(lambda p: Path(p).exists())].copy()

# normalize labels -> only {'real','fake'}
def normalize_label(lab, fake_type, src, path):
    l = str(lab).strip().lower()
    if l in {"real","fake"}: return l
    ft = str(fake_type).strip().lower()
    s  = str(src).strip().lower()
    p  = str(path)
    if "/raw/real_clean" in p or s.startswith("real_"): return "real"
    if ft in {"tts_edge","channel_attack","vc"} or "/raw/fake_" in p or s.startswith("fake_"): return "fake"
    return "fake"
df_all["label"] = [normalize_label(l, ft, s, p) for l, ft, s, p in
                   zip(df_all["label"], df_all["fake_type"], df_all["source"], df_all["path"])]

print("Input counts (normalized):")
print(df_all.groupby(df_all["label"].astype(str).str.lower()).size().to_string())

# deterministic per-label subset (here: everything)
parts = []
for lab, g in df_all.groupby(df_all["label"].astype(str).str.lower()):
    frac = FRACTION_REAL if lab == "real" else FRACTION_FAKE
    parts.append(g if frac>=1.0 else g[stable_frac_mask(g["path"], frac, HASH_SEED)])
df_in = pd.concat(parts, ignore_index=True) if parts else df_all.copy()

# -------- Manifest (resume) --------
MANIFEST.parent.mkdir(parents=True, exist_ok=True)
if MANIFEST.exists():
    mf = pd.read_csv(MANIFEST)
    done_set = set(mf.loc[mf["profile"]==PROFILE, "path_out"].astype(str))
else:
    with open(MANIFEST, "w", newline="", encoding="utf-8") as f:
        csv.writer(f).writerow(["utt_id","path_in","path_out","profile","duration","rms_db","peak_db"])
    done_set = set()

def already_done(path_in: str) -> bool:
    outp = out_path_for(path_in)
    return (str(outp) in done_set) or outp.exists()

# -------- Worker --------
def preprocess_one(row):
    pin = str(row["path"])
    pout = out_path_for(pin)
    if str(pout) in done_set or pout.exists():
        return None

    ensure_parent(pout)

    cur_db  = rms_dbfs(Path(pin))
    gain_db = float(np.clip(TARGET_DB - cur_db, -20.0, 20.0))
    FILTER  = build_filter_chain(PROFILE, gain_db)

    # temp in SAME dir to avoid cross-device link
    tmp = pout.parent / f".{pout.stem}.tmp-{uuid.uuid4().hex}.wav"
    try:
        cmd = [
            "ffmpeg","-nostdin","-hide_banner","-loglevel","error","-y",
            "-i", pin, "-ac","1","-ar","16000","-af", FILTER, "-sample_fmt","s16",
            str(tmp)
        ]
        p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        if p.returncode != 0:
            try: tmp.unlink(missing_ok=True)
            except: pass
            return {"error": p.stderr.decode("utf-8","ignore")[:300], "path_in": pin}

        os.replace(str(tmp), str(pout))  # atomic within same FS

        # quick stats
        try:
            x, sr = sf.read(str(pout), dtype="float32", always_2d=False)
            if x.ndim > 1: x = x.mean(axis=1)
            dur = len(x)/float(sr) if sr else 0.0
            peak = float(np.max(np.abs(x))) if len(x) else 0.0
            rms  = float(np.sqrt(np.mean(np.square(x)))) if len(x) else 0.0
            rms_db = -120.0 if rms<=1e-9 else 20*np.log10(np.clip(rms, 1e-9, 1.0))
            peak_db = -120.0 if peak<=1e-9 else 20*np.log10(np.clip(peak, 1e-9, 1.0))
        except Exception:
            dur, rms_db, peak_db = 0.0, -120.0, -120.0

        return {"utt_id": row.get("utt_id",""), "path_in": pin, "path_out": str(pout),
                "profile": PROFILE, "duration": round(dur,3),
                "rms_db": round(rms_db,2), "peak_db": round(peak_db,2)}
    except Exception as e:
        try: tmp.unlink(missing_ok=True)
        except: pass
        return {"error": str(e), "path_in": pin}

# -------- Run --------
planned = len(df_in)
already = sum(1 for _ , r in df_in.iterrows() if already_done(str(r["path"])))
todo = planned - already
log(f"\n[preproc:{PROFILE}] PLANNED={planned:,} | ALREADY_DONE={already:,} | TO_DO={todo:,}")

created = 0; errs = 0
buf, errbuf = [], []
t0 = time.time()

with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
    futs = [ex.submit(preprocess_one, r) for _, r in df_in.iterrows() if not already_done(str(r["path"]))]
    done_ct = 0
    for f in as_completed(futs):
        done_ct += 1
        res = f.result()
        if res is None:
            pass
        elif isinstance(res, dict) and "error" in res:
            errs += 1
            if len(errbuf) < 5: errbuf.append(res)  # capture first few errors for inspection
        else:
            buf.append(res)
            created += 1

        if done_ct % 500 == 0 or done_ct == todo:
            print(f"[{PROFILE}] {done_ct:,}/{todo:,}  (+{created} ok, {errs} err)  | elapsed {time.time()-t0:.1f}s", flush=True)

        if len(buf) >= BATCH_WRITE or (done_ct == todo and buf):
            with open(MANIFEST, "a", newline="", encoding="utf-8") as fcsv:
                w = csv.DictWriter(fcsv, fieldnames=["utt_id","path_in","path_out","profile","duration","rms_db","peak_db"])
                for row in buf: w.writerow(row)
            buf.clear()

print(f"[{PROFILE}] FINISHED: +{created} files, {errs} errors | total elapsed {time.time()-t0:.1f}s")
if errbuf:
    print("\n--- First few errors (trimmed) ---")
    for e in errbuf:
        print(e["path_in"], "->", e["error"])

# -------- Summary --------
def hours_from_manifest(profile: str):
    try:
        m = pd.read_csv(MANIFEST)
        m = m[m["profile"]==profile]
        return float(pd.to_numeric(m["duration"], errors="coerce").fillna(0).sum()/3600.0), len(m)
    except Exception:
        return 0.0, 0

h, n = hours_from_manifest(PROFILE)
print(f"[{PROFILE}] Manifest rows: {n:,} | hours: {h:.2f}h")

Input counts (normalized):
label
fake    45166
real    49623

[preproc:base] PLANNED=94,789 | ALREADY_DONE=20,774 | TO_DO=74,015
[base] 500/74,015  (+500 ok, 0 err)  | elapsed 116.6s
[base] 1,000/74,015  (+1000 ok, 0 err)  | elapsed 158.5s
[base] 1,500/74,015  (+1500 ok, 0 err)  | elapsed 222.7s
[base] 2,000/74,015  (+2000 ok, 0 err)  | elapsed 290.0s
[base] 2,500/74,015  (+2500 ok, 0 err)  | elapsed 358.5s
[base] 3,000/74,015  (+3000 ok, 0 err)  | elapsed 425.7s
[base] 3,500/74,015  (+3500 ok, 0 err)  | elapsed 489.8s
[base] 4,000/74,015  (+4000 ok, 0 err)  | elapsed 555.2s
[base] 4,500/74,015  (+4500 ok, 0 err)  | elapsed 618.7s
[base] 5,000/74,015  (+5000 ok, 0 err)  | elapsed 682.8s
[base] 5,500/74,015  (+5500 ok, 0 err)  | elapsed 746.5s
[base] 6,000/74,015  (+6000 ok, 0 err)  | elapsed 811.0s
[base] 6,500/74,015  (+6500 ok, 0 err)  | elapsed 875.9s
[base] 7,000/74,015  (+7000 ok, 0 err)  | elapsed 939.7s
[base] 7,500/74,015  (+7500 ok, 0 err)  | elapsed 1003.9s
[base] 8,000/74,01

### **4. Base Profile Audit**
We verify that every processed file on disk is correctly indexed in `proc_manifest.csv`, backfilling any missing entries to ensure the dataset is complete.

In [None]:
# ================= FAST PREPROC AUDIT + REPAIR (no full disk-hours scan) =================
from pathlib import Path
import pandas as pd, csv
import soundfile as sf

ROOT      = Path("/content/drive/MyDrive/hindi_dfake")
META      = ROOT / "metadata"
OUT_ROOT  = ROOT / "processed" / "wav"
MANIFEST  = META / "proc_manifest.csv"
PROFILE   = "base"   # change/add more profiles later if needed

def read_csv_safe(path: Path) -> pd.DataFrame:
    df = pd.read_csv(path)
    for c in ["utt_id","path","duration","label","fake_type","source"]:
        if c not in df.columns:
            df[c] = "" if c != "duration" else 0.0
    return df

def out_path_for(profile: str, in_path: str) -> Path:
    p = Path(in_path)
    try:    rel = p.relative_to(ROOT)
    except: rel = Path("_external") / p.name
    return OUT_ROOT / profile / rel

def in_path_from_out(profile: str, out_path: Path) -> Path:
    base = OUT_ROOT / profile
    try:
        rel = out_path.relative_to(base)
        return ROOT / rel
    except Exception:
        return ROOT / "_unknown_source" / out_path.name

def fast_duration(path: Path) -> float:
    try:
        info = sf.info(str(path))
        if info.samplerate and info.frames:
            return float(info.frames) / float(info.samplerate)
    except Exception:
        pass
    return 0.0

# Universe of inputs that actually exist
dfs = [read_csv_safe(META/"master_real.csv"), read_csv_safe(META/"master_fake.csv")]
att = META/"attacks.labeled.csv"
if att.exists():
    dfs.append(read_csv_safe(att))
df_all = pd.concat(dfs, ignore_index=True)
df_all = df_all[df_all["path"].astype(str).apply(lambda p: Path(p).exists())].copy()

# Build lookup from input path -> row (for backfill metadata)
in_index = df_all.set_index("path").to_dict("index")

# Manifest slice
if MANIFEST.exists():
    mf = pd.read_csv(MANIFEST)
else:
    MANIFEST.parent.mkdir(parents=True, exist_ok=True)
    with open(MANIFEST, "w", newline="", encoding="utf-8") as f:
        csv.writer(f).writerow(["utt_id","path_in","path_out","profile","duration","rms_db","peak_db"])
    mf = pd.read_csv(MANIFEST)

mf_p = mf[mf["profile"] == PROFILE].copy()
set_mani = set(mf_p["path_out"].astype(str))

# On-disk outputs for this profile
disk_paths = list((OUT_ROOT / PROFILE).rglob("*.wav"))
set_disk = set(str(p) for p in disk_paths)

print(f"[counts] profile={PROFILE} | outputs_on_disk={len(set_disk):,} | manifest_rows={len(mf_p):,}")

missing_in_manifest = sorted(set_disk - set_mani)
print(f"[diff] on-disk but not in manifest: {len(missing_in_manifest):,}")

# ----- Backfill only the missing ones (fast) -----
if missing_in_manifest:
    rows = []
    for out_str in missing_in_manifest:
        outp = Path(out_str)
        in_p = in_path_from_out(PROFILE, outp)
        base = in_index.get(str(in_p), {})  # may be {}
        dur = fast_duration(outp)

        rows.append({
            "utt_id": base.get("utt_id",""),
            "path_in": str(in_p),
            "path_out": out_str,
            "profile": PROFILE,
            "duration": round(dur,3),
            "rms_db": "",   # unknown here (optional to compute later)
            "peak_db": "",  # unknown here
        })

    with open(MANIFEST, "a", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=["utt_id","path_in","path_out","profile","duration","rms_db","peak_db"])
        for r in rows: w.writerow(r)
    print(f"[fix] backfilled {len(rows):,} manifest rows.")

    # reload
    mf = pd.read_csv(MANIFEST)
    mf_p = mf[mf["profile"] == PROFILE].copy()

# ----- Dedup manifest by (profile, path_out) -----
before = len(mf)
mf = mf.drop_duplicates(subset=["profile","path_out"], keep="first").reset_index(drop=True)
after = len(mf)
if after != before:
    mf.to_csv(MANIFEST, index=False)
    print(f"[fix] dedup manifest: -{before-after} rows")

# Final counts
mf_p = mf[mf["profile"] == PROFILE].copy()
print(f"[post] manifest_rows={len(mf_p):,}")
print("[ok] FAST audit/repair done.")

[counts] profile=base | outputs_on_disk=94,789 | manifest_rows=94,789
[diff] on-disk but not in manifest: 0
[post] manifest_rows=94,789
[ok] FAST audit/repair done.


### **5. Profile: Strong**
We generate a robust training set by applying RawBoost (noise, reverb) and Artifact EQ (3-6kHz boost) to all files.

In [None]:
# ================= PREPROCESSING — STRONG (EQ + RawBoost v3) =================
# Writes: /content/drive/MyDrive/hindi_dfake/processed/wav/strong/...
# Logs to: /content/drive/MyDrive/hindi_dfake/metadata/proc_manifest.csv (profile=strong)

import os, csv, uuid, time, hashlib, subprocess
from pathlib import Path
import pandas as pd, numpy as np, soundfile as sf
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm.auto import tqdm

# ------------ CONFIG ------------
ROOT = Path("/content/drive/MyDrive/hindi_dfake")
META = ROOT / "metadata"
OUT_ROOT = ROOT / "processed" / "wav"

PROFILE = "strong"          # strong = base + artifact EQ + RawBoost v3
INCLUDE_ATTACKS = True
FRACTION_REAL = 1.00        # process all
FRACTION_FAKE = 1.00
HASH_SEED = 2025

# Loudness & trim
TARGET_DB   = -26.0
TRIM_THR_DB = -45
TRIM_DUR_S  = 0.20

# Concurrency / batching
MAX_WORKERS   = 6           # 6–8 is reasonable on Colab CPU
INFLIGHT_MULT = 3
BATCH_WRITE   = 500

# RawBoost v3–style knobs
RB_ENABLE           = True
RB_BANDPASS_HZ      = (2500, 6000)
RB_SNR_DB           = 25.0
RB_NOISE_PROB       = 1.0
RB_IMPULSE_PROB     = 0.5
RB_IMPULSES_PER_SEC = 1.0
RB_IMPULSE_GAIN     = 0.08
RB_REVERB_PROB      = 0.6
RB_REVERB_T_SEC     = 0.03
RB_REVERB_DECAY     = 0.35
RB_REVERB_WET       = 0.18

REAL_CSV  = META / "master_real.csv"
FAKE_CSV  = META / "master_fake.csv"
ATT_CSV   = META / "attacks.labeled.csv"
MANIFEST  = META / "proc_manifest.csv"
assert REAL_CSV.exists() and FAKE_CSV.exists()

# ------------ helpers ------------
def read_csv_safe(path: Path) -> pd.DataFrame:
    df = pd.read_csv(path)
    for c in ["utt_id","path","duration","label","fake_type","source"]:
        if c not in df.columns:
            df[c] = "" if c != "duration" else 0.0
    return df

def normalize_label(lab, fake_type, src, path):
    l = str(lab).strip().lower()
    if l in {"real","fake"}: return l
    ft = str(fake_type).strip().lower()
    s  = str(src).strip().lower()
    p  = str(path)
    if "/raw/real_clean" in p or s.startswith("real_"): return "real"
    if ft in {"tts_edge","channel_attack","vc"} or "/raw/fake_" in p or s.startswith("fake_"): return "fake"
    return "fake"

def stable_frac_mask(series: pd.Series, frac: float, seed: int) -> pd.Series:
    if frac >= 1.0: return pd.Series([True]*len(series), index=series.index)
    s = series.astype(str).apply(lambda p: int(hashlib.sha1((p+str(seed)).encode()).hexdigest()[:8], 16) / 0xFFFFFFFF)
    return s < frac

def out_path_for(in_path: str) -> Path:
    p = Path(in_path)
    try:    rel = p.relative_to(ROOT)
    except: rel = Path("_external") / p.name
    return OUT_ROOT / PROFILE / rel

def ensure_parent(p: Path): p.parent.mkdir(parents=True, exist_ok=True)

def rms_dbfs_arr(x: np.ndarray) -> float:
    if x.ndim > 1: x = x.mean(axis=1)
    if len(x) == 0: return -120.0
    rms = float(np.sqrt(np.mean(np.square(x))))
    if rms <= 1e-9: return -120.0
    return 20.0*np.log10(np.clip(rms, 1e-9, 1.0))

# ---- RawBoost v3–style (deterministic per file) ----
def _rng_from_key(key: str):
    seed = int(hashlib.sha1(key.encode("utf-8")).hexdigest()[:8], 16)
    return np.random.default_rng(seed)

def _add_colored_noise(x, sr, rng, band=(2500,6000), snr_db=25.0):
    n = len(x)
    if n == 0: return x
    wn = rng.standard_normal(n).astype(np.float32)
    X = np.fft.rfft(wn); freqs = np.fft.rfftfreq(n, d=1.0/sr)
    mask = (freqs >= band[0]) & (freqs <= band[1]); X[~mask] = 0.0
    noise = np.fft.irfft(X, n=n).astype(np.float32)
    sig_rms = np.sqrt(np.mean(np.square(x))) + 1e-9
    target_noise_rms = sig_rms / (10.0**(snr_db/20.0))
    cur_rms = np.sqrt(np.mean(np.square(noise))) + 1e-12
    noise *= (target_noise_rms / cur_rms)
    return np.clip(x + noise, -1.0, 1.0)

def _add_impulses(x, sr, rng, per_sec=1.0, gain=0.08):
    n = len(x)
    if n == 0: return x
    dur = n/float(sr); k = max(1, int(per_sec * dur))
    idx = rng.integers(0, n, size=k)
    amp = gain * (np.sqrt(np.mean(np.square(x))) + 1e-9)
    y = x.copy()
    y[idx] = np.clip(y[idx] + amp * rng.choice([-1.0, 1.0], size=k), -1.0, 1.0)
    return y

def _add_small_reverb(x, sr, rng, t_sec=0.03, decay=0.35, wet=0.18):
    n = len(x)
    if n == 0: return x
    ir_len = max(8, int(t_sec * sr))
    t = np.arange(ir_len, dtype=np.float32)
    ir = np.exp(-decay * t / ir_len).astype(np.float32)
    for _ in range(3):
        pos = int(rng.integers(0, ir_len))
        ir[pos] += float(rng.uniform(0.1, 0.3))
    ir /= (np.sum(np.abs(ir)) + 1e-9)
    y = np.convolve(x, ir, mode="full")[:n].astype(np.float32)
    return np.clip((1.0 - wet) * x + wet * y, -1.0, 1.0)

def rawboost_v3(x: np.ndarray, sr: int, key: str) -> np.ndarray:
    if x.ndim > 1: x = x.mean(axis=1)
    x = np.clip(x, -1.0, 1.0).astype(np.float32)
    if not RB_ENABLE: return x
    rng = _rng_from_key(key)
    if rng.uniform() < RB_NOISE_PROB:
        x = _add_colored_noise(x, sr, rng, RB_BANDPASS_HZ, RB_SNR_DB)
    if rng.uniform() < RB_IMPULSE_PROB:
        x = _add_impulses(x, sr, rng, RB_IMPULSES_PER_SEC, RB_IMPULSE_GAIN)
    if rng.uniform() < RB_REVERB_PROB:
        x = _add_small_reverb(x, sr, rng, RB_REVERB_T_SEC, RB_REVERB_DECAY, RB_REVERB_WET)
    return x

def build_filter_chain_strong(gain_db: float) -> str:
    return ",".join([
        "highpass=f=20",
        "equalizer=f=3000:t=q:w=1.0:g=2.5",
        "equalizer=f=4800:t=q:w=0.9:g=2.0",
        "treble=g=1.0:f=6000:t=h:w=0.7",
        f"volume={gain_db}dB",
        f"silenceremove=start_periods=1:start_duration={TRIM_DUR_S}:start_threshold={TRIM_THR_DB}dB:stop_periods=1:stop_duration={TRIM_DUR_S}:stop_threshold={TRIM_THR_DB}dB"
    ])

# ------------ load inputs ------------
dfR = read_csv_safe(REAL_CSV)
dfF = read_csv_safe(FAKE_CSV)
dfs = [dfR, dfF]
if INCLUDE_ATTACKS and ATT_CSV.exists():
    dfs.append(read_csv_safe(ATT_CSV))
df_all = pd.concat(dfs, ignore_index=True)
df_all = df_all[df_all["path"].astype(str).apply(lambda p: Path(p).exists())].copy()
df_all["label"] = [normalize_label(l, ft, s, p) for l, ft, s, p in
                   zip(df_all["label"], df_all["fake_type"], df_all["source"], df_all["path"])]

# stratified deterministic subset (here: all)
parts = []
for lab, g in df_all.groupby(df_all["label"].astype(str).str.lower()):
    frac = FRACTION_REAL if lab == "real" else FRACTION_FAKE
    parts.append(g if frac>=1.0 else g[stable_frac_mask(g["path"], frac, HASH_SEED)])
df_in = pd.concat(parts, ignore_index=True) if parts else df_all.copy()

# ------------ manifest skip logic ------------
MANIFEST.parent.mkdir(parents=True, exist_ok=True)
if MANIFEST.exists():
    mf = pd.read_csv(MANIFEST, usecols=["profile","path_out"])
    done_set = set(mf.loc[mf["profile"]==PROFILE, "path_out"].astype(str))
else:
    with open(MANIFEST, "w", newline="", encoding="utf-8") as f:
        csv.writer(f).writerow(["utt_id","path_in","path_out","profile","duration","rms_db","peak_db"])
    done_set = set()

def already_done(pin: str) -> bool:
    pout = out_path_for(pin)
    return pout.exists() or (str(pout) in done_set)

todo_rows = [r for _, r in df_in.iterrows() if not already_done(str(r["path"]))]
planned, todo = len(df_in), len(todo_rows)
print(f"Input counts (normalized):")
print(df_all.groupby(df_all['label']).size().to_string())
print(f"\n[preproc:{PROFILE}] PLANNED={planned:,} | ALREADY_DONE={planned-todo:,} | TO_DO={todo:,}")

# ------------ worker ------------
def preprocess_one(row):
    pin = str(row["path"])
    pout = out_path_for(pin)
    ensure_parent(pout)

    # read (and resample if needed)
    try:
        x, sr = sf.read(pin, dtype="float32", always_2d=False)
    except Exception as e:
        return {"error": f"read_fail: {e}", "path_in": pin}
    if x.ndim > 1: x = x.mean(axis=1)
    if sr != 16000:
        tmp = pout.parent / f".{pout.stem}.in-{uuid.uuid4().hex}.wav"
        cmd = ["ffmpeg","-nostdin","-hide_banner","-loglevel","error","-y",
               "-i", pin, "-ac","1","-ar","16000","-sample_fmt","s16", str(tmp)]
        p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        if p.returncode != 0:
            try: tmp.unlink(missing_ok=True)
            except: pass
            return {"error": "resample_fail", "path_in": pin}
        x, sr = sf.read(str(tmp), dtype="float32", always_2d=False)
        try: tmp.unlink(missing_ok=True)
        except: pass

    x = np.clip(x, -1.0, 1.0).astype(np.float32)

    # RawBoost
    key = f"{pin}|{PROFILE}"
    try:    y = rawboost_v3(x, sr, key)
    except: y = x

    # write RB temp near target
    tmp_rb = pout.parent / f".{pout.stem}.rb-{uuid.uuid4().hex}.wav"
    sf.write(str(tmp_rb), y, 16000, subtype="PCM_16")

    # EQ + loudness + trim via ffmpeg
    gain_db = float(np.clip(TARGET_DB - rms_dbfs_arr(y), -20.0, 20.0))
    FILTER  = build_filter_chain_strong(gain_db)

    tmp_out = pout.parent / f".{pout.stem}.tmp-{uuid.uuid4().hex}.wav"
    cmd = ["ffmpeg","-nostdin","-hide_banner","-loglevel","error","-y",
           "-i", str(tmp_rb), "-ac","1","-ar","16000",
           "-af", FILTER, "-sample_fmt","s16", str(tmp_out)]
    p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    try: tmp_rb.unlink(missing_ok=True)
    except: pass
    if p.returncode != 0:
        try: tmp_out.unlink(missing_ok=True)
        except: pass
        return {"error": p.stderr.decode("utf-8","ignore")[:300], "path_in": pin}

    os.replace(str(tmp_out), str(pout))

    # stats
    try:
        z, sr2 = sf.read(str(pout), dtype="float32", always_2d=False)
        if z.ndim > 1: z = z.mean(axis=1)
        dur = len(z)/float(sr2) if sr2 else 0.0
        peak = float(np.max(np.abs(z))) if len(z) else 0.0
        rms  = float(np.sqrt(np.mean(np.square(z)))) if len(z) else 0.0
        rms_db = -120.0 if rms<=1e-9 else 20*np.log10(np.clip(rms,1e-9,1.0))
        peak_db = -120.0 if peak<=1e-9 else 20*np.log10(np.clip(peak,1e-9,1.0))
    except Exception:
        dur, rms_db, peak_db = 0.0, -120.0, -120.0

    return {"utt_id": row.get("utt_id",""), "path_in": pin, "path_out": str(pout),
            "profile": PROFILE, "duration": round(dur,3),
            "rms_db": round(rms_db,2), "peak_db": round(peak_db,2)}

# ------------ streaming scheduler with progress ------------
created = 0; errs = 0; buf = []
start = time.time()
inflight_cap = max(MAX_WORKERS * INFLIGHT_MULT, MAX_WORKERS)
pbar = tqdm(total=len(todo_rows), desc=f"preproc:{PROFILE}", unit="file")

def flush_manifest(rows):
    if not rows: return
    with open(MANIFEST, "a", newline="", encoding="utf-8") as fcsv:
        w = csv.DictWriter(fcsv, fieldnames=["utt_id","path_in","path_out","profile","duration","rms_db","peak_db"])
        for r in rows: w.writerow(r)

it = iter(todo_rows)
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
    inflight = set()
    for _ in range(min(inflight_cap, len(todo_rows))):
        try: inflight.add(ex.submit(preprocess_one, next(it)))
        except StopIteration: break
    while inflight:
        done = next(as_completed(inflight))
        inflight.remove(done)
        res = done.result()
        pbar.update(1)
        if isinstance(res, dict) and "error" in res:
            errs += 1
        elif res:
            buf.append(res); created += 1
            if len(buf) >= BATCH_WRITE:
                flush_manifest(buf); buf.clear()
        try: inflight.add(ex.submit(preprocess_one, next(it)))
        except StopIteration: pass

flush_manifest(buf)
pbar.close()
print(f"\n[{PROFILE}] FINISHED: +{created} files, {errs} errors | elapsed {(time.time()-start)/60:.1f} min")

# quick manifest slice
if MANIFEST.exists():
    m = pd.read_csv(MANIFEST)
    mp = m[m["profile"]==PROFILE]
    hrs = float(pd.to_numeric(mp["duration"], errors="coerce").fillna(0).sum()/3600.0)
    print(f"[{PROFILE}] Manifest rows: {len(mp):,} | hours: {hrs:.2f} h")

Input counts (normalized):
label
fake    45166
real    49623

[preproc:strong] PLANNED=94,789 | ALREADY_DONE=40 | TO_DO=94,749


preproc:strong:   0%|          | 0/94749 [00:00<?, ?file/s]


[strong] FINISHED: +94749 files, 0 errors | elapsed 270.4 min
[strong] Manifest rows: 94,789 | hours: 181.31 h


### **6. Strong Profile Audit**
We verify the integrity of the augmented dataset, confirming that all files are present, correctly formatted (16kHz PCM_16), and indexed in the manifest.

In [None]:
# ===================== STRONG: one-shot sanity audit =====================
from pathlib import Path
import pandas as pd, csv, random
import soundfile as sf

ROOT      = Path("/content/drive/MyDrive/hindi_dfake")
META      = ROOT / "metadata"
OUT_ROOT  = ROOT / "processed" / "wav"
PROFILE   = "strong"

REAL_CSV  = META / "master_real.csv"
FAKE_CSV  = META / "master_fake.csv"
ATT_CSV   = META / "attacks.labeled.csv"
MANIFEST  = META / "proc_manifest.csv"

def read_csv_safe(p):
    df = pd.read_csv(p)
    for c in ["utt_id","path","duration","label","fake_type","source"]:
        if c not in df.columns:
            df[c] = "" if c != "duration" else 0.0
    return df

def normalize_label(lab, fake_type, src, path):
    l = str(lab).strip().lower()
    if l in {"real","fake"}: return l
    ft = str(fake_type).strip().lower()
    s  = str(src).strip().lower()
    p  = str(path)
    if "/raw/real_clean" in p or s.startswith("real_"): return "real"
    if ft in {"tts_edge","channel_attack","vc"} or "/raw/fake_" in p or s.startswith("fake_"): return "fake"
    return "fake"

def out_path_for(in_path: str) -> Path:
    p = Path(in_path)
    try:
        rel = p.relative_to(ROOT)
    except Exception:
        rel = Path("_external") / p.name
    return OUT_ROOT / PROFILE / rel

# ---------- load inputs (only those that exist) ----------
dfs = [read_csv_safe(REAL_CSV), read_csv_safe(FAKE_CSV)]
if ATT_CSV.exists():
    dfs.append(read_csv_safe(ATT_CSV))
df_all = pd.concat(dfs, ignore_index=True)
df_all = df_all[df_all["path"].astype(str).apply(lambda p: Path(p).exists())].copy()
df_all["label"] = [normalize_label(l, ft, s, p) for l, ft, s, p in
                   zip(df_all["label"], df_all["fake_type"], df_all["source"], df_all["path"])]

expected = set(str(out_path_for(p)) for p in df_all["path"].astype(str))

# ---------- what’s on disk & in manifest ----------
on_disk  = set(str(p) for p in (OUT_ROOT/PROFILE).rglob("*.wav"))
if MANIFEST.exists():
    mf = pd.read_csv(MANIFEST)
else:
    MANIFEST.parent.mkdir(parents=True, exist_ok=True)
    with open(MANIFEST, "w", newline="", encoding="utf-8") as f:
        csv.writer(f).writerow(["utt_id","path_in","path_out","profile","duration","rms_db","peak_db"])
    mf = pd.read_csv(MANIFEST)

mf_p     = mf[mf["profile"]==PROFILE]
in_mani  = set(mf_p["path_out"].astype(str))

print("[Universe] inputs that exist:", len(df_all))
print(f"[counts:{PROFILE}] expected={len(expected):,} | on_disk={len(on_disk):,} | manifest_rows={len(in_mani):,}")

miss_disk   = expected - on_disk
extra_disk  = on_disk - expected
miss_mani   = on_disk  - in_mani
extra_mani  = in_mani  - on_disk

print(f"[diff] expected but missing on disk: {len(miss_disk):,}")
print(f"[diff] on disk but missing in manifest: {len(miss_mani):,}")
print(f"[diff] manifest rows without file: {len(extra_mani):,}")

# ---------- backfill manifest if needed ----------
def fast_duration(p):
    try:
        info = sf.info(p)
        return round(info.frames / info.samplerate, 3)
    except: return 0.0

if len(miss_mani) > 0:
    print(f"[fix] backfilling {len(miss_mani):,} manifest rows …")
    rows = []
    # rough reconstruction of input path by mirroring
    for out_str in list(miss_mani):
        outp = Path(out_str)
        try:
            rel = outp.relative_to(OUT_ROOT/PROFILE)
            in_p = ROOT / rel
        except Exception:
            in_p = ROOT / "_unknown_source" / outp.name
        rows.append({
            "utt_id": "", "path_in": str(in_p), "path_out": out_str,
            "profile": PROFILE, "duration": fast_duration(str(outp)),
            "rms_db": "", "peak_db": ""
        })
    with open(MANIFEST, "a", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=["utt_id","path_in","path_out","profile","duration","rms_db","peak_db"])
        for r in rows: w.writerow(r)
    print("[fix] backfill done.")

# ---------- dedup manifest on (profile, path_out) ----------
mf2 = pd.read_csv(MANIFEST)
before = len(mf2)
mf2 = mf2.drop_duplicates(subset=["profile","path_out"], keep="first").reset_index(drop=True)
if len(mf2) != before:
    mf2.to_csv(MANIFEST, index=False)
    print(f"[fix] dedup manifest: -{before - len(mf2)} rows")

# ---------- spot-check audio format on a random sample ----------
sample_n = min(300, len(on_disk))
sample_paths = random.sample(list(on_disk), sample_n) if sample_n > 0 else []
sr_counts, ch_counts, subtype_counts, bad = {}, {}, {}, 0
for p in sample_paths:
    try:
        info = sf.info(p)
        sr_counts[info.samplerate] = sr_counts.get(info.samplerate, 0) + 1
        ch_counts[info.channels]   = ch_counts.get(info.channels,   0) + 1
        subtype_counts[info.subtype] = subtype_counts.get(info.subtype, 0) + 1
    except Exception:
        bad += 1

print(f"[probe] sample={sample_n} | sr={sr_counts} | ch={ch_counts} | subtype={subtype_counts} | bad={bad}")

# ---------- hours from manifest (profile=strong) ----------
mf = pd.read_csv(MANIFEST)
ms = mf[mf["profile"]==PROFILE]
hours = float(pd.to_numeric(ms["duration"], errors="coerce").fillna(0).sum()/3600.0)
print(f"[hours:{PROFILE}] rows={len(ms):,} | hours={hours:.2f}h")

print("\n[OK] Audit complete.")

[Universe] inputs that exist: 94789
[counts:strong] expected=94,789 | on_disk=94,789 | manifest_rows=94,789
[diff] expected but missing on disk: 0
[diff] on disk but missing in manifest: 0
[diff] manifest rows without file: 0
[probe] sample=300 | sr={16000: 300} | ch={1: 300} | subtype={'PCM_16': 300} | bad=0
[hours:strong] rows=94,789 | hours=181.31h

[OK] Audit complete.


### **7. Test Set Integration (Fake) - Base Profile**
We re-run the `base` processor to catch newly added files (specifically the fake test set). The script detects the 3,004 new files and processes them without re-doing the existing ~94k.

In [None]:
# ========== PREPROCESSING (BASE profile; resumable; same style as before) ==========
# Pipeline: DC high-pass -> per-file gain to TARGET_DB -> light trim
# Writes: /content/drive/MyDrive/hindi_dfake/processed/wav/base/...
# Logs  : /content/drive/MyDrive/hindi_dfake/metadata/proc_manifest.csv (profile=base)

import os, csv, time, uuid, hashlib, subprocess
from pathlib import Path
import pandas as pd, numpy as np, soundfile as sf
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm.auto import tqdm

# ---------------- CONFIG ----------------
ROOT = Path("/content/drive/MyDrive/hindi_dfake")
META = ROOT / "metadata"
OUT_ROOT = ROOT / "processed" / "wav"

PROFILE = "base"          # <- BASE profile
INCLUDE_ATTACKS = True    # include attacks.labeled.csv if present
FRACTION_REAL = 1.00      # process all (resumable)
FRACTION_FAKE = 1.00
HASH_SEED = 2025

# Loudness & trim (same as your earlier cells)
TARGET_DB   = -26.0       # target RMS (dBFS-ish)
TRIM_THR_DB = -45
TRIM_DUR_S  = 0.20

# Concurrency / batching
MAX_WORKERS   = 6
INFLIGHT_MULT = 3
BATCH_WRITE   = 500

REAL_CSV  = META / "master_real.csv"
FAKE_CSV  = META / "master_fake.csv"
ATT_CSV   = META / "attacks.labeled.csv"
MANIFEST  = META / "proc_manifest.csv"
assert REAL_CSV.exists() and FAKE_CSV.exists(), "Missing master_real.csv/master_fake.csv"

# ---------------- helpers ----------------
def read_csv_safe(path: Path) -> pd.DataFrame:
    df = pd.read_csv(path)
    for c in ["utt_id","path","duration","label","fake_type","source"]:
        if c not in df.columns:
            df[c] = "" if c != "duration" else 0.0
    return df

def normalize_label(lab, fake_type, src, path):
    l = str(lab).strip().lower()
    if l in {"real","fake"}: return l
    ft = str(fake_type).strip().lower()
    s  = str(src).strip().lower()
    p  = str(path)
    if "/raw/real_clean" in p or s.startswith("real_"): return "real"
    if ft in {"tts_edge","channel_attack","vc"} or "/raw/fake_" in p or s.startswith("fake_"): return "fake"
    return "fake"

def stable_frac_mask(series: pd.Series, frac: float, seed: int) -> pd.Series:
    if frac >= 1.0: return pd.Series([True]*len(series), index=series.index)
    s = series.astype(str).apply(lambda p: int(hashlib.sha1((p+str(seed)).encode()).hexdigest()[:8], 16) / 0xFFFFFFFF)
    return s < frac

def out_path_for(in_path: str) -> Path:
    p = Path(in_path)
    try:    rel = p.relative_to(ROOT)
    except: rel = Path("_external") / p.name
    return OUT_ROOT / PROFILE / rel

def ensure_parent(p: Path): p.parent.mkdir(parents=True, exist_ok=True)

def rms_dbfs(p: Path) -> float:
    try:
        x, sr = sf.read(str(p), dtype="float32", always_2d=False)
        if x.ndim > 1: x = x.mean(axis=1)
        if len(x) == 0: return -120.0
        rms = float(np.sqrt(np.mean(np.square(x))))
        if rms <= 1e-9: return -120.0
        return 20.0*np.log10(np.clip(rms, 1e-9, 1.0))
    except Exception:
        return -120.0

def build_filter_chain_base(gain_db: float) -> str:
    return ",".join([
        "highpass=f=20",
        f"volume={gain_db}dB",
        f"silenceremove=start_periods=1:start_duration={TRIM_DUR_S}:start_threshold={TRIM_THR_DB}dB:"
        f"stop_periods=1:stop_duration={TRIM_DUR_S}:stop_threshold={TRIM_THR_DB}dB"
    ])

# ---------------- load inputs ----------------
dfR = read_csv_safe(REAL_CSV)
dfF = read_csv_safe(FAKE_CSV)
dfs = [dfR, dfF]
if INCLUDE_ATTACKS and ATT_CSV.exists(): dfs.append(read_csv_safe(ATT_CSV))

df_all = pd.concat(dfs, ignore_index=True)
df_all = df_all[df_all["path"].astype(str).apply(lambda p: Path(p).exists())].copy()
df_all["label"] = [normalize_label(l, ft, s, p) for l, ft, s, p in
                   zip(df_all["label"], df_all["fake_type"], df_all["source"], df_all["path"])]

print("Input counts (normalized):")
print(df_all.groupby(df_all["label"].astype(str).str.lower()).size().to_string())

# deterministic per-label subset (here: everything; still resumable)
parts = []
for lab, g in df_all.groupby(df_all["label"].astype(str).str.lower()):
    frac = FRACTION_REAL if lab == "real" else FRACTION_FAKE
    parts.append(g if frac>=1.0 else g[stable_frac_mask(g["path"], frac, HASH_SEED)])
df_in = pd.concat(parts, ignore_index=True) if parts else df_all.copy()

# ---------------- manifest / resume ----------------
MANIFEST.parent.mkdir(parents=True, exist_ok=True)
if MANIFEST.exists():
    mf = pd.read_csv(MANIFEST, usecols=["profile","path_out"])
    done_set = set(mf.loc[mf["profile"]==PROFILE, "path_out"].astype(str))
else:
    with open(MANIFEST, "w", newline="", encoding="utf-8") as f:
        csv.writer(f).writerow(["utt_id","path_in","path_out","profile","duration","rms_db","peak_db"])
    done_set = set()

def already_done(pin: str) -> bool:
    pout = out_path_for(pin)
    return pout.exists() or (str(pout) in done_set)

todo_rows = [r for _, r in df_in.iterrows() if not already_done(str(r["path"]))]
planned, todo = len(df_in), len(todo_rows)
print(f"\n[preproc:{PROFILE}] PLANNED={planned:,} | ALREADY_DONE={planned-todo:,} | TO_DO={todo:,}")

# ---------------- worker (BASE chain) ----------------
def preprocess_one(row):
    pin = str(row["path"])
    pout = out_path_for(pin)
    ensure_parent(pout)

    # If input SR != 16k, resample first (same FS temp)
    try:
        x, sr = sf.read(pin, dtype="float32", always_2d=False)
    except Exception as e:
        return {"error": f"read_fail: {e}", "path_in": pin}
    if sr != 16000 or (x.ndim > 1):
        tmp_in = pout.parent / f".{pout.stem}.in-{uuid.uuid4().hex}.wav"
        cmd = ["ffmpeg","-nostdin","-hide_banner","-loglevel","error","-y",
               "-i", pin, "-ac","1","-ar","16000","-sample_fmt","s16", str(tmp_in)]
        p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        if p.returncode != 0:
            try: tmp_in.unlink(missing_ok=True)
            except: pass
            return {"error": "resample_fail", "path_in": pin}
        pin_for_chain = str(tmp_in)
    else:
        pin_for_chain = pin

    gain_db = float(np.clip(TARGET_DB - rms_dbfs(Path(pin_for_chain)), -20.0, 20.0))
    FILTER  = build_filter_chain_base(gain_db)

    tmp_out = pout.parent / f".{pout.stem}.tmp-{uuid.uuid4().hex}.wav"
    cmd = ["ffmpeg","-nostdin","-hide_banner","-loglevel","error","-y",
           "-i", pin_for_chain, "-ac","1","-ar","16000",
           "-af", FILTER, "-sample_fmt","s16", str(tmp_out)]
    p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    # clean temp (if created)
    if pin_for_chain != pin:
        try: Path(pin_for_chain).unlink(missing_ok=True)
        except: pass

    if p.returncode != 0:
        try: tmp_out.unlink(missing_ok=True)
        except: pass
        return {"error": p.stderr.decode("utf-8","ignore")[:300], "path_in": pin}

    os.replace(str(tmp_out), str(pout))

    # quick stats
    try:
        z, sr2 = sf.read(str(pout), dtype="float32", always_2d=False)
        if z.ndim > 1: z = z.mean(axis=1)
        dur = len(z)/float(sr2) if sr2 else 0.0
        peak = float(np.max(np.abs(z))) if len(z) else 0.0
        rms  = float(np.sqrt(np.mean(np.square(z)))) if len(z) else 0.0
        rms_db = -120.0 if rms<=1e-9 else 20*np.log10(np.clip(rms,1e-9,1.0))
        peak_db = -120.0 if peak<=1e-9 else 20*np.log10(np.clip(peak,1e-9,1.0))
    except Exception:
        dur, rms_db, peak_db = 0.0, -120.0, -120.0

    return {"utt_id": row.get("utt_id",""), "path_in": pin, "path_out": str(pout),
            "profile": PROFILE, "duration": round(dur,3),
            "rms_db": round(rms_db,2), "peak_db": round(peak_db,2)}

# ---------------- streaming scheduler + progress ----------------
created = 0; errs = 0; buf = []
start = time.time()
inflight_cap = max(MAX_WORKERS * INFLIGHT_MULT, MAX_WORKERS)
pbar = tqdm(total=len(todo_rows), desc=f"preproc:{PROFILE}", unit="file")

def flush_manifest(rows):
    if not rows: return
    with open(MANIFEST, "a", newline="", encoding="utf-8") as fcsv:
        w = csv.DictWriter(fcsv, fieldnames=["utt_id","path_in","path_out","profile","duration","rms_db","peak_db"])
        for r in rows: w.writerow(r)

it = iter(todo_rows)
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
    inflight = set()
    for _ in range(min(inflight_cap, len(todo_rows))):
        try: inflight.add(ex.submit(preprocess_one, next(it)))
        except StopIteration: break
    while inflight:
        done = next(as_completed(inflight))
        inflight.remove(done)
        res = done.result()
        pbar.update(1)
        if isinstance(res, dict) and "error" in res:
            errs += 1
        elif res:
            buf.append(res); created += 1
            if len(buf) >= BATCH_WRITE:
                flush_manifest(buf); buf.clear()
        try: inflight.add(ex.submit(preprocess_one, next(it)))
        except StopIteration: pass

flush_manifest(buf)
pbar.close()
print(f"\n[{PROFILE}] FINISHED: +{created} files, {errs} errors | elapsed {(time.time()-start)/60:.1f} min")

# ---------------- quick audit / summary ----------------
def hours_from_manifest(profile: str):
    try:
        m = pd.read_csv(MANIFEST)
        m = m[m["profile"]==profile]
        return float(pd.to_numeric(m["duration"], errors="coerce").fillna(0).sum()/3600.0), len(m)
    except Exception:
        return 0.0, 0

outs = list((OUT_ROOT/PROFILE).rglob("*.wav"))
print(f"[{PROFILE}] outputs on disk: {len(outs):,} (showing a few)")
for p in outs[:8]:
    print("  ", p)

hrs, nrows = hours_from_manifest(PROFILE)
print(f"[{PROFILE}] manifest rows: {nrows:,} | hours: {hrs:.2f} h")

# keep manifest dedupbed (safety)
if MANIFEST.exists():
    mf = pd.read_csv(MANIFEST)
    before = len(mf)
    mf = mf.drop_duplicates(subset=["profile","path_out"], keep="first").reset_index(drop=True)
    if len(mf) != before:
        mf.to_csv(MANIFEST, index=False)
        print(f"[fix] dedup manifest: -{before-len(mf)} rows")
print("[OK] BASE preprocessing done.")


Input counts (normalized):
label
fake    48170
real    49623

[preproc:base] PLANNED=97,793 | ALREADY_DONE=94,789 | TO_DO=3,004


preproc:base:   0%|          | 0/3004 [00:00<?, ?file/s]


[base] FINISHED: +3004 files, 0 errors | elapsed 9.0 min
[base] outputs on disk: 97,793 (showing a few)
   /content/drive/MyDrive/hindi_dfake/processed/wav/base/raw/fake_tts/tts_edge_6859d4f039aea68f.wav
   /content/drive/MyDrive/hindi_dfake/processed/wav/base/raw/fake_tts/tts_edge_19e1c8d7cad439c0.wav
   /content/drive/MyDrive/hindi_dfake/processed/wav/base/raw/fake_tts/tts_edge_a5ea2af5277b57c4.wav
   /content/drive/MyDrive/hindi_dfake/processed/wav/base/raw/fake_tts/tts_edge_7aee2f81107fe9de.wav
   /content/drive/MyDrive/hindi_dfake/processed/wav/base/raw/fake_tts/tts_edge_c103d799a208b3c8.wav
   /content/drive/MyDrive/hindi_dfake/processed/wav/base/raw/fake_tts/tts_edge_8883f29f2fb7bfdf.wav
   /content/drive/MyDrive/hindi_dfake/processed/wav/base/raw/fake_tts/tts_edge_e43afa0cec72f0aa.wav
   /content/drive/MyDrive/hindi_dfake/processed/wav/base/raw/fake_tts/tts_edge_47ea9317998de3aa.wav
[base] manifest rows: 97,793 | hours: 78.72 h
[OK] BASE preprocessing done.


### **8. Test Set Integration (Fake) - Strong Profile**
We apply the full augmentation pipeline (RawBoost + EQ) to the newly added **fake test set** files, ensuring they are consistent with the rest of the `strong` dataset.

In [None]:
# ================= PREPROCESSING — STRONG (EQ + RawBoost v3) =================
# Writes: /content/drive/MyDrive/hindi_dfake/processed/wav/strong/...
# Logs to: /content/drive/MyDrive/hindi_dfake/metadata/proc_manifest.csv (profile=strong)

import os, csv, uuid, time, hashlib, subprocess
from pathlib import Path
import pandas as pd, numpy as np, soundfile as sf
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm.auto import tqdm

# ------------ CONFIG ------------
ROOT = Path("/content/drive/MyDrive/hindi_dfake")
META = ROOT / "metadata"
OUT_ROOT = ROOT / "processed" / "wav"

PROFILE = "strong"          # strong = base + artifact EQ + RawBoost v3
INCLUDE_ATTACKS = True
FRACTION_REAL = 1.00        # process all
FRACTION_FAKE = 1.00
HASH_SEED = 2025

# Loudness & trim
TARGET_DB   = -26.0
TRIM_THR_DB = -45
TRIM_DUR_S  = 0.20

# Concurrency / batching (safe defaults for Colab)
MAX_WORKERS   = 6
INFLIGHT_MULT = 3
BATCH_WRITE   = 500

# RawBoost v3–style knobs (same as we used earlier)
RB_ENABLE           = True
RB_BANDPASS_HZ      = (2500, 6000)
RB_SNR_DB           = 25.0
RB_NOISE_PROB       = 1.0
RB_IMPULSE_PROB     = 0.5
RB_IMPULSES_PER_SEC = 1.0
RB_IMPULSE_GAIN     = 0.08
RB_REVERB_PROB      = 0.6
RB_REVERB_T_SEC     = 0.03
RB_REVERB_DECAY     = 0.35
RB_REVERB_WET       = 0.18

REAL_CSV  = META / "master_real.csv"
FAKE_CSV  = META / "master_fake.csv"
ATT_CSV   = META / "attacks.labeled.csv"
MANIFEST  = META / "proc_manifest.csv"
assert REAL_CSV.exists() and FAKE_CSV.exists()

# ------------ helpers ------------
def read_csv_safe(path: Path) -> pd.DataFrame:
    df = pd.read_csv(path)
    for c in ["utt_id","path","duration","label","fake_type","source"]:
        if c not in df.columns:
            df[c] = "" if c != "duration" else 0.0
    return df

def normalize_label(lab, fake_type, src, path):
    l = str(lab).strip().lower()
    if l in {"real","fake"}: return l
    ft = str(fake_type).strip().lower()
    s  = str(src).strip().lower()
    p  = str(path)
    if "/raw/real_clean" in p or s.startswith("real_"): return "real"
    if ft in {"tts_edge","channel_attack","vc"} or "/raw/fake_" in p or s.startswith("fake_"): return "fake"
    return "fake"

def stable_frac_mask(series: pd.Series, frac: float, seed: int) -> pd.Series:
    if frac >= 1.0: return pd.Series([True]*len(series), index=series.index)
    s = series.astype(str).apply(lambda p: int(hashlib.sha1((p+str(seed)).encode()).hexdigest()[:8], 16) / 0xFFFFFFFF)
    return s < frac

def out_path_for(in_path: str) -> Path:
    p = Path(in_path)
    try:    rel = p.relative_to(ROOT)
    except: rel = Path("_external") / p.name
    return OUT_ROOT / PROFILE / rel

def ensure_parent(p: Path): p.parent.mkdir(parents=True, exist_ok=True)

def rms_dbfs_arr(x: np.ndarray) -> float:
    if x.ndim > 1: x = x.mean(axis=1)
    if len(x) == 0: return -120.0
    rms = float(np.sqrt(np.mean(np.square(x))))
    if rms <= 1e-9: return -120.0
    return 20.0*np.log10(np.clip(rms, 1e-9, 1.0))

# ---- RawBoost v3–style (deterministic per file) ----
def _rng_from_key(key: str):
    seed = int(hashlib.sha1(key.encode("utf-8")).hexdigest()[:8], 16)
    return np.random.default_rng(seed)

def _add_colored_noise(x, sr, rng, band=(2500,6000), snr_db=25.0):
    n = len(x)
    if n == 0: return x
    wn = rng.standard_normal(n).astype(np.float32)
    X = np.fft.rfft(wn); freqs = np.fft.rfftfreq(n, d=1.0/sr)
    X[~((freqs >= band[0]) & (freqs <= band[1]))] = 0.0
    noise = np.fft.irfft(X, n=n).astype(np.float32)
    sig_rms = np.sqrt(np.mean(np.square(x))) + 1e-9
    target_noise_rms = sig_rms / (10.0**(snr_db/20.0))
    cur_rms = np.sqrt(np.mean(np.square(noise))) + 1e-12
    noise *= (target_noise_rms / cur_rms)
    return np.clip(x + noise, -1.0, 1.0)

def _add_impulses(x, sr, rng, per_sec=1.0, gain=0.08):
    n = len(x)
    if n == 0: return x
    dur = n/float(sr); k = max(1, int(per_sec * dur))
    idx = rng.integers(0, n, size=k)
    amp = gain * (np.sqrt(np.mean(np.square(x))) + 1e-9)
    y = x.copy()
    y[idx] = np.clip(y[idx] + amp * rng.choice([-1.0, 1.0], size=k), -1.0, 1.0)
    return y

def _add_small_reverb(x, sr, rng, t_sec=0.03, decay=0.35, wet=0.18):
    n = len(x)
    if n == 0: return x
    ir_len = max(8, int(t_sec * sr))
    t = np.arange(ir_len, dtype=np.float32)
    ir = np.exp(-decay * t / ir_len).astype(np.float32)
    for _ in range(3):
        pos = int(rng.integers(0, ir_len))
        ir[pos] += float(rng.uniform(0.1, 0.3))
    ir /= (np.sum(np.abs(ir)) + 1e-9)
    y = np.convolve(x, ir, mode="full")[:n].astype(np.float32)
    return np.clip((1.0 - wet) * x + wet * y, -1.0, 1.0)

def rawboost_v3(x: np.ndarray, sr: int, key: str) -> np.ndarray:
    if x.ndim > 1: x = x.mean(axis=1)
    x = np.clip(x, -1.0, 1.0).astype(np.float32)
    if not RB_ENABLE: return x
    rng = _rng_from_key(key)
    if rng.uniform() < RB_NOISE_PROB:
        x = _add_colored_noise(x, sr, rng, RB_BANDPASS_HZ, RB_SNR_DB)
    if rng.uniform() < RB_IMPULSE_PROB:
        x = _add_impulses(x, sr, rng, RB_IMPULSES_PER_SEC, RB_IMPULSE_GAIN)
    if rng.uniform() < RB_REVERB_PROB:
        x = _add_small_reverb(x, sr, rng, RB_REVERB_T_SEC, RB_REVERB_DECAY, RB_REVERB_WET)
    return x

def build_filter_chain_strong(gain_db: float) -> str:
    return ",".join([
        "highpass=f=20",
        "equalizer=f=3000:t=q:w=1.0:g=2.5",
        "equalizer=f=4800:t=q:w=0.9:g=2.0",
        "treble=g=1.0:f=6000:t=h:w=0.7",
        f"volume={gain_db}dB",
        f"silenceremove=start_periods=1:start_duration={TRIM_DUR_S}:start_threshold={TRIM_THR_DB}dB:stop_periods=1:stop_duration={TRIM_DUR_S}:stop_threshold={TRIM_THR_DB}dB"
    ])

# ------------ load inputs ------------
dfR = read_csv_safe(REAL_CSV)
dfF = read_csv_safe(FAKE_CSV)
dfs = [dfR, dfF]
if INCLUDE_ATTACKS and ATT_CSV.exists():
    dfs.append(read_csv_safe(ATT_CSV))
df_all = pd.concat(dfs, ignore_index=True)
df_all = df_all[df_all["path"].astype(str).apply(lambda p: Path(p).exists())].copy()
df_all["label"] = [normalize_label(l, ft, s, p) for l, ft, s, p in
                   zip(df_all["label"], df_all["fake_type"], df_all["source"], df_all["path"])]

# deterministic per-label subset (here: everything, but we still keep the hook)
parts = []
for lab, g in df_all.groupby(df_all["label"].astype(str).str.lower()):
    frac = FRACTION_REAL if lab == "real" else FRACTION_FAKE
    parts.append(g if frac>=1.0 else g[stable_frac_mask(g["path"], frac, HASH_SEED)])
df_in = pd.concat(parts, ignore_index=True) if parts else df_all.copy()

# ------------ manifest skip logic ------------
MANIFEST.parent.mkdir(parents=True, exist_ok=True)
if MANIFEST.exists():
    mf = pd.read_csv(MANIFEST, usecols=["profile","path_out"])
    done_set = set(mf.loc[mf["profile"]==PROFILE, "path_out"].astype(str))
else:
    with open(MANIFEST, "w", newline="", encoding="utf-8") as f:
        csv.writer(f).writerow(["utt_id","path_in","path_out","profile","duration","rms_db","peak_db"])
    done_set = set()

def already_done(pin: str) -> bool:
    pout = out_path_for(pin)
    return pout.exists() or (str(pout) in done_set)

# Only process files that don't have a STRONG output yet
todo_rows = [r for _, r in df_in.iterrows() if not already_done(str(r["path"]))]

planned, todo = len(df_in), len(todo_rows)
print("Input counts (normalized):")
print(df_all.groupby(df_all['label']).size().to_string())
print(f"\n[preproc:{PROFILE}] PLANNED={planned:,} | ALREADY_DONE={planned-todo:,} | TO_DO={todo:,}")

# ------------ worker ------------
def preprocess_one(row):
    pin = str(row["path"])
    pout = out_path_for(pin)
    ensure_parent(pout)

    # read (and resample if needed)
    try:
        x, sr = sf.read(pin, dtype="float32", always_2d=False)
    except Exception as e:
        return {"error": f"read_fail: {e}", "path_in": pin}
    if x.ndim > 1: x = x.mean(axis=1)
    if sr != 16000:
        tmp = pout.parent / f".{pout.stem}.in-{uuid.uuid4().hex}.wav"
        cmd = ["ffmpeg","-nostdin","-hide_banner","-loglevel","error","-y",
               "-i", pin, "-ac","1","-ar","16000","-sample_fmt","s16", str(tmp)]
        p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        if p.returncode != 0:
            try: tmp.unlink(missing_ok=True)
            except: pass
            return {"error": "resample_fail", "path_in": pin}
        x, sr = sf.read(str(tmp), dtype="float32", always_2d=False)
        try: tmp.unlink(missing_ok=True)
        except: pass

    x = np.clip(x, -1.0, 1.0).astype(np.float32)

    # RawBoost v3–style
    key = f"{pin}|{PROFILE}"
    try:    y = rawboost_v3(x, sr, key)
    except: y = x

    # write RB temp near target (avoid cross-device link)
    tmp_rb = pout.parent / f".{pout.stem}.rb-{uuid.uuid4().hex}.wav"
    sf.write(str(tmp_rb), y, 16000, subtype="PCM_16")

    # EQ + loudness + trim via ffmpeg
    gain_db = float(np.clip(TARGET_DB - rms_dbfs_arr(y), -20.0, 20.0))
    FILTER  = build_filter_chain_strong(gain_db)

    tmp_out = pout.parent / f".{pout.stem}.tmp-{uuid.uuid4().hex}.wav"
    cmd = ["ffmpeg","-nostdin","-hide_banner","-loglevel","error","-y",
           "-i", str(tmp_rb), "-ac","1","-ar","16000",
           "-af", FILTER, "-sample_fmt","s16", str(tmp_out)]
    p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    try: tmp_rb.unlink(missing_ok=True)
    except: pass
    if p.returncode != 0:
        try: tmp_out.unlink(missing_ok=True)
        except: pass
        return {"error": p.stderr.decode("utf-8","ignore")[:300], "path_in": pin}

    os.replace(str(tmp_out), str(pout))

    # stats
    try:
        z, sr2 = sf.read(str(pout), dtype="float32", always_2d=False)
        if z.ndim > 1: z = z.mean(axis=1)
        dur = len(z)/float(sr2) if sr2 else 0.0
        peak = float(np.max(np.abs(z))) if len(z) else 0.0
        rms  = float(np.sqrt(np.mean(np.square(z)))) if len(z) else 0.0
        rms_db = -120.0 if rms<=1e-9 else 20*np.log10(np.clip(rms,1e-9,1.0))
        peak_db = -120.0 if peak<=1e-9 else 20*np.log10(np.clip(peak,1e-9,1.0))
    except Exception:
        dur, rms_db, peak_db = 0.0, -120.0, -120.0

    return {"utt_id": row.get("utt_id",""), "path_in": pin, "path_out": str(pout),
            "profile": PROFILE, "duration": round(dur,3),
            "rms_db": round(rms_db,2), "peak_db": round(peak_db,2)}

# ------------ streaming scheduler with progress ------------
created = 0; errs = 0; buf = []
start = time.time()
inflight_cap = max(MAX_WORKERS * INFLIGHT_MULT, MAX_WORKERS)
pbar = tqdm(total=len(todo_rows), desc=f"preproc:{PROFILE}", unit="file")

def flush_manifest(rows):
    if not rows: return
    with open(MANIFEST, "a", newline="", encoding="utf-8") as fcsv:
        w = csv.DictWriter(fcsv, fieldnames=["utt_id","path_in","path_out","profile","duration","rms_db","peak_db"])
        for r in rows: w.writerow(r)

it = iter(todo_rows)
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
    inflight = set()
    for _ in range(min(inflight_cap, len(todo_rows))):
        try: inflight.add(ex.submit(preprocess_one, next(it)))
        except StopIteration: break
    while inflight:
        done = next(as_completed(inflight))
        inflight.remove(done)
        res = done.result()
        pbar.update(1)
        if isinstance(res, dict) and "error" in res:
            errs += 1
        elif res:
            buf.append(res); created += 1
            if len(buf) >= BATCH_WRITE:
                flush_manifest(buf); buf.clear()
        try: inflight.add(ex.submit(preprocess_one, next(it)))
        except StopIteration: pass

flush_manifest(buf)
pbar.close()
print(f"\n[{PROFILE}] FINISHED: +{created} files, {errs} errors | elapsed {(time.time()-start)/60:.1f} min")

# quick manifest slice
if MANIFEST.exists():
    m = pd.read_csv(MANIFEST)
    mp = m[m["profile"]==PROFILE]
    hrs = float(pd.to_numeric(mp["duration"], errors="coerce").fillna(0).sum()/3600.0)
    print(f"[{PROFILE}] Manifest rows: {len(mp):,} | hours: {hrs:.2f} h")

print("[OK] STRONG preprocessing done.")

Input counts (normalized):
label
fake    48170
real    49623

[preproc:strong] PLANNED=97,793 | ALREADY_DONE=94,789 | TO_DO=3,004


preproc:strong:   0%|          | 0/3004 [00:00<?, ?file/s]


[strong] FINISHED: +3004 files, 0 errors | elapsed 10.1 min
[strong] Manifest rows: 97,793 | hours: 185.21 h
[OK] STRONG preprocessing done.


### **9. Test Set Integration (Real) - Base Profile**
We process the **FLEURS** dataset (Real Test Set), normalizing the 604 files to match the `base` profile (16kHz, mono, -26dB) for consistent evaluation.

In [None]:
# ========== PREPROCESS FLEURS (BASE profile; exact same chain as MMS-TTS) ==========
# Pipeline: DC high-pass -> per-file gain to TARGET_DB -> light trim
# Writes: /content/drive/MyDrive/hindi_dfake/processed/wav/base/...
# Logs  : /content/drive/MyDrive/hindi_dfake/metadata/proc_manifest.csv (profile=base)

import os, csv, time, uuid, hashlib, subprocess
from pathlib import Path
import pandas as pd, numpy as np, soundfile as sf
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm.auto import tqdm

# ---------------- CONFIG ----------------
ROOT = Path("/content/drive/MyDrive/hindi_dfake")
META = ROOT / "metadata"
OUT_ROOT = ROOT / "processed" / "wav"

PROFILE = "base"          # <- BASE profile (unchanged)
HASH_SEED = 2025

# Loudness & trim (identical to your MMS-TTS run)
TARGET_DB   = -26.0
TRIM_THR_DB = -45
TRIM_DUR_S  = 0.20

# Concurrency / batching
MAX_WORKERS   = 6
INFLIGHT_MULT = 3
BATCH_WRITE   = 500

FLEURS_CSV = META / "thirdparty_real_test.fleurs.csv"   # <-- input list (FLEURS only)
MANIFEST   = META / "proc_manifest.csv"

assert FLEURS_CSV.exists(), f"Missing {FLEURS_CSV}"

# ---------------- helpers (unchanged) ----------------
def read_csv_safe(path: Path) -> pd.DataFrame:
    df = pd.read_csv(path)
    for c in ["utt_id","path","duration","label","fake_type","source"]:
        if c not in df.columns:
            df[c] = "" if c != "duration" else 0.0
    return df

def normalize_label(lab, fake_type, src, path):
    l = str(lab).strip().lower()
    if l in {"real","fake"}: return l
    ft = str(fake_type).strip().lower()
    s  = str(src).strip().lower()
    p  = str(path)
    if "/raw/real_clean" in p or s.startswith("real_"): return "real"
    if ft in {"tts_edge","channel_attack","vc"} or "/raw/fake_" in p or s.startswith("fake_"): return "fake"
    return "fake"

def out_path_for(in_path: str) -> Path:
    p = Path(in_path)
    try:    rel = p.relative_to(ROOT)
    except: rel = Path("_external") / p.name
    return OUT_ROOT / PROFILE / rel

def ensure_parent(p: Path): p.parent.mkdir(parents=True, exist_ok=True)

def rms_dbfs(p: Path) -> float:
    try:
        x, sr = sf.read(str(p), dtype="float32", always_2d=False)
        if hasattr(x, "ndim") and x.ndim > 1: x = x.mean(axis=1)
        if len(x) == 0: return -120.0
        rms = float(np.sqrt(np.mean(np.square(x))))
        if rms <= 1e-9: return -120.0
        return 20.0*np.log10(np.clip(rms, 1e-9, 1.0))
    except Exception:
        return -120.0

def build_filter_chain_base(gain_db: float) -> str:
    return ",".join([
        "highpass=f=20",
        f"volume={gain_db}dB",
        f"silenceremove=start_periods=1:start_duration={TRIM_DUR_S}:start_threshold={TRIM_THR_DB}dB:"
        f"stop_periods=1:stop_duration={TRIM_DUR_S}:stop_threshold={TRIM_THR_DB}dB"
    ])

# ---------------- load inputs (FLEURS ONLY) ----------------
df_fl = read_csv_safe(FLEURS_CSV)
# keep only files that exist on disk
df_fl = df_fl[df_fl["path"].astype(str).apply(lambda p: Path(p).exists())].copy()
# force label to real (FLEURS is real speech)
df_fl["label"] = [normalize_label("real", ft, s, p) for ft, s, p in
                  zip(df_fl["fake_type"], df_fl["source"], df_fl["path"])]

print("Input counts (FLEURS only, normalized):")
print(df_fl.groupby(df_fl["label"].astype(str).str.lower()).size().to_string())

df_in = df_fl.copy()  # all fleurs; resume logic below will skip already-done

# ---------------- manifest / resume ----------------
MANIFEST.parent.mkdir(parents=True, exist_ok=True)
if MANIFEST.exists():
    mf = pd.read_csv(MANIFEST, usecols=["profile","path_out"])
    done_set = set(mf.loc[mf["profile"]==PROFILE, "path_out"].astype(str))
else:
    with open(MANIFEST, "w", newline="", encoding="utf-8") as f:
        csv.writer(f).writerow(["utt_id","path_in","path_out","profile","duration","rms_db","peak_db"])
    done_set = set()

def already_done(pin: str) -> bool:
    pout = out_path_for(pin)
    return pout.exists() or (str(pout) in done_set)

todo_rows = [r for _, r in df_in.iterrows() if not already_done(str(r["path"]))]
planned, todo = len(df_in), len(todo_rows)
print(f"\n[preproc:{PROFILE} | FLEURS] PLANNED={planned:,} | ALREADY_DONE={planned-todo:,} | TO_DO={todo:,}")

# ---------------- worker (BASE chain; identical) ----------------
def preprocess_one(row):
    pin = str(row["path"])
    pout = out_path_for(pin)
    ensure_parent(pout)

    # If input SR != 16k or multichannel, resample first
    try:
        x, sr = sf.read(pin, dtype="float32", always_2d=False)
    except Exception as e:
        return {"error": f"read_fail: {e}", "path_in": pin}
    if (sr != 16000) or (hasattr(x, "ndim") and x.ndim > 1):
        tmp_in = pout.parent / f".{pout.stem}.in-{uuid.uuid4().hex}.wav"
        cmd = ["ffmpeg","-nostdin","-hide_banner","-loglevel","error","-y",
               "-i", pin, "-ac","1","-ar","16000","-sample_fmt","s16", str(tmp_in)]
        p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        if p.returncode != 0:
            try: tmp_in.unlink(missing_ok=True)
            except: pass
            return {"error": "resample_fail", "path_in": pin}
        pin_for_chain = str(tmp_in)
    else:
        pin_for_chain = pin

    gain_db = float(np.clip(TARGET_DB - rms_dbfs(Path(pin_for_chain)), -20.0, 20.0))
    FILTER  = build_filter_chain_base(gain_db)

    tmp_out = pout.parent / f".{pout.stem}.tmp-{uuid.uuid4().hex}.wav"
    cmd = ["ffmpeg","-nostdin","-hide_banner","-loglevel","error","-y",
           "-i", pin_for_chain, "-ac","1","-ar","16000",
           "-af", FILTER, "-sample_fmt","s16", str(tmp_out)]
    p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    # clean temp (if created)
    if pin_for_chain != pin:
        try: Path(pin_for_chain).unlink(missing_ok=True)
        except: pass

    if p.returncode != 0:
        try: tmp_out.unlink(missing_ok=True)
        except: pass
        return {"error": p.stderr.decode("utf-8","ignore")[:300], "path_in": pin}

    os.replace(str(tmp_out), str(pout))

    # quick stats
    try:
        z, sr2 = sf.read(str(pout), dtype="float32", always_2d=False)
        if hasattr(z, "ndim") and z.ndim > 1: z = z.mean(axis=1)
        dur = len(z)/float(sr2) if sr2 else 0.0
        peak = float(np.max(np.abs(z))) if len(z) else 0.0
        rms  = float(np.sqrt(np.mean(np.square(z)))) if len(z) else 0.0
        rms_db = -120.0 if rms<=1e-9 else 20*np.log10(np.clip(rms,1e-9,1.0))
        peak_db = -120.0 if peak<=1e-9 else 20*np.log10(np.clip(peak,1e-9,1.0))
    except Exception:
        dur, rms_db, peak_db = 0.0, -120.0, -120.0

    return {"utt_id": row.get("utt_id",""), "path_in": pin, "path_out": str(pout),
            "profile": PROFILE, "duration": round(dur,3),
            "rms_db": round(rms_db,2), "peak_db": round(peak_db,2)}

# ---------------- scheduler + progress (unchanged) ----------------
created = 0; errs = 0; buf = []
start = time.time()
inflight_cap = max(MAX_WORKERS * INFLIGHT_MULT, MAX_WORKERS)
pbar = tqdm(total=len(todo_rows), desc=f"preproc:{PROFILE}:FLEURS", unit="file")

def flush_manifest(rows):
    if not rows: return
    with open(MANIFEST, "a", newline="", encoding="utf-8") as fcsv:
        w = csv.DictWriter(fcsv, fieldnames=["utt_id","path_in","path_out","profile","duration","rms_db","peak_db"])
        for r in rows: w.writerow(r)

it = iter(todo_rows)
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
    inflight = set()
    for _ in range(min(inflight_cap, len(todo_rows))):
        try: inflight.add(ex.submit(preprocess_one, next(it)))
        except StopIteration: break
    while inflight:
        done = next(as_completed(inflight))
        inflight.remove(done)
        res = done.result()
        pbar.update(1)
        if isinstance(res, dict) and "error" in res:
            errs += 1
        elif res:
            buf.append(res); created += 1
            if len(buf) >= BATCH_WRITE:
                flush_manifest(buf); buf.clear()
        try: inflight.add(ex.submit(preprocess_one, next(it)))
        except StopIteration: pass

flush_manifest(buf)
pbar.close()
print(f"\n[{PROFILE}:FLEURS] FINISHED: +{created} files, {errs} errors | elapsed {(time.time()-start)/60:.1f} min")

# ---------------- quick audit / summary (unchanged) ----------------
def hours_from_manifest(profile: str):
    try:
        m = pd.read_csv(MANIFEST)
        m = m[m["profile"]==profile]
        return float(pd.to_numeric(m["duration"], errors="coerce").fillna(0).sum()/3600.0), len(m)
    except Exception:
        return 0.0, 0

outs = list((OUT_ROOT/PROFILE).rglob("*.wav"))
print(f"[{PROFILE}] outputs on disk: {len(outs):,} (showing a few)")
for p in outs[:8]:
    print("  ", p)

hrs, nrows = hours_from_manifest(PROFILE)
print(f"[{PROFILE}] manifest rows: {nrows:,} | hours: {hrs:.2f} h")

# keep manifest dedupbed (safety)
if MANIFEST.exists():
    mf = pd.read_csv(MANIFEST)
    before = len(mf)
    mf = mf.drop_duplicates(subset=["profile","path_out"], keep="first").reset_index(drop=True)
    if len(mf) != before:
        mf.to_csv(MANIFEST, index=False)
        print(f"[fix] dedup manifest: -{before-len(mf)} rows")
print("[OK] BASE preprocessing for FLEURS done.")

Input counts (FLEURS only, normalized):
label
real    604

[preproc:base | FLEURS] PLANNED=604 | ALREADY_DONE=0 | TO_DO=604


preproc:base:FLEURS:   0%|          | 0/604 [00:00<?, ?file/s]


[base:FLEURS] FINISHED: +604 files, 0 errors | elapsed 2.2 min
[base] outputs on disk: 98,397 (showing a few)
   /content/drive/MyDrive/hindi_dfake/processed/wav/base/raw/fake_tts/tts_edge_6859d4f039aea68f.wav
   /content/drive/MyDrive/hindi_dfake/processed/wav/base/raw/fake_tts/tts_edge_19e1c8d7cad439c0.wav
   /content/drive/MyDrive/hindi_dfake/processed/wav/base/raw/fake_tts/tts_edge_a5ea2af5277b57c4.wav
   /content/drive/MyDrive/hindi_dfake/processed/wav/base/raw/fake_tts/tts_edge_7aee2f81107fe9de.wav
   /content/drive/MyDrive/hindi_dfake/processed/wav/base/raw/fake_tts/tts_edge_c103d799a208b3c8.wav
   /content/drive/MyDrive/hindi_dfake/processed/wav/base/raw/fake_tts/tts_edge_8883f29f2fb7bfdf.wav
   /content/drive/MyDrive/hindi_dfake/processed/wav/base/raw/fake_tts/tts_edge_e43afa0cec72f0aa.wav
   /content/drive/MyDrive/hindi_dfake/processed/wav/base/raw/fake_tts/tts_edge_47ea9317998de3aa.wav
[base] manifest rows: 98,397 | hours: 79.59 h
[OK] BASE preprocessing for FLEURS done.


### **10. Test Set Integration (Real) - Strong Profile**
We apply the full `strong` augmentation pipeline (RawBoost + EQ) to the **FLEURS** dataset (Real Test Set). This ensures we can evaluate the model's performance on "hard," noisy real speech as well as clean speech.




In [None]:
# ================= PREPROCESS FLEURS — STRONG (EQ + RawBoost v3) =================
# Writes: /content/drive/MyDrive/hindi_dfake/processed/wav/strong/...
# Logs :  /content/drive/MyDrive/hindi_dfake/metadata/proc_manifest.csv (profile=strong)
# Scope:  ONLY rows from metadata/thirdparty_real_test.fleurs.csv (resume-safe)

import os, csv, uuid, time, hashlib, subprocess
from pathlib import Path
import pandas as pd, numpy as np, soundfile as sf
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm.auto import tqdm

# ------------ CONFIG ------------
ROOT = Path("/content/drive/MyDrive/hindi_dfake")
META = ROOT / "metadata"
OUT_ROOT = ROOT / "processed" / "wav"

PROFILE = "strong"          # strong = base + artifact EQ + RawBoost v3 (same as MMS-TTS)
HASH_SEED = 2025

# Loudness & trim (identical to your MMS-TTS STRONG run)
TARGET_DB   = -26.0
TRIM_THR_DB = -45
TRIM_DUR_S  = 0.20

# Concurrency / batching
MAX_WORKERS   = 6
INFLIGHT_MULT = 3
BATCH_WRITE   = 500

FLEURS_CSV = META / "thirdparty_real_test.fleurs.csv"   # <-- input list (FLEURS only)
MANIFEST   = META / "proc_manifest.csv"
assert FLEURS_CSV.exists(), f"Missing {FLEURS_CSV}"

# ------------ helpers (unchanged logic from your STRONG job) ------------
def read_csv_safe(path: Path) -> pd.DataFrame:
    df = pd.read_csv(path)
    for c in ["utt_id","path","duration","label","fake_type","source"]:
        if c not in df.columns:
            df[c] = "" if c != "duration" else 0.0
    return df

def out_path_for(in_path: str) -> Path:
    p = Path(in_path)
    try:    rel = p.relative_to(ROOT)
    except: rel = Path("_external") / p.name
    return OUT_ROOT / PROFILE / rel

def ensure_parent(p: Path): p.parent.mkdir(parents=True, exist_ok=True)

def rms_dbfs_arr(x: np.ndarray) -> float:
    if x.ndim > 1: x = x.mean(axis=1)
    if len(x) == 0: return -120.0
    rms = float(np.sqrt(np.mean(np.square(x))))
    if rms <= 1e-9: return -120.0
    return 20.0*np.log10(np.clip(rms, 1.0e-9, 1.0))

# ---- RawBoost v3–style (deterministic per file) ----
def _rng_from_key(key: str):
    seed = int(hashlib.sha1(key.encode("utf-8")).hexdigest()[:8], 16)
    return np.random.default_rng(seed)

def _add_colored_noise(x, sr, rng, band=(2500,6000), snr_db=25.0):
    n = len(x)
    if n == 0: return x
    wn = rng.standard_normal(n).astype(np.float32)
    X = np.fft.rfft(wn); freqs = np.fft.rfftfreq(n, d=1.0/sr)
    X[~((freqs >= band[0]) & (freqs <= band[1]))] = 0.0
    noise = np.fft.irfft(X, n=n).astype(np.float32)
    sig_rms = np.sqrt(np.mean(np.square(x))) + 1e-9
    target_noise_rms = sig_rms / (10.0**(snr_db/20.0))
    cur_rms = np.sqrt(np.mean(np.square(noise))) + 1e-12
    noise *= (target_noise_rms / cur_rms)
    return np.clip(x + noise, -1.0, 1.0)

def _add_impulses(x, sr, rng, per_sec=1.0, gain=0.08):
    n = len(x)
    if n == 0: return x
    dur = n/float(sr); k = max(1, int(per_sec * dur))
    idx = rng.integers(0, n, size=k)
    amp = gain * (np.sqrt(np.mean(np.square(x))) + 1e-9)
    y = x.copy()
    y[idx] = np.clip(y[idx] + amp * rng.choice([-1.0, 1.0], size=k), -1.0, 1.0)
    return y

def _add_small_reverb(x, sr, rng, t_sec=0.03, decay=0.35, wet=0.18):
    n = len(x)
    if n == 0: return x
    ir_len = max(8, int(t_sec * sr))
    t = np.arange(ir_len, dtype=np.float32)
    ir = np.exp(-decay * t / ir_len).astype(np.float32)
    for _ in range(3):
        pos = int(rng.integers(0, ir_len))
        ir[pos] += float(rng.uniform(0.1, 0.3))
    ir /= (np.sum(np.abs(ir)) + 1e-9)
    y = np.convolve(x, ir, mode="full")[:n].astype(np.float32)
    return np.clip((1.0 - wet) * x + wet * y, -1.0, 1.0)

# knobs copied from your MMS-TTS config
RB_ENABLE           = True
RB_BANDPASS_HZ      = (2500, 6000)
RB_SNR_DB           = 25.0
RB_NOISE_PROB       = 1.0
RB_IMPULSE_PROB     = 0.5
RB_IMPULSES_PER_SEC = 1.0
RB_IMPULSE_GAIN     = 0.08
RB_REVERB_PROB      = 0.6
RB_REVERB_T_SEC     = 0.03
RB_REVERB_DECAY     = 0.35
RB_REVERB_WET       = 0.18

def rawboost_v3(x: np.ndarray, sr: int, key: str) -> np.ndarray:
    if x.ndim > 1: x = x.mean(axis=1)
    x = np.clip(x, -1.0, 1.0).astype(np.float32)
    if not RB_ENABLE: return x
    rng = _rng_from_key(key)
    if rng.uniform() < RB_NOISE_PROB:
        x = _add_colored_noise(x, sr, rng, RB_BANDPASS_HZ, RB_SNR_DB)
    if rng.uniform() < RB_IMPULSE_PROB:
        x = _add_impulses(x, sr, rng, RB_IMPULSES_PER_SEC, RB_IMPULSE_GAIN)
    if rng.uniform() < RB_REVERB_PROB:
        x = _add_small_reverb(x, sr, rng, RB_REVERB_T_SEC, RB_REVERB_DECAY, RB_REVERB_WET)
    return x

def build_filter_chain_strong(gain_db: float) -> str:
    return ",".join([
        "highpass=f=20",
        "equalizer=f=3000:t=q:w=1.0:g=2.5",
        "equalizer=f=4800:t=q:w=0.9:g=2.0",
        "treble=g=1.0:f=6000:t=h:w=0.7",
        f"volume={gain_db}dB",
        f"silenceremove=start_periods=1:start_duration={TRIM_DUR_S}:start_threshold={TRIM_THR_DB}dB:"
        f"stop_periods=1:stop_duration={TRIM_DUR_S}:stop_threshold={TRIM_THR_DB}dB"
    ])

# ------------ load inputs (FLEURS ONLY) ------------
df_fl = read_csv_safe(FLEURS_CSV)
df_fl = df_fl[df_fl["path"].astype(str).apply(lambda p: Path(p).exists())].copy()
df_in = df_fl.copy()

# ------------ manifest / resume ------------
MANIFEST.parent.mkdir(parents=True, exist_ok=True)
if MANIFEST.exists():
    mf = pd.read_csv(MANIFEST, usecols=["profile","path_out"])
    done_set = set(mf.loc[mf["profile"]==PROFILE, "path_out"].astype(str))
else:
    with open(MANIFEST, "w", newline="", encoding="utf-8") as f:
        csv.writer(f).writerow(["utt_id","path_in","path_out","profile","duration","rms_db","peak_db"])
    done_set = set()

def already_done(pin: str) -> bool:
    pout = out_path_for(pin)
    return pout.exists() or (str(pout) in done_set)

todo_rows = [r for _, r in df_in.iterrows() if not already_done(str(r["path"]))]
planned, todo = len(df_in), len(todo_rows)
print(f"[preproc:{PROFILE}:FLEURS] PLANNED={planned:,} | ALREADY_DONE={planned-todo:,} | TO_DO={todo:,}")

# ------------ worker ------------
def preprocess_one(row):
    pin = str(row["path"])
    pout = out_path_for(pin)
    ensure_parent(pout)

    # read + resample if needed
    try:
        x, sr = sf.read(pin, dtype="float32", always_2d=False)
    except Exception as e:
        return {"error": f"read_fail: {e}", "path_in": pin}
    if hasattr(x, "ndim") and x.ndim > 1: x = x.mean(axis=1)
    if sr != 16000:
        tmp = pout.parent / f".{pout.stem}.in-{uuid.uuid4().hex}.wav"
        cmd = ["ffmpeg","-nostdin","-hide_banner","-loglevel","error","-y",
               "-i", pin, "-ac","1","-ar","16000","-sample_fmt","s16", str(tmp)]
        p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        if p.returncode != 0:
            try: tmp.unlink(missing_ok=True)
            except: pass
            return {"error": "resample_fail", "path_in": pin}
        x, sr = sf.read(str(tmp), dtype="float32", always_2d=False)
        try: tmp.unlink(missing_ok=True)
        except: pass

    x = np.clip(x, -1.0, 1.0).astype(np.float32)

    # RawBoost v3–style
    key = f"{pin}|{PROFILE}"
    try:    y = rawboost_v3(x, sr, key)
    except: y = x

    # write RB temp near target (avoid cross-device link)
    tmp_rb = pout.parent / f".{pout.stem}.rb-{uuid.uuid4().hex}.wav"
    sf.write(str(tmp_rb), y, 16000, subtype="PCM_16")

    # EQ + loudness + trim via ffmpeg
    gain_db = float(np.clip(TARGET_DB - rms_dbfs_arr(y), -20.0, 20.0))
    FILTER  = build_filter_chain_strong(gain_db)

    tmp_out = pout.parent / f".{pout.stem}.tmp-{uuid.uuid4().hex}.wav"
    cmd = ["ffmpeg","-nostdin","-hide_banner","-loglevel","error","-y",
           "-i", str(tmp_rb), "-ac","1","-ar","16000",
           "-af", FILTER, "-sample_fmt","s16", str(tmp_out)]
    p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    try: tmp_rb.unlink(missing_ok=True)
    except: pass
    if p.returncode != 0:
        try: tmp_out.unlink(missing_ok=True)
        except: pass
        return {"error": p.stderr.decode("utf-8","ignore")[:300], "path_in": pin}

    os.replace(str(tmp_out), str(pout))

    # stats
    try:
        z, sr2 = sf.read(str(pout), dtype="float32", always_2d=False)
        if hasattr(z, "ndim") and z.ndim > 1: z = z.mean(axis=1)
        dur = len(z)/float(sr2) if sr2 else 0.0
        peak = float(np.max(np.abs(z))) if len(z) else 0.0
        rms  = float(np.sqrt(np.mean(np.square(z)))) if len(z) else 0.0
        rms_db = -120.0 if rms<=1e-9 else 20*np.log10(np.clip(rms,1e-9,1.0))
        peak_db = -120.0 if peak<=1e-9 else 20*np.log10(np.clip(peak,1e-9,1.0))
    except Exception:
        dur, rms_db, peak_db = 0.0, -120.0, -120.0

    return {"utt_id": row.get("utt_id",""), "path_in": pin, "path_out": str(pout),
            "profile": PROFILE, "duration": round(dur,3),
            "rms_db": round(rms_db,2), "peak_db": round(peak_db,2)}

# ------------ scheduler + progress ------------
created = 0; errs = 0; buf = []
start = time.time()
inflight_cap = max(MAX_WORKERS * INFLIGHT_MULT, MAX_WORKERS)
pbar = tqdm(total=len(todo_rows), desc=f"preproc:{PROFILE}:FLEURS", unit="file")

def flush_manifest(rows):
    if not rows: return
    with open(MANIFEST, "a", newline="", encoding="utf-8") as fcsv:
        w = csv.DictWriter(fcsv, fieldnames=["utt_id","path_in","path_out","profile","duration","rms_db","peak_db"])
        for r in rows: w.writerow(r)

it = iter(todo_rows)
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
    inflight = set()
    for _ in range(min(inflight_cap, len(todo_rows))):
        try: inflight.add(ex.submit(preprocess_one, next(it)))
        except StopIteration: break
    while inflight:
        done = next(as_completed(inflight))
        inflight.remove(done)
        res = done.result()
        pbar.update(1)
        if isinstance(res, dict) and "error" in res:
            errs += 1
        elif res:
            buf.append(res); created += 1
            if len(buf) >= BATCH_WRITE:
                flush_manifest(buf); buf.clear()
        try: inflight.add(ex.submit(preprocess_one, next(it)))
        except StopIteration: pass

flush_manifest(buf)
pbar.close()
print(f"\n[{PROFILE}:FLEURS] FINISHED: +{created} files, {errs} errors | elapsed {(time.time()-start)/60:.1f} min")

# quick manifest slice
if MANIFEST.exists():
    m = pd.read_csv(MANIFEST)
    mp = m[m["profile"]==PROFILE]
    hrs = float(pd.to_numeric(mp["duration"], errors="coerce").fillna(0).sum()/3600.0)
    print(f"[{PROFILE}] Manifest rows: {len(mp):,} | hours: {hrs:.2f} h")

print("[OK] STRONG preprocessing for FLEURS done.")

[preproc:strong:FLEURS] PLANNED=604 | ALREADY_DONE=0 | TO_DO=604


preproc:strong:FLEURS:   0%|          | 0/604 [00:00<?, ?file/s]


[strong:FLEURS] FINISHED: +604 files, 0 errors | elapsed 2.3 min
[strong] Manifest rows: 98,397 | hours: 186.23 h
[OK] STRONG preprocessing for FLEURS done.


### **11. Profile: Clean - Test Set**
We generate a separate test set using the `clean` profile. This profile applies **Artifact EQ** and normalization but **omits RawBoost** (noise/reverb). This provides a baseline for evaluating model performance on high-quality, non-degraded audio.

In [None]:
# ================= PREPROCESSING — CLEAN (for test set) =================
# Writes: /content/drive/MyDrive/hindi_dfake/processed/wav/clean/...
# Logs to: /content/drive/MyDrive/hindi_dfake/metadata/proc_manifest_clean.csv

import os, csv, uuid, time, subprocess
from pathlib import Path
import pandas as pd, numpy as np, soundfile as sf
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm.auto import tqdm

# ------------ CONFIG ------------
ROOT = Path("/content/drive/MyDrive/hindi_dfake")
META = ROOT / "metadata"
OUT_ROOT = ROOT / "processed" / "wav"

PROFILE = "clean"  # clean = NO RawBoost, just EQ + normalize + trim
TARGET_DB   = -26.0
TRIM_THR_DB = -45
TRIM_DUR_S  = 0.20

# Concurrency
MAX_WORKERS   = 6
INFLIGHT_MULT = 3
BATCH_WRITE   = 500

# Test set CSVs (both real and fake)
TEST_REAL_CSV = META / "fs_test_real.labeled.csv"
TEST_FAKE_CSV = META / "fs_test_fake_mms.labeled.csv"
MANIFEST = META / "proc_manifest_clean.csv"

assert TEST_REAL_CSV.exists(), f"Missing {TEST_REAL_CSV}"
assert TEST_FAKE_CSV.exists(), f"Missing {TEST_FAKE_CSV}"

# ------------ helpers ------------
def read_csv_safe(path: Path) -> pd.DataFrame:
    df = pd.read_csv(path)
    for c in ["utt_id","path_audio","duration","label"]:
        if c not in df.columns:
            df[c] = "" if c != "duration" else 0.0
    return df

def infer_raw_path(processed_path: str) -> str:
    """
    STRONG-only mapping:
      <anything>/hindi_dfake/processed/wav/strong/raw/<suffix>
    → /content/drive/MyDrive/hindi_dfake/raw/<suffix>

    Works even if the CSV contains Windows-style paths like:
      G:\\My Drive\\hindi_dfake\\processed\\wav\\strong\\raw\\...
    """
    s = str(processed_path).replace("\\", "/")
    marker = "hindi_dfake/processed/wav/strong/raw/"

    if marker not in s:
        return str(processed_path)

    suffix = s.split(marker, 1)[1]  # e.g. "real_clean/ivr/xxx.wav"
    return str((ROOT / "raw" / suffix).resolve())

def out_path_for(raw_path: str) -> Path:
    """Map raw audio path to clean output path"""
    p = Path(raw_path)
    try:
        rel = p.relative_to(ROOT / "raw")
    except:
        rel = Path("_external") / p.name
    return OUT_ROOT / PROFILE / rel

def ensure_parent(p: Path):
    p.parent.mkdir(parents=True, exist_ok=True)

def rms_dbfs_arr(x: np.ndarray) -> float:
    if x.ndim > 1:
        x = x.mean(axis=1)
    if len(x) == 0:
        return -120.0
    rms = float(np.sqrt(np.mean(np.square(x))))
    if rms <= 1e-9:
        return -120.0
    return 20.0*np.log10(np.clip(rms, 1e-9, 1.0))

def build_filter_chain_clean(gain_db: float) -> str:
    """Same EQ as strong, but NO RawBoost beforehand"""
    return ",".join([
        "highpass=f=20",
        "equalizer=f=3000:t=q:w=1.0:g=2.5",
        "equalizer=f=4800:t=q:w=0.9:g=2.0",
        "treble=g=1.0:f=6000:t=h:w=0.7",
        f"volume={gain_db}dB",
        f"silenceremove=start_periods=1:start_duration={TRIM_DUR_S}:start_threshold={TRIM_THR_DB}dB:stop_periods=1:stop_duration={TRIM_DUR_S}:stop_threshold={TRIM_THR_DB}dB",
    ])

# ------------ load test set ------------
df_real = read_csv_safe(TEST_REAL_CSV)
df_fake = read_csv_safe(TEST_FAKE_CSV)

# Convert path_audio (processed/strong) to raw paths (STRONG-only)
df_real["path_raw"] = df_real["path_audio"].astype(str).apply(infer_raw_path)
df_fake["path_raw"] = df_fake["path_audio"].astype(str).apply(infer_raw_path)

# Debug sanity (recommended)
print("Sample mapping (REAL):")
print(df_real[["path_audio","path_raw"]].head(3).to_string(index=False))
print("Exists?:", [Path(p).exists() for p in df_real["path_raw"].head(3)])

# Filter to existing files only
df_real = df_real[df_real["path_raw"].apply(lambda p: Path(p).exists())].copy()
df_fake = df_fake[df_fake["path_raw"].apply(lambda p: Path(p).exists())].copy()

df_test = pd.concat([df_real, df_fake], ignore_index=True)

print(f"\nTest set files found:")
print(f"  Real: {len(df_real):,}")
print(f"  Fake: {len(df_fake):,}")
print(f"  Total: {len(df_test):,}")

# ------------ manifest skip logic ------------
MANIFEST.parent.mkdir(parents=True, exist_ok=True)
if MANIFEST.exists():
    mf = pd.read_csv(MANIFEST, usecols=["profile","path_out"])
    done_set = set(mf.loc[mf["profile"] == PROFILE, "path_out"].astype(str))
else:
    with open(MANIFEST, "w", newline="", encoding="utf-8") as f:
        csv.writer(f).writerow(["utt_id","path_in","path_out","profile","duration","rms_db","peak_db"])
    done_set = set()

def already_done(pin: str) -> bool:
    pout = out_path_for(pin)
    return pout.exists() or (str(pout) in done_set)

todo_rows = [r for _, r in df_test.iterrows() if not already_done(str(r["path_raw"]))]
planned, todo = len(df_test), len(todo_rows)
print(f"\n[preproc:{PROFILE}] PLANNED={planned:,} | ALREADY_DONE={planned-todo:,} | TO_DO={todo:,}")

# ------------ worker (NO RAWBOOST) ------------
def preprocess_one(row):
    pin = str(row["path_raw"])
    pout = out_path_for(pin)
    ensure_parent(pout)

    # Read and resample to 16k mono
    try:
        x, sr = sf.read(pin, dtype="float32", always_2d=False)
    except Exception as e:
        return {"error": f"read_fail: {e}", "path_in": pin}

    if x.ndim > 1:
        x = x.mean(axis=1)

    if sr != 16000:
        tmp = pout.parent / f".{pout.stem}.in-{uuid.uuid4().hex}.wav"
        cmd = ["ffmpeg","-nostdin","-hide_banner","-loglevel","error","-y",
               "-i", pin, "-ac","1","-ar","16000","-sample_fmt","s16", str(tmp)]
        p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        if p.returncode != 0:
            try: tmp.unlink(missing_ok=True)
            except: pass
            return {"error": "resample_fail", "path_in": pin}
        x, sr = sf.read(str(tmp), dtype="float32", always_2d=False)
        try: tmp.unlink(missing_ok=True)
        except: pass

    x = np.clip(x, -1.0, 1.0).astype(np.float32)

    # NO RawBoost here
    tmp_clean = pout.parent / f".{pout.stem}.clean-{uuid.uuid4().hex}.wav"
    sf.write(str(tmp_clean), x, 16000, subtype="PCM_16")

    # Compute gain and apply EQ + normalize + trim
    gain_db = float(np.clip(TARGET_DB - rms_dbfs_arr(x), -20.0, 20.0))
    FILTER = build_filter_chain_clean(gain_db)

    tmp_out = pout.parent / f".{pout.stem}.tmp-{uuid.uuid4().hex}.wav"
    cmd = ["ffmpeg","-nostdin","-hide_banner","-loglevel","error","-y",
           "-i", str(tmp_clean), "-ac","1","-ar","16000",
           "-af", FILTER, "-sample_fmt","s16", str(tmp_out)]
    p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    try: tmp_clean.unlink(missing_ok=True)
    except: pass

    if p.returncode != 0:
        try: tmp_out.unlink(missing_ok=True)
        except: pass
        return {"error": p.stderr.decode("utf-8","ignore")[:300], "path_in": pin}

    os.replace(str(tmp_out), str(pout))

    # Stats
    try:
        z, sr2 = sf.read(str(pout), dtype="float32", always_2d=False)
        if z.ndim > 1:
            z = z.mean(axis=1)
        dur = len(z)/float(sr2) if sr2 else 0.0
        peak = float(np.max(np.abs(z))) if len(z) else 0.0
        rms = float(np.sqrt(np.mean(np.square(z)))) if len(z) else 0.0
        rms_db = -120.0 if rms<=1e-9 else 20*np.log10(np.clip(rms,1e-9,1.0))
        peak_db = -120.0 if peak<=1e-9 else 20*np.log10(np.clip(peak,1e-9,1.0))
    except Exception:
        dur, rms_db, peak_db = 0.0, -120.0, -120.0

    return {
        "utt_id": row.get("utt_id",""),
        "path_in": pin,
        "path_out": str(pout),
        "profile": PROFILE,
        "duration": round(dur,3),
        "rms_db": round(rms_db,2),
        "peak_db": round(peak_db,2),
    }

# ------------ process ------------
created = 0; errs = 0; buf = []
start = time.time()
inflight_cap = max(MAX_WORKERS * INFLIGHT_MULT, MAX_WORKERS)
pbar = tqdm(total=len(todo_rows), desc=f"preproc:{PROFILE}", unit="file")

def flush_manifest(rows):
    if not rows: return
    with open(MANIFEST, "a", newline="", encoding="utf-8") as fcsv:
        w = csv.DictWriter(fcsv, fieldnames=["utt_id","path_in","path_out","profile","duration","rms_db","peak_db"])
        for r in rows:
            w.writerow(r)

it = iter(todo_rows)
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
    inflight = set()
    for _ in range(min(inflight_cap, len(todo_rows))):
        try: inflight.add(ex.submit(preprocess_one, next(it)))
        except StopIteration: break

    while inflight:
        done = next(as_completed(inflight))
        inflight.remove(done)
        res = done.result()
        pbar.update(1)

        if isinstance(res, dict) and "error" in res:
            errs += 1
        elif res:
            buf.append(res); created += 1
            if len(buf) >= BATCH_WRITE:
                flush_manifest(buf); buf.clear()

        try: inflight.add(ex.submit(preprocess_one, next(it)))
        except StopIteration: pass

flush_manifest(buf)
pbar.close()
print(f"\n[{PROFILE}] FINISHED: +{created} files, {errs} errors | elapsed {(time.time()-start)/60:.1f} min")

# Summary
if MANIFEST.exists():
    m = pd.read_csv(MANIFEST)
    mp = m[m["profile"] == PROFILE]
    hrs = float(pd.to_numeric(mp["duration"], errors="coerce").fillna(0).sum()/3600.0)
    print(f"[{PROFILE}] Manifest rows: {len(mp):,} | hours: {hrs:.2f} h")

    print(f"\n[{PROFILE}] Output directories created:")
    for parent in sorted(set(Path(p).parent for p in mp["path_out"])):
        count = sum(1 for p in mp["path_out"] if str(p).startswith(str(parent)))
        print(f"  {parent} -> {count} files")

print("\n[OK] CLEAN test set preprocessing done.")
print(f"Output location: {OUT_ROOT / PROFILE}")

Sample mapping (REAL):
                                                                                                        path_audio                                                                                                 path_raw
G:\My Drive\hindi_dfake\processed\wav\strong\raw\real_clean\thirdparty\fleurs_hi_in\fleurs_hi_8b63ce7acd912273.wav /content/drive/MyDrive/hindi_dfake/raw/real_clean/thirdparty/fleurs_hi_in/fleurs_hi_8b63ce7acd912273.wav
G:\My Drive\hindi_dfake\processed\wav\strong\raw\real_clean\thirdparty\fleurs_hi_in\fleurs_hi_6c47be90259fefd1.wav /content/drive/MyDrive/hindi_dfake/raw/real_clean/thirdparty/fleurs_hi_in/fleurs_hi_6c47be90259fefd1.wav
G:\My Drive\hindi_dfake\processed\wav\strong\raw\real_clean\thirdparty\fleurs_hi_in\fleurs_hi_cb0357a131aedb4f.wav /content/drive/MyDrive/hindi_dfake/raw/real_clean/thirdparty/fleurs_hi_in/fleurs_hi_cb0357a131aedb4f.wav
Exists?: [True, True, True]

Test set files found:
  Real: 3,022
  Fake: 3,002
  Total: 6,024

[p

preproc:clean:   0%|          | 0/5594 [00:00<?, ?file/s]


[clean] FINISHED: +5594 files, 0 errors | elapsed 17.7 min
[clean] Manifest rows: 5,594 | hours: 8.11 h

[clean] Output directories created:
  /content/drive/MyDrive/hindi_dfake/processed/wav/clean/fake_tts_mms -> 3002 files
  /content/drive/MyDrive/hindi_dfake/processed/wav/clean/real_clean/commonvoice -> 468 files
  /content/drive/MyDrive/hindi_dfake/processed/wav/clean/real_clean/ivr -> 1950 files
  /content/drive/MyDrive/hindi_dfake/processed/wav/clean/real_clean/thirdparty/fleurs_hi_in -> 174 files

[OK] CLEAN test set preprocessing done.
Output location: /content/drive/MyDrive/hindi_dfake/processed/wav/clean
