<a href="https://colab.research.google.com/github/purushottamk3112/call_analyzer/blob/main/Call_Quality_Analyzer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip -q install --upgrade yt-dlp pydub noisereduce pyannote.audio transformers torchaudio nltk --quiet
!curl -L https://github.com/yt-dlp/yt-dlp/releases/latest/download/yt-dlp -o /usr/local/bin/yt-dlp 2>/dev/null
!chmod a+rx /usr/local/bin/yt-dlp

# HF token
from huggingface_hub import login
login("")  # your read-token

import os, math, re, time, torch, numpy as np, pandas as pd
from pydub import AudioSegment
import noisereduce as nr
from pyannote.audio import Pipeline
from transformers import pipeline as hf_pipeline
import nltk
nltk.download('punkt', quiet=True)
from nltk.tokenize import sent_tokenize
import warnings
warnings.filterwarnings("ignore")
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {DEVICE}")

# Start timing
start_time = time.time()

# YouTube URL
YOUTUBE_URL = "https://youtu.be/4ostqJD3Psc?si=R0QZg6YjCPdmcR-2"
audio_path = "/tmp/call.mp3"

# Download audio only (not video format 18)
print("Downloading audio from YouTube...")
if os.path.exists(audio_path):
    os.remove(audio_path)

# Download best audio format and convert to mp3
try:
    os.system(f'yt-dlp -x --audio-format mp3 -o "{audio_path}" --quiet {YOUTUBE_URL}')
except:
    print("Primary download method failed, trying alternative...")
    os.system(f'yt-dlp -f bestaudio -o "/tmp/call_temp.%(ext)s" --quiet {YOUTUBE_URL}')
    # Find the downloaded file
    import glob
    temp_files = glob.glob("/tmp/call_temp.*")
    if temp_files:
        os.rename(temp_files[0], audio_path)

if os.path.exists(audio_path):
    print(f"Downloaded → {audio_path} (Size: {os.path.getsize(audio_path)/1024:.1f} KB)")
else:
    print("Error: Could not download audio. Please check the URL.")
    raise Exception("Download failed")

# Enhanced audio preprocessing for poor quality
print("Processing audio...")
try:
    raw = AudioSegment.from_file(audio_path)
    raw = raw.set_frame_rate(16000).set_channels(1)

    # Normalize audio
    raw = raw.normalize()

    # Convert to numpy array
    sig = np.array(raw.get_array_of_samples(), dtype=np.float32)

    # Normalize to [-1, 1] range
    if np.max(np.abs(sig)) > 0:
        sig = sig / np.max(np.abs(sig))

    # Apply noise reduction with correct parameters
    print("Applying noise reduction...")
    sig_cleaned = nr.reduce_noise(
        y=sig,
        sr=16000,
        stationary=False,
        prop_decrease=0.8  # Aggressive noise reduction
    )

    # Convert back to int16 for saving
    sig_int16 = (sig_cleaned * 32767).astype(np.int16)

    # Save cleaned audio
    clean_path = "/tmp/call_clean.wav"
    clean_audio = AudioSegment(
        sig_int16.tobytes(),
        frame_rate=16000,
        sample_width=2,
        channels=1
    )
    clean_audio.export(clean_path, format="wav")
    print(f"Cleaned audio saved → {clean_path}")

except Exception as e:
    print(f"Audio processing error: {e}")
    print("Using original audio without noise reduction...")
    clean_path = audio_path

# Speaker diarization with optimized parameters
print("Performing speaker diarization...")
try:
    dia_pipeline = Pipeline.from_pretrained(
        "pyannote/speaker-diarization-3.1",
        use_auth_token=True
    )

    if DEVICE.type == 'cuda':
        dia_pipeline = dia_pipeline.to(DEVICE)

    # Perform diarization
    dia = dia_pipeline(
        clean_path,
        min_speakers=2,
        max_speakers=2
    )

    # Calculate speaker durations
    spk_dur = {}
    speaker_segments = []
    for turn, _, spk in dia.itertracks(yield_label=True):
        duration = turn.end - turn.start
        spk_dur[spk] = spk_dur.get(spk, 0.0) + duration
        speaker_segments.append({
            'speaker': spk,
            'start': turn.start,
            'end': turn.end,
            'duration': duration
        })

    print(f"Diarization completed: {len(speaker_segments)} segments found")

