# Speech-to-Text (STT) Notebook
A structured workflow for development and experimentation.

## Environment Setup (pip installs, conda setup, dependencies)

In [None]:
# !pip install https://github.com/huggingface/transformers/archive/main.zip torchaudio peft soundfile
# !pip install https://files.pythonhosted.org/packages/fc/ca/83398cfcd557360a3d7b2d732aee1c5f6999f68618d1645f38d53e14c9ff/vosk-0.3.45-py3-none-manylinux_2_12_x86_64.manylinux2010_x86_64.whl
# !pip -q install vosk pydub psutil jiwer
# !pip -q install faster-whisper soundfile
# !pip -q install transformers soundfile librosa jiwer psutil


## Imports (all Python imports in one place)

In [1]:
# pre-check for torchaudio
import os, ctypes
# Ensure libgomp is in the global symbol table before torchaudio loads
try:
    ctypes.CDLL("libgomp.so.1", mode=os.RTLD_GLOBAL)
except OSError:
    # Fallback to absolute path if needed (adjust if your path differs)
    ctypes.CDLL("/lib/aarch64-linux-gnu/libgomp.so.1", mode=os.RTLD_GLOBAL)

import torchaudio  # must come AFTER the preload
print("torchaudio OK")


torchaudio OK


In [3]:
import torch
# from transformers import Speech2TextProcessor, Speech2TextForConditionalGeneration, AutoProcessor, AutoModelForSpeechSeq2Seq
from datasets import load_dataset

import vosk

from huggingface_hub import hf_hub_download

## Configuration

## Data / Inputs

# **Models**

## Facebook s2t-small-librispeech-asr

### Model Setup

In [None]:
import transformers, torch
print("Transformers:", transformers.__version__, "| Torch CUDA:", torch.cuda.is_available())


In [None]:
import torch
from transformers import Speech2TextProcessor, Speech2TextForConditionalGeneration
import sentencepiece

MODEL_ID = "facebook/s2t-small-librispeech-asr"
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

processor = Speech2TextProcessor.from_pretrained(MODEL_ID)
model = Speech2TextForConditionalGeneration.from_pretrained(MODEL_ID)
model.to(DEVICE)  # keep float32; this model is stable in fp32

print("Loaded:", MODEL_ID, "| Device:", DEVICE)


### Compute Model

In [None]:
import time, psutil, numpy as np, soundfile as sf, librosa, torch

AUDIO_PATH = "ref.wav"

# Load audio as mono 16 kHz float32
# (librosa handles resampling; S2T uses 16k log-mels)
wave, sr = librosa.load(AUDIO_PATH, sr=16_000, mono=True)
audio_dur_sec = len(wave) / 16_000.0

# Features
inputs = processor(wave, sampling_rate=16_000, return_tensors="pt")
input_features = inputs.input_features.to(DEVICE)
attn_mask = inputs.attention_mask.to(DEVICE) if "attention_mask" in inputs else None

# Decode
t0 = time.time()
with torch.inference_mode():
    generated_ids = model.generate(
        input_features,
        attention_mask=attn_mask,
        max_length=448,     # safe cap; adjust if your files are long
        num_beams=1,        # greedy for speed
        do_sample=False
    )
wall = time.time() - t0

# Text
s2t_transcript = processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()

# Metrics
rtf = wall / max(audio_dur_sec, 1e-6)
proc = psutil.Process()
mem_mb = proc.memory_info().rss / (1024**2)
cpu_pct = psutil.cpu_percent(interval=0.2)

print("— TRANSCRIPT —")
print(s2t_transcript if s2t_transcript else "(empty)")
print("\n— STATS —")
print(f"duration_sec: {audio_dur_sec:.4f}")
print(f"wall_time_sec: {wall:.4f}")
print(f"real_time_factor: {rtf:.4f}")
print(f"cpu_percent: {cpu_pct:.2f}")
print(f"mem_rss_mb: {mem_mb:.2f}")


In [None]:
# pre-check
import os, ctypes, sys

# Try system libgomp first (adjust path if ldconfig shows a different one)
CANDIDATES = [
    "libgomp.so.1",
    "/lib/aarch64-linux-gnu/libgomp.so.1",
    "/usr/lib/aarch64-linux-gnu/libgomp.so.1",
]

