In [1]:
import torch
torch.cuda.is_available()

False

In [1]:
import csv
from pathlib import Path
from collections import defaultdict, Counter
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, Any, List, Optional, Tuple
from tqdm import tqdm
import os

SPLITS = ["train", "dev", "test"]

def find_audio_file(useable_root: Path, src_audio: str) -> Tuple[Path, bool]:
    p = Path(src_audio)
    if p.is_absolute() and p.exists():
        return p, True
    cand = useable_root / p
    if cand.exists():
        return cand, True
    cand2 = useable_root / p.name
    if cand2.exists():
        return cand2, True
    if len(p.parts) > 1:
        cand3 = useable_root / p.parts[0] / p.name
        if cand3.exists():
            return cand3, True
    return cand2, cand2.exists()

def _read_tsv_rows(useable_root: Path, split: str) -> List[Dict[str, str]]:
    tsv = useable_root / f"{split}.tsv"
    if not tsv.exists():
        return []
    rows = []
    with tsv.open("r", encoding="utf-8", newline="") as fh:
        reader = csv.DictReader(fh, delimiter="\t")
        for r in reader:
            src = (r.get("src_audio") or "").strip()
            if not src:
                continue
            lang = (r.get("language") or "").strip()
            rows.append({"src_audio": src, "language": lang})
    return rows

def _prefix_from_src(src: str, language: str, covost_mode: bool=False) -> str:
    p = Path(src)
    if covost_mode and len(p.parts) >= 1 and p.parts[0]:
        return p.parts[0]
    if len(p.parts) > 1:
        return p.parts[0]
    return language or "unknown"

def _lang_folder(prefix: str) -> str:
    return prefix.replace("_", "-")

def _worker_check_file(
    task: Tuple[str, str, Dict[str,str]],
    useable_root_p: Path,
    out_root_p: Path
) -> Dict[str, Any]:
    """
    Worker receives (prefix, split, row) and returns a small dict:
      {
        'prefix': prefix,
        'split': split,
        'basename': basename,
        'src_exists': bool,
        'src_tried': str,
        'dest_exists': bool,
        'dest_path': str
      }
    """
    pref, split, row = task
    src_field = row["src_audio"]
    basename = Path(src_field).name
    src_candidate, src_exists = find_audio_file(useable_root_p, src_field)
    dest_path = out_root_p / _lang_folder(pref) / split / basename
    dest_exists = dest_path.exists()
    return {
        "prefix": pref,
        "split": split,
        "basename": basename,
        "src_field": src_field,
        "src_tried": str(src_candidate),
        "src_exists": src_exists,
        "dest_path": str(dest_path),
        "dest_exists": dest_exists,
    }