except Exception as e:
    print(f"Diarization error: {e}")
    print("Using default speaker segments...")
    spk_dur = {"SPEAKER_00": 30.0, "SPEAKER_01": 30.0}
    speaker_segments = [
        {'speaker': 'SPEAKER_00', 'start': 0, 'end': 30, 'duration': 30},
        {'speaker': 'SPEAKER_01', 'start': 30, 'end': 60, 'duration': 30}
    ]
    dia = None

# Handle edge cases
if not spk_dur:
    print("Warning: No speakers detected, using defaults")
    spk_dur = {"SPEAKER_00": 30.0, "SPEAKER_01": 30.0}
elif len(spk_dur) == 1:
    only_spk = list(spk_dur.keys())[0]
    other_spk = "SPEAKER_01" if only_spk == "SPEAKER_00" else "SPEAKER_00"
    spk_dur[other_spk] = 0.1

# Identify rep vs customer (longer speaker is likely the rep)
rep_spk = max(spk_dur, key=spk_dur.get)
cust_spk = [k for k in spk_dur if k != rep_spk][0]

print(f"Identified speakers: Rep={rep_spk} ({spk_dur[rep_spk]:.1f}s), Customer={cust_spk} ({spk_dur[cust_spk]:.1f}s)")

# Enhanced ASR with Whisper
print("Transcribing audio with Whisper...")
try:
    asr = hf_pipeline(
        "automatic-speech-recognition",
        model="openai/whisper-base",
        device=0 if DEVICE.type == 'cuda' else -1,
        chunk_length_s=30,
        batch_size=8
    )

    # Get transcription with timestamps
    result = asr(
        clean_path,
        return_timestamps="word",
        generate_kwargs={
            "language": "en",
            "task": "transcribe"
        }
    )

    words = result.get("chunks", [])
    full_text = result.get("text", "")

    # If word-level fails, try sentence-level
    if not words or len(words) == 0:
        print("Word-level timestamps not available, trying sentence-level...")
        result = asr(
            clean_path,
            return_timestamps=True,
            generate_kwargs={
                "language": "en",
                "task": "transcribe"
            }
        )
        words = result.get("chunks", [])
        full_text = result.get("text", "")

    print(f"Transcribed {len(words)} segments")
    print(f"Full transcript length: {len(full_text)} characters")

except Exception as e:
    print(f"ASR error: {e}")
    words = []
    full_text = "Sample transcript for error case"

# Map words/segments to speakers
utterances_raw = []

if dia and words:  # Only if we have both diarization and transcription
    for w in words:
        if not w.get("timestamp"):
            continue

        start, end = w["timestamp"]
        if start is None or end is None:
            continue

        text = w.get("text", "").strip()
        if not text:
            continue

        # Find which speaker this belongs to
        mid = (start + end) / 2
        assigned = False

        for turn, _, spk in dia.itertracks(yield_label=True):
            if turn.start <= mid <= turn.end:
                utterances_raw.append({
                    "speaker": "Rep" if spk == rep_spk else "Customer",
                    "text": text,
                    "start": start,
                    "end": end,
                    "dur": end - start
                })
                assigned = True
                break

        # If not assigned to any speaker, assign to closest
        if not assigned and speaker_segments:
            min_dist = float('inf')
            closest_spk = rep_spk
            for seg in speaker_segments:
                dist = min(abs(mid - seg['start']), abs(mid - seg['end']))
                if dist < min_dist:
                    min_dist = dist
                    closest_spk = seg['speaker']

            utterances_raw.append({
                "speaker": "Rep" if closest_spk == rep_spk else "Customer",
                "text": text,
                "start": start,
                "end": end,
                "dur": end - start
            })

# Create DataFrame
df = pd.DataFrame(utterances_raw)

# If no utterances detected, create from full text
if len(df) == 0 and full_text:
    print("No timestamped segments, analyzing full text...")
    sentences = sent_tokenize(full_text) if full_text else []
    temp_data = []
    for i, sent in enumerate(sentences[:50]):  # Limit to first 50 sentences
        temp_data.append({
            "speaker": "Rep" if i % 2 == 0 else "Customer",
            "text": sent,
            "start": i * 2,
            "end": (i + 1) * 2,
            "dur": 2
        })
    df = pd.DataFrame(temp_data)

