In [1]:
!pip install --break-system-packages git+https://github.com/openai/whisper.git

Collecting git+https://github.com/openai/whisper.git
  Cloning https://github.com/openai/whisper.git to /tmp/pip-req-build-_b59tud2
  Running command git clone --filter=blob:none --quiet https://github.com/openai/whisper.git /tmp/pip-req-build-_b59tud2
  Resolved https://github.com/openai/whisper.git to commit c0d2f624c09dc18e709e37c2ad90c039a4eb72a2
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Building wheels for collected packages: openai-whisper
  Building wheel for openai-whisper (pyproject.toml) ... [?25l[?25hdone
  Created wheel for openai-whisper: filename=openai_whisper-20250625-py3-none-any.whl size=803979 sha256=33c3bf938505926688b1e3da92b1b9591b2cd4e3e1f525389230f49ee1664369
  Stored in directory: /tmp/pip-ephem-wheel-cache-kwbusxpk/wheels/c3/03/25/5e0ba78bc27a3a089f137c9f1d92fdfce16d06996c071a016c
Successfully built openai-whisper
Installing collec

In [2]:
!pip install --break-system-packages pyannote.audio torchaudio

Collecting pyannote.audio
  Downloading pyannote.audio-3.3.2-py2.py3-none-any.whl.metadata (11 kB)
Collecting asteroid-filterbanks>=0.4 (from pyannote.audio)
  Downloading asteroid_filterbanks-0.4.0-py3-none-any.whl.metadata (3.3 kB)
Collecting lightning>=2.0.1 (from pyannote.audio)
  Downloading lightning-2.5.4-py3-none-any.whl.metadata (39 kB)
Collecting pyannote.core>=5.0.0 (from pyannote.audio)
  Downloading pyannote.core-5.0.0-py3-none-any.whl.metadata (1.4 kB)
Collecting pyannote.database>=5.0.1 (from pyannote.audio)
  Downloading pyannote.database-5.1.3-py3-none-any.whl.metadata (1.1 kB)
Collecting pyannote.metrics>=3.2 (from pyannote.audio)
  Downloading pyannote.metrics-3.2.1-py3-none-any.whl.metadata (1.3 kB)
Collecting pyannote.pipeline>=3.0.1 (from pyannote.audio)
  Downloading pyannote.pipeline-3.0.1-py3-none-any.whl.metadata (897 bytes)
Collecting pytorch-metric-learning>=2.1.0 (from pyannote.audio)
  Downloading pytorch_metric_learning-2.9.0-py3-none-any.whl.metadata (18

In [3]:
import whisper
from pyannote.audio import Pipeline
import torch
import re
import os
import subprocess
import json
from datetime import datetime
from typing import Dict, List, Optional, Any

  torchaudio.list_audio_backends()


In [4]:
# Configuration
INPUT_AUDIO_PATH = "call2.wav"
CLEAN_AUDIO_PATH = "cleaned_audio_for_asr_and_diarization.wav"
HUGGING_FACE_ACCESS_TOKEN = "hf_"

In [5]:
class EnhancedInsuranceASR:
    def __init__(self):
        self.model = whisper.load_model("large-v3")

        # Enhanced domain-specific vocabulary from Gemini prompt
        self.domain_terms = {
            # Insurance companies
            'axis maxlife': 'Axis Maxlife Insurance',
            'axis max life': 'Axis Maxlife Insurance',
            'access max life': 'Axis Maxlife Insurance',

            # Payment methods
            'g pay': 'Google Pay',
            'google pay': 'Google Pay',
            'phone pay': 'PhonePe',
            'phone pe': 'PhonePe',
            'pay tm': 'Paytm',
            'net banking': 'net banking',

            # Insurance terms
            'some assured': 'sum assured',
            'fund value': 'fund values',
            'premium do': 'premium due',
            'do date': 'due date',
            'late fee': 'late fee',
            'surrender value': 'surrender value',
            'maturity value': 'maturity value',
            'health declaration form': 'health declaration form',

            # Common names (only add if actually heard in audio)
            'jaya parkash': 'Jaya Parkash',
            'chandru': 'Chandru',
            'sneha': 'Sneha',
            'kowsalya': 'Kowsalya',
            'swathi': 'Swathi',
            'delphina': 'Delphina'
        }

        # Enhanced initial prompt based on Gemini approach
        self.enhanced_prompt = (
            "This is a customer support call for Axis Maxlife Insurance about policy renewal. "
            "The conversation includes policy numbers, due dates, fund values, sum assured amounts, "
            "late fees, and payment methods like Google Pay, PhonePe, Paytm, net banking, UPI, and cards. "
            "Speakers discuss premium due amounts, surrender values, maturity values, alternative mobile numbers, "
            "email IDs, and health declaration forms. Common outcomes include payment confirmations, "
            "callback requests, or providing policy information."
        )

        # Entity extraction patterns
        self.entity_patterns = {
            'policy_number': [
                r'\b(?:policy|policy number|pol no)[:\s]*([A-Z0-9]{8,15})\b',
                r'\b([A-Z]{2,4}\d{8,12})\b'  # Common policy number formats
            ],
            'due_date': [
                r'\b(\d{1,2}[-/]\d{1,2}[-/]\d{2,4})\b',
                r'\b(\d{1,2}\s+(?:jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)\w*\s+\d{2,4})\b'
            ],
            'premium_due_amount': [
                r'\b(?:premium|amount|rupees?|rs\.?|₹)\s*(?:due|payable)?\s*(?:is|of)?\s*([₹]?\s*\d{1,6}(?:,\d{3})*(?:\.\d{2})?)\b',
                r'\b([₹]\s*\d{1,6}(?:,\d{3})*(?:\.\d{2})?)\s*(?:rupees?|due|premium)\b'
            ],
            'late_fee': [
                r'\b(?:late fee|penalty|additional charge)\s*(?:is|of)?\s*([₹]?\s*\d{1,6}(?:,\d{3})*(?:\.\d{2})?)\b'
            ],
            'mobile_number': [
                r'\b(?:mobile|phone|number)\s*(?:is|:)?\s*([6-9]\d{9})\b',
                r'\b([6-9]\d{9})\b'
            ],
            'email': [
                r'\b([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})\b'
            ]
        }

    def smart_audio_preprocessing(self, input_path: str, output_path: str) -> bool:
        """Enhanced audio preprocessing with insurance call optimization"""
        print("--- Optimized Insurance Call Audio Preprocessing ---")

        # Optimized for Indian English and regional accents in insurance calls
        ffmpeg_command = [
            "ffmpeg", "-i", input_path,
            "-acodec", "pcm_s16le",
            "-ac", "1",  # Mono
            "-ar", "16000",  # Optimal for Whisper
            "-af", "loudnorm=I=-16:TP=-1.5:LRA=11,highpass=f=85,lowpass=f=7500,afftdn=nr=8",
            "-y", output_path
        ]

        try:
            result = subprocess.run(ffmpeg_command, check=True, capture_output=True, text=True)
            print("✅ Insurance call preprocessing successful")
            return True
        except subprocess.CalledProcessError as e:
            print(f"❌ Preprocessing failed: {e}")
            return False

    def detect_language(self, audio_path: str) -> str:
        """Detect language from predefined set"""
        # Quick detection using Whisper
        result = self.model.transcribe(audio_path, language=None, fp16=False, task="transcribe")
        detected_lang = result.get('language', 'en')

        # Map to Gemini's expected language format
        lang_mapping = {
            'ta': 'Tamil',
            'te': 'Telugu',
            'hi': 'Hindi',
            'ml': 'Malayalam',
            'kn': 'Kannada',
            'en': 'English'  # Fallback
        }

        return lang_mapping.get(detected_lang, 'Hindi')  # Default to Hindi if uncertain

    def enhanced_whisper_transcription(self, audio_path: str, detected_language: str) -> Dict[str, Any]:
        """Enhanced transcription with language-specific optimization"""
        print(f"--- Enhanced Whisper Transcription (Language: {detected_language}) ---")

        # Map language back to Whisper format
        whisper_lang_map = {
            'Tamil': 'ta',
            'Telugu': 'te',
            'Hindi': 'hi',
            'Malayalam': 'ml',
            'Kannada': 'kn',
            'English': 'en'
        }

        whisper_lang = whisper_lang_map.get(detected_language, 'hi')

        # Enhanced parameters for insurance calls
        result = self.model.transcribe(
            audio_path,
            language=whisper_lang,
            task="transcribe",  # Keep in original language first
            temperature=0.0,
            beam_size=5,
            patience=1.5,
            condition_on_previous_text=False,
            no_speech_threshold=0.6,  # More sensitive for quiet speakers
            compression_ratio_threshold=1.8,
            logprob_threshold=-0.4,
            word_timestamps=True,  # Enable for better segmentation
            initial_prompt=self.enhanced_prompt,
            verbose=True,
        )

        # If not English, also get English translation
        english_result = None
        if detected_language != 'English':
            english_result = self.model.transcribe(
                audio_path,
                language=whisper_lang,
                task="translate",  # Translate to English
                temperature=0.0,
                beam_size=5,
                patience=1.5,
                condition_on_previous_text=False,
                no_speech_threshold=0.6,
                compression_ratio_threshold=1.8,
                logprob_threshold=-0.4,
                initial_prompt=self.enhanced_prompt,
                verbose=True,
            )

        return {
            'source_result': result,
            'english_result': english_result,
            'detected_language': detected_language
        }

    def extract_entities(self, text: str) -> Dict[str, Any]:
        """Extract insurance-specific entities from transcription"""
        entities = {
            "policy_number": None,
            "due_date": None,
            "premium_due_amount": None,
            "late_fee": None,
            "payment_method": None,
            "fund_values": None,
            "sum_assured": None,
            "surrender_value": None,
            "maturity_value": None,
            "alternative_mobile": None,
            "email_id": None,
            "needs_health_declaration_form": False,
            "outcome": None
        }

        text_lower = text.lower()

        # Extract structured entities using patterns
        for entity, patterns in self.entity_patterns.items():
            for pattern in patterns:
                match = re.search(pattern, text_lower, re.IGNORECASE)
                if match:
                    if entity == 'policy_number':
                        entities['policy_number'] = match.group(1).upper()
                    elif entity == 'due_date':
                        entities['due_date'] = match.group(1)
                    elif entity in ['premium_due_amount', 'late_fee']:
                        amount = match.group(1).replace('₹', '').replace(',', '').strip()
                        entities[entity] = amount
                    elif entity == 'mobile_number':
                        entities['alternative_mobile'] = match.group(1)
                    elif entity == 'email':
                        entities['email_id'] = match.group(1)
                    break

        # Detect payment methods
        payment_methods = {
            'google pay': 'Google Pay',
            'g pay': 'Google Pay',
            'phonepe': 'PhonePe',
            'phone pe': 'PhonePe',
            'paytm': 'Paytm',
            'pay tm': 'Paytm',
            'net banking': 'net banking',
            'netbanking': 'net banking',
            'upi': 'UPI',
            'card': 'card',
            'cash': 'cash'
        }

        for method_key, method_value in payment_methods.items():
            if method_key in text_lower:
                entities['payment_method'] = method_value
                break

        # Detect outcome
        if any(word in text_lower for word in ['paid', 'payment done', 'successful']):
            entities['outcome'] = 'paid_now'
        elif any(word in text_lower for word in ['will pay', 'pay later', 'tomorrow']):
            entities['outcome'] = 'will_pay_later'
        elif any(word in text_lower for word in ['callback', 'call back', 'call later']):
            entities['outcome'] = 'callback_required'
        elif any(word in text_lower for word in ['declined', 'cannot pay', 'not possible']):
            entities['outcome'] = 'declined'
        else:
            entities['outcome'] = 'info_given'

        # Check for health declaration form
        if 'health declaration' in text_lower or 'medical form' in text_lower:
            entities['needs_health_declaration_form'] = True

        return entities

    def post_process_insurance_text(self, text: str) -> str:
        """Enhanced post-processing for insurance domain"""
        if not text:
            return ""

        # Apply domain-specific corrections
        text_lower = text.lower()
        for wrong, correct in self.domain_terms.items():
            text_lower = text_lower.replace(wrong, correct)

        # Enhanced currency formatting
        text_lower = re.sub(r'\brs[.]?\s*', '₹', text_lower)
        text_lower = re.sub(r'\brupees?\s*(\d+)', r'₹\1', text_lower)

        # Remove excessive repetitions (enhanced)
        words = text_lower.split()
        cleaned_words = []
        i = 0
        while i < len(words):
            current_word = words[i].lower()
            repetition_count = 1
            j = i + 1
            while j < len(words) and words[j].lower() == current_word:
                repetition_count += 1
                j += 1
            # Keep max 2 repetitions for emphasis, 1 for excessive repetition
            keep_count = min(repetition_count, 2) if repetition_count <= 3 else 1
            for _ in range(keep_count):
                cleaned_words.append(words[i])
            i += repetition_count

        text_lower = ' '.join(cleaned_words)

        # Clean up spacing and punctuation
        text_lower = re.sub(r'\s{2,}', ' ', text_lower)
        text_lower = re.sub(r'\s+([,.!?])', r'\1', text_lower)

        # Capitalize sentences
        text_lower = re.sub(r'(^|[.!?]\s+)([a-z])',
                           lambda m: m.group(1) + m.group(2).upper(),
                           text_lower)

        return text_lower.strip()

    def create_gemini_style_output(self, transcription_result: Dict[str, Any],
                                 dialogue: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Create output matching Gemini's JSON structure"""

        source_result = transcription_result['source_result']
        english_result = transcription_result['english_result']
        detected_language = transcription_result['detected_language']

        # Combine all text for entity extraction
        full_source_text = ' '.join([seg['text'] for seg in source_result['segments']])
        full_english_text = ' '.join([seg['text'] for seg in english_result['segments']]) if english_result else full_source_text

        # Extract entities from English text (more reliable)
        entities = self.extract_entities(full_english_text)

        # Create dialogue with timestamps
        formatted_dialogue = []
        for entry in dialogue:
            start_min = int(entry['start_time'] // 60)
            start_sec = int(entry['start_time'] % 60)
            timestamp = f"{start_min:02d}:{start_sec:02d}"

            # Map speaker to Agent/Customer (simple heuristic)
            speaker_name = entry['speaker']
            if 'speaker_00' in speaker_name.lower() or '0' in speaker_name:
                speaker_label = "Agent"
            elif 'speaker_01' in speaker_name.lower() or '1' in speaker_name:
                speaker_label = "Customer"
            else:
                speaker_label = speaker_name

            formatted_dialogue.append({
                "ts": timestamp,
                "speaker": speaker_label,
                "text": entry['text']
            })

        # Quality flags
        quality_flags = {
            "noisy_audio": len([seg for seg in source_result['segments'] if seg.get('no_speech_prob', 0) > 0.8]) > 0,
            "mixed_language": detected_language == 'English' and any(word in full_source_text.lower()
                                                                   for word in ['tamil', 'hindi', 'telugu']),
            "missing_key_info": []
        }

        # Check for missing key information
        if not entities['policy_number']:
            quality_flags['missing_key_info'].append('policy_number')
        if not entities['premium_due_amount']:
            quality_flags['missing_key_info'].append('premium_due_amount')
        if not entities['payment_method'] and entities['outcome'] == 'paid_now':
            quality_flags['missing_key_info'].append('payment_method')

        return {
            "source_language": detected_language,
            "transcript_source": full_source_text,
            "transcript_english": full_english_text,
            "dialogue": formatted_dialogue,
            "entities": entities,
            "quality_flags": quality_flags,
            "processing_metadata": {
                "model_used": "whisper-large-v3",
                "processing_time": datetime.now().isoformat(),
                "total_duration": source_result.get('duration', 0),
                "total_speakers": len(set(d['speaker'] for d in dialogue))
            }
        }

    def process_audio(self, audio_path: str) -> Dict[str, Any]:
        """Main processing pipeline"""
        print("🎯 Starting Enhanced Insurance ASR Pipeline")
        print("=" * 60)

        # Step 1: Audio preprocessing
        if not self.smart_audio_preprocessing(audio_path, CLEAN_AUDIO_PATH):
            raise Exception("Audio preprocessing failed")

        # Step 2: Language detection
        detected_language = self.detect_language(CLEAN_AUDIO_PATH)
        print(f"🌐 Detected language: {detected_language}")

        # Step 3: Enhanced transcription
        transcription_result = self.enhanced_whisper_transcription(CLEAN_AUDIO_PATH, detected_language)

        # Step 4: Process segments and remove repetitions
        source_segments = transcription_result['source_result']['segments']
        cleaned_segments = self.detect_and_remove_repetitions(source_segments)

        # Step 5: Post-process text
        processed_segments = []
        for segment in cleaned_segments:
            processed_text = self.post_process_insurance_text(segment['text'])
            if processed_text.strip() and len(processed_text.strip()) > 5:
                segment_copy = segment.copy()
                segment_copy['text'] = processed_text
                processed_segments.append(segment_copy)

        # Step 6: Speaker diarization
        dialogue = self.create_dialogue_from_segments(processed_segments)

        # Step 7: Create Gemini-style output
        final_output = self.create_gemini_style_output(transcription_result, dialogue)

        return final_output

    def detect_and_remove_repetitions(self, segments: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Enhanced repetition detection for insurance calls"""
        # Your existing repetition detection logic here
        # (I'll keep your implementation as it's already quite good)
        print("🔍 Starting enhanced repetition detection...")
        cleaned_segments = []

        for segment in segments:
            text = segment['text'].strip()
            words = text.split()

            if len(words) < 2:
                continue

            # Check for word dominance
            word_counts = {}
            for word in words:
                word_lower = word.lower().strip('.,!?')
                word_counts[word_lower] = word_counts.get(word_lower, 0) + 1

            max_word_count = max(word_counts.values()) if word_counts else 0
            word_dominance = max_word_count / len(words) if words else 0

            if word_dominance > 0.4:
                continue

            # Check consecutive repetitions
            max_consecutive = 0
            consecutive_repeats = 0
            for j in range(1, len(words)):
                if words[j].lower().strip('.,!?') == words[j-1].lower().strip('.,!?'):
                    consecutive_repeats += 1
                    max_consecutive = max(max_consecutive, consecutive_repeats + 1)
                else:
                    consecutive_repeats = 0

            if max_consecutive > 3:
                continue

            cleaned_segments.append(segment)

        print(f"📊 Cleaning: {len(segments)} → {len(cleaned_segments)} segments")
        return cleaned_segments

    def create_dialogue_from_segments(self, segments: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Create dialogue structure from processed segments"""
        # Simple implementation - you can enhance with your diarization logic
        dialogue = []

        for i, segment in enumerate(segments):
            # Simple speaker alternation (enhance with actual diarization)
            speaker = f"Speaker_{i % 2}"

            dialogue.append({
                'speaker': speaker,
                'text': segment['text'],
                'start_time': segment.get('start', i * 3),  # Approximate timing
                'end_time': segment.get('end', (i + 1) * 3)
            })

        return dialogue

In [None]:
def main():
    """Main execution function"""
    try:
        asr = EnhancedInsuranceASR()
        result = asr.process_audio(INPUT_AUDIO_PATH)

        # Save results in Gemini-compatible format
        with open('insurance_call_analysis.json', 'w', encoding='utf-8') as f:
            json.dump(result, f, indent=2, ensure_ascii=False)

        print("\n" + "=" * 60)
        print("📋 EXTRACTED INFORMATION")
        print("=" * 60)
        print(f"Language: {result['source_language']}")
        print(f"Policy Number: {result['entities']['policy_number']}")
        print(f"Premium Due: {result['entities']['premium_due_amount']}")
        print(f"Payment Method: {result['entities']['payment_method']}")
        print(f"Outcome: {result['entities']['outcome']}")

        print("\n" + "🎭 DIALOGUE PREVIEW" + "=" * 40)
        for entry in result['dialogue'][:10]:  # Show first 10 exchanges
            print(f"\n[{entry['ts']}] {entry['speaker']}:")
            print(f"  📝 {entry['text']}")

        print(f"\n💾 Complete results saved to: insurance_call_analysis.json")
        print("✅ Enhanced processing completed successfully!")

    except Exception as e:
        print(f"❌ Processing failed: {e}")

if __name__ == "__main__":
    main()

100%|█████████████████████████████████████| 2.88G/2.88G [01:20<00:00, 38.3MiB/s]


🎯 Starting Enhanced Insurance ASR Pipeline
--- Optimized Insurance Call Audio Preprocessing ---
✅ Insurance call preprocessing successful
🌐 Detected language: Tamil
--- Enhanced Whisper Transcription (Language: Tamil) ---