def verify_cvss_copy_streaming(
    useable_root: str,
    out_root: str,
    filter_language: Optional[str] = None,
    covost_mode: bool = False,
    max_workers: int = 8,
    show_progress: bool = True,
) -> Dict[str, Any]:
    """
    Streaming verifier: per-row checks with a single tqdm progress bar.

    Returns:
      {
        "groups": { (prefix,split) : { aggregated info } },
        "summary": {...}
      }
    """
    useable_root_p = Path(useable_root).absolute()
    out_root_p = Path(out_root).absolute()

    # 1. Load all rows and build tasks list (prefix, split, row)
    tasks: List[Tuple[str, str, Dict[str,str]]] = []
    prefixes = set()
    for split in SPLITS:
        rows = _read_tsv_rows(useable_root_p, split)
        for r in rows:
            pref = _prefix_from_src(r["src_audio"], r.get("language",""), covost_mode=covost_mode)
            prefixes.add(pref)
            tasks.append((pref, split, r))

    # apply filter_language
    if filter_language is not None:
        def _match(p: str, flt: str) -> bool:
            return p == flt or p.replace("_","-") == flt or (flt in p) or (flt in p.replace("_","-"))
        tasks = [t for t in tasks if _match(t[0], filter_language)]
        prefixes = {t[0] for t in tasks}
        if not tasks:
            return {"error": f"no tasks after applying filter_language='{filter_language}'", "groups": {}}

    total_tasks = len(tasks)
    if total_tasks == 0:
        return {"error": "no tasks found (check your useable_root *.tsv files)", "groups": {}}

    # 2. Stream-check using executor.map and tqdm
    groups_agg: Dict[Tuple[str,str], Dict[str, Any]] = {}
    # initialize groups
    for p in prefixes:
        for s in SPLITS:
            groups_agg[(p,s)] = {
                "prefix": p,
                "split": s,
                "expected_rows": 0,
                "found_on_dest": 0,
                "missing_dest": [],    # sample upto a few
                "missing_src": [],
                "duplicates_counter": Counter(),
            }

    # count expected_rows and duplicates_counter first to fill duplicates info
    for pref, split, row in tasks:
        key = (pref, split)
        groups_agg[key]["expected_rows"] += 1
        basename = Path(row["src_audio"]).name
        groups_agg[key]["duplicates_counter"][basename] += 1

    # Worker wrapper for mapping: it needs only task and global roots
    def _map_iter():
        with ThreadPoolExecutor(max_workers=max_workers) as ex:
            for result in ex.map(lambda t: _worker_check_file(t, useable_root_p, out_root_p), tasks, chunksize=256):
                yield result

    iterator = _map_iter()
    if show_progress:
        iterator = tqdm(iterator, total=total_tasks, desc="verifying files", unit="file")

    # stream results and aggregate
    for res in iterator:
        key = (res["prefix"], res["split"])
        g = groups_agg[key]
        if res["dest_exists"]:
            g["found_on_dest"] += 1
        else:
            if len(g["missing_dest"]) < 100:  # keep a reasonable sample
                g["missing_dest"].append({"basename": res["basename"], "expected_dest": res["dest_path"], "src_exists": res["src_exists"], "src_tried": res["src_tried"]})
        if not res["src_exists"]:
            if len(g["missing_src"]) < 100:
                g["missing_src"].append({"src_field": res["src_field"], "tried": res["src_tried"]})

    # finalize groups: compute duplicates list and counts
    final_groups = {}
    total_expected = 0
    total_found = 0
    total_missing_dest = 0
    total_missing_src = 0
    groups_with_problems = 0
    for key, g in groups_agg.items():
        dup_list = [b for b,c in g["duplicates_counter"].items() if c > 1]
        missing_dest_count = sum(1 for _ in range(g["expected_rows"]) ) if False else len(g["_dummy"]) if False else None
        # we didn't store per-row existence counts per-file other than found_on_dest
        # use stored values:
        missing_dest_count = g["expected_rows"] - g["found_on_dest"]
        missing_src_count = len(g["missing_src"])
        total_expected += g["expected_rows"]
        total_found += g["found_on_dest"]
        total_missing_dest += missing_dest_count
        total_missing_src += missing_src_count
        if missing_dest_count or missing_src_count or dup_list:
            groups_with_problems += 1
        final_groups[key] = {
            "prefix": g["prefix"],
            "split": g["split"],
            "expected_rows": g["expected_rows"],
            "found_on_dest": g["found_on_dest"],
            "missing_dest_count": missing_dest_count,
            "missing_dest_sample": g["missing_dest"],
            "missing_src_count": missing_src_count,
            "missing_src_sample": g["missing_src"],
            "duplicates_count": len(dup_list),
            "duplicates_sample": dup_list[:50],
        }

    summary = {
        "useable_root": str(useable_root_p),
        "out_root": str(out_root_p),
        "covost_mode": covost_mode,
        "total_tasks": total_tasks,
        "total_expected": total_expected,
        "total_found_on_dest": total_found,
        "total_missing_on_dest": total_missing_dest,
        "total_missing_sources": total_missing_src,
        "groups_with_problems": groups_with_problems,
    }

    # concise print
    print("==== verify_cvss_copy_streaming summary ====")
    print(f"Total files checked: {summary['total_tasks']}")
    print(f"Total expected: {summary['total_expected']}, found on dest: {summary['total_found_on_dest']}, missing on dest: {summary['total_missing_on_dest']}")
    print(f"Missing sources: {summary['total_missing_sources']}")
    if summary["groups_with_problems"] == 0:
        print("All good ✅")
    else:
        print(f"Issues in {summary['groups_with_problems']} group(s). Inspect 'groups' for details.")

    return {"groups": final_groups, "summary": summary}


