# Notebook to eval Common Voice dataset
- https://github.com/common-voice/cv-dataset
- now legacy code as the code of this notebook is refactored into eval scripts
- this notebook is for initial exploration of cv22 dataset and building eval pipeline


```bash
[lang].tar.gz/
├── clips/
│   ├── *.mp3 files                # audio clips
├── dev.tsv                        # development set (subset for ML model dev/tuning)
├── invalidated.tsv                # clips with ≥2 validations where down_votes > up_votes, 
│                                  # or ≥3 validations with down_votes = up_votes
├── other.tsv                      # clips without enough validations to determine status
├── test.tsv                       # test set (subset for model evaluation)
├── train.tsv                      # training set (largest subset for ML model training)
├── validated.tsv                  # clips with ≥2 validations where up_votes > down_votes
├── reported.tsv                   # (since Corpus 5.0) sentences flagged/reported by contributors
```

## Variables, file utils

In [9]:
from dotenv import load_dotenv
import os
import sys
from pathlib import Path

REPO_ROOT = Path("..").resolve()
if str(REPO_ROOT) not in sys.path:
    sys.path.append(str(REPO_ROOT))

load_dotenv(override=True)
import pandas as pd
import numpy as np
import json
from functools import partial
print(os.getenv("TEST"))


Ok. Reading from .env file


In [2]:
def list_dirs(path, return_full_path=False):
    path = os.path.abspath(os.path.realpath(os.path.expanduser(path)))
    dirs = []
    for f in os.listdir(path):
        full_path = os.path.join(path, f)
        if os.path.isdir(full_path):
            dirs.append(full_path if return_full_path else f)
    return dirs


def list_files(path, ext=".mp3", return_full_path=False):
    path = os.path.abspath(os.path.realpath(os.path.expanduser(path)))
    files = []
    for f in os.listdir(path):
        full_path = os.path.join(path, f)
        if os.path.isfile(full_path) and f.endswith(ext):
            files.append(full_path if return_full_path else f)
    return files

In [None]:
list_dirs(os.getenv("CV22_PATH"))
test_tsv_files = list_files(
    os.path.join(os.getenv("CV22_PATH"), "en"),
    ext="test.tsv",
    return_full_path=True,
)

print(test_tsv_files)


## CV22 manager class
- simple class to easily work with Common Voice dataset

In [7]:
import concurrent.futures
import threading
import concurrent.futures
import threading
import os
from datetime import datetime