loaded = False
for p in CANDIDATES:
    try:
        ctypes.CDLL(p, mode=os.RTLD_GLOBAL)
        loaded = True
        break
    except OSError:
        pass

if not loaded:
    raise RuntimeError("Could not preload libgomp; check your system path.")

# OPTIONAL but helpful on small systems
os.environ.setdefault("OMP_NUM_THREADS", "1")
os.environ.setdefault("OPENBLAS_NUM_THREADS", "1")

# Now it's safe to import sklearn/transformers/etc.
import sklearn
print("sklearn OK:", sklearn.__version__)


In [None]:
import torch
from transformers import Speech2TextProcessor, Speech2TextForConditionalGeneration, pipeline

AUDIO_PATH = "ref.wav"
MODEL_ID   = "facebook/s2t-small-librispeech-asr"

processor = Speech2TextProcessor.from_pretrained(MODEL_ID)
model = Speech2TextForConditionalGeneration.from_pretrained(MODEL_ID)
asr_s2t = pipeline(
    "automatic-speech-recognition",
    model=model,
    tokenizer=processor.tokenizer,
    feature_extractor=processor.feature_extractor,
    device=0 if torch.cuda.is_available() else -1,
)

s2t_out = asr_s2t(
    AUDIO_PATH,
    chunk_length_s=15, stride_length_s=(2,2),
    generate_kwargs=dict(
        num_beams=3,
        no_repeat_ngram_size=3,
        repetition_penalty=1.2,
        length_penalty=1.0,
        max_new_tokens=200,
        early_stopping=True,
    ),
)
s2t_transcript = s2t_out["text"]
print("S2T:", s2t_transcript[:200], "…")


## Vosk

### Model Setup

In [None]:
import os, zipfile, urllib.request

model_url = "https://alphacephei.com/vosk/models/vosk-model-small-en-us-0.15.zip"
zip_path  = "content/vosk-model-small-en-us-0.15.zip"
model_dir = "content/vosk-model-small-en-us-0.15"

if not os.path.exists(model_dir):
    print("Downloading Vosk small EN model...")
    urllib.request.urlretrieve(model_url, zip_path)
    print("Unzipping...")
    with zipfile.ZipFile(zip_path, 'r') as zf:
        zf.extractall("content") # dictory path of the content path folder
    os.remove(zip_path)
else:
    print("Model already present")

print("Model ready:", model_dir, "| Exists:", os.path.isdir(model_dir))


### Computer Model

In [None]:
import json, time, subprocess, tempfile, psutil
from pathlib import Path
from pydub import AudioSegment
from vosk import Model, KaldiRecognizer

AUDIO_PATH = Path("content/ref.wav")
MODEL_DIR  = Path("content/vosk-model-small-en-us-0.15")

def ensure_pcm16_mono_16k(in_path: Path) -> Path:
    out_wav = Path(tempfile.gettempdir()) / f"conv_{in_path.stem}_16k_mono.wav"
    cmd = [
        "ffmpeg","-y",
        "-i", str(in_path),
        "-ac","1","-ar","16000","-f","wav","-acodec","pcm_s16le",
        str(out_wav)
    ]
    # Colab has ffmpeg preinstalled
    subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
    return out_wav

def transcribe_file_vosk(model_dir: Path, audio_path: Path):
    t0 = time.time()
    model = Model(str(model_dir))
    rec = KaldiRecognizer(model, 16000)
    rec.SetWords(True)

    wav_path = ensure_pcm16_mono_16k(audio_path)
    pcm = AudioSegment.from_wav(wav_path)
    raw = pcm.raw_data

    # ~30 ms chunking
    chunk_ms = 30
    step = int(16000 * 2 * chunk_ms / 1000)  # bytes per 30 ms (16k * 2 bytes)
    for i in range(0, len(raw), step):
        rec.AcceptWaveform(raw[i:i+step])

    final_json = json.loads(rec.FinalResult())
    text = final_json.get("text", "")

    wall = time.time() - t0
    dur  = len(pcm) / 1000.0
    rtf  = wall / max(dur, 1e-6)

    proc = psutil.Process()
    mem_mb = proc.memory_info().rss / (1024**2)
    cpu_pct = psutil.cpu_percent(interval=0.2)

    stats = dict(duration_sec=dur, wall_time_sec=wall, real_time_factor=rtf,
                 cpu_percent=cpu_pct, mem_rss_mb=mem_mb)
    return text, stats

