# Audio Transcription – Peru Audio Clips  
### **GPU-Enhanced, Progress-Aware, Multi-Threaded Pipeline (ElevenLabs Scribe)**

This notebook transcribes Spanish MP3 audio clips to **text** **transcripts** using
the new **ElevenLabs Scribe v1** Speech-to-Text API, replacing the previous Groq Whisper flow.

---

**Key Enhancements (relative to the Groq version)**

| Area | Upgrade |
|------|---------|
| STT engine | Switched to ElevenLabs Scribe v1 – 99-language, word-timestamped, speaker-diarized output. |
| Diarization | Native `diarize` flag with `num_speakers` hint for up to 32 speakers. |
| Spanish mode | `language_code="es"` consistently enforced. |
| Cost ledger | Updated to $0.40 h / $0.00667 min pricing. |
| API client | Uses the **official `elevenlabs` Python SDK**. |


In [None]:
# --- Install dependencies ---------------------------------------------------
import importlib.util, subprocess, sys, os

# Ensure required packages are installed both in Colab and in a local Jupyter environment.
pkgs = [
    'elevenlabs',
    'python-dotenv',
    'requests',
    'pandas',
    'tqdm',
    'psutil',
    'soundfile',
    'librosa'
]

IN_COLAB = importlib.util.find_spec('google.colab') is not None

if IN_COLAB:
    # Colab: use shell magic for speed and readability
    from IPython import get_ipython
    get_ipython().system('pip -q install ' + ' '.join(pkgs))
else:
    # Local: run pip quietly via subprocess (only installs missing packages)
    subprocess.run([sys.executable, '-m', 'pip', 'install', '--quiet', *pkgs], check=True)

print('✅ Dependencies installed/verified')


In [None]:
# --- Environment & GPU detection -------------------------------------------
import importlib.util, subprocess, os, sys, json, time, hashlib, psutil
from pathlib import Path
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock, Semaphore, Event

IN_COLAB = importlib.util.find_spec("google.colab") is not None
CPU_COUNT = os.cpu_count() or 1
CHUNK_SIZE = 8192

# GPU detection (unchanged helper)
def detect_gpu_and_configure():
    gpu = {'available': False, 'memory_gb': 0, 'acceleration_flag': '', 'max_concurrent': 1}
    try:
        out = subprocess.run(['nvidia-smi','--query-gpu=memory.total','--format=csv,noheader,nounits'],
                             capture_output=True, text=True, timeout=10)
        if out.returncode==0:
            mem_mb=int(out.stdout.strip())
            mem_gb=mem_mb/1024
            gpu.update({'available': True,
                        'memory_gb': mem_gb,
                        'acceleration_flag': '-hwaccel cuda',
                        'max_concurrent': 4 if mem_gb>=16 else 2 if mem_gb>=8 else 1})
            print(f"🚀 NVIDIA GPU: {mem_gb:.1f} GB - {gpu['max_concurrent']} concurrent heavy tasks")
    except Exception as e:
        print("No NVIDIA GPU or nvidia-smi failed:", e)
    if not gpu['available']:
        print("💻 Using CPU-only mode for local tasks")
    return gpu

GPU_INFO = detect_gpu_and_configure()
GPU_SEMAPHORE = Semaphore(GPU_INFO['max_concurrent']) if GPU_INFO['available'] else None
GPU_MONITOR_STOP = Event()

def start_gpu_monitor(interval=60):
    if not GPU_INFO['available']: return None
    def _loop():
        while not GPU_MONITOR_STOP.is_set():
            try:
                out = subprocess.run(['nvidia-smi','--query-gpu=utilization.gpu,memory.used,memory.total','--format=csv,noheader,nounits'],
                                     capture_output=True, text=True, timeout=5)
                if out.returncode==0:
                    util, used, total = map(int,out.stdout.strip().split(','))
                    print(f"[GPU] util {util}%  mem {used}/{total} MB")
            except Exception as e:
                print("[GPU-mon] err",e)
            GPU_MONITOR_STOP.wait(interval)
    import threading, atexit
    t=threading.Thread(target=_loop,daemon=True); t.start()
    atexit.register(GPU_MONITOR_STOP.set)
    return t