In [3]:
# normal run (parallel, auto progress)
res = verify_cvss_copy_streaming("F:\ML-Project\Datasets\original","F:\ML-Project\Datasets\processed_datasets\cvss\cvss-c", max_workers=8)


verifying files: 100%|██████████| 210309/210309 [1:08:12<00:00, 51.39file/s]

==== verify_cvss_copy_streaming summary ====
Total files checked: 210309
Total expected: 210309, found on dest: 210309, missing on dest: 0
Missing sources: 0
All good ✅





In [2]:

res_covost = verify_cvss_copy_streaming("F:\ML-Project\Datasets\original", "F:\ML-Project\Datasets\processed_datasets\covost", covost_mode=True, max_workers=8)

verifying files: 100%|██████████| 210309/210309 [54:32<00:00, 64.26file/s] 

==== verify_cvss_copy_streaming summary ====
Total files checked: 210309
Total expected: 210309, found on dest: 0, missing on dest: 210309
Missing sources: 0
Issues in 9 group(s). Inspect 'groups' for details.





In [4]:
import csv
from pathlib import Path
from collections import defaultdict, Counter
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, Any, List, Optional, Tuple
from tqdm import tqdm
import os

SPLITS = ["train", "dev", "test"]

def _list_files_fast(dirpath: Path) -> List[str]:
    if not dirpath.exists() or not dirpath.is_dir():
        return []
    try:
        return [ent.name for ent in os.scandir(dirpath) if ent.is_file()]
    except Exception:
        return [p.name for p in dirpath.iterdir() if p.is_file()]

def _read_tsv_rows(useable_root: Path, split: str) -> List[Dict[str,str]]:
    tsv = useable_root / f"{split}.tsv"
    if not tsv.exists():
        return []
    rows = []
    with tsv.open("r", encoding="utf-8", newline="") as fh:
        reader = csv.DictReader(fh, delimiter="\t")
        for r in reader:
            src = (r.get("src_audio") or "").strip()
            if not src:
                continue
            lang = (r.get("language") or "").strip()
            rows.append({"src_audio": src, "language": lang})
    return rows

def _lang_folder(prefix: str, lang_map: Optional[Dict[str,str]]=None) -> str:
    if lang_map and prefix in lang_map:
        return lang_map[prefix].replace("_","-")
    return prefix.replace("_","-")

def _find_audio_covost(useable_root: Path, src_audio: str, recursive_search_depth: int = 2) -> Tuple[Path, bool]:
    """
    CoVoST-aware lookup for a src_audio like 'en/speaker/file.wav' or 'en/file.wav'
    Tries:
      1) If absolute path and exists -> return it
      2) useable_root / lang_code / basename
      3) useable_root / lang_code / */ basename  (one-level speaker subdir)
      4) optionally bounded recursive search under useable_root/lang_code up to depth
      5) fallback: useable_root / basename (not found)
    Returns (candidate_path, exists_bool)
    """
    p = Path(src_audio)
    if p.is_absolute() and p.exists():
        return p, True
    if len(p.parts) == 0:
        cand = useable_root / p.name
        return cand, cand.exists()

    lang_code = p.parts[0]
    basename = p.name

    # 2) direct under lang_code
    cand = useable_root / lang_code / basename
    if cand.exists():
        return cand, True
    # 3) one-level speaker subdir
    lang_dir = useable_root / lang_code
    if lang_dir.exists() and lang_dir.is_dir():
        try:
            for entry in os.scandir(lang_dir):
                if not entry.is_dir():
                    continue
                cand2 = Path(entry.path) / basename
                if cand2.exists():
                    return cand2, True
        except Exception:
            pass
    # 4) bounded depth recursive search (avoid deep full walk)
    if lang_dir.exists() and lang_dir.is_dir() and recursive_search_depth > 0:
        # BFS style traversal up to depth
        from collections import deque
        q = deque([(lang_dir, 0)])
        while q:
            cur_dir, depth = q.popleft()
            if depth > recursive_search_depth:
                continue
            try:
                for entry in os.scandir(cur_dir):
                    if entry.is_file() and entry.name == basename:
                        return Path(entry.path), True
                    if entry.is_dir():
                        q.append((Path(entry.path), depth+1))
            except Exception:
                pass

    # 5) fallback: try useable_root/basename
    cand_fallback = useable_root / basename
    return cand_fallback, cand_fallback.exists()

