<a href="https://colab.research.google.com/github/saketpandey0/call-quality-analyzer/blob/main/call_quality_analyzer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
!pip install pyannote.audio librosa transformers torch --quiet


  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m59.6/59.6 kB[0m [31m6.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m897.8/897.8 kB[0m [31m30.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m828.5/828.5 kB[0m [31m57.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m58.5/58.5 kB[0m [31m5.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m48.1/48.1 kB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m51.4/51.4 kB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m127.8/127.8 kB[0m [31m12.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [1]:
import os
import sys
import json
import logging
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional, Tuple
import warnings
warnings.filterwarnings('ignore')

In [2]:
import torch
import numpy as np
import pandas as pd
import librosa
import soundfile as sf
from pydub import AudioSegment

# from transformers import Wav2Vec2ForCTC, Wav2Vec2Tokenizer
# from pyannote.audio import Pipeline

In [3]:
try:
  from transformers import pipeline
  TRANSFORMERS_AVAILABLE = True
except ImportError as e:
  print(f"Warning: transformer not available: {e}")
  TRANSFORMERS_AVAILABLE = False


In [4]:
WHISPER_MODEL_SIZE = "tiny.en"
SAMPLE_RATE = 16000
MAX_AUDIO_LENGTH = 300

In [5]:
class OptimisedCallQualityAnalyzer:

  def __init__(self, whisper_model_size: str = "tiny.en", enable_streo_seperation: bool = True, log_level: str = "INFO"):

    logging.basicConfig(level=getattr(logging, log_level.upper()))
    self.logger = logging.getLogger(__name__)

    self.enable_streo_seperation = enable_streo_seperation
    self.whisper_model_size = whisper_model_size

    self.whisper_model = None
    self.sentiment_analyzer = None

    self._initialize_models()
    self.logger.info("OptimizedCallQualityAnalyzer initialized successfully")


In [6]:
def _initialize_models(self):
        """Initialize lightweight ML models for speed"""

        try:
            # Load fastest Whisper model
            self.logger.info(f"Loading Whisper model ({self.whisper_model_size})...")
            start_time = datetime.now()
            self.whisper_model = whisper.load_model(self.whisper_model_size)
            load_time = (datetime.now() - start_time).total_seconds()
            self.logger.info(f"Whisper loaded in {load_time:.1f}s")

            if TRANSFORMERS_AVAILABLE:
                self.logger.info("Loading lightweight sentiment model...")
                try:
                    self.sentiment_analyzer = pipeline(
                        "sentiment-analysis",
                        model="distilbert-base-uncased-finetuned-sst-2-english",
                        return_all_scores=True
                    )
                    self.logger.info("Sentiment analyzer loaded")
                except Exception as e:
                    self.logger.warning(f"Failed to load sentiment model: {e}")
                    try:
                        self.sentiment_analyzer = pipeline("sentiment-analysis")
                    except Exception as e2:
                        self.logger.error(f"No sentiment model available: {e2}")
                        self.sentiment_analyzer = None
            else:
                self.logger.warning("Transformers not available - sentiment analysis disabled")
                self.sentiment_analyzer = None

        except Exception as e:
            self.logger.error(f"Error initializing models: {e}")
            raise


In [7]:
def preprocess_audio_fast(self,
                             audio_path: str,
                             target_sr: int = 16000,
                             max_length: int = MAX_AUDIO_LENGTH) -> Tuple[Optional[np.ndarray], Optional[int], Optional[float], Optional[bool]]:


        self.logger.info(f"Fast preprocessing: {Path(audio_path).name}")

        try:
            # Load audio - limit length for speed
            audio, sr = librosa.load(audio_path, sr=target_sr, duration=max_length)

            # Check if original was stereo
            try:
                # Try to load original to check channels
                original_audio, original_sr = librosa.load(audio_path, sr=None, mono=False)
                is_stereo = len(original_audio.shape) > 1 and original_audio.shape[0] == 2
            except:
                is_stereo = False

            # Simple normalization
            if np.max(np.abs(audio)) > 0:
                audio = audio / np.max(np.abs(audio)) * 0.95

            duration = len(audio) / sr
            self.logger.info(f"Audio processed: {duration:.2f}s at {sr}Hz, Stereo: {is_stereo}")

            return audio, sr, duration, is_stereo

        except Exception as e:
            self.logger.error(f"Error preprocessing audio: {e}")
            return None, None, None, None

In [8]:
def separate_stereo_channels(self, audio_path: str) -> Dict[str, np.ndarray]:


        self.logger.info("Separating stereo channels...")

        try:
            # Load as stereo
            audio, sr = librosa.load(audio_path, sr=SAMPLE_RATE, mono=False)

            if len(audio.shape) == 1:
                # Mono audio - can't separate
                self.logger.warning("Mono audio detected - cannot separate channels")
                return {
                    'agent': audio,
                    'customer': audio,
                    'separation_method': 'mono_duplicate'
                }

            elif audio.shape[0] == 2:
                # True stereo
                left_channel = audio[0]
                right_channel = audio[1]

                # Assume agent on left, customer on right
                return {
                    'agent': left_channel,
                    'customer': right_channel,
                    'separation_method': 'stereo_channels'
                }

            else:
                # Multi-channel - use first two
                self.logger.warning(f"Multi-channel audio ({audio.shape[0]} channels) - using first two")
                return {
                    'agent': audio[0],
                    'customer': audio[1],
                    'separation_method': 'multi_channel'
                }

        except Exception as e:
            self.logger.error(f"Error in stereo separation: {e}")
            # Fallback to mono
            try:
                audio, sr = librosa.load(audio_path, sr=SAMPLE_RATE, mono=True)
                return {
                    'agent': audio,
                    'customer': audio,
                    'separation_method': 'fallback_mono'
                }
            except:
                return {}


In [9]:
def transcribe_audio_fast(self, audio_path: str) -> Optional[Dict]:


        self.logger.info("Fast transcription...")
        start_time = datetime.now()

        try:
            # Use fastest settings
            result = self.whisper_model.transcribe(
                audio_path,
                language="en",  # Force English for speed
                word_timestamps=False,  # Disable for speed
                verbose=False
            )

            transcription_time = (datetime.now() - start_time).total_seconds()
            text_length = len(result['text'])

            self.logger.info(f"Transcription completed in {transcription_time:.1f}s: {text_length} chars")

            return result

        except Exception as e:
            self.logger.error(f"Error in fast transcription: {e}")
            return None

In [10]:
def calculate_talk_time_from_channels(self, channel_data: Dict[str, np.ndarray], sample_rate: int) -> Dict[str, float]:


        if not channel_data or len(channel_data) < 2:
            return {"Unknown Speaker": 100.0}

        try:
            # Calculate RMS energy for each channel
            window_size = int(0.1 * sample_rate)  # 100ms windows
            energy_threshold = 0.01  # Minimum energy to consider as speech

            channel_speaking_time = {}

            for speaker, audio in channel_data.items():
                if speaker in ['separation_method']:
                    continue

                # Calculate windowed RMS energy
                speaking_time = 0
                for i in range(0, len(audio) - window_size, window_size):
                    window = audio[i:i + window_size]
                    rms = np.sqrt(np.mean(window**2))

                    if rms > energy_threshold:
                        speaking_time += window_size / sample_rate

                channel_speaking_time[speaker] = speaking_time

            # Convert to percentages
            total_speaking_time = sum(channel_speaking_time.values())

            if total_speaking_time == 0:
                return {"agent": 50.0, "customer": 50.0}

            talk_ratios = {}
            for speaker, time in channel_speaking_time.items():
                percentage = (time / total_speaking_time) * 100
                talk_ratios[speaker] = round(percentage, 2)

            return talk_ratios

        except Exception as e:
            self.logger.error(f"Error calculating talk ratios: {e}")
            return {"agent": 50.0, "customer": 50.0}


In [11]:
def count_questions_fast(self, text: str) -> Dict:


        import re

        # Simple question patterns for speed
        questions = []

        # Split into sentences quickly
        sentences = re.split(r'[.!?]+', text)

        question_patterns = [
            r'\?',
            r'\b(what|how|why|when|where|who|which|can|could|would|will|should|do|does|did|is|are)\b.*',
        ]

        for sentence in sentences:
            sentence = sentence.strip().lower()
            if not sentence:
                continue

            # Quick check for question patterns
            for pattern in question_patterns:
                if re.search(pattern, sentence):
                    if len(sentence) > 5:  # Avoid very short matches
                        questions.append(sentence[:100])  # Truncate for storage
                    break

        return {
            'count': len(questions),
            'questions': questions[:5],  # First 5 examples
            'question_rate': len(questions) / len(sentences) if sentences else 0
        }

In [12]:
def find_longest_pause(self, channel_data: Dict[str, np.ndarray], sample_rate: int) -> Dict:


        try:
            # Combine channels for overall silence detection
            if 'agent' in channel_data and 'customer' in channel_data:
                combined_audio = channel_data['agent'] + channel_data['customer']
            else:
                # Use first available channel
                audio_arrays = [v for k, v in channel_data.items() if k != 'separation_method' and isinstance(v, np.ndarray)]
                if audio_arrays:
                    combined_audio = audio_arrays[0]
                else:
                    return {'duration': 0.0, 'start': 0.0, 'end': 0.0}

            # Find silence periods
            window_size = int(0.1 * sample_rate)  # 100ms windows
            silence_threshold = 0.005  # Very low energy threshold
            min_pause_length = int(2.0 * sample_rate)  # Minimum 2 second pause

            silence_windows = []
            for i in range(0, len(combined_audio) - window_size, window_size):
                window = combined_audio[i:i + window_size]
                rms = np.sqrt(np.mean(window**2))

                if rms < silence_threshold:
                    silence_windows.append(i)

            if not silence_windows:
                return {'duration': 0.0, 'start': 0.0, 'end': 0.0}

            # Find longest consecutive silence
            longest_pause = {'duration': 0.0, 'start': 0.0, 'end': 0.0}
            current_start = None

            for i, window_idx in enumerate(silence_windows):
                if i == 0 or window_idx != silence_windows[i-1] + window_size:
                    # Start of new silence period
                    current_start = window_idx / sample_rate

                if i == len(silence_windows) - 1 or window_idx + window_size != silence_windows[i+1]:
                    # End of silence period
                    current_end = (window_idx + window_size) / sample_rate
                    current_duration = current_end - current_start

                    if current_duration > longest_pause['duration']:
                        longest_pause = {
                            'duration': round(current_duration, 2),
                            'start': round(current_start, 2),
                            'end': round(current_end, 2)
                        }

            return longest_pause

        except Exception as e:
            self.logger.error(f"Error finding longest pause: {e}")
            return {'duration': 0.0, 'start': 0.0, 'end': 0.0}


In [13]:
def analyze_sentiment_fast(self, text: str) -> Dict:


        self.logger.info("Fast sentiment analysis...")

        if not self.sentiment_analyzer or not text.strip():
            return {
                'dominant_sentiment': 'neutral',
                'confidence': 0.0,
                'all_sentiments': {'neutral': 100.0}
            }

        try:
            # Process in smaller chunks for speed
            max_length = 300  # Shorter chunks for speed
            chunks = [text[i:i+max_length] for i in range(0, len(text), max_length)]
            chunks = [chunk.strip() for chunk in chunks if chunk.strip()]

            if not chunks:
                return {
                    'dominant_sentiment': 'neutral',
                    'confidence': 0.0,
                    'all_sentiments': {'neutral': 100.0}
                }

            # Limit to first 3 chunks for speed in free Colab
            chunks = chunks[:3]

            # Analyze chunks
            sentiment_scores = {'positive': 0, 'negative': 0, 'neutral': 0}
            total_confidence = 0

            for chunk in chunks:
                try:
                    result = self.sentiment_analyzer(chunk)

                    if isinstance(result, list) and result:
                        if isinstance(result[0], list):
                            # Multiple scores format
                            for score_dict in result[0]:
                                label = score_dict['label'].lower()
                                score = score_dict['score']

                                if 'pos' in label or label == 'positive':
                                    sentiment_scores['positive'] += score
                                elif 'neg' in label or label == 'negative':
                                    sentiment_scores['negative'] += score
                                else:
                                    sentiment_scores['neutral'] += score

                                total_confidence += score
                        else:
                            # Single score format
                            label = result[0]['label'].lower()
                            score = result[0]['score']

                            if 'pos' in label or label == 'positive':
                                sentiment_scores['positive'] += score
                            elif 'neg' in label or label == 'negative':
                                sentiment_scores['negative'] += score
                            else:
                                sentiment_scores['neutral'] += score

                            total_confidence += score

                except Exception as e:
                    self.logger.warning(f"Error analyzing chunk sentiment: {e}")
                    continue

            # Normalize scores
            if total_confidence > 0:
                sentiment_percentages = {
                    k: round((v / total_confidence) * 100, 2)
                    for k, v in sentiment_scores.items()
                }
            else:
                sentiment_percentages = {'neutral': 100.0}

            # Find dominant sentiment
            dominant = max(sentiment_percentages.items(), key=lambda x: x[1])

            return {
                'dominant_sentiment': dominant[0],
                'confidence': dominant[1],
                'all_sentiments': sentiment_percentages
            }

        except Exception as e:
            self.logger.error(f"Error in fast sentiment analysis: {e}")
            return {
                'dominant_sentiment': 'neutral',
                'confidence': 0.0,
                'all_sentiments': {'neutral': 100.0}
            }

In [14]:
def generate_fast_insights(self, analysis_results: Dict) -> List[str]:


        insights = []

        try:
            # Talk-time insights
            talk_ratios = analysis_results.get('talk_time_ratio', {})
            if 'agent' in talk_ratios and 'customer' in talk_ratios:
                agent_ratio = talk_ratios['agent']
                customer_ratio = talk_ratios['customer']

                if agent_ratio > 70:
                    insights.append("COACHING: Agent dominated conversation (>70%). Encourage more customer engagement.")
                elif agent_ratio < 30:
                    insights.append("COACHING: Agent spoke too little (<30%). May need to provide more guidance.")
                elif 40 <= agent_ratio <= 60:
                    insights.append("EXCELLENT: Well-balanced conversation ratio.")

                # Customer engagement
                if customer_ratio > 60:
                    insights.append("POSITIVE: High customer engagement - they're actively participating.")
                elif customer_ratio < 20:
                    insights.append("ATTENTION: Low customer participation - may indicate disengagement.")

            # Question insights
            questions = analysis_results.get('questions', {})
            question_count = questions.get('count', 0)
            call_duration = analysis_results.get('file_info', {}).get('duration_minutes', 1)

            questions_per_minute = question_count / call_duration if call_duration > 0 else 0

            if questions_per_minute < 0.5:
                insights.append("COACHING: Very few questions asked. Train agent on discovery techniques.")
            elif questions_per_minute > 4:
                insights.append("COACHING: Too many questions. Agent may not be listening to responses.")
            elif questions_per_minute >= 1:
                insights.append("GOOD: Healthy questioning shows active engagement.")

            # Pause/silence insights
            longest_pause = analysis_results.get('longest_pause', {})
            pause_duration = longest_pause.get('duration', 0)

            if pause_duration > 10:
                insights.append(f"ATTENTION: Very long pause ({pause_duration}s). Check for technical issues.")
            elif pause_duration > 5:
                insights.append(f"NOTE: Extended pause ({pause_duration}s) - may indicate thinking time or connection issues.")
            elif pause_duration < 1:
                insights.append("FAST-PACED: Very few pauses - ensure customer has time to respond.")

            # Sentiment insights
            sentiment = analysis_results.get('sentiment', {})
            dominant_sentiment = sentiment.get('dominant_sentiment', 'neutral')
            confidence = sentiment.get('confidence', 0)

            if dominant_sentiment == 'negative' and confidence > 50:
                insights.append("PRIORITY: Negative sentiment detected. Review for service recovery opportunities.")
            elif dominant_sentiment == 'positive' and confidence > 60:
                insights.append("EXCELLENT: Strong positive sentiment - great customer experience!")
            elif dominant_sentiment == 'positive' and confidence > 40:
                insights.append("GOOD: Positive customer sentiment maintained.")

            # Call duration insights
            if call_duration < 1:
                insights.append("QUICK CALL: Very short interaction - ensure resolution was complete.")
            elif call_duration > 10:
                insights.append("LONG CALL: Extended duration - review for efficiency opportunities.")

            # Audio quality insights
            separation_method = analysis_results.get('channel_separation', {}).get('separation_method', 'unknown')
            if separation_method == 'mono_duplicate':
                insights.append("AUDIO NOTE: Mono recording - speaker separation approximate.")
            elif separation_method == 'stereo_channels':
                insights.append("AUDIO QUALITY: Clear stereo separation enables accurate speaker analysis.")

            # Default insight
            if not insights:
                insights.append("STANDARD CALL: No critical issues identified in this interaction.")

            # Limit insights for readability
            return insights[:6]

        except Exception as e:
            self.logger.error(f"Error generating insights: {e}")
            return ["Unable to generate insights due to analysis error."]


In [15]:
def analyze_call_optimized(self, audio_path: str, save_results: bool = True) -> Optional[Dict]:

        self.logger.info(f"FAST ANALYSIS: {Path(audio_path).name}")
        self.logger.info("=" * 50)

        start_time = datetime.now()

        try:
            # Validate input
            if not Path(audio_path).exists():
                self.logger.error(f"Audio file not found: {audio_path}")
                return None

            # Step 1: Fast preprocessing
            audio_data, sample_rate, duration, is_stereo = self.preprocess_audio_fast(audio_path)
            if audio_data is None:
                return None

            # Step 2: Channel separation (if stereo)
            channel_data = {}
            if self.enable_stereo_separation and is_stereo:
                channel_data = self.separate_stereo_channels(audio_path)
            else:
                # Mono fallback
                channel_data = {
                    'agent': audio_data,
                    'customer': audio_data,
                    'separation_method': 'mono_assumed'
                }

            # Step 3: Fast transcription
            transcription_result = self.transcribe_audio_fast(audio_path)
            if not transcription_result:
                self.logger.error("Transcription failed")
                return None

            full_text = transcription_result['text']

            # Step 4: Quick calculations
            self.logger.info("Calculating metrics...")

            talk_time_ratio = self.calculate_talk_time_from_channels(channel_data, sample_rate)
            questions = self.count_questions_fast(full_text)
            longest_pause = self.find_longest_pause(channel_data, sample_rate)
            sentiment = self.analyze_sentiment_fast(full_text)

            # Compile results
            results = {
                'analysis_metadata': {
                    'analyzer_version': '1.0.0-optimized',
                    'analysis_timestamp': datetime.now().isoformat(),
                    'processing_time_seconds': (datetime.now() - start_time).total_seconds(),
                    'optimization_level': 'colab_free_tier',
                    'models_used': {
                        'whisper': self.whisper_model_size,
                        'diarization': 'stereo_channel_separation',
                        'sentiment': 'distilbert-base-uncased-finetuned-sst-2-english'
                    }
                },
                'file_info': {
                    'filename': Path(audio_path).name,
                    'file_path': str(audio_path),
                    'duration_seconds': round(duration, 2),
                    'duration_minutes': round(duration / 60, 2),
                    'sample_rate': sample_rate,
                    'is_stereo': is_stereo,
                    'file_size_mb': round(Path(audio_path).stat().st_size / (1024*1024), 2)
                },
                'talk_time_ratio': talk_time_ratio,
                'questions': questions,
                'longest_pause': longest_pause,
                'sentiment': sentiment,
                'transcription': {
                    'full_text': full_text,
                    'word_count': len(full_text.split()),
                    'character_count': len(full_text)
                },
                'channel_separation': {
                    'method': channel_data.get('separation_method', 'unknown'),
                    'channels_available': len([k for k in channel_data.keys() if k != 'separation_method'])
                }
            }

            # Generate insights
            self.logger.info("Generating insights...")
            insights = self.generate_fast_insights(results)
            results['actionable_insights'] = insights

            # Save results if requested
            if save_results:
                self._save_results(results)

            processing_time = (datetime.now() - start_time).total_seconds()
            self.logger.info(f"ANALYSIS COMPLETED in {processing_time:.2f} seconds")
            self.logger.info("=" * 50)

            return results

        except Exception as e:
            self.logger.error(f"Error in optimized analysis: {e}")
            return None


In [16]:
def _save_results(self, results: Dict):
        """Save analysis results to file"""
        try:
            # Create results directory
            results_dir = PROJECT_ROOT / "results" / "reports"
            results_dir.mkdir(parents=True, exist_ok=True)

            # Generate filename
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"optimized_analysis_{timestamp}.json"
            filepath = results_dir / filename

            # Save to JSON
            with open(filepath, 'w', encoding='utf-8') as f:
                json.dump(results, f, indent=2, ensure_ascii=False, default=str)

            self.logger.info(f"Results saved: {filepath}")

        except Exception as e:
            self.logger.error(f"Error saving results: {e}")


In [17]:
def display_results_compact(self, results: Dict):

        if not results:
            print("No results to display")
            return

        print("\n" + "="*60)
        print("FAST CALL ANALYSIS REPORT")
        print("="*60)

        # Processing info
        metadata = results.get('analysis_metadata', {})
        processing_time = metadata.get('processing_time_seconds', 0)
        print(f" Processed in: {processing_time:.1f} seconds")

        # File info
        file_info = results['file_info']
        print(f" File: {file_info['filename']} ({file_info['duration_minutes']:.1f}m)")

        # Talk ratios
        talk_ratios = results['talk_time_ratio']
        print(f"\n TALK-TIME:")
        for speaker, ratio in talk_ratios.items():
            emoji = "👨‍💼" if speaker == "agent" else "👤" if speaker == "customer" else "🔊"
            print(f" {emoji} {speaker.title()}: {ratio}%")

        # Questions
        questions = results['questions']
        print(f"\n Questions: {questions['count']} total")

        # Longest pause
        pause = results['longest_pause']
        if pause['duration'] > 1:
            print(f"Longest pause: {pause['duration']}s")

        # Sentiment
        sentiment = results['sentiment']
        sentiment_emoji = {"positive", "negative", "neutral"}
        emoji = sentiment_emoji.get(sentiment['dominant_sentiment'])
        print(f"{emoji} Sentiment: {sentiment['dominant_sentiment'].title()} ({sentiment['confidence']:.1f}%)")

        # Key insights
        insights = results.get('actionable_insights', [])
        print(f"\nKEY INSIGHTS:")
        for i, insight in enumerate(insights[:3], 1):  # Show top 3
            print(f"   {i}. {insight}")

        if len(insights) > 3:
            print(f"   ... and {len(insights)-3} more insights")

        # Audio quality note
        separation = results.get('channel_separation', {})
        method = separation.get('method', 'unknown')
        if method == 'stereo_channels':
            print(f"\n Audio: Clear stereo separation")
        elif method == 'mono_assumed':
            print(f"\n Audio: Mono (approximate speaker separation)")

        print("="*60)



In [18]:
def quick_analyze_call(audio_path: str, display_results: bool = True) -> Optional[Dict]:

    analyzer = OptimizedCallQualityAnalyzer()
    results = analyzer.analyze_call_optimized(audio_path)

    if results and display_results:
        analyzer.display_results_compact(results)

    return results


In [19]:
def analyze_demo_call(audio_path: str = None) -> Optional[Dict]:

    if audio_path is None:
        print("Please provide audio_path parameter")
        print("Usage: analyze_demo_call('path/to/your/audio.wav')")
        return None

    if not Path(audio_path).exists():
        print(f"Audio file not found: {audio_path}")
        print("Please upload an audio file to Colab first")
        return None

    print("Starting optimized call analysis...")
    print(f"File: {Path(audio_path).name}")
    print("Optimized for free Colab (<30s processing)")
    print()

    return quick_analyze_call(audio_path, display_results=True)


In [20]:
class CoLabCallAnalyzer:

    def __init__(self):
        print("Initializing Colab Call Analyzer...")
        self.analyzer = OptimizedCallQualityAnalyzer(
            whisper_model_size="tiny.en",  # Fastest model
            enable_stereo_separation=True,
            log_level="WARNING"  # Reduce output for cleaner Colab experience
        )
        print("Ready for analysis!")

    def analyze(self, audio_file_path: str) -> Optional[Dict]:

        return self.analyzer.analyze_call_optimized(audio_file_path, save_results=False)

    def show_results(self, results: Dict):

        if results:
            self.analyzer.display_results_compact(results)
        else:
            print("No results to display")

    def analyze_and_show(self, audio_file_path: str):

        print("Analyzing call...")
        results = self.analyze(audio_file_path)
        self.show_results(results)
        return results



In [21]:
def benchmark_performance(audio_path: str, num_runs: int = 3) -> Dict:


    print(f"Benchmarking performance with {num_runs} runs...")

    times = []
    analyzer = OptimizedCallQualityAnalyzer(log_level="ERROR")

    for i in range(num_runs):
        print(f"Run {i+1}/{num_runs}...", end=" ")
        start_time = datetime.now()

        results = analyzer.analyze_call_optimized(audio_path, save_results=False)

        elapsed = (datetime.now() - start_time).total_seconds()
        times.append(elapsed)

        print(f"{elapsed:.1f}s")

    stats = {
        'average_time': round(np.mean(times), 2),
        'min_time': round(min(times), 2),
        'max_time': round(max(times), 2),
        'std_deviation': round(np.std(times), 2),
        'all_times': times,
        'colab_compatible': max(times) < 30  # Under 30s requirement
    }

    print(f"\n PERFORMANCE RESULTS:")
    print(f"   Average: {stats['average_time']}s")
    print(f"   Range: {stats['min_time']}s - {stats['max_time']}s")
    print(f"   Colab Compatible: {'YES' if stats['colab_compatible'] else 'NO'}")

    return stats


In [22]:
def setup_colab_environment():

    print("Setting up Colab environment...")

    # Install required packages
    install_commands = [
        "pip install -q whisper-openai",
        "pip install -q librosa",
        "pip install -q transformers",
        "pip install -q torch torchaudio",
        "pip install -q soundfile",
        "pip install -q pydub"
    ]

    for cmd in install_commands:
        print(f"Installing: {cmd.split()[-1]}")
        os.system(cmd)

    print("Packages installed")

    # Pre-download models for faster first run
    print("Pre-downloading models...")
    try:
        # Download Whisper tiny.en
        import whisper
        whisper.load_model("tiny.en")
        print("Whisper tiny.en ready")

        # Download sentiment model
        from transformers import pipeline
        pipeline("sentiment-analysis", model="distilbert-base-uncased-finetuned-sst-2-english")
        print("Sentiment model ready")

    except Exception as e:
        print(f"Model download warning: {e}")

    print("Colab environment ready for call analysis!")


In [23]:
def create_sample_colab_notebook():
    """
    Generate sample code for Colab notebook
    """

    notebook_code = '''
# Call Quality Analyzer - Colab Demo
# Optimized for sub-30 second processing

# 1. Setup (run once)
!pip install -q whisper-openai librosa transformers torch soundfile pydub

# 2. Import and initialize
from analyzer_optimized import CoLabCallAnalyzer
analyzer = CoLabCallAnalyzer()

# 3. Upload your audio file to Colab
from google.colab import files
uploaded = files.upload()
audio_file = list(uploaded.keys())[0]

# 4. Analyze the call
results = analyzer.analyze_and_show(audio_file)

# 5. Access specific results
if results:
    print("\\nDetailed breakdown:")
    print(f"Agent talk time: {results['talk_time_ratio']['agent']}%")
    print(f"Questions asked: {results['questions']['count']}")
    print(f"Sentiment: {results['sentiment']['dominant_sentiment']}")
    '''

    print(" Sample Colab Notebook Code:")
    print("="*50)
    print(notebook_code)
    print("="*50)


if __name__ == "__main__":
    print("Optimized Call Quality Analyzer")
    print("Designed for Google Colab free tier")
    print("Processes calls in <30 seconds")
    print()
    print("Quick start options:")
    print("1. analyzer = CoLabCallAnalyzer()")
    print("2. results = analyzer.analyze_and_show('audio.wav')")
    print("3. Or use: quick_analyze_call('audio.wav')")
    print()
    print("For Colab setup: setup_colab_environment()")
    print("For sample code: create_sample_colab_notebook()")


Optimized Call Quality Analyzer
Designed for Google Colab free tier
Processes calls in <30 seconds

Quick start options:
1. analyzer = CoLabCallAnalyzer()
2. results = analyzer.analyze_and_show('audio.wav')
3. Or use: quick_analyze_call('audio.wav')

For Colab setup: setup_colab_environment()
For sample code: create_sample_colab_notebook()


In [24]:
class StreamlinedAnalyzer:
    """
    Minimal analyzer for basic metrics only
    Even faster processing for simple use cases
    """

    def __init__(self):
        self.whisper_model = whisper.load_model("tiny.en")
        if TRANSFORMERS_AVAILABLE:
            self.sentiment_analyzer = pipeline("sentiment-analysis")
        else:
            self.sentiment_analyzer = None

    def basic_analysis(self, audio_path: str) -> Dict:
        """
        Basic analysis with only essential metrics
        Target: <15 seconds processing time
        """

        start_time = datetime.now()

        # Load audio (limit to 2 minutes max for speed)
        audio, sr = librosa.load(audio_path, sr=16000, duration=120)
        duration = len(audio) / sr

        # Quick transcription
        result = self.whisper_model.transcribe(audio_path, language="en", verbose=False)
        text = result['text']

        # Basic sentiment (single chunk only)
        sentiment = "neutral"
        if self.sentiment_analyzer and text.strip():
            try:
                sent_result = self.sentiment_analyzer(text[:500])  # First 500 chars only
                sentiment = sent_result[0]['label'].lower()
            except:
                pass

        # Simple question count
        question_count = text.count('?') + len([s for s in text.split() if s.lower() in ['what', 'how', 'why', 'when', 'where']])

        processing_time = (datetime.now() - start_time).total_seconds()

        return {
            'duration_minutes': round(duration / 60, 2),
            'word_count': len(text.split()),
            'question_count': question_count,
            'sentiment': sentiment,
            'transcription': text[:200] + "..." if len(text) > 200 else text,
            'processing_time': round(processing_time, 2)
        }



In [25]:
class BatchProcessor:
    """
    Process multiple audio files efficiently in Colab
    """

    def __init__(self):
        self.analyzer = OptimizedCallQualityAnalyzer(log_level="WARNING")

    def process_folder(self, folder_path: str, max_files: int = 5) -> List[Dict]:
        """
        Process multiple audio files (limited for Colab performance)
        """

        folder = Path(folder_path)
        audio_extensions = {'.wav', '.mp3', '.m4a', '.flac'}
        audio_files = [f for f in folder.glob('*') if f.suffix.lower() in audio_extensions]

        # Limit files for Colab memory constraints
        audio_files = audio_files[:max_files]

        print(f"Processing {len(audio_files)} files...")

        results = []
        for i, file_path in enumerate(audio_files, 1):
            print(f"File {i}/{len(audio_files)}: {file_path.name}")

            try:
                result = self.analyzer.analyze_call_optimized(str(file_path), save_results=False)
                if result:
                    results.append(result)
                    print(f"Completed in {result['analysis_metadata']['processing_time_seconds']:.1f}s")
                else:
                    print("Failed")

            except Exception as e:
                print(f"Error: {e}")
                continue

        print(f"\n Batch complete: {len(results)}/{len(audio_files)} successful")
        return results


In [26]:
def create_summary_report(self, results: List[Dict]) -> Dict:
        """
        Create summary statistics from batch results
        """

        if not results:
            return {}

        # Aggregate metrics
        avg_duration = np.mean([r['file_info']['duration_minutes'] for r in results])
        avg_questions = np.mean([r['questions']['count'] for r in results])

        sentiment_counts = {}
        for result in results:
            sent = result['sentiment']['dominant_sentiment']
            sentiment_counts[sent] = sentiment_counts.get(sent, 0) + 1

        talk_ratios = []
        for result in results:
            if 'agent' in result['talk_time_ratio']:
                talk_ratios.append(result['talk_time_ratio']['agent'])

        avg_agent_talk = np.mean(talk_ratios) if talk_ratios else 50

        summary = {
            'total_calls': len(results),
            'average_duration_minutes': round(avg_duration, 2),
            'average_questions_per_call': round(avg_questions, 1),
            'sentiment_distribution': sentiment_counts,
            'average_agent_talk_percentage': round(avg_agent_talk, 1),
            'processing_times': [r['analysis_metadata']['processing_time_seconds'] for r in results]
        }

        print(" BATCH SUMMARY:")
        print(f"   Total calls analyzed: {summary['total_calls']}")
        print(f"   Average duration: {summary['average_duration_minutes']} minutes")
        print(f"   Average questions: {summary['average_questions_per_call']}")
        print(f"   Average agent talk: {summary['average_agent_talk_percentage']}%")
        print(f"   Sentiment breakdown: {summary['sentiment_distribution']}")

        return summary