In [None]:
# --- Adaptive worker counts -------------------------------------------------
def configure_workers():
    mem = psutil.virtual_memory().available / 2**30
    base = min(CPU_COUNT*2, 16)
    by_mem = int(mem*0.8)
    total = min(base, by_mem, 12)
    return max(1,total)

MAX_WORKERS = configure_workers()
PROGRESS_SAVE_INTERVAL = 5
print(f"🖥️  CPU {CPU_COUNT}  → workers {MAX_WORKERS}")


In [None]:
# --- Drive / paths ----------------------------------------------------------
import os
from pathlib import Path
if IN_COLAB:
    from google.colab import drive as _gd; _gd.mount('/content/drive')
    ROOT = Path('/content/drive/My Drive/world bank/data/Peru')
else:
    # For local runs you can set PERU_DATA_ROOT to point to the project directory; defaults to cwd.
    ROOT = Path(os.getenv('PERU_DATA_ROOT', Path.cwd()))

INPUT_CSV   = ROOT/'evals/formattedData/peru_with_audio_clips.csv'
PROGRESS_CSV= ROOT/'evals/formattedData/peru_transcript_progress.csv'
FINAL_CSV   = ROOT/'evals/formattedData/peru_with_transcripts.csv'
TRANS_DIR   = ROOT/'transcripts/processed'
CACHE_DIR   = ROOT/'transcripts/cache'
COST_LOG    = ROOT/'transcripts/cost_tracking.json'

for p in [TRANS_DIR, CACHE_DIR]:
    p.mkdir(parents=True, exist_ok=True)

print("ROOT →",ROOT)


In [None]:
# --- ElevenLabs API init ----------------------------------------------------
from dotenv import load_dotenv; load_dotenv()
from elevenlabs import ElevenLabs
import math
from pathlib import Path

API_KEY = os.getenv('ELEVENLABS_API_KEY')
if not API_KEY:
    raise RuntimeError('Set ELEVENLABS_API_KEY env var')

client = ElevenLabs(api_key=API_KEY)
MODEL_ID = 'scribe_v1'
print("Model →", MODEL_ID)

TRANSCRIPTION_PARAMS = dict(
    model_id=MODEL_ID,
    language_code='es',      # Spanish
    diarize=True,            # enable speaker diarization
    num_speakers=None,       # adjust if known, else None
    timestamps_granularity='word',
    tag_audio_events=True
)

# ElevenLabs bills $0.40 per hour → $0.0066667 per minute
COST_PER_MIN = 0.0066667


In [None]:
# --- Processor class with JSON and column-based storage ---------------------
import librosa, soundfile
import pandas as pd
import json
from tqdm.auto import tqdm
from datetime import datetime
from threading import Lock