def _find_audio_general(useable_root: Path, src_audio: str) -> Tuple[Path,bool]:
    p = Path(src_audio)
    if p.is_absolute() and p.exists():
        return p, True
    cand = useable_root / p
    if cand.exists():
        return cand, True
    cand2 = useable_root / p.name
    if cand2.exists():
        return cand2, True
    if len(p.parts) > 1:
        cand3 = useable_root / p.parts[0] / p.name
        if cand3.exists():
            return cand3, True
    return cand2, cand2.exists()

def _worker_covost_task(
    task: Tuple[str, str, Dict[str,str]],
    useable_root_p: Path,
    out_root_p: Path,
    covost_mode: bool,
    lang_map: Optional[Dict[str,str]],
    recursive_search_depth: int
) -> Dict[str, Any]:
    pref, split, row = task
    src_field = row["src_audio"]
    basename = Path(src_field).name

    if covost_mode:
        src_candidate, src_exists = _find_audio_covost(useable_root_p, src_field, recursive_search_depth=recursive_search_depth)
    else:
        src_candidate, src_exists = _find_audio_general(useable_root_p, src_field)

    dest_path = out_root_p / _lang_folder(pref, lang_map) / split / basename
    dest_exists = dest_path.exists()

    return {
        "prefix": pref,
        "split": split,
        "basename": basename,
        "src_field": src_field,
        "src_tried": str(src_candidate),
        "src_exists": src_exists,
        "dest_path": str(dest_path),
        "dest_exists": dest_exists,
    }