vosk_text, stats = transcribe_file_vosk(MODEL_DIR, AUDIO_PATH)

print("— TRANSCRIPT —")
print(vosk_text if vosk_text else "(empty)")
print("\n— STATS —")
for k, v in stats.items():
    print(f"{k}: {v:.4f}" if isinstance(v, float) else f"{k}: {v}")


## Facebook - Wav2Vec2-Base-960h

### Model Setup

In [None]:
import torch, psutil
from transformers import Wav2Vec2Processor, Wav2Vec2ForCTC, pipeline

MODEL_ID = "facebook/wav2vec2-base-960h"
AUDIO_PATH = "/content/ref.wav"

device = "cuda" if torch.cuda.is_available() else "cpu"
device_index = 0 if device == "cuda" else -1

# Load processor + model
processor = Wav2Vec2Processor.from_pretrained(MODEL_ID)
model = Wav2Vec2ForCTC.from_pretrained(MODEL_ID)

# Optional: dynamic quantization on CPU to reduce RAM (safe to skip on GPU)
if device == "cpu":
    model = torch.quantization.quantize_dynamic(
        model, {torch.nn.Linear}, dtype=torch.qint8
    )

model.to(device).eval()

# Build a pipeline so we get robust long-audio chunking
asr = pipeline(
    "automatic-speech-recognition",
    model=model,
    tokenizer=processor.tokenizer,
    feature_extractor=processor.feature_extractor,
    device=device_index,
)
print(f"Ready on {device} (quantized={device=='cpu'})")


### Model Compute

In [None]:
import time, soundfile as sf, psutil

# Audio duration for RTF
info = sf.info(AUDIO_PATH)
audio_dur_sec = info.frames / max(info.samplerate, 1)

t0 = time.time()
out = asr(
    AUDIO_PATH,
    chunk_length_s=20,        # chunking keeps RAM flat
    stride_length_s=(2, 2),   # small overlap
    return_timestamps="word", # <-- FIX for CTC pipelines
    # ignore_warning=True,    # optional
)
wall = time.time() - t0
rtf = wall / max(audio_dur_sec, 1e-6)

proc = psutil.Process()
mem_mb = proc.memory_info().rss / (1024**2)
cpu_pct = psutil.cpu_percent(interval=0.2)

wav2vec2_text = out["text"].strip() if isinstance(out, dict) else str(out).strip()

print("— TRANSCRIPT —")
print(wav2vec2_text if wav2vec2_text else "(empty)")

print("\n— STATS —")
print(f"duration_sec: {audio_dur_sec:.4f}")
print(f"wall_time_sec: {wall:.4f}")
print(f"real_time_factor: {rtf:.4f}")
print(f"cpu_percent: {cpu_pct:.2f}")
print(f"mem_rss_mb: {mem_mb:.2f}")


## OpenAI - Whisper

### Model Setup

In [4]:
import torch, psutil, time, soundfile as sf
from faster_whisper import WhisperModel

AUDIO_PATH = "content/ref.wav"

# Choose size: "tiny", "base", ("small", "medium" also exist but heavier)
MODEL_SIZE = "base"

# Auto-select device/precision
if torch.cuda.is_available():
    DEVICE = "cuda"
    COMPUTE_TYPE = "float16"   # good default for GPU
else:
    DEVICE = "cpu"
    COMPUTE_TYPE = "int8"      # efficient on CPU

print(f"Using model={MODEL_SIZE}, device={DEVICE}, compute_type={COMPUTE_TYPE}")
model = WhisperModel(MODEL_SIZE, device=DEVICE, compute_type=COMPUTE_TYPE)


ImportError: /home/nvidia/Documents/STT-Model-Transformer/venv/lib/python3.8/site-packages/ctranslate2/../ctranslate2.libs/libgomp-d22c30c5.so.1.0.0: cannot allocate memory in static TLS block

### Compute Model

In [None]:
import itertools

# Load once to get duration for RTF
info = sf.info(AUDIO_PATH)
audio_dur_sec = info.frames / max(info.samplerate, 1)

