# Call Quality Analyzer (Colab)

This notebook is a Colab-ready conversion of the provided Python script. Run cells in order. Short notes:
- Designed to run on free Colab (use `MODEL_SIZE='tiny'` for fastest runtime).
- It downloads the test YouTube file, preprocesses audio, diarizes (lightweight), transcribes with Whisper, and computes metrics.


In [1]:
# Install dependencies (may take ~1-2 minutes on first run)
!pip install -q yt-dlp ffmpeg-python librosa soundfile transformers openai-whisper resemblyzer webrtcvad scikit-learn torch numpy


[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/177.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m177.1/177.1 kB[0m [31m11.0 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/803.2 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m803.2/803.2 kB[0m [31m45.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m66.2/66.2 kB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m78.6/78.6 kB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?2

In [2]:
# Imports
import os
import subprocess
import math
import tempfile
from pathlib import Path
from collections import defaultdict

import numpy as np
import librosa
import soundfile as sf

import whisper
from resemblyzer import VoiceEncoder, preprocess_wav
from sklearn.cluster import AgglomerativeClustering
from transformers import pipeline


In [3]:
# Helper: download YouTube audio and convert to mono 16k wav
YOUTUBE_URL = "https://www.youtube.com/watch?v=4ostqJD3Psc"  # test file (given)
OUT_WAV = "call_audio.wav"


def download_youtube_audio(youtube_url, out_wav="call_audio.wav"):
    """
    Robust downloader: tries yt-dlp first, falls back to pytube.
    Ensures output WAV is 16kHz mono at path out_wav.
    """
    import subprocess, shlex
    from pathlib import Path
    try:
        # Try yt-dlp
        tmp_audio = "tmp_audio"
        cmd = f"yt-dlp -f bestaudio -o {tmp_audio}.%(ext)s {shlex.quote(youtube_url)}"
        print("Trying yt-dlp...")
        subprocess.run(cmd, shell=True, check=True, capture_output=True, text=True, timeout=180)
        found = None
        for f in Path('.').glob('tmp_audio.*'):
            found = str(f)
            break
        if not found:
            raise FileNotFoundError("yt-dlp did not produce tmp_audio.* file")
        # Convert to 16k mono WAV
        cmd2 = f"ffmpeg -y -i {shlex.quote(found)} -ar 16000 -ac 1 {shlex.quote(out_wav)}"
        subprocess.run(cmd2, shell=True, check=True, capture_output=True, text=True)
        try:
            Path(found).unlink()
        except:
            pass
        return out_wav
    except Exception as e:
        print("yt-dlp failed or timed out:", repr(e))
        print("Falling back to pytube...")
        try:
            from pytube import YouTube
            yt = YouTube(youtube_url)
            stream = yt.streams.filter(only_audio=True).order_by('abr').desc().first()
            if stream is None:
                raise RuntimeError("No audio stream found via pytube")
            temp_file = "pytube_tmp.mp4"
            stream.download(filename=temp_file)
            cmd2 = f"ffmpeg -y -i {shlex.quote(temp_file)} -ar 16000 -ac 1 {shlex.quote(out_wav)}"
            subprocess.run(cmd2, shell=True, check=True, capture_output=True, text=True)
            try:
                Path(temp_file).unlink()
            except:
                pass
            return out_wav
        except Exception as e2:
            print("pytube fallback failed:", repr(e2))
            raise RuntimeError("Both yt-dlp and pytube failed to download the video. Check network, video restrictions, or provide cookies.") from e2


def preprocess_audio(path, target_sr=16000):
    y, sr = librosa.load(path, sr=target_sr, mono=True)
    y = y / (np.max(np.abs(y)) + 1e-9)
    y, _ = librosa.effects.trim(y, top_db=40)
    sf.write(path, y, target_sr)
    return path, y, target_sr


In [4]:
# Fast speaker segmentation (energy-based windows + embeddings clustering)
def sliding_windows(y, sr, win_s=2.0, hop_s=1.0):
    win = int(win_s * sr)
    hop = int(hop_s * sr)
    segments = []
    for start in range(0, max(1, len(y) - win + 1), hop):
        end = start + win
        segments.append((start / sr, end / sr, y[start:end]))
    return segments

def get_speaker_clusters(wav_path, y, sr, n_speakers=2):
    encoder = VoiceEncoder()
    segs = sliding_windows(y, sr, win_s=2.0, hop_s=1.0)
    wavs = []
    times = []
    for s_e, e_e, seg in segs:
        if len(seg) < 0.2 * sr:
            continue
        with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as t:
            sf.write(t.name, seg, sr)
            wavs.append(preprocess_wav(t.name))
            times.append((s_e, e_e))
            try:
                os.unlink(t.name)
            except:
                pass
    if len(wavs) == 0:
        return [], []
    embeds = [encoder.embed_utterance(w) for w in wavs]
    X = np.vstack(embeds)
    clustering = AgglomerativeClustering(n_clusters=n_speakers).fit(X)
    labels = clustering.labels_
    timeline = []
    for (s,e), lab in zip(times, labels):
        timeline.append({'start': s, 'end': e, 'label': int(lab)})
    return timeline, X


In [5]:
# Transcribe with Whisper (small/tiny available)
MODEL_SIZE = "small"  # change to 'tiny' for faster runtime
model = whisper.load_model(MODEL_SIZE)

def transcribe_with_timestamps(wav_path):
    res = model.transcribe(wav_path, language='en', verbose=False)
    segments = res.get('segments', [])
    return segments


100%|████████████████████████████████████████| 461M/461M [00:03<00:00, 135MiB/s]


In [6]:
# Map transcription segments to speaker labels using embeddings similarity
def map_segments_to_speakers(segments, timeline, y, sr):
    if not timeline:
        for seg in segments:
            seg['speaker'] = 0
        return segments
    encoder = VoiceEncoder()
    label_embeds = defaultdict(list)
    for t in timeline:
        s, e = t['start'], t['end']
        start_sample = int(max(0, s * sr))
        end_sample = int(min(len(y), e * sr))
        seg = y[start_sample:end_sample]
        if len(seg) < 1600:
            continue
        with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tfile:
            sf.write(tfile.name, seg, sr)
            wav = preprocess_wav(tfile.name)
            emb = encoder.embed_utterance(wav)
            try:
                os.unlink(tfile.name)
            except:
                pass
        label_embeds[t['label']].append(emb)
    centroids = {}
    for lab, es in label_embeds.items():
        if len(es) > 0:
            centroids[lab] = np.mean(es, axis=0)
    if not centroids:
        for seg in segments:
            seg['speaker'] = 0
        return segments
    for seg in segments:
        s, e = seg['start'], seg['end']
        start_sample = int(max(0, s * sr))
        end_sample = int(min(len(y), e * sr))
        seg_audio = y[start_sample:end_sample]
        if len(seg_audio) < 1600:
            seg['speaker'] = 0
            continue
        with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tfile:
            sf.write(tfile.name, seg_audio, sr)
            wav = preprocess_wav(tfile.name)
            emb = encoder.embed_utterance(wav)
            try:
                os.unlink(tfile.name)
            except:
                pass
        best_lab = None
        best_dist = float('inf')
        for lab, cent in centroids.items():
            d = np.linalg.norm(emb - cent)
            if d < best_dist:
                best_dist = d
                best_lab = lab
        seg['speaker'] = int(best_lab)
    return segments


In [7]:
# Metrics extraction and heuristics
sentiment_pipe = pipeline('sentiment-analysis')

QUESTION_WORDS = set(['who','what','when','where','why','how','is','are','do','does','did','can','could','would','will','shall'])

def is_question(text):
    text = text.strip().lower()
    if text.endswith('?'):
        return True
    first_word = text.split()[0] if text.split() else ''
    if first_word in QUESTION_WORDS:
        return True
    return False

def analyze_call(segments):
    durations = defaultdict(float)
    questions = 0
    segments_sorted = sorted(segments, key=lambda x: x['start'])
    monologues = []
    cur = None
    for seg in segments_sorted:
        sp = seg.get('speaker', 0)
        dur = seg['end'] - seg['start']
        durations[sp] += dur
        if is_question(seg.get('text','')):
            questions += 1
        if cur is None:
            cur = {'speaker': sp, 'start': seg['start'], 'end': seg['end']}
        else:
            if sp == cur['speaker'] and seg['start'] <= cur['end'] + 1.0:
                cur['end'] = max(cur['end'], seg['end'])
            else:
                monologues.append(cur)
                cur = {'speaker': sp, 'start': seg['start'], 'end': seg['end']}
    if cur:
        monologues.append(cur)
    longest_monologue = max((m['end'] - m['start'] for m in monologues), default=0.0)
    full_text = ' '.join([s['text'] for s in segments_sorted])
    sentiment = sentiment_pipe(full_text[:512])
    sentiment_label = sentiment[0]['label']
    total = sum(durations.values()) if durations else 1.0
    dominant = max(durations.items(), key=lambda x: x[1])[0] if durations else 0
    talk_ratio = {k: (v/total)*100 for k,v in durations.items()}
    insight = ''
    if talk_ratio.get(dominant,0) > 70:
        insight = f"Speaker {dominant} dominated the call ({talk_ratio[dominant]:.0f}%). Consider letting the other party speak more."
    elif questions < 2:
        insight = "Few questions were asked. Encourage more probing questions to understand the customer's needs."
    else:
        insight = "Balanced conversation. Continue focusing on open-ended questions."
    return {
        'talk_ratio': talk_ratio,
        'questions': questions,
        'longest_monologue_s': longest_monologue,
        'sentiment': sentiment_label,
        'actionable_insight': insight,
        'dominant_speaker': dominant
    }


No model was supplied, defaulted to distilbert/distilbert-base-uncased-finetuned-sst-2-english and revision 714eb0f (https://huggingface.co/distilbert/distilbert-base-uncased-finetuned-sst-2-english).
Using a pipeline without specifying a model name and revision in production is not recommended.
The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/629 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/268M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

Device set to use cpu


In [8]:
# Main pipeline: download -> preprocess -> diarize -> transcribe -> map -> analyze
def run_pipeline(youtube_url="https://www.youtube.com/watch?v=4ostqJD3Psc"):
    print('Downloading audio...')
    wav = download_youtube_audio(youtube_url, OUT_WAV)
    print('Preprocessing audio...')
    wav, y, sr = preprocess_audio(wav)
    print('Extracting speaker windows and embeddings...')
    timeline, _ = get_speaker_clusters(wav, y, sr, n_speakers=2)
    print('Transcribing...')
    segments = transcribe_with_timestamps(wav)
    print(f'Transcribed {len(segments)} segments')
    print('Mapping segments to speakers...')
    segments = map_segments_to_speakers(segments, timeline, y, sr)
    print('Analyzing call...')
    results = analyze_call(segments)
    return results, segments

# Run pipeline
results, segments = run_pipeline()
print('\n=== Results ===')
for sp, val in results['talk_ratio'].items():
    print(f'  Speaker {sp}: {val:.1f}%')
print('Questions asked:', results['questions'])
print('Longest monologue (s):', f"{results['longest_monologue_s']:.1f}")
print('Sentiment:', results['sentiment'])
print('Actionable insight:', results['actionable_insight'])
print('Dominant speaker (heuristic):', results['dominant_speaker'])


Downloading audio...
Trying yt-dlp...
yt-dlp failed or timed out: CalledProcessError(2, "yt-dlp -f bestaudio -o tmp_audio.%(ext)s 'https://www.youtube.com/watch?v=4ostqJD3Psc'")
Falling back to pytube...
pytube fallback failed: ModuleNotFoundError("No module named 'pytube'")


RuntimeError: Both yt-dlp and pytube failed to download the video. Check network, video restrictions, or provide cookies.

In [9]:
# Robust YouTube download: install tools, try yt-dlp (verbose), fallback to pytube
# Run this cell in Colab before calling run_pipeline()

# 1) Ensure ffmpeg and packages are installed
!apt-get update -y && apt-get install -y ffmpeg -qq
!pip install -q yt-dlp pytube3 ffmpeg-python

import subprocess, shlex, sys
from pathlib import Path


def download_youtube_audio(youtube_url, out_wav="call_audio.wav"):
    # Import pytube inside the function to avoid ModuleNotFoundError
    from pytube import YouTube
    tmp_audio = "tmp_audio"
    # Try yt-dlp first (verbose so we see errors)
    cmd = f"yt-dlp -f bestaudio -o {tmp_audio}.%(ext)s {shlex.quote(youtube_url)}"
    print("Trying yt-dlp...\n  CMD:", cmd)
    try:
        proc = subprocess.run(cmd, shell=True, check=True, capture_output=True, text=True, timeout=120)
        print("yt-dlp stdout:\n", proc.stdout[:1000])
        print("yt-dlp stderr (tail):\n", proc.stderr[-1000:])
        # find downloaded file
        found = None
        for f in Path('.').glob('tmp_audio.*'):
            found = str(f)
            break
        if not found:
            raise FileNotFoundError("yt-dlp did not produce tmp_audio.* file")
        # convert to wav 16k mono using ffmpeg
        cmd2 = f"ffmpeg -y -i {shlex.quote(found)} -ar 16000 -ac 1 {shlex.quote(out_wav)}"
        subprocess.run(cmd2, shell=True, check=True, capture_output=True, text=True)
        try:
            Path(found).unlink()
        except:
            pass
        return out_wav
    except Exception as e:
        print("yt-dlp failed:", repr(e))
        # print more info if available
        try:
            print("Attempting fallback: pytube")
            yt = YouTube(youtube_url)
            stream = yt.streams.filter(only_audio=True).order_by('abr').desc().first()
            if stream is None:
                raise RuntimeError("No audio stream found with pytube")
            temp_file = "pytube_tmp.mp4"
            print("Downloading with pytube (this may take a while)...")
            stream.download(filename=temp_file)
            # convert to wav 16k mono
            cmd2 = f"ffmpeg -y -i {shlex.quote(temp_file)} -ar 16000 -ac 1 {shlex.quote(out_wav)}"
            subprocess.run(cmd2, shell=True, check=True, capture_output=True, text=True)
            try:
                Path(temp_file).unlink()
            except:
                pass
            return out_wav
        except Exception as e2:
            print("pytube fallback failed:", repr(e2))
            raise RuntimeError("Both yt-dlp and pytube failed to download the video. See messages above.") from e2

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
0% [Waiting for headers] [1 InRelease 14.2 kB/129 kB 11%] [Connected to cloud.r                                                                               Hit:2 http://archive.ubuntu.com/ubuntu jammy InRelease
0% [Waiting for headers] [1 InRelease 129 kB/129 kB 100%] [Connected to cloud.r                                                                               Get:3 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
                                                                               Get:4 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
                                                                               Hit:5 https://cli.github.com/packages stable InRelease
0% [3 InRelease 85.1 kB/128 kB 66%] [Connected to r2u.stat.illinois.edu (192.17                                                                   

In [10]:
# Optional: Upload a local audio/video file (use this if YouTube download fails).
# After running this cell and selecting a file, it will be converted to 'call_audio.wav' (16k mono).
try:
    from google.colab import files
    uploaded = files.upload()  # use the widget to choose a file (mp3/mp4/wav)
    if uploaded:
        import shlex, subprocess, os
        fname = next(iter(uploaded.keys()))
        print('Uploaded:', fname)
        # ensure ffmpeg installed
        try:
            subprocess.run('ffmpeg -version', shell=True, check=True, capture_output=True, text=True)
        except:
            print('Installing ffmpeg...')
            subprocess.run('apt-get update -y && apt-get install -y ffmpeg -qq', shell=True, check=True)
        # convert to 16k mono wav
        out = 'call_audio.wav'
        cmd = f"ffmpeg -y -i {shlex.quote(fname)} -ar 16000 -ac 1 {shlex.quote(out)}"
        print('Converting to 16k mono WAV...')
        subprocess.run(cmd, shell=True, check=True)
        print('Saved as', out)
    else:
        print('No file uploaded.')
except Exception as e:
    print('Upload helper only works in Colab. Error:', e)


KeyboardInterrupt: 

In [14]:
# Wrapper pipeline: prefer local 'call_audio.wav' if present, else download from YouTube.
def run_pipeline_auto(youtube_url="https://www.youtube.com/watch?v=4ostqJD3Psc", prefer_local=True):
    from pathlib import Path
    audio_path = Path('call_audio.wav')
    if prefer_local and audio_path.exists():
        print('Using local call_audio.wav (uploaded or pre-placed). Skipping download.')
        # skip download, but ensure preprocess and rest run on this file
        wav = str(audio_path)
        wav, y, sr = preprocess_audio(wav)
    else:
        print('No local file found or prefer_local=False — downloading from YouTube.')
        wav = download_youtube_audio(youtube_url, OUT_WAV)
        wav, y, sr = preprocess_audio(wav)
    print('Extracting speaker windows and embeddings...')
    timeline, _ = get_speaker_clusters(wav, y, sr, n_speakers=2)
    print('Transcribing...')
    segments = transcribe_with_timestamps(wav)
    print(f'Transcribed {len(segments)} segments')
    print('Mapping segments to speakers...')
    segments = map_segments_to_speakers(segments, timeline, y, sr)
    print('Analyzing call...')
    results = analyze_call(segments)
    return results, segments

# Example run (uncomment to execute):
# results, segments = run_pipeline_auto(prefer_local=True)
# print(results)