def verify_cvss_copy_covost(
    useable_root: str,
    out_root: str,
    filter_language: Optional[str] = None,
    covost_mode: bool = False,
    lang_map: Optional[Dict[str,str]] = None,
    max_workers: int = 8,
    show_progress: bool = True,
    recursive_search_depth: int = 2
) -> Dict[str,Any]:
    """
    Streaming verifier that supports CoVoST layout.

    Parameters:
      - useable_root: path with train/dev/test .tsv and source files
      - out_root: path where your converter wrote per-language folders
      - filter_language: optional filter (applies to prefix or mapped prefix)
      - covost_mode: if True, use CoVoST-aware source lookup (first src component = lang code)
      - lang_map: optional dict mapping src language codes -> out folder names, e.g. {"en": "english"}
      - recursive_search_depth: how deep (in folder levels) to look under useable_root/lang_code (0 = disable)
      - max_workers, show_progress: same as before

    Returns:
      {"groups": { (prefix,split): {...} }, "summary": {...} }
    """
    useable_root_p = Path(useable_root).absolute()
    out_root_p = Path(out_root).absolute()

    # build tasks list
    tasks = []
    prefixes = set()
    for split in SPLITS:
        for row in _read_tsv_rows(useable_root_p, split):
            # prefix detection: in covost_mode, prefer first path part; otherwise use previous heuristics
            p = Path(row["src_audio"])
            if covost_mode and len(p.parts) >= 1 and p.parts[0]:
                pref = p.parts[0]
            elif len(p.parts) > 1:
                pref = p.parts[0]
            else:
                pref = row.get("language") or "unknown"
            prefixes.add(pref)
            tasks.append((pref, split, row))

    # filter tasks if requested (match against prefix or mapped name)
    if filter_language:
        def _match(pref):
            mapped = lang_map.get(pref, pref) if lang_map else pref
            pref_check = pref == filter_language or pref.replace("_","-") == filter_language or (filter_language in pref)
            mapped_check = mapped == filter_language or mapped.replace("_","-") == filter_language or (filter_language in mapped)
            return pref_check or mapped_check
        tasks = [t for t in tasks if _match(t[0])]
        prefixes = {t[0] for t in tasks}
        if not tasks:
            return {"error": f"no tasks after applying filter_language='{filter_language}'", "groups": {}}

    total = len(tasks)
    if total == 0:
        return {"error": "no tasks found — check useable_root .tsv files", "groups": {}}

    # init aggregation buckets
    groups = {}
    for p in prefixes:
        for s in SPLITS:
            groups[(p,s)] = {
                "prefix": p, "split": s, "expected_rows": 0, "found_on_dest": 0,
                "missing_dest": [], "missing_src": [], "duplicates_counter": Counter()
            }
    for pref, split, row in tasks:
        key = (pref, split)
        groups[key]["expected_rows"] += 1
        basename = Path(row["src_audio"]).name
        groups[key]["duplicates_counter"][basename] += 1

    # stream-check via executor.map (chunksize tuned for many small tasks)
    def _map_iter():
        with ThreadPoolExecutor(max_workers=max_workers) as ex:
            for r in ex.map(lambda t: _worker_covost_task(t, useable_root_p, out_root_p, covost_mode, lang_map, recursive_search_depth), tasks, chunksize=256):
                yield r

    iterator = _map_iter()
    if show_progress:
        iterator = tqdm(iterator, total=total, desc="verifying files", unit="file")

    for res in iterator:
        key = (res["prefix"], res["split"])
        g = groups[key]
        if res["dest_exists"]:
            g["found_on_dest"] += 1
        else:
            if len(g["missing_dest"]) < 200:
                g["missing_dest"].append({"basename": res["basename"], "expected_dest": res["dest_path"], "src_exists": res["src_exists"], "src_tried": res["src_tried"]})
        if not res["src_exists"]:
            if len(g["missing_src"]) < 200:
                g["missing_src"].append({"src_field": res["src_field"], "tried": res["src_tried"]})

    # finalize
    final_groups = {}
    total_expected = total_found = total_missing_dest = total_missing_src = 0
    groups_with_problems = 0
    for key,g in groups.items():
        dup_list = [b for b,c in g["duplicates_counter"].items() if c > 1]
        missing_dest_count = g["expected_rows"] - g["found_on_dest"]
        missing_src_count = len(g["missing_src"])
        total_expected += g["expected_rows"]
        total_found += g["found_on_dest"]
        total_missing_dest += missing_dest_count
        total_missing_src += missing_src_count
        if missing_dest_count or missing_src_count or dup_list:
            groups_with_problems += 1
        final_groups[key] = {
            "prefix": g["prefix"],
            "split": g["split"],
            "expected_rows": g["expected_rows"],
            "found_on_dest": g["found_on_dest"],
            "missing_dest_count": missing_dest_count,
            "missing_dest_sample": g["missing_dest"],
            "missing_src_count": missing_src_count,
            "missing_src_sample": g["missing_src"],
            "duplicates_count": len(dup_list),
            "duplicates_sample": dup_list[:50],
        }

    summary = {
        "useable_root": str(useable_root_p),
        "out_root": str(out_root_p),
        "covost_mode": covost_mode,
        "lang_map_used": bool(lang_map),
        "groups_checked": len(final_groups),
        "total_expected_rows": total_expected,
        "total_found_on_dest": total_found,
        "total_missing_on_dest": total_missing_dest,
        "total_missing_sources": total_missing_src,
        "groups_with_problems": groups_with_problems,
    }

    # short human summary
    print("==== verify_cvss_copy_covost summary ====")
    print(f"Total files checked: {total}")
    print(f"Expected: {total_expected}, found on dest: {total_found}, missing on dest: {total_missing_dest}")
    print(f"Missing sources: {total_missing_src}")
    if summary["groups_with_problems"] == 0:
        print("All good ✅")
    else:
        print(f"Issues in {summary['groups_with_problems']} group(s). Inspect results['groups'] for details.")

    return {"groups": final_groups, "summary": summary}


In [None]:
res = verify_cvss_copy_covost(
    useable_root="/mnt/covost/orig",
    out_root="/mnt/out/cvss-c",
    covost_mode=True,
    max_workers=12,
    recursive_search_depth=2
)