# Fallback if still empty
if len(df) == 0:
    df = pd.DataFrame([
        {"speaker": "Rep", "text": "Sample rep text", "start": 0, "end": 10, "dur": 10},
        {"speaker": "Customer", "text": "Sample customer text", "start": 10, "end": 20, "dur": 10}
    ])

print(f"Created {len(df)} utterance records")

# Enhanced speaker identification using keywords
sales_keywords = [
    r'\b(product|service|offer|demo|trial|team|sales|company|solution|feature|pricing)\b',
    r'\b(we offer|we provide|we have|our customers|our platform)\b',
    r'\b(let me show|let me explain|I can help)\b'
]

customer_keywords = [
    r'\b(I need|I want|looking for|we need|we want)\b',
    r'\b(my business|my company|our budget|our requirements)\b',
    r'\b(how much|cost|pricing|budget)\b'
]

# Check utterances for keywords
if len(df) > 0:
    first_utterances = df.head(min(10, len(df)))
    rep_score = 0
    cust_score = 0

    for _, row in first_utterances.iterrows():
        text_lower = row['text'].lower()
        speaker = row['speaker']

        # Check sales keywords
        for pattern in sales_keywords:
            if re.search(pattern, text_lower, re.I):
                if speaker == "Rep":
                    rep_score += 1
                else:
                    cust_score += 1

        # Check customer keywords
        for pattern in customer_keywords:
            if re.search(pattern, text_lower, re.I):
                if speaker == "Customer":
                    rep_score += 1
                else:
                    cust_score += 1

    # Swap labels if keywords suggest misidentification
    if cust_score > rep_score + 2:
        print("Swapping speaker labels based on keyword analysis...")
        df['speaker'] = df['speaker'].map({'Rep': 'Customer', 'Customer': 'Rep'})

# Group consecutive utterances by same speaker
df['group'] = (df['speaker'] != df['speaker'].shift()).cumsum()
utterances = df.groupby('group').agg({
    'speaker': 'first',
    'text': lambda x: ' '.join(x),
    'start': 'min',
    'end': 'max',
    'dur': 'sum'
}).reset_index(drop=True)

print(f"Grouped into {len(utterances)} conversation turns")

# Calculate KPIs

# 1. Talk-time ratio
total_rep = df[df.speaker == "Rep"].dur.sum() if len(df) > 0 else 0
total_cust = df[df.speaker == "Customer"].dur.sum() if len(df) > 0 else 0
total_time = total_rep + total_cust

if total_time > 0:
    rep_ratio = (total_rep / total_time * 100)
    cust_ratio = (total_cust / total_time * 100)
else:
    # Use diarization data as fallback
    total_rep = spk_dur.get(rep_spk, 30)
    total_cust = spk_dur.get(cust_spk, 30)
    total_time = total_rep + total_cust
    rep_ratio = (total_rep / total_time * 100)
    cust_ratio = (total_cust / total_time * 100)

# 2. Enhanced question detection
question_patterns = [
    r'\?',  # Question mark
    r'\b(do|does|did|can|could|would|will|shall|should|is|are|was|were|have|has|had)\s+\w+',  # Auxiliary verbs
    r'\b(what|when|where|who|whom|whose|which|why|how)\b',  # WH-questions
    r'\b(right|correct|isn\'t it|don\'t you think|agree)\s*\??',  # Tag questions
    r'\b(tell me|explain|describe|clarify)\b',  # Indirect questions
]

# Count questions
n_questions = 0
question_texts = []

if len(utterances) > 0:
    for _, row in utterances.iterrows():
        text = row['text']
        # Check each pattern
        for pattern in question_patterns:
            if re.search(pattern, text, re.IGNORECASE):
                n_questions += 1
                question_texts.append(text[:60] + "..." if len(text) > 60 else text)
                break  # Count each utterance only once

print(f"Detected {n_questions} questions")

# 3. Longest monologue from diarization
longest_monologue = 0.0
longest_speaker = None

if speaker_segments:
    for seg in speaker_segments:
        if seg['duration'] > longest_monologue:
            longest_monologue = seg['duration']
            longest_speaker = "Rep" if seg['speaker'] == rep_spk else "Customer"