class CommonVoiceDataset:
    def __init__(self, root_path, random_seed=42):
        """Initialize Common Voice dataset handler."""
        self.root_path = os.path.abspath(os.path.realpath(os.path.expanduser(root_path)))
        self.languages = self._list_languages()
        self.random_seed = random_seed
        self.__valid_splits = ["train", "dev", "test", "validated", "invalidated", "other", "reported"]

        print(f"Found {len(self.languages)} languages: {self.languages[:5]}...")

    # Private methods
    def __validate_language(self, lang_code):
        if lang_code not in self.languages:
            raise ValueError(f"Language '{lang_code}' not found")

    def __validate_split(self, split):
        if split not in self.__valid_splits:
            raise ValueError(f"Invalid split '{split}'")

    # Protected methods
    def _list_languages(self):
        """List all language directories."""
        return [d for d in os.listdir(self.root_path) if os.path.isdir(os.path.join(self.root_path, d))]

    def _get_language_path(self, lang_code):
        self.__validate_language(lang_code)
        return os.path.join(self.root_path, lang_code)

    def _get_split_tsv_path(self, lang_code, split):
        self.__validate_split(split)
        lang_path = self._get_language_path(lang_code)
        tsv_file = os.path.join(lang_path, f"{split}.tsv")

        if not os.path.exists(tsv_file):
            available = self._get_available_splits(lang_code)
            raise ValueError(f"Split '{split}' not found for {lang_code}. Available: {available}")

        return tsv_file

    def _get_available_splits(self, lang_code):
        lang_path = self._get_language_path(lang_code)
        return [s for s in self.__valid_splits if os.path.exists(os.path.join(lang_path, f"{s}.tsv"))]

    def _load_dataframe(self, tsv_path):
        df = pd.read_csv(tsv_path, sep="\t", low_memory=False)

        # Clean missing data
        if "path" in df.columns and "sentence" in df.columns:
            df = df.dropna(subset=["path", "sentence"])
            df = df[df["sentence"].str.strip() != ""]

        return df

    def _apply_quality_filters(self, df, min_up_votes=0, require_gender=False, require_age=False):
        if "up_votes" in df.columns and min_up_votes > 0:
            df = df[df["up_votes"] >= min_up_votes]

        if require_gender and "gender" in df.columns:
            df = df[df["gender"].notna()]

        if require_age and "age" in df.columns:
            df = df[df["age"].notna()]

        return df

    def _sample_deterministically(self, df, n_samples):
        np.random.seed(self.random_seed)

        if len(df) > n_samples:
            return df.sample(n=n_samples, random_state=self.random_seed)
        return df

    def _build_audio_path(self, lang_code, filename):
        lang_path = self._get_language_path(lang_code)

        # Try clips subdirectory first
        audio_path = os.path.join(lang_path, "clips", filename)
        if os.path.exists(audio_path):
            return audio_path

        # Try direct path
        audio_path = os.path.join(lang_path, filename)
        if os.path.exists(audio_path):
            return audio_path

        raise FileNotFoundError(f"Audio file not found: {filename}")

    def _extract_sample_metadata(self, row):
        return {
            "client_id": row.get("client_id", ""),
            "age": row.get("age", ""),
            "gender": row.get("gender", ""),
            "accents": row.get("accents", ""),
            "up_votes": row.get("up_votes", 0),
            "down_votes": row.get("down_votes", 0),
        }

    # Public methods
    def load_split(self, lang_code, split="test"):
        """Load a dataset split."""
        tsv_path = self._get_split_tsv_path(lang_code, split)
        return self._load_dataframe(tsv_path)

    def get_samples(
        self, lang_code, n_samples=100, split="test", min_up_votes=2, require_gender=False, require_age=False
    ):
        """Get n deterministic samples from a language split."""
        df = self.load_split(lang_code, split)
        df = self._apply_quality_filters(df, min_up_votes, require_gender, require_age)
        df = self._sample_deterministically(df, n_samples)

        if len(df) < n_samples:
            print(f"Warning: {lang_code}/{split} has only {len(df)}/{n_samples} samples")

        return df.reset_index(drop=True)

    def get_audio_file_path(self, lang_code, audio_filename):
        """Get full path to audio file."""
        return self._build_audio_path(lang_code, audio_filename)

    def get_sample_with_audio(self, lang_code, sample_row):
        """Get audio path and metadata for a sample."""
        metadata = self._extract_sample_metadata(sample_row)

        return {
            "audio_path": self._build_audio_path(lang_code, sample_row["path"]),
            "text": sample_row["sentence"],
            "lang_code": lang_code,
            **metadata,
        }

    def iter_language_samples(self, n_samples=100, split="test", languages=None, min_up_votes=2, skip_errors=True):
        """Iterate through samples for each language."""
        langs_to_process = languages if languages else sorted(self.languages)

        for lang_code in langs_to_process:
            try:
                samples = self.get_samples(lang_code, n_samples, split, min_up_votes=min_up_votes)

                if len(samples) > 0:
                    yield lang_code, samples

            except Exception as e:
                print(f"Error processing {lang_code}: {e}")
                if not skip_errors:
                    raise

    def get_batch_generator(self, lang_code, samples_df, batch_size=16):
        """Generate batches of samples."""
        for i in range(0, len(samples_df), batch_size):
            batch = []

            for _, row in samples_df.iloc[i : i + batch_size].iterrows():
                try:
                    sample = self.get_sample_with_audio(lang_code, row)
                    batch.append(sample)
                except Exception as e:
                    print(f"Skipping sample: {e}")
                    continue

            if batch:
                yield batch

    def get_language_stats(self, lang_code, split="test"):
        """Get statistics for a language/split."""
        try:
            df = self.load_split(lang_code, split)

            stats = {
                "total_samples": len(df),
                "available_splits": self._get_available_splits(lang_code),
            }

            if "client_id" in df.columns:
                stats["unique_speakers"] = df["client_id"].nunique()

            if "up_votes" in df.columns:
                stats["avg_up_votes"] = df["up_votes"].mean()
                stats["high_quality_samples"] = len(df[df["up_votes"] >= 2])

            return stats

        except Exception as e:
            return {"error": str(e)}

    def iter_language_samples_with_checkpoint(
        self, n_samples=100, split="test", languages=None, checkpoint_file="wer_checkpoint.json"
    ):
        """Iterate through languages, skipping already completed ones."""
        # Load checkpoint if exists
        completed = set()
        if os.path.exists(checkpoint_file):
            with open(checkpoint_file, "r") as f:
                checkpoint = json.load(f)
                completed = set(checkpoint.get("completed", []))
                print(f"Resuming from checkpoint: {len(completed)} languages already done")

        langs_to_process = languages if languages else sorted(self.languages)
        langs_to_process = [l for l in langs_to_process if l not in completed]

        for lang_code in langs_to_process:
            try:
                samples = self.get_samples(lang_code, n_samples, split)
                if len(samples) > 0:
                    yield lang_code, samples
            except Exception as e:
                print(f"[{datetime.now()}] Error with {lang_code}: {e}")
                yield lang_code, None  # Return None to signal error but continue

In [69]:
# print size of all the languages in cv22
cv = CommonVoiceDataset(os.getenv("CV22_PATH"))
all_stats = {}
for lang in cv.languages:
    stats = cv.get_language_stats(lang, split="test")
    all_stats[lang] = stats
    print(f"{lang}: {stats}")