class TranscriptionProcessor:
    def __init__(self):
        self.lock = Lock()
        self.stats = dict(processed=0, success=0, fail=0, cached=0,
                          api_calls=0, cost=0.0, minutes=0.0,
                          start=time.time())
        self.progress_cnt=0
        self.load_cost()

    # ---------- cost ledger ----------
    def load_cost(self):
        try:
            if COST_LOG.exists():
                self.stats['cost'] = json.loads(COST_LOG.read_text()).get('total_cost',0)
                print("💰 prev cost =",self.stats['cost'])
        except: pass
    def flush_cost(self):
        COST_LOG.write_text(json.dumps({
            'total_cost': self.stats['cost'],
            'updated': datetime.now().isoformat()
        },indent=2))

    # ---------- saving ---------------
    def save_progress(self, df, force=False):
        with self.lock:
            self.progress_cnt += 1
            if force or self.progress_cnt>=PROGRESS_SAVE_INTERVAL:
                df.to_csv(PROGRESS_CSV, index=False)
                self.progress_cnt=0
                print("💾 checkpoint saved", PROGRESS_CSV.name)

    # ---------- cache utils ----------
    def cache_key(self, path:Path):
        s=path.stat()
        return hashlib.md5(f"{path}{s.st_size}{s.st_mtime}".encode()).hexdigest()

    def cache_path(self, path:Path): return CACHE_DIR/f"{self.cache_key(path)}.json"

    def try_cache(self, path:Path):
        cp=self.cache_path(path)
        if cp.exists():
            try:
                return json.loads(cp.read_text())['text']
            except: pass
        return None

    def store_cache(self, path:Path, text:str):
        self.cache_path(path).write_text(json.dumps({'text':text,'ts':time.time()}))

    def duration_min(self, path:Path):
        try:
            print(f"Calculating duration for {path}")
            y,sr=librosa.load(path, sr=None, mono=True)
            duration = len(y)/sr/60
            print(f"Calculated duration: {duration}")
            return duration
        except Exception as e:
            print(f"ERROR during duration calculation: {e}")
            return max(path.stat().st_size/(1024**2),0.1)

    # ---------- JSON storage and metadata extraction ----------
    def store_transcript_data(self, df:pd.DataFrame, idx:int, prefix:str, transcript_data:dict):
        # Store full JSON response
        json_col = prefix + '_JSON'
        df.at[idx, json_col] = json.dumps(transcript_data, ensure_ascii=False)
        # Store text and key metadata fields
        df.at[idx, prefix + ' Text'] = transcript_data.get('text', '')
        df.at[idx, prefix + ' Language Code'] = transcript_data.get('language_code', '')
        df.at[idx, prefix + ' Language Probability'] = transcript_data.get('language_probability', 0)
        words = transcript_data.get('words', [])
        df.at[idx, prefix + ' Word Count'] = len([w for w in words if w.get('type') == 'word'])
        df.at[idx, prefix + ' Duration Seconds'] = max([w.get('end', 0) for w in words], default=0)
        df.at[idx, prefix + ' Speaker Count'] = len(set(w.get('speaker_id') for w in words if w.get('speaker_id')))
        df.at[idx, prefix + ' Has Audio Events'] = any(w.get('type') == 'audio_event' for w in words)
        df.at[idx, prefix + ' First Speaker'] = next((w.get('speaker_id') for w in words if w.get('type') == 'word'), None)

    # ---------- core transcription with JSON response ----------
    def transcribe(self, path:Path, retries=0):
        print(f"Processing: {path}")
        print(f"File exists: {path.exists()}")
        if GPU_SEMAPHORE: GPU_SEMAPHORE.acquire()
        try:
            cached_text = self.try_cache(path)
            if cached_text:
                with self.lock: self.stats['cached']+=1
                print("Result from cache")
                # Returning minimal JSON with text only
                return {'text': cached_text, 'language_code':'', 'language_probability':0, 'words':[]}, "cached"

            dur=self.duration_min(path)
            est_cost=dur*COST_PER_MIN
            print("Calling ElevenLabs API...")
            with open(path,'rb') as f:
                resp = client.speech_to_text.convert(file=f, **TRANSCRIPTION_PARAMS)
            # Build transcript_data dict
            text = resp.text.strip() if hasattr(resp,'text') else str(resp)
            transcript_data = {
                'language_code': getattr(resp, 'language_code', ''),
                'language_probability': getattr(resp, 'language_probability', 0),
                'text': text,
                'words': []
            }
            for w in getattr(resp, 'words', []):
                transcript_data['words'].append({
                    'text': getattr(w, 'text', ''),
                    'start': getattr(w, 'start', 0),
                    'end': getattr(w, 'end', 0),
                    'type': getattr(w, 'type', ''),
                    'speaker_id': getattr(w, 'speaker_id', ''),
                    'logprob': getattr(w, 'logprob', None)
                })
            if len(text) < 10:
                print("Transcript too short, raising")
                raise ValueError("too short")
            self.store_cache(path, text)
            with self.lock:
                self.stats.update(success=self.stats['success']+1,
                                  api_calls=self.stats['api_calls']+1,
                                  cost=self.stats['cost']+est_cost,
                                  minutes=self.stats['minutes']+dur)
            print("Transcription success")
            return transcript_data, "ok"
        except Exception as e:
            print(f"ERROR: {type(e).__name__}: {e}")
            if retries < 2:
                print(f"Retrying... attempt {retries+1}")
                time.sleep(2**retries)
                return self.transcribe(path, retries+1)
            with self.lock: self.stats['fail'] += 1
            return None, str(e)
        finally:
            if GPU_SEMAPHORE: GPU_SEMAPHORE.release()