t0 = time.time()
segments, info_rt = model.transcribe(
    AUDIO_PATH,
    language="en",           # English only (skips auto-detect)
    task="transcribe",       # not "translate"
    beam_size=1,             # greedy for speed; set >1 for accuracy
    vad_filter=True,         # basic VAD can help on noisy files
)
# Collect text (order preserved)
whisper_text = " ".join(s.text.strip() for s in segments)

wall = time.time() - t0
rtf = wall / max(audio_dur_sec, 1e-6)

proc = psutil.Process()
mem_mb = proc.memory_info().rss / (1024**2)
cpu_pct = psutil.cpu_percent(interval=0.2)

print("— TRANSCRIPT —")
print(whisper_text if whisper_text else "(empty)")
print("\n— STATS —")
print(f"duration_sec: {audio_dur_sec:.4f}")
print(f"wall_time_sec: {wall:.4f}")
print(f"real_time_factor: {rtf:.4f}")
print(f"cpu_percent: {cpu_pct:.2f}")
print(f"mem_rss_mb: {mem_mb:.2f}")

# Extra info from faster-whisper (when available)
if info_rt is not None:
    # info_rt might have attributes like duration, transcription options, etc.
    try:
        print("\n— RUNTIME INFO —")
        print(f"language: {info_rt.language}")
        print(f"language_probability: {info_rt.language_probability:.4f}")
        print(f"duration (reported): {info_rt.duration:.4f}")
    except Exception:
        pass


## Model Comparison

In [None]:
ref_str = "It looks like your descending, sir I need to make sure you are climbing not descending, Gold is climbing 22G, say altitude 2500, 22G, 22G low altitude alert, climb the airplane maintain 5000, climb the airplane please, just level off the plane and climb the airplane up to 5000, when you can, sir, you appear to be descending again sir, Are you... say altitude. Tower the aircraft has crashed uhh... about half a mile in front of us into the houses"

In [None]:
# %% ASR 4-way comparison with Accuracy + Correctness/Precision/F1
# Expects these variables to be defined earlier:
#   ref_str, vosk_text, whisper_text, wav2vec2_text, s2t_transcript
# Optional installs (uncomment if needed):
# !pip -q install pandas matplotlib

import re, os, csv
import pandas as pd
import matplotlib.pyplot as plt

# ---------- Normalization tuned for ATC-ish speech ----------
def normalize_atc(s: str) -> str:
    if s is None:
        return ""
    s = s.lower()
    # map common domain tokens
    s = s.replace("22g", "two two golf")
    s = s.replace("5,000", "five thousand").replace("5000", "five thousand")
    s = s.replace("2500", "two thousand five hundred")
    s = s.replace("gold", "golf")
    # drop fillers, punctuation, collapse spaces
    s = re.sub(r"\b(uh+|um+)\b", " ", s)
    s = re.sub(r"[‐–—…]", " ", s)          # normalize dashes/ellipses
    s = re.sub(r"[^\w\s]", " ", s)         # strip punctuation
    s = re.sub(r"\s+", " ", s).strip()
    return s