Found 10 languages: ['ru', 'it', 'en', 'es', 'ja']...
ru: {'total_samples': 10244, 'available_splits': ['train', 'dev', 'test', 'validated', 'invalidated', 'other', 'reported'], 'unique_speakers': 2033, 'avg_up_votes': 2.0807301835220615, 'high_quality_samples': 10244}
it: {'total_samples': 15177, 'available_splits': ['train', 'dev', 'test', 'validated', 'invalidated', 'other', 'reported'], 'unique_speakers': 3848, 'avg_up_votes': 2.1311853462476114, 'high_quality_samples': 15177}
en: {'total_samples': 16396, 'available_splits': ['train', 'dev', 'test', 'validated', 'invalidated', 'other', 'reported'], 'unique_speakers': 12082, 'avg_up_votes': 2.2098072700658697, 'high_quality_samples': 16396}
es: {'total_samples': 15893, 'available_splits': ['train', 'dev', 'test', 'validated', 'invalidated', 'other', 'reported'], 'unique_speakers': 6550, 'avg_up_votes': 2.0812936512930222, 'high_quality_samples': 15893}
en: {'total_samples': 16396, 'available_splits': ['train', 'dev', 'test', 'valida

In [33]:
# use the class
cv_dataset = CommonVoiceDataset(os.getenv("CV22_PATH"))
print(cv_dataset.languages)

# get the first 5 samples of english test set
samples_df = cv_dataset.get_samples("en", split="test", n_samples=5)

# get the first sample of english test set with audio path and metadata
sample_with_audio = cv_dataset.get_sample_with_audio("en", samples_df.iloc[0])
from pprint import pprint

pprint(sample_with_audio["audio_path"])
pprint(sample_with_audio["text"])

Found 10 languages: ['ru', 'it', 'en', 'es', 'ja']...
['ru', 'it', 'en', 'es', 'ja', 'de', 'pl', 'pt', 'fr', 'zh-CN']
'/root/data/common_voice_22/cv-corpus-22.0-2025-06-20/en/clips/common_voice_en_77702.mp3'
'He was thinking about omens, and someone had appeared.'


## Functions to evaluate ASR models

In [None]:
# Functions to evaluate ASR models have been replaced by run_wer_evaluation
# which supports multiple services with proper checkpointing and error handling.

## Sample Transcriptions endpoints

In [54]:
!uv pip install openai groq speechmatics-python elevenlabs jiwer -q

In [57]:
from dotenv import load_dotenv

load_dotenv(override=True)

SAMPLE_AUDIO_FILE = "/root/data/common_voice_22/cv-corpus-22.0-2025-06-20/en/clips/common_voice_en_77702.mp3"
SAMPLE_TEXT = "He was thinking about omens, and someone had appeared."

### OpenAI

In [None]:
from openai import OpenAI

client = OpenAI()
audio_file = open(SAMPLE_AUDIO_FILE, "rb")

transcription = client.audio.transcriptions.create(model="gpt-4o-transcribe", file=audio_file)

print(transcription.text)

### Groq

In [None]:
import os
from groq import Groq

client = Groq()
filename = SAMPLE_AUDIO_FILE

with open(filename, "rb") as file:
    transcription = client.audio.transcriptions.create(
        file=(filename, file.read()),
        model="whisper-large-v3-turbo",
        response_format="verbose_json",
    )
    print(transcription.text)

 He was thinking about omens and someone had appeared.


### ElevenLabs

In [None]:
from elevenlabs import ElevenLabs

# Initialize client
client = ElevenLabs(api_key=os.getenv("ELEVENLABS_API_KEY"))

# Open your audio file and send to API
with open(SAMPLE_AUDIO_FILE, "rb") as f:
    transcript = client.speech_to_text.convert(
        file=f,
        model_id="scribe_v1",  # or "scribe_v1_experimental"
        language_code="en",  # optional
    )

# 'This request exceeds your API key quota of 10. You have 0 credits remaining, while 5 credits are required for this request.'
print(transcript.text)

He was thinking about omens, and someone had appeared.


### Spechmatics


In [None]:
# https://github.com/speechmatics/speechmatics-python-sdk

from speechmatics.models import ConnectionSettings, BatchTranscriptionConfig
from speechmatics.batch_client import BatchClient
from httpx import HTTPStatusError

API_KEY = os.getenv("SPEECHMATICS_API_KEY")
PATH_TO_FILE = SAMPLE_AUDIO_FILE
LANGUAGE = "en"

settings = ConnectionSettings(
    url="https://asr.api.speechmatics.com/v2",  # Batch API endpoint
    auth_token=API_KEY,
)

with BatchClient(settings) as client:
    try:
        # Submit job
        job_id = client.submit_job(PATH_TO_FILE, BatchTranscriptionConfig(language=LANGUAGE))
        print(f"Job {job_id} submitted. Waiting for completion...")

        # Wait for results (txt, json-v2, srt, etc.)
        transcript = client.wait_for_completion(job_id, transcription_format="txt")
        print("Transcript:\n", transcript)

    except HTTPStatusError as e:
        if e.response.status_code == 401:
            print("Invalid API key – check your API_KEY.")
        elif e.response.status_code == 400:
            print("Bad request:", e.response.json().get("detail"))
        else:
            raise e


Job bfa1a4g0fx submitted. Waiting for completion...
Transcript:
 He was thinking about omens and someone had appeared.


### Gladia

In [None]:
import requests, time, os

API_KEY = os.getenv("GLADIA_API_KEY")
AUDIO_FILE = SAMPLE_AUDIO_FILE

headers = {"x-gladia-key": API_KEY}

# 1. Upload audio file
with open(AUDIO_FILE, "rb") as f:
    resp = requests.post(
        "https://api.gladia.io/v2/upload",
        headers=headers,
        files={"audio": (os.path.basename(AUDIO_FILE), f, "audio/wav")},
    )
resp.raise_for_status()
file_url = resp.json()["audio_url"]
print("Uploaded:", file_url)

# 2. Request transcription
payload = {"audio_url": file_url}
resp = requests.post(
    "https://api.gladia.io/v2/pre-recorded", headers={**headers, "Content-Type": "application/json"}, json=payload
)
resp.raise_for_status()
job = resp.json()
job_id, result_url = job["id"], job["result_url"]
print("Job ID:", job_id)

# 3. Poll until done
while True:
    resp = requests.get(result_url, headers=headers)
    resp.raise_for_status()
    data = resp.json()
    if data["status"] == "done":
        transcript = data["result"]["transcription"]["full_transcript"]
        print("\nTranscription:\n", transcript)
        break
    elif data["status"] == "error":
        print("Error:", data)
        break
    else:
        time.sleep(2)


Uploaded: https://api.gladia.io/file/2d772b19-0d18-4c02-acfc-abbdb41a1504
Job ID: 45387c1f-f0c6-445d-89ab-135070334072

Transcription:
 He was thinking about omens and someone had appeared.


In [None]:
# Transcribe functions for each service
def transcribe_openai(audio_path, model="gpt-4o-transcribe"):
    import logging
    from openai import OpenAI

    # Suppress verbose logs from httpx and other libraries
    logging.getLogger("httpx").setLevel(logging.WARNING)
    logging.getLogger("openai").setLevel(logging.WARNING)

    client = OpenAI()
    with open(audio_path, "rb") as audio_file:
        transcription = client.audio.transcriptions.create(model=model, file=audio_file)
    return transcription.text


def transcribe_groq(audio_path, model="whisper-large-v3-turbo"):
    import logging

    # Suppress verbose logs from all HTTP libraries before importing
    logging.getLogger("httpx").setLevel(logging.ERROR)
    logging.getLogger("urllib3").setLevel(logging.ERROR)
    logging.getLogger("requests").setLevel(logging.ERROR)
    logging.getLogger("groq").setLevel(logging.ERROR)

    from groq import Groq

    client = Groq()
    with open(audio_path, "rb") as file:
        transcription = client.audio.transcriptions.create(
            file=(os.path.basename(audio_path), file.read()),
            model=model,
            response_format="verbose_json",
        )
    return transcription.text


def transcribe_elevenlabs(audio_path, model_id="scribe_v1"):
    import logging
    from elevenlabs import ElevenLabs

    # Suppress verbose logs from httpx and other libraries
    logging.getLogger("httpx").setLevel(logging.WARNING)
    logging.getLogger("elevenlabs").setLevel(logging.WARNING)

    client = ElevenLabs(api_key=os.getenv("ELEVENLABS_API_KEY"))
    with open(audio_path, "rb") as f:
        transcript = client.speech_to_text.convert(
            file=f,
            model_id=model_id,
            language_code="en",
        )
    return transcript.text


def transcribe_speechmatics(audio_path, language="en"):
    import logging
    from speechmatics.models import ConnectionSettings, BatchTranscriptionConfig
    from speechmatics.batch_client import BatchClient
    from httpx import HTTPStatusError

    # Suppress verbose logs from speechmatics and httpx
    logging.getLogger("speechmatics").setLevel(logging.WARNING)
    logging.getLogger("httpx").setLevel(logging.WARNING)

    API_KEY = os.getenv("SPEECHMATICS_API_KEY")
    settings = ConnectionSettings(
        url="https://asr.api.speechmatics.com/v2",
        auth_token=API_KEY,
    )

    with BatchClient(settings) as client:
        try:
            job_id = client.submit_job(audio_path, BatchTranscriptionConfig(language=language))
            transcript = client.wait_for_completion(job_id, transcription_format="txt")
            return transcript
        except HTTPStatusError as e:
            if e.response.status_code == 401:
                raise Exception("Invalid API key – check your SPEECHMATICS_API_KEY.")
            elif e.response.status_code == 400:
                raise Exception(f"Bad request: {e.response.json().get('detail')}")
            else:
                raise e


def transcribe_gladia(audio_path):
    import logging, requests, time

    # Suppress verbose logs from requests and urllib3
    logging.getLogger("requests").setLevel(logging.WARNING)
    logging.getLogger("urllib3").setLevel(logging.WARNING)

    API_KEY = os.getenv("GLADIA_API_KEY")
    headers = {"x-gladia-key": API_KEY}

    # 1. Upload audio file
    with open(audio_path, "rb") as f:
        resp = requests.post(
            "https://api.gladia.io/v2/upload",
            headers=headers,
            files={"audio": (os.path.basename(audio_path), f, "audio/wav")},
        )
    resp.raise_for_status()
    file_url = resp.json()["audio_url"]

    # 2. Request transcription
    payload = {"audio_url": file_url}
    resp = requests.post(
        "https://api.gladia.io/v2/pre-recorded", headers={**headers, "Content-Type": "application/json"}, json=payload
    )
    resp.raise_for_status()
    job = resp.json()
    job_id, result_url = job["id"], job["result_url"]

    # 3. Poll until done
    while True:
        resp = requests.get(result_url, headers=headers)
        resp.raise_for_status()
        data = resp.json()
        if data["status"] == "done":
            transcript = data["result"]["transcription"]["full_transcript"]
            return transcript
        elif data["status"] == "error":
            raise Exception(f"Transcription error: {data}")
        else:
            time.sleep(2)

In [None]:
import sys




    """

    Args:
        lang_code: Language code (e.g., 'en', 'es', 'fr', etc.)

    Returns:
    """
    if lang_code.lower() in ["en", "english"]:
    else:


# For backward compatibility, keep the old name


## Service Transcribe Functions

Define transcribe functions for each ASR service with the same interface: `transcribe(audio_path)` -> str

In [55]:
# Service and model configurations
SERVICE_MODELS = [
    ("openai", "gpt-4o-transcribe"),
    ("openai", "gpt-4o-mini-transcribe"),
    ("openai", "whisper-1"),
    ("groq", "whisper-large-v3"),
    ("groq", "whisper-large-v3-turbo"),
    ("elevenlabs", "scribe_v1"),
    ("speechmatics", "en"),
    ("gladia", "pre-recorded"),
]

# Create service functions with models
SERVICE_FUNCS = {}
for svc, model in SERVICE_MODELS:
    if svc == "openai":
        SERVICE_FUNCS[f"{svc}_{model}"] = partial(transcribe_openai, model=model)
    elif svc == "groq":
        SERVICE_FUNCS[f"{svc}_{model}"] = partial(transcribe_groq, model=model)
    elif svc == "elevenlabs":
        SERVICE_FUNCS[f"{svc}_{model}"] = partial(transcribe_elevenlabs, model_id=model)
    elif svc == "speechmatics":
        SERVICE_FUNCS[f"{svc}_{model}"] = partial(transcribe_speechmatics, language=model)
    elif svc == "gladia":
        SERVICE_FUNCS[f"{svc}_{model}"] = transcribe_gladia

# Services list (keys of SERVICE_FUNCS)
SERVICES = list(SERVICE_FUNCS.keys())
print(f"SERVICES: {SERVICES}")
print(f"Len SERVICES: {len(SERVICES)}")

SERVICES: ['openai_gpt-4o-transcribe', 'openai_gpt-4o-mini-transcribe', 'openai_whisper-1', 'groq_whisper-large-v3', 'groq_whisper-large-v3-turbo', 'elevenlabs_scribe_v1', 'speechmatics_en', 'gladia_pre-recorded']
Len SERVICES: 8


## WER Evaluation Pipeline

Functions for running WER evaluation across multiple services with retry, error handling, and checkpointing.

In [None]:
import logging
import time
from typing import Dict, List, Any
import jiwer
import concurrent.futures


def run_wer_evaluation(
    dataset_path: str,
    services: List[str],
    service_funcs: Dict[str, callable],
    languages: List[str],
    results_file: str,
    checkpoint_file: str,
    log_file: str,
    max_retries: int = 3,
    n_samples: int = 100,
) -> Dict[str, Dict[str, Any]]:
    """
    Run WER evaluation across multiple ASR services with checkpointing and error handling.

    Args:
        dataset_path: Path to Common Voice dataset
        services: List of service names
        service_funcs: Dict mapping service names to transcribe functions
        languages: List of language codes to evaluate
        results_file: Path to save final results
        checkpoint_file: Path for checkpointing progress
        log_file: Path for logging
        max_retries: Maximum retries per transcription
        n_samples: Number of samples per language

    Returns:
        Dict with results per language
    """

    # Setup logging
    logging.basicConfig(filename=log_file, level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
    console = logging.StreamHandler()
    console.setLevel(logging.INFO)
    logging.getLogger("").addHandler(console)

    # Initialize dataset
    cv_dataset = CommonVoiceDataset(dataset_path)

    # Load checkpoint if exists
    completed_services = set()
    if os.path.exists(checkpoint_file):
        with open(checkpoint_file, "r") as f:
            checkpoint = json.load(f)
            completed_services = set(checkpoint.get("completed", []))
            logging.info(f"Resuming from checkpoint: {len(completed_services)} services completed")

    results = {}

    for lang_code in languages:
        if lang_code not in cv_dataset.languages:
            logging.warning(f"Language {lang_code} not found in dataset")
            continue

        logging.info(f"Processing language: {lang_code}")

        # Get samples
        samples_df = cv_dataset.get_samples(lang_code, n_samples=n_samples, split="test")
        if len(samples_df) == 0:
            logging.warning(f"No samples found for {lang_code}")
            continue

        lang_results = {}

        for service_name in services:
            if f"{lang_code}_{service_name}" in completed_services:
                logging.info(f"Skipping completed: {lang_code}_{service_name}")
                continue

            logging.info(f"Evaluating {service_name} on {lang_code}")

            transcribe_func = service_funcs[service_name]
            predictions = []
            references = []
            timings = []

            # Process samples sequentially, but run services in parallel for each sample
            for idx, (_, row) in enumerate(samples_df.iterrows()):
                sample = cv_dataset.get_sample_with_audio(lang_code, row)
                audio_path = sample["audio_path"]
                reference_text = sample["text"]

                # Normalize reference
                normalized_ref = text_normalizer(reference_text)
                references.append(normalized_ref)

                # Transcribe with retries (single service call)
                transcription = None
                start_time = time.time()

                for attempt in range(max_retries):
                    try:
                        transcription = transcribe_func(audio_path)
                        break
                    except Exception as e:
                        logging.warning(
                            f"Attempt {attempt + 1} failed for {service_name} on {lang_code} sample {idx}: {e}"
                        )
                        if attempt == max_retries - 1:
                            logging.error(f"Failed to transcribe {lang_code} sample {idx} after {max_retries} attempts")
                            transcription = ""  # Empty transcription on failure

                end_time = time.time()
                timings.append(end_time - start_time)

                # Normalize prediction
                if transcription:
                    normalized_pred = text_normalizer(transcription)
                else:
                    normalized_pred = ""
                predictions.append(normalized_pred)

                if (idx + 1) % 10 == 0:
                    logging.info(f"Processed {idx + 1}/{len(samples_df)} samples for {service_name}")

            # Calculate WER
            if predictions and references:
                wer = jiwer.wer(references, predictions)
                avg_time = sum(timings) / len(timings)
            else:
                wer = 1.0
                avg_time = 0.0

            lang_results[service_name] = {"wer": wer, "timing": avg_time, "n_samples": len(predictions)}

            logging.info(
                f"{service_name} on {lang_code}: WER={wer:.4f}, Avg Time={avg_time:.2f}s, Samples={len(predictions)}"
            )

            # Save checkpoint
            completed_services.add(f"{lang_code}_{service_name}")
            checkpoint = {"completed": list(completed_services)}
            with open(checkpoint_file, "w") as f:
                json.dump(checkpoint, f)

        results[lang_code] = lang_results

        # Save intermediate results
        with open(results_file, "w") as f:
            json.dump(results, f, indent=2)

    logging.info("Evaluation completed")
    return results


def run_wer_evaluation_parallel(
    dataset_path: str,
    services: List[str],
    service_funcs: Dict[str, callable],
    languages: List[str],
    results_file: str,
    checkpoint_file: str,
    log_file: str,
    max_retries: int = 3,
    n_samples: int = 100,
    max_workers: int = 8,
) -> Dict[str, Dict[str, Any]]:
    """
    Run WER evaluation across multiple ASR services with parallel processing and checkpointing.

    Args:
        dataset_path: Path to Common Voice dataset
        services: List of service names
        service_funcs: Dict mapping service names to transcribe functions
        languages: List of language codes to evaluate
        results_file: Path to save final results
        checkpoint_file: Path for checkpointing progress
        log_file: Path for logging
        max_retries: Maximum retries per transcription
        n_samples: Number of samples per language
        max_workers: Maximum number of parallel workers (services per sample)

    Returns:
        Dict with results per language
    """

    # Suppress verbose HTTP logging globally before any service imports
    import logging

    logging.getLogger("httpx").setLevel(logging.ERROR)
    logging.getLogger("urllib3").setLevel(logging.ERROR)
    logging.getLogger("requests").setLevel(logging.ERROR)
    logging.getLogger("openai").setLevel(logging.ERROR)
    logging.getLogger("groq").setLevel(logging.ERROR)
    logging.getLogger("elevenlabs").setLevel(logging.ERROR)
    logging.getLogger("speechmatics").setLevel(logging.ERROR)

    # Setup logging
    logging.basicConfig(filename=log_file, level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
    console = logging.StreamHandler()
    console.setLevel(logging.INFO)
    logging.getLogger("").addHandler(console)

    # Initialize dataset
    cv_dataset = CommonVoiceDataset(dataset_path)

    # Load checkpoint if exists
    completed_services = set()
    if os.path.exists(checkpoint_file):
        with open(checkpoint_file, "r") as f:
            checkpoint = json.load(f)
            completed_services = set(checkpoint.get("completed", []))
            logging.info(f"Resuming from checkpoint: {len(completed_services)} services completed")

    results = {}

    def transcribe_with_retry(service_name, audio_path, sample_idx, lang_code):
        """Transcribe a single audio file with retries for one service."""
        transcribe_func = service_funcs[service_name]
        start_time = time.time()

        for attempt in range(max_retries):
            try:
                transcription = transcribe_func(audio_path)
                end_time = time.time()
                return {
                    "service": service_name,
                    "transcription": transcription,
                    "timing": end_time - start_time,
                    "success": True,
                }
            except Exception as e:
                logging.warning(
                    f"Attempt {attempt + 1} failed for {service_name} on {lang_code} sample {sample_idx}: {e}"
                )
                if attempt == max_retries - 1:
                    logging.error(f"Failed to transcribe {lang_code} sample {sample_idx} after {max_retries} attempts")
                    end_time = time.time()
                    return {
                        "service": service_name,
                        "transcription": "",
                        "timing": end_time - start_time,
                        "success": False,
                    }

    for lang_code in languages:
        if lang_code not in cv_dataset.languages:
            logging.warning(f"Language {lang_code} not found in dataset")
            continue

        logging.info(f"Processing language: {lang_code}")

        # Get samples
        samples_df = cv_dataset.get_samples(lang_code, n_samples=n_samples, split="test")
        if len(samples_df) == 0:
            logging.warning(f"No samples found for {lang_code}")
            continue

        lang_results = {}

        # Check which services need to be processed
        services_to_process = [s for s in services if f"{lang_code}_{s}" not in completed_services]

        if not services_to_process:
            logging.info(f"All services already completed for {lang_code}, loading from checkpoint")
            # Load existing results if available
            if os.path.exists(results_file):
                with open(results_file, "r") as f:
                    existing_results = json.load(f)
                    if lang_code in existing_results:
                        lang_results = existing_results[lang_code]
            results[lang_code] = lang_results
            continue

        logging.info(f"Evaluating {len(services_to_process)} services on {lang_code}")

        # Process each sample, running all services in parallel for that sample
        all_predictions = {service: [] for service in services_to_process}
        all_references = []
        all_timings = {service: [] for service in services_to_process}

        for idx, (_, row) in enumerate(samples_df.iterrows()):
            sample = cv_dataset.get_sample_with_audio(lang_code, row)
            audio_path = sample["audio_path"]
            reference_text = sample["text"]

            # Normalize reference
            normalized_ref = text_normalizer(reference_text)
            all_references.append(normalized_ref)

            # Run all services in parallel for this sample
            with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
                # Submit all service transcription tasks for this sample
                future_to_service = {
                    executor.submit(transcribe_with_retry, service, audio_path, idx, lang_code): service
                    for service in services_to_process
                }

                # Collect results as they complete
                for future in concurrent.futures.as_completed(future_to_service):
                    result = future.result()
                    service = result["service"]
                    transcription = result["transcription"]
                    timing = result["timing"]

                    # Normalize prediction
                    if transcription:
                        normalized_pred = text_normalizer(transcription)
                    else:
                        normalized_pred = ""

                    all_predictions[service].append(normalized_pred)
                    all_timings[service].append(timing)

            if (idx + 1) % 5 == 0:
                logging.info(f"Processed {idx + 1}/{len(samples_df)} samples")

        # Calculate WER for each service
        for service_name in services_to_process:
            predictions = all_predictions[service_name]
            timings = all_timings[service_name]

            if predictions and all_references:
                wer = jiwer.wer(all_references, predictions)
                avg_time = sum(timings) / len(timings)
            else:
                wer = 1.0
                avg_time = 0.0

            lang_results[service_name] = {"wer": wer, "timing": avg_time, "n_samples": len(predictions)}

            logging.info(
                f"{service_name} on {lang_code}: WER={wer:.4f}, Avg Time={avg_time:.2f}s, Samples={len(predictions)}"
            )

            # Save checkpoint
            completed_services.add(f"{lang_code}_{service_name}")
            checkpoint = {"completed": list(completed_services)}
            with open(checkpoint_file, "w") as f:
                json.dump(checkpoint, f)

        results[lang_code] = lang_results

        # Save intermediate results
        with open(results_file, "w") as f:
            json.dump(results, f, indent=2)

    logging.info("Evaluation completed")
    return results

## Run WER Evaluation

Example usage of the WER evaluation pipeline.

In [68]:
# Run evaluation for all services on 5 English samples (PARALLEL VERSION)
from datetime import datetime

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

print(f"Number of services: {len(SERVICES)}")
print(f"Services: {SERVICES}")
results = run_wer_evaluation_parallel(
    dataset_path=os.getenv("CV22_PATH"),
    services=SERVICES,  # All services
    service_funcs=SERVICE_FUNCS,
    languages=["en"],  # English only
    results_file=f"results/wer_results_{timestamp}.json",
    checkpoint_file=f"results/wer_checkpoint_{timestamp}.json",
    log_file=f"logs/wer_eval_{timestamp}.log",
    max_retries=2,  # Fewer retries for testing
    n_samples=5,  # 5 samples
    max_workers=8,  # Run up to 8 services in parallel per sample
)

print("\nFinal results:")
for lang, data in results.items():
    print(f"{lang}:")
    for service, metrics in data.items():
        print(
            f"  {service}: WER={metrics['wer']:.4f}, Avg Time={metrics['timing']:.2f}s, Samples={metrics['n_samples']}"
        )


Processing language: en
Processing language: en
Processing language: en
Processing language: en
Processing language: en
Evaluating 8 services on en
Evaluating 8 services on en
Evaluating 8 services on en
Evaluating 8 services on en
Evaluating 8 services on en
Evaluating 8 services on en


Number of services: 8
Services: ['openai_gpt-4o-transcribe', 'openai_gpt-4o-mini-transcribe', 'openai_whisper-1', 'groq_whisper-large-v3', 'groq_whisper-large-v3-turbo', 'elevenlabs_scribe_v1', 'speechmatics_en', 'gladia_pre-recorded']
Found 10 languages: ['ru', 'it', 'en', 'es', 'ja']...


HTTP Request: POST https://api.groq.com/openai/v1/audio/transcriptions "HTTP/1.1 200 OK"
HTTP Request: POST https://api.groq.com/openai/v1/audio/transcriptions "HTTP/1.1 200 OK"
HTTP Request: POST https://api.groq.com/openai/v1/audio/transcriptions "HTTP/1.1 200 OK"
HTTP Request: POST https://api.groq.com/openai/v1/audio/transcriptions "HTTP/1.1 200 OK"
HTTP Request: POST https://api.groq.com/openai/v1/audio/transcriptions "HTTP/1.1 200 OK"
HTTP Request: POST https://api.groq.com/openai/v1/audio/transcriptions "HTTP/1.1 200 OK"
HTTP Request: POST https://api.groq.com/openai/v1/audio/transcriptions "HTTP/1.1 200 OK"
HTTP Request: POST https://api.groq.com/openai/v1/audio/transcriptions "HTTP/1.1 200 OK"
HTTP Request: POST https://api.groq.com/openai/v1/audio/transcriptions "HTTP/1.1 200 OK"
HTTP Request: POST https://api.groq.com/openai/v1/audio/transcriptions "HTTP/1.1 200 OK"
HTTP Request: POST https://api.groq.com/openai/v1/audio/transcriptions "HTTP/1.1 200 OK"
HTTP Request: POST ht


Final results:
en:
  openai_gpt-4o-transcribe: WER=0.1351, Avg Time=1.07s, Samples=5
  openai_gpt-4o-mini-transcribe: WER=0.1622, Avg Time=1.12s, Samples=5
  openai_whisper-1: WER=0.1892, Avg Time=1.41s, Samples=5
  groq_whisper-large-v3: WER=0.2432, Avg Time=0.31s, Samples=5
  groq_whisper-large-v3-turbo: WER=0.1892, Avg Time=0.28s, Samples=5
  elevenlabs_scribe_v1: WER=0.1081, Avg Time=0.76s, Samples=5
  speechmatics_en: WER=0.1622, Avg Time=2.65s, Samples=5
  gladia_pre-recorded: WER=0.2432, Avg Time=6.39s, Samples=5


In [74]:
# Run evaluation for ALL languages, 2 samples each, all services (PARALLEL VERSION)
from datetime import datetime

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

# Get all available languages
cv_dataset = CommonVoiceDataset(os.getenv("CV22_PATH"))
all_languages = cv_dataset.languages
print(f"Found {len(all_languages)} languages: {all_languages}")

print(f"Number of services: {len(SERVICES)}")
print(f"Services: {SERVICES}")

results = run_wer_evaluation_parallel(
    dataset_path=os.getenv("CV22_PATH"),
    services=SERVICES,  # All services
    service_funcs=SERVICE_FUNCS,
    languages=all_languages,  # All languages
    results_file=f"results/wer_results_all_{timestamp}.json",
    checkpoint_file=f"results/wer_checkpoint_all_{timestamp}.json",
    log_file=f"logs/wer_eval_all_{timestamp}.log",
    max_retries=2,  # Fewer retries for testing
    n_samples=2,  # 2 samples per language
    max_workers=8,  # Run up to 8 services in parallel per sample
)

print("\nFinal results:")
for lang, data in results.items():
    print(f"\n{lang.upper()}:")
    for service, metrics in data.items():
        print(
            f"  {service}: WER={metrics['wer']:.4f}, Avg Time={metrics['timing']:.2f}s, Samples={metrics['n_samples']}"
        )

Processing language: ru
Processing language: ru
Processing language: ru
Processing language: ru
Processing language: ru
Processing language: ru
Processing language: ru
Processing language: ru
Processing language: ru
Processing language: ru
Processing language: ru
Evaluating 8 services on ru
Evaluating 8 services on ru
Evaluating 8 services on ru
Evaluating 8 services on ru
Evaluating 8 services on ru
Evaluating 8 services on ru
Evaluating 8 services on ru
Evaluating 8 services on ru
Evaluating 8 services on ru
Evaluating 8 services on ru
Evaluating 8 services on ru
Evaluating 8 services on ru


Found 10 languages: ['ru', 'it', 'en', 'es', 'ja']...
Found 10 languages: ['ru', 'it', 'en', 'es', 'ja', 'de', 'pl', 'pt', 'fr', 'zh-CN']
Number of services: 8
Services: ['openai_gpt-4o-transcribe', 'openai_gpt-4o-mini-transcribe', 'openai_whisper-1', 'groq_whisper-large-v3', 'groq_whisper-large-v3-turbo', 'elevenlabs_scribe_v1', 'speechmatics_en', 'gladia_pre-recorded']
Found 10 languages: ['ru', 'it', 'en', 'es', 'ja']...


openai_gpt-4o-transcribe on ru: WER=0.5000, Avg Time=1.09s, Samples=2
openai_gpt-4o-transcribe on ru: WER=0.5000, Avg Time=1.09s, Samples=2
openai_gpt-4o-transcribe on ru: WER=0.5000, Avg Time=1.09s, Samples=2
openai_gpt-4o-transcribe on ru: WER=0.5000, Avg Time=1.09s, Samples=2
openai_gpt-4o-transcribe on ru: WER=0.5000, Avg Time=1.09s, Samples=2
openai_gpt-4o-transcribe on ru: WER=0.5000, Avg Time=1.09s, Samples=2
openai_gpt-4o-mini-transcribe on ru: WER=0.3750, Avg Time=1.53s, Samples=2
openai_gpt-4o-mini-transcribe on ru: WER=0.3750, Avg Time=1.53s, Samples=2
openai_gpt-4o-mini-transcribe on ru: WER=0.3750, Avg Time=1.53s, Samples=2
openai_gpt-4o-mini-transcribe on ru: WER=0.3750, Avg Time=1.53s, Samples=2
openai_gpt-4o-mini-transcribe on ru: WER=0.3750, Avg Time=1.53s, Samples=2
openai_gpt-4o-mini-transcribe on ru: WER=0.3750, Avg Time=1.53s, Samples=2
openai_whisper-1 on ru: WER=0.6250, Avg Time=1.46s, Samples=2
openai_whisper-1 on ru: WER=0.6250, Avg Time=1.46s, Samples=2
openai


Final results:

RU:
  openai_gpt-4o-transcribe: WER=0.5000, Avg Time=1.09s, Samples=2
  openai_gpt-4o-mini-transcribe: WER=0.3750, Avg Time=1.53s, Samples=2
  openai_whisper-1: WER=0.6250, Avg Time=1.46s, Samples=2
  groq_whisper-large-v3: WER=0.6250, Avg Time=0.27s, Samples=2
  groq_whisper-large-v3-turbo: WER=0.5000, Avg Time=0.26s, Samples=2
  elevenlabs_scribe_v1: WER=2.0000, Avg Time=0.76s, Samples=2
  speechmatics_en: WER=1.1250, Avg Time=2.26s, Samples=2
  gladia_pre-recorded: WER=0.1250, Avg Time=6.35s, Samples=2

IT:
  openai_gpt-4o-transcribe: WER=0.1053, Avg Time=1.17s, Samples=2
  openai_gpt-4o-mini-transcribe: WER=0.1053, Avg Time=0.88s, Samples=2
  openai_whisper-1: WER=0.0000, Avg Time=1.82s, Samples=2
  groq_whisper-large-v3: WER=0.0000, Avg Time=0.39s, Samples=2
  groq_whisper-large-v3-turbo: WER=0.0000, Avg Time=0.27s, Samples=2
  elevenlabs_scribe_v1: WER=0.0000, Avg Time=0.81s, Samples=2
  speechmatics_en: WER=0.9474, Avg Time=2.67s, Samples=2
  gladia_pre-recorded

In [56]:
# Test the text normalizer
test_texts = [
    "He was thinking about omens, and someone had appeared.",
    "Almost all species have some known economic value.",
    "The fruit of a fig tree is apple shaped.",
    "A knockout on all levels.",
    "This applies to adding new scripture.",
    "Reading or not, it's to you, really.",
]

print("Testing text normalizer:")
for text in test_texts:
    normalized = text_normalizer(text)
    changed = " (CHANGED)" if normalized != text.lower() else ""
    print(f"Original: {text}")
    print(f"Normalized: {normalized}{changed}")
    print()

Testing text normalizer:
Original: He was thinking about omens, and someone had appeared.
Normalized: he was thinking about omens and someone had appeared (CHANGED)

Original: Almost all species have some known economic value.
Normalized: almost all species have some known economic value (CHANGED)

Original: The fruit of a fig tree is apple shaped.
Normalized: the fruit of a fig tree is apple shaped (CHANGED)

Original: A knockout on all levels.
Normalized: a knockout on all levels (CHANGED)

Original: This applies to adding new scripture.
Normalized: this applies to adding new scripture (CHANGED)

Original: Reading or not, it's to you, really.
Normalized: reading or not it is to you really (CHANGED)



In [72]:
# Test language-specific normalizers
test_texts = {
    "en": "He was thinking about omens, and someone had appeared.",
    "es": "Él estaba pensando en augurios, y alguien había aparecido.",
    "fr": "Il pensait aux présages, et quelqu'un était apparu.",
    "de": "Er dachte an Omen, und jemand war erschienen.",
}

print("Testing language-specific normalizers:")
for lang, text in test_texts.items():
    normalizer = get_text_normalizer(lang)
    normalized = normalizer(text)
    print(f"{lang.upper()}: {text}")
    print(f"       -> {normalized}")
    print()

Testing language-specific normalizers:
EN: He was thinking about omens, and someone had appeared.
       -> he was thinking about omens and someone had appeared

ES: Él estaba pensando en augurios, y alguien había aparecido.
       -> el estaba pensando en augurios y alguien habia aparecido

FR: Il pensait aux présages, et quelqu'un était apparu.
       -> il pensait aux presages et quelqu un etait apparu

DE: Er dachte an Omen, und jemand war erschienen.
       -> er dachte an omen und jemand war erschienen



In [73]:
# Global logging suppression for ASR services
import logging
import os

# Set logging levels to ERROR for all HTTP-related libraries before any imports
logging.getLogger("httpx").setLevel(logging.ERROR)
logging.getLogger("urllib3").setLevel(logging.ERROR)
logging.getLogger("requests").setLevel(logging.ERROR)
logging.getLogger("openai").setLevel(logging.ERROR)
logging.getLogger("groq").setLevel(logging.ERROR)
logging.getLogger("elevenlabs").setLevel(logging.ERROR)
logging.getLogger("speechmatics").setLevel(logging.ERROR)

# Also disable propagation for these loggers to prevent duplicate messages
for logger_name in ["httpx", "urllib3", "requests", "openai", "groq", "elevenlabs", "speechmatics"]:
    logger = logging.getLogger(logger_name)
    logger.propagate = False

print("Global logging suppression configured")

Global logging suppression configured