In [None]:
# --- Load dataframe and initialize metadata columns ------------------------
import pandas as pd, numpy as np
df = pd.read_csv(INPUT_CSV)
for col in ['First Audio Transcript','Last Audio Transcript']:
    # Ensure base transcript columns exist
    if col not in df.columns: df[col] = ''
    # JSON and metadata columns
    df[col + '_JSON'] = ''
    df[col + ' Text'] = ''
    df[col + ' Language Code'] = ''
    df[col + ' Language Probability'] = 0
    df[col + ' Word Count'] = 0
    df[col + ' Duration Seconds'] = 0
    df[col + ' Speaker Count'] = 0
    df[col + ' Has Audio Events'] = False
    df[col + ' First Speaker'] = None

# merge existing progress / final for base and metadata columns
for p in [PROGRESS_CSV, FINAL_CSV]:
    if p.exists():
        df_old = pd.read_csv(p)
        for col in ['First Audio Transcript','Last Audio Transcript']:
            # Merge base column
            if col in df_old.columns:
                df[col] = df[col].mask(df[col].eq('') & df_old[col].ne(''), df_old[col])
            # Merge metadata columns
            for meta_col in [
                '_JSON', ' Text', ' Language Code', ' Language Probability',
                ' Word Count', ' Duration Seconds', ' Speaker Count',
                ' Has Audio Events', ' First Speaker'
            ]:
                full_col = col + meta_col
                if full_col in df_old.columns:
                    df[full_col] = df[full_col].mask(df[full_col].eq('') & df_old[full_col].ne(''), df_old[full_col])

print(f"Rows {len(df)}")

In [None]:
# --- Build job list ---------------------------------------------------------
jobs = []
processor = TranscriptionProcessor()
for idx, row in df.iterrows():
    ident = row.get('School_Clip', f'row_{idx}')
    if row['First Audio Clip'] and not row['First Audio Transcript_JSON']:
        jobs.append(dict(idx=idx, ref=row['First Audio Clip'], col='First Audio Transcript', ident=ident, ctype='first'))
    if row['Last Audio Clip'] and not row['Last Audio Transcript_JSON']:
        jobs.append(dict(idx=idx, ref=row['Last Audio Clip'], col='Last Audio Transcript', ident=ident, ctype='last'))

print(f"Pending clips {len(jobs)}")

In [None]:
# --- Run transcription ------------------------------------------------------
start = time.time()
if jobs:
    def worker(job):
        if IN_COLAB:
            path = Path('/content/drive/My Drive') / job['ref']
        else:
            path = ROOT / job['ref']
        transcript_data, status = processor.transcribe(path)
        if transcript_data:
            processor.store_transcript_data(df, job['idx'], job['col'], transcript_data)
        processor.save_progress(df)
        with processor.lock:
            processor.stats['processed'] += 1
        return job, status

    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex, tqdm(total=len(jobs), unit='clip') as bar:
        for fut in as_completed([ex.submit(worker, j) for j in jobs]):
            job, status = fut.result()
            bar.update(1)
            bar.set_postfix(proc=processor.stats['processed'], succ=processor.stats['success'],
                            cost=f"${processor.stats['cost']:.2f}")
    processor.save_progress(df, force=True)
else:
    print("Nothing to do – all transcripts present")


In [None]:
# --- Save final CSV ---------------------------------------------------------
df.to_csv(FINAL_CSV, index=False)
processor.flush_cost()
print(f"✅ Saved {FINAL_CSV.name} | rows {len(df)}")


## 🎉 Done!

This notebook automatically:
1. Detects GPUs and adjusts worker counts.
2. Saves progress every 5 successful transcriptions (`peru_transcript_progress.csv`).
3. Resumes from either the **progress** or **final** CSV if re-run.
4. **Uses ElevenLabs Scribe** for high-accuracy Spanish transcripts with speaker diarization.
5. Tracks cumulative ElevenLabs API spend in `cost_tracking.json` at $0.00667 per minute.

With the updated implementation:
- Full JSON responses are stored per-transcript in dedicated `_JSON` columns;
- Key metadata (text, language, word counts, durations, speaker info) are extracted into separate columns;
- Existing workflows remain compatible while enabling more detailed analysis.

You can safely interrupt and restart without re-transcribing completed audio files.