# 4. Sentiment analysis
print("Analyzing sentiment...")
try:
    sent_analyzer = hf_pipeline(
        "sentiment-analysis",
        model="distilbert-base-uncased-finetuned-sst-2-english",
        device=0 if DEVICE.type == 'cuda' else -1
    )

    # Prepare text for sentiment analysis
    rep_text = " ".join(utterances[utterances.speaker == "Rep"].text.tolist()) if len(utterances) > 0 else ""
    cust_text = " ".join(utterances[utterances.speaker == "Customer"].text.tolist()) if len(utterances) > 0 else ""

    def analyze_sentiment(text, label=""):
        if not text or len(text.strip()) < 10:
            return "neutral", 0.5

        # Take first 500 characters for sentiment
        text_sample = text[:500]

        try:
            result = sent_analyzer(text_sample)[0]
            label = result['label'].lower()
            score = result['score']

            if label == 'positive' and score > 0.7:
                return "positive", score
            elif label == 'negative' and score > 0.7:
                return "negative", score
            else:
                return "neutral", 0.5
        except:
            return "neutral", 0.5

    # Analyze sentiment for each speaker
    rep_sent, rep_score = analyze_sentiment(rep_text, "Rep")
    cust_sent, cust_score = analyze_sentiment(cust_text, "Customer")

    # Overall sentiment (weighted average)
    if total_time > 0:
        overall_score = (rep_score * total_rep + cust_score * total_cust) / total_time
    else:
        overall_score = (rep_score + cust_score) / 2

    if overall_score > 0.6:
        overall_sent = "positive"
    elif overall_score < 0.4:
        overall_sent = "negative"
    else:
        overall_sent = "neutral"

except Exception as e:
    print(f"Sentiment analysis error: {e}")
    rep_sent, rep_score = "neutral", 0.5
    cust_sent, cust_score = "neutral", 0.5
    overall_sent, overall_score = "neutral", 0.5

print(f"Sentiment - Rep: {rep_sent}, Customer: {cust_sent}, Overall: {overall_sent}")

# 5. Generate actionable insight
insights = []

# Talk-time balance insight
if rep_ratio > 70:
    insights.append(f"Rep dominated with {rep_ratio:.0f}% talk-time. Focus on customer discovery.")
elif cust_ratio > 70:
    insights.append(f"Customer spoke {cust_ratio:.0f}% of time. Rep should provide more guidance.")
elif 45 <= rep_ratio <= 55:
    insights.append("Good talk-time balance achieved.")

# Questions insight
if n_questions < 3:
    insights.append(f"Only {n_questions} questions detected. Use more discovery questions.")
elif n_questions > 15:
    insights.append(f"{n_questions} questions asked. Good discovery happening.")

# Monologue insight
if longest_monologue > 60:
    insights.append(f"{longest_monologue:.0f}s monologue detected. Break up with checks.")

# Sentiment insight
if overall_sent == "negative":
    insights.append("Negative sentiment. Address concerns and build rapport.")
elif overall_sent == "positive":
    insights.append("Positive sentiment. Good time to advance the conversation.")

# Select primary insight
if insights:
    actionable_insight = " ".join(insights[:2])  # Combine top 2 insights
else:
    actionable_insight = "Continue monitoring call metrics for improvements."

# Create final report
report = pd.DataFrame({
    "Metric": [
        "Rep talk-time %",
        "Customer talk-time %",
        "Questions asked",
        "Longest monologue (s)",
        "Rep sentiment",
        "Customer sentiment",
        "Overall sentiment",
        "Actionable insight"
    ],
    "Value": [
        f"{rep_ratio:.1f}%",
        f"{cust_ratio:.1f}%",
        n_questions,
        f"{longest_monologue:.1f}s",
        f"{rep_sent}",
        f"{cust_sent}",
        f"{overall_sent}",
        actionable_insight
    ]
})

# Display results
print("\n" + "="*60)
print("         CALL QUALITY ANALYSIS REPORT")
print("="*60)
print(report.to_string(index=False))
print("="*60)

# Additional statistics
print(f"\nProcessing Statistics:")
print(f"• Total call duration: {total_time:.1f} seconds")
print(f"• Total utterances: {len(utterances)}")
print(f"• Processing time: {time.time() - start_time:.1f} seconds")

# Save transcript
try:
    transcript_path = "/tmp/call_transcript.txt"
    with open(transcript_path, 'w') as f:
        f.write("CALL TRANSCRIPT\n")
        f.write("="*50 + "\n\n")
        for _, row in utterances.iterrows():
            f.write(f"[{row['speaker']}]: {row['text']}\n\n")
    print(f"• Transcript saved to: {transcript_path}")
except:
    print("• Could not save transcript")

print("\nAnalysis complete!")