def normalize_raw(s: str) -> str:
    if s is None:
        return ""
    s = s.lower()
    s = re.sub(r"[^\w\s]", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s

# ---------- Levenshtein alignment to get S/D/I + matches ----------
def align_counts(ref_tokens, hyp_tokens):
    R, H = ref_tokens, hyp_tokens
    n, m = len(R), len(H)
    dp = [[0]*(m+1) for _ in range(n+1)]
    for i in range(1, n+1): dp[i][0] = i
    for j in range(1, m+1): dp[0][j] = j
    for i in range(1, n+1):
        for j in range(1, m+1):
            if R[i-1] == H[j-1]:
                dp[i][j] = dp[i-1][j-1]
            else:
                dp[i][j] = min(dp[i-1][j] + 1,    # deletion
                               dp[i][j-1] + 1,    # insertion
                               dp[i-1][j-1] + 1)  # substitution
    # backtrack to count
    i, j = n, m
    S = D = I = C = 0
    while i > 0 or j > 0:
        if i>0 and j>0 and R[i-1] == H[j-1] and dp[i][j] == dp[i-1][j-1]:
            C += 1; i -= 1; j -= 1
        elif i>0 and j>0 and dp[i][j] == dp[i-1][j-1] + 1:
            S += 1; i -= 1; j -= 1
        elif i>0 and dp[i][j] == dp[i-1][j] + 1:
            D += 1; i -= 1
        else:
            I += 1; j -= 1
    return {"S": S, "D": D, "I": I, "C": C, "N": n, "M": m}

def metrics_from_tokens(ref_toks, hyp_toks):
    c = align_counts(ref_toks, hyp_toks)
    N, M = max(1, c["N"]), max(1, c["M"])
    S, D, I, C = c["S"], c["D"], c["I"], c["C"]
    wer = (S + D + I) / N
    acc = max(0.0, 1.0 - wer)
    # correctness = recall of reference words; precision = cleanliness of hypothesis
    correctness = C / N
    precision   = C / M
    f1 = 0.0 if (correctness + precision) == 0 else 2 * correctness * precision / (correctness + precision)
    return {
        "wer": wer, "acc": acc,
        "correctness": correctness,
        "precision": precision,
        "f1": f1,
        "ref_words": c["N"], "hyp_words": c["M"],
        "S": S, "D": D, "I": I, "C": C
    }

def score_pair(name, hyp_raw, ref_raw):
    # RAW tokens
    ref_r = normalize_raw(ref_raw).split()
    hyp_r = normalize_raw(hyp_raw).split()
    raw = metrics_from_tokens(ref_r, hyp_r)
    # DOMAIN-NORMALIZED tokens
    ref_n = normalize_atc(ref_raw).split()
    hyp_n = normalize_atc(hyp_raw).split()
    norm = metrics_from_tokens(ref_n, hyp_n)

    return {
        "backend": name,
        "wer_raw": round(raw["wer"], 4),
        "acc_raw": round(raw["acc"], 4),
        "wer_norm": round(norm["wer"], 4),
        "acc_norm": round(norm["acc"], 4),
        "correctness": round(norm["correctness"], 4),
        "precision": round(norm["precision"], 4),
        "f1": round(norm["f1"], 4),
        "ref_words": norm["ref_words"],
        "hyp_words": norm["hyp_words"],
        "S": norm["S"], "D": norm["D"], "I": norm["I"], "C": norm["C"],
        "hyp_preview": (str(hyp_raw)[:120] + "…") if hyp_raw and len(str(hyp_raw)) > 120 else str(hyp_raw or "")
    }

# Collect available hypotheses
def _get(name):
    return globals()[name] if name in globals() else None

if "ref_str" not in globals():
    raise RuntimeError("Please define `ref_str` (ground-truth transcript) first.")

pairs = [
    ("whisper",   _get("whisper_text")),
    ("wav2vec2",  _get("wav2vec2_text")),
    ("vosk",      _get("vosk_text")),
    ("s2t-small", _get("s2t_transcript")),
]

rows = [score_pair(name, hyp, ref_str) for name, hyp in pairs if hyp is not None]
df = pd.DataFrame(rows).sort_values(["wer_norm", "backend"]).reset_index(drop=True)
display(df)

# Save CSV (same path as before to keep your workflow)
csv_path = "/content/asr_eval_summary.csv"
df.to_csv(csv_path, index=False)
print(f"Saved → {csv_path}")

# Charts: WER and Accuracy (normalized)
plt.figure(figsize=(6,4))
plt.bar(df["backend"], df["wer_norm"])
plt.title("ASR Comparison — Normalized WER (lower is better)")
plt.xlabel("backend"); plt.ylabel("WER (normalized)")
plt.ylim(0, max(0.05, df["wer_norm"].max()*1.15))
plt.tight_layout(); plt.show()

plt.figure(figsize=(6,4))
plt.bar(df["backend"], df["acc_norm"])
plt.title("ASR Comparison — Normalized Accuracy (higher is better)")
plt.xlabel("backend"); plt.ylabel("Accuracy (normalized)")
plt.ylim(0, 1.0)
plt.tight_layout(); plt.show()

# Optional: print best by acc_norm
if not df.empty:
    best = df.sort_values("acc_norm", ascending=False).iloc[0]
    print(f"Best by acc_norm: {best['backend']} ({best['acc_norm']*100:.2f}%)")
