# 1. Importing Libraries

In [1]:
import os
import json
import time
import uuid
import requests
from datetime import datetime
from typing import Dict, List
import pandas as pd
from dotenv import load_dotenv
from azure.storage.blob import BlobServiceClient
import sys
from io import StringIO
import glob
import base64
from IPython.display import Audio, display
import itertools

# Load environment variables
load_dotenv()

True

In [2]:
AUDIO_ROOT = "audio_emotions/files"

transcription_folder = "azure/transcriptions"
convo_folder = "azure/convo"
batch_folder = "azure/batch_summary"
folders = ["azure", transcription_folder, convo_folder, batch_folder]

for folder in folders:
    if not os.path.exists(folder):
        os.makedirs(folder)
        print(f"✅ Created folder: {folder}")
    else:
        print(f"ℹ️  Already exists: {folder}")

ℹ️  Already exists: azure
ℹ️  Already exists: azure/transcriptions
ℹ️  Already exists: azure/convo
ℹ️  Already exists: azure/batch_summary


# 2. Defining Classes

In [3]:
class AudioProcessorConfig:
    def __init__(self):
        self.speech_subscription_key = os.getenv("SPEECH_SUBSCRIPTION_KEY")
        self.speech_endpoint = os.getenv("SPEECH_ENDPOINT").rstrip('/')
        self.language_subscription_key = os.getenv("LANGUAGE_SUBSCRIPTION_KEY") 
        self.language_endpoint = os.getenv("LANGUAGE_ENDPOINT").rstrip('/')
        self.storage_connection_string = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
        self.container_name = os.getenv("BLOB_CONTAINER_NAME", "audio-files")
        self.locale = "en-US"
        self.language = "en"
        self.use_stereo_audio = True  # Set to True if your audio is stereo
        
        # Validate required configurations
        required_configs = [
            self.speech_subscription_key, self.speech_endpoint,
            self.language_subscription_key, self.language_endpoint,
            self.storage_connection_string
        ]
        if not all(required_configs):
            raise ValueError("Missing required environment variables. Check your .env file.")

class TranscriptionPhrase:
    def __init__(self, id: int, text: str, itn: str, lexical: str, speaker_number: int, offset: str, offset_in_ticks: float):
        self.id = id
        self.text = text
        self.itn = itn
        self.lexical = lexical
        self.speaker_number = speaker_number
        self.offset = offset
        self.offset_in_ticks = offset_in_ticks

class SentimentAnalysisResult:
    def __init__(self, speaker_number: int, offset_in_ticks: float, document: Dict):
        self.speaker_number = speaker_number
        self.offset_in_ticks = offset_in_ticks
        self.document = document

class ConversationAnalysisSummaryItem:
    def __init__(self, aspect: str, summary: str):
        self.aspect = aspect
        self.summary = summary

class ConversationAnalysisPiiItem:
    def __init__(self, category: str, text: str):
        self.category = category
        self.text = text

class ConversationAnalysisForSimpleOutput:
    def __init__(self, summary: List[ConversationAnalysisSummaryItem], pii_analysis: List[List[ConversationAnalysisPiiItem]]):
        self.summary = summary
        self.pii_analysis = pii_analysis

# 3. Helper Functions

### 3.1 Generic REST API request handler

In [4]:
def send_request(method: str, uri: str, headers: Dict = None, json_data: Dict = None, expected_status_codes: List = None):
    """Generic REST API request handler"""
    if headers is None:
        headers = {}
    if expected_status_codes is None:
        expected_status_codes = [200]
    
    try:
        if method.upper() == 'GET':
            response = requests.get(uri, headers=headers)
        elif method.upper() == 'POST':
            response = requests.post(uri, headers=headers, json=json_data)
        elif method.upper() == 'DELETE':
            response = requests.delete(uri, headers=headers)
        else:
            raise ValueError(f"Unsupported HTTP method: {method}")
        
        if response.status_code not in expected_status_codes:
            raise Exception(f"Request failed with status {response.status_code}: {response.text}")
        
        result = {"status_code": response.status_code, "headers": dict(response.headers)}
        try:
            result["json"] = response.json()
        except:
            result["text"] = response.text
            
        return result
    except Exception as e:
        raise Exception(f"Request failed: {str(e)}")

### 3.2 Function to Upload to Azure Blob Storage

In [5]:
def upload_audio_to_blob(local_file_path: str, config: AudioProcessorConfig) -> str:
    """Upload local audio file to Azure Blob Storage and return public URL"""
    try:
        # Initialize blob service client
        blob_service_client = BlobServiceClient.from_connection_string(config.storage_connection_string)
        
        # Generate unique blob name
        filename = os.path.basename(local_file_path)
        blob_name = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{filename}"
        
        # Upload file
        blob_client = blob_service_client.get_blob_client(
            container=config.container_name, 
            blob=blob_name
        )
        
        print(f"Uploading {local_file_path} to blob storage...")
        with open(local_file_path, "rb") as data:
            blob_client.upload_blob(data, overwrite=True)
        
        # Return public URL
        blob_url = f"https://{os.getenv('AZURE_STORAGE_ACCOUNT_NAME')}.blob.core.windows.net/{config.container_name}/{blob_name}"
        print(f"File uploaded successfully: {blob_url}")
        return blob_url
        
    except Exception as e:
        raise Exception(f"Failed to upload file to blob storage: {str(e)}")

### 3.2 Transcription Functions

In [6]:
def create_transcription(audio_url: str, config: AudioProcessorConfig) -> str:
    """Create batch transcription job for a dual-channel, 8kHz telephony WAV.
    
    NOTE: Azure Batch Transcription requires the audio file URL to be publicly accessible or secured with a valid SAS token.
    To resolve "Authentication failed for recordings URI":
      1. **Generate a container-level SAS** in the Azure Portal: Go to your Storage Account ➔ Containers ➔ [Your Container] ➔ Shared access signature. Select **Read** and **List**, set an expiration, then **Generate**.
      2. **Attach the SAS**: Copy the SAS token (including the leading `?`) and assign it to `config.sas_token`, or append it to your `audio_url` (e.g. `https://...wav?sv=...`).
      3. **Verify Access**: Paste the full URL in a browser or `curl` to confirm you can download the WAV without authentication errors.
      4. **Check Firewall & Networking**: Ensure your storage account allows access from your transcription service’s IP or set Access Level to **Blob (anonymous read)** if acceptable.
      5. **Retry Upload & Transcription** after confirming the URL resolves publicly.
    To avoid "Authentication failed for recordings URI":
      - Generate a container-level SAS token (Read + List) for your blob container
      - Ensure the 'audio_url' includes that SAS (e.g. audio_url?sv=...&sr=c&si=...)
      - Assign config.sas_token to the full token string (including leading '?')"""
    """Create batch transcription job for a dual-channel, 8kHz telephony WAV."""
    # Append SAS token if present
    url = audio_url + config.sas_token if getattr(config, 'sas_token', None) else audio_url

    uri = f"{config.speech_endpoint}/speechtotext/v3.2/transcriptions"
    headers = {
        "Ocp-Apim-Subscription-Key": config.speech_subscription_key,
        "Content-Type": "application/json"
    }
    now = datetime.utcnow().strftime('%Y%m%d_%H%M%SZ')
    payload = {
        "contentUrls": [url],
        "properties": {
            "diarizationEnabled": True,
            "diarizationMode": "SpeakerDiarization",
            "diarizationSpeakerCount": getattr(config, 'channel_count', 2),
            "punctuationMode": "DictatedAndAutomatic",
            "profanityFilterMode": "Masked",
            "wordLevelTimestampsEnabled": True,
            "timeToLive": "PT30M"
        },
        "locale": config.locale,
        "displayName": f"call_center_{now}",
    }

    resp = send_request("POST", uri, headers, payload, expected_status_codes=[201])
    transcription_uri = resp["json"].get("self")
    if not transcription_uri:
        raise Exception(f"Failed to create transcription: {resp.get('text') or resp.get('json')}" )

    tid = transcription_uri.rstrip('/').split('/')[-1]
    try:
        uuid.UUID(tid)
        return tid
    except ValueError:
        raise Exception(f"Invalid transcription ID received: {tid}")


def get_transcription_status(transcription_id: str, config: AudioProcessorConfig) -> bool:
    """Check if transcription is complete; logs full error payload on failure."""
    uri = f"{config.speech_endpoint}/speechtotext/v3.2/transcriptions/{transcription_id}"
    headers = {"Ocp-Apim-Subscription-Key": config.speech_subscription_key}

    resp = send_request("GET", uri, headers, expected_status_codes=[200])
    data = resp.get("json", {})
    status = data.get("status", "").lower()
    print(f"Status for {transcription_id}: {status.upper()}")

    if status == "failed":
        # Try detailed error fields
        err = (data.get('properties', {}).get('error', {}).get('message')
               or data.get('properties', {}).get('error', {}).get('failureReason')
               or data.get('error', {}).get('message')
               or resp.get('text')
               or 'No error info')
        print(f"Transcription failed: {err}")
        raise Exception(f"Transcription failed: {err}")

    return status == "succeeded"


def wait_for_transcription(transcription_id: str, config: AudioProcessorConfig, wait_seconds: int = 10):
    """Poll transcription until it succeeds or fails."""
    print(f"Polling status for {transcription_id} every {wait_seconds}s...")
    while True:
        try:
            if get_transcription_status(transcription_id, config):
                print("Transcription succeeded.")
                return
        except Exception as e:
            # break loop on failure for detailed error
            raise
        time.sleep(wait_seconds)


def get_transcription_result(transcription_id: str, config: AudioProcessorConfig) -> Dict:
    """Download transcription JSON once complete."""
    uri = f"{config.speech_endpoint}/speechtotext/v3.2/transcriptions/{transcription_id}/files"
    headers = {"Ocp-Apim-Subscription-Key": config.speech_subscription_key}

    resp = send_request("GET", uri, headers, expected_status_codes=[200])
    files = resp.get("json", {}).get("values", [])
    transcription_file = next((f for f in files if f.get("kind", "").lower() == "transcription"), None)
    if not transcription_file:
        raise Exception(f"Transcription file not found; files: {files}")

    download_url = transcription_file.get("links", {}).get("contentUrl")
    if not download_url:
        raise Exception(f"Missing contentUrl in transcription_file: {transcription_file}")

    result_resp = send_request("GET", download_url, {}, expected_status_codes=[200])
    return result_resp.get("json", {})


def parse_transcription_phrases(transcription: Dict, config: AudioProcessorConfig) -> List[TranscriptionPhrase]:
    """Parse recognized phrases into structured objects."""
    phrases = []
    for idx, p in enumerate(sorted(transcription.get("recognizedPhrases", []), key=lambda x: x.get("offsetInTicks", 0))):
        best = p.get("nBest", [{}])[0]
        speaker_num = (p.get("speaker") - 1) if p.get("speaker") else p.get("channel", 0)
        phrases.append(TranscriptionPhrase(
            id=idx,
            text=best.get("display", ""),
            itn=best.get("itn", ""),
            lexical=best.get("lexical", ""),
            speaker_number=speaker_num,
            offset=p.get("offset"),
            offset_in_ticks=p.get("offsetInTicks")
        ))
    return phrases

### 3.3 Function for Sentiment Analysis (Positive, Neutral, Negative)

In [7]:
def analyze_sentiment(phrases: List[TranscriptionPhrase], config: AudioProcessorConfig) -> List[SentimentAnalysisResult]:
    """Analyze sentiment for each phrase"""
    uri = f"{config.language_endpoint}/language/:analyze-text?api-version=2024-11-01"
    headers = {
        "Ocp-Apim-Subscription-Key": config.language_subscription_key,
        "Content-Type": "application/json"
    }
    
    results = []
    phrase_data = {}
    
    # Prepare documents for sentiment analysis (max 10 per request)
    documents = []
    for phrase in phrases:
        phrase_data[phrase.id] = (phrase.speaker_number, phrase.offset_in_ticks)
        documents.append({
            "id": str(phrase.id),
            "language": config.language,
            "text": phrase.text,
        })
    
    # Process in chunks of 10
    for i in range(0, len(documents), 10):
        chunk = documents[i:i+10]
        content = {
            "kind": "SentimentAnalysis",
            "analysisInput": {"documents": chunk},
        }
        
        response = send_request("POST", uri, headers, content, [200])
        
        for document in response["json"]["results"]["documents"]:
            phrase_id = int(document["id"])
            speaker_num, offset_ticks = phrase_data[phrase_id]
            results.append(SentimentAnalysisResult(speaker_num, offset_ticks, document))
    
    return results

### 3.4 Conversation Analysis Function

In [8]:
def analyze_conversation(phrases: List[TranscriptionPhrase], config: AudioProcessorConfig) -> Dict:
    """Analyze conversation for summary and PII"""
    uri = f"{config.language_endpoint}/language/analyze-conversations/jobs?api-version=2024-11-01"
    headers = {
        "Ocp-Apim-Subscription-Key": config.language_subscription_key,
        "Content-Type": "application/json"
    }
    
    # Convert phrases to conversation items
    conversation_items = []
    for phrase in phrases:
        conversation_items.append({
            "id": str(phrase.id),
            "text": phrase.text,
            "itn": phrase.itn,
            "lexical": phrase.lexical,
            "role": "Agent" if phrase.speaker_number == 0 else "Customer",
            "participantId": str(phrase.speaker_number)
        })
    
    content = {
        "displayName": f"call_center_{datetime.now()}",
        "analysisInput": {
            "conversations": [{
                "id": "conversation1",
                "language": config.language,
                "modality": "transcript",
                "conversationItems": conversation_items,
            }],
        },
        "tasks": [
            {
                "taskName": "summary_1",
                "kind": "ConversationalSummarizationTask",
                "parameters": {
                    "modelVersion": "latest",
                    "summaryAspects": ["Issue", "Resolution"]
                }
            },
            {
                "taskName": "PII_1", 
                "kind": "ConversationalPIITask",
                "parameters": {
                    "piiCategories": ["All"],
                    "includeAudioRedaction": False,
                    "redactionSource": "text",
                    "modelVersion": "2024-11-01-preview",
                    "loggingOptOut": False
                }
            }
        ]
    }
    
    response = send_request("POST", uri, headers, content, [202])
    analysis_url = response["headers"]["operation-location"]
    
    # Wait for analysis to complete
    print("Waiting for conversation analysis to complete...")
    while True:
        status_response = send_request("GET", analysis_url, {
            "Ocp-Apim-Subscription-Key": config.language_subscription_key
        }, expected_status_codes=[200])
        
        status = status_response["json"]["status"].lower()
        if status == "succeeded":
            break
        elif status == "failed":
            raise Exception("Conversation analysis failed")
        else:
            print("Waiting 10 seconds...")
            time.sleep(10)
    
    print("Conversation analysis completed!")
    return status_response["json"]

### 3.5 Function to Process Results

In [9]:
def process_results(phrases: List[TranscriptionPhrase], 
                   sentiment_results: List[SentimentAnalysisResult],
                   conversation_analysis: Dict) -> Dict:
    """Process and combine all analysis results"""
    
    # Sort sentiment results by offset
    sorted_sentiments = sorted(sentiment_results, key=lambda x: x.offset_in_ticks)
    
    # Extract conversation summary
    tasks = conversation_analysis["tasks"]["items"]
    summary_task = next(t for t in tasks if t["taskName"] == "summary_1")
    pii_task = next(t for t in tasks if t["taskName"] == "PII_1")
    
    summaries = []
    if summary_task and "results" in summary_task:
        conversation = summary_task["results"]["conversations"][0]
        for summary in conversation.get("summaries", []):
            summaries.append(ConversationAnalysisSummaryItem(
                summary["aspect"], 
                summary["text"]
            ))
    
    # Extract PII information
    pii_analysis = []
    if pii_task and "results" in pii_task:
        conversation = pii_task["results"]["conversations"][0]
        for item in conversation.get("conversationItems", []):
            item_entities = []
            for entity in item.get("entities", []):
                item_entities.append(ConversationAnalysisPiiItem(
                    entity["category"], 
                    entity["text"]
                ))
            pii_analysis.append(item_entities)
    
    # Create results dictionary
    results = {
        "phrases": [],
        "overall_summary": summaries,
        "metadata": {
            "total_phrases": len(phrases),
            "speakers": list(set(p.speaker_number for p in phrases)),
            "processing_time": datetime.now().isoformat()
        }
    }
    
    # Combine phrase-level results
    for i, phrase in enumerate(phrases):
        phrase_result = {
            "id": phrase.id,
            "text": phrase.text,
            "speaker": phrase.speaker_number,
            "offset": phrase.offset,
            "sentiment": None,
            "sentiment_scores": None,
            "pii_entities": []
        }
        
        # Add sentiment if available
        if i < len(sorted_sentiments):
            sentiment_doc = sorted_sentiments[i].document
            phrase_result["sentiment"] = sentiment_doc["sentiment"]
            phrase_result["sentiment_scores"] = sentiment_doc["confidenceScores"]
        
        # Add PII entities if available
        if i < len(pii_analysis):
            phrase_result["pii_entities"] = [
                {"category": entity.category, "text": entity.text} 
                for entity in pii_analysis[i]
            ]
        
        results["phrases"].append(phrase_result)
    
    return results

# 4. Audio to Insights

### 4.1 Process a single audio (.wav) file

In [10]:
def process_audio_file(audio_file_path: str, config: AudioProcessorConfig = None, 
                      skip_upload: bool = False, audio_url: str = None,
                      save_intermediate: bool = False, cleanup_blob: bool = True) -> Dict:
    """
    Main function to process audio file through the complete pipeline
    
    Args:
        audio_file_path: Path to local audio file
        config: Configuration object (will create default if None)
        skip_upload: If True and audio_url provided, skip blob upload
        audio_url: Direct URL to audio file (if already uploaded)
        save_intermediate: Save intermediate results to files
        cleanup_blob: Delete uploaded blob after processing
    
    Returns:
        Dict containing all analysis results
    """
    
    if config is None:
        config = AudioProcessorConfig()
    
    # Validate input file
    if not skip_upload and not os.path.exists(audio_file_path):
        raise FileNotFoundError(f"Audio file not found: {audio_file_path}")
    
    print(f"Processing audio file: {audio_file_path}")
    start_time = time.time()
    blob_name = None  # Track for cleanup
    
    try:
        # Step 1: Upload to blob storage (or use provided URL)
        if skip_upload and audio_url:
            print(f"\n=== Step 1: Using provided audio URL ===")
            print(f"Audio URL: {audio_url}")
        else:
            print("\n=== Step 1: Uploading to Azure Blob Storage ===")
            audio_url = upload_audio_to_blob(audio_file_path, config)
            # Extract blob name for cleanup
            blob_name = audio_url.split('/')[-1]
        
        # Step 2: Create transcription
        print("\n=== Step 2: Creating Speech Transcription ===")
        transcription_id = create_transcription(audio_url, config)
        print(f"Transcription ID: {transcription_id}")
        
        # Step 3: Wait for transcription
        print("\n=== Step 3: Waiting for Transcription ===")
        wait_for_transcription(transcription_id, config)
        
        # Step 4: Get transcription result
        print("\n=== Step 4: Getting Transcription Result ===")
        transcription = get_transcription_result(transcription_id, config)
        phrases = parse_transcription_phrases(transcription, config)
        print(f"Found {len(phrases)} phrases")
        
        # Save intermediate result if requested
        if save_intermediate:
            file_path = os.path.join(
                transcription_folder,
                f"transcription_{transcription_id}.json"
            )
             
            with open(file_path, 'w') as f:
                json.dump(transcription, f, indent=2)

            print(f"Saved transcription to: transcription_{transcription_id}.json")
        
        # Step 5: Analyze sentiment
        print("\n=== Step 5: Analyzing Sentiment ===")
        sentiment_results = analyze_sentiment(phrases, config)
        print(f"Analyzed sentiment for {len(sentiment_results)} phrases")
        
        # Step 6: Analyze conversation
        print("\n=== Step 6: Analyzing Conversation ===")
        conversation_analysis = analyze_conversation(phrases, config)
        
        # Save intermediate result if requested
        if save_intermediate:
            file_path = os.path.join(
                convo_folder,
                f"conversation_analysis_{transcription_id}.json"
            )

            with open(file_path, 'w') as f:
                json.dump(conversation_analysis, f, indent=2)
            print(f"Saved conversation analysis to: conversation_analysis_{transcription_id}.json")
        
        # Step 7: Process and combine results
        print("\n=== Step 7: Processing Results ===")
        final_results = process_results(phrases, sentiment_results, conversation_analysis)
        
        # Add processing metadata
        end_time = time.time()
        final_results["metadata"].update({
            "processing_duration_seconds": round(end_time - start_time, 2),
            "transcription_id": transcription_id,
            "audio_url": audio_url,
            "original_file": audio_file_path
        })
        
        # Cleanup: Delete transcription job
        try:
            delete_uri = f"{config.speech_endpoint}/speechtotext/v3.2/transcriptions/{transcription_id}"
            headers = {"Ocp-Apim-Subscription-Key": config.speech_subscription_key}
            send_request("DELETE", delete_uri, headers, expected_status_codes=[204])
            print("Cleaned up transcription job")
        except Exception as cleanup_error:
            print(f"Warning: Failed to cleanup transcription job: {cleanup_error}")
        
        # Cleanup: Delete uploaded blob if requested
        if cleanup_blob and blob_name and not skip_upload:
            try:
                blob_service_client = BlobServiceClient.from_connection_string(config.storage_connection_string)
                blob_client = blob_service_client.get_blob_client(
                    container=config.container_name, 
                    blob=blob_name
                )
                blob_client.delete_blob()
                print("Cleaned up uploaded blob")
            except Exception as cleanup_error:
                print(f"Warning: Failed to cleanup blob: {cleanup_error}")
        
        processing_time = round(end_time - start_time, 2)
        print(f"\n=== Processing Complete in {processing_time} seconds! ===")
        return final_results
        
    except Exception as e:
        print(f"Error processing audio file: {str(e)}")
        # Attempt cleanup on error
        if blob_name and cleanup_blob:
            try:
                blob_service_client = BlobServiceClient.from_connection_string(config.storage_connection_string)
                blob_client = blob_service_client.get_blob_client(
                    container=config.container_name, 
                    blob=blob_name
                )
                blob_client.delete_blob()
                print("Cleaned up uploaded blob after error")
            except:
                pass
        raise

### 4.2 Process audio (.wav) file in batches

In [11]:
def process_multiple_audio_files(audio_files: List[str], config: AudioProcessorConfig = None,
                                max_concurrent: int = 3, save_results: bool = True) -> Dict:
    """
    Process multiple audio files with optional concurrency
    
    Args:
        audio_files: List of audio file paths
        config: Configuration object
        max_concurrent: Maximum number of concurrent processing jobs
        save_results: Save individual results to files
    
    Returns:
        Dict with results for each file and summary statistics
    """
    
    if config is None:
        config = AudioProcessorConfig()
    
    print(f"Processing {len(audio_files)} audio files...")
    
    all_results = {}
    failed_files = []
    processing_stats = {
        "total_files": len(audio_files),
        "successful": 0,
        "failed": 0,
        "start_time": datetime.now().isoformat(),
        "total_phrases": 0,
        "total_duration": 0
    }
    
    for i, audio_file in enumerate(audio_files, 1):
        print(f"\n{'='*60}")
        print(f"Processing file {i}/{len(audio_files)}: {os.path.basename(audio_file)}")
        print(f"{'='*60}")
        
        try:
            result = process_audio_file(audio_file, config)
            all_results[audio_file] = result
            processing_stats["successful"] += 1
            processing_stats["total_phrases"] += result["metadata"]["total_phrases"]
            processing_stats["total_duration"] += result["metadata"]["processing_duration_seconds"]
            
            if save_results:
                pass
                # base_name = os.path.splitext(os.path.basename(audio_file))[0]
                # export_results_to_csv(result, f"{base_name}_results.csv")
                # export_full_results_to_json(result, f"{base_name}_full_results.json")
                
        except Exception as e:
            print(f"Failed to process {audio_file}: {str(e)}")
            failed_files.append({"file": audio_file, "error": str(e)})
            processing_stats["failed"] += 1
    
    processing_stats["end_time"] = datetime.now().isoformat()
    processing_stats["failed_files"] = failed_files
    
    # Create summary
    summary = {
        "processing_stats": processing_stats,
        "individual_results": all_results
    }
    
    if save_results:
        file_path = os.path.join(
            batch_folder,
            f"batch_processing_summary.json"
        )
        
        with open(file_path, 'w') as f:
            json.dump(summary, f, indent=2, default=str)
        print(f"\nBatch processing summary saved to: batch_processing_summary.json")
    
    print(f"\n{'='*60}")
    print("BATCH PROCESSING COMPLETE")
    print(f"{'='*60}")
    print(f"Successful: {processing_stats['successful']}/{processing_stats['total_files']}")
    print(f"Failed: {processing_stats['failed']}/{processing_stats['total_files']}")
    print(f"Total phrases processed: {processing_stats['total_phrases']}")
    print(f"Total processing time: {processing_stats['total_duration']:.2f} seconds")
    
    return summary

### 4.3 Display insights result

In [12]:
def display_results(results: Dict):
    """Display results in a nice format"""
    
    print("=" * 80)
    print("AUDIO PROCESSING RESULTS")
    print("=" * 80)
    
    # Metadata
    metadata = results["metadata"]
    print(f"\nTotal Phrases: {metadata['total_phrases']}")
    print(f"Speakers: {metadata['speakers']}")
    print(f"Processing Time: {metadata['processing_time']}")
    
    # Overall Summary
    print("\n" + "=" * 40)
    print("CONVERSATION SUMMARY")
    print("=" * 40)
    for summary in results["overall_summary"]:
        print(f"\n{summary.aspect}:")
        print(f"  {summary.summary}")
    
    # Phrase-by-phrase analysis
    print("\n" + "=" * 40)
    print("DETAILED PHRASE ANALYSIS")
    print("=" * 40)
    
    for phrase in results["phrases"]:
        print(f"\nPhrase {phrase['id'] + 1} (Speaker {phrase['speaker']}):")
        print(f"  Text: {phrase['text']}")
        print(f"  Sentiment: {phrase['sentiment']}")
        if phrase['sentiment_scores']:
            scores = phrase['sentiment_scores']
            print(f"  Confidence - Positive: {scores.get('positive', 0):.2f}, "
                  f"Negative: {scores.get('negative', 0):.2f}, "
                  f"Neutral: {scores.get('neutral', 0):.2f}")
        if phrase['pii_entities']:
            pii_str = ', '.join([f"{e['category']}: {e['text']}" for e in phrase['pii_entities']])
            print(f"  PII Entities: {pii_str}")
        else:
            print("  PII Entities: None")

### 4.4 Additional Audio Processing Methods

In [13]:
def quick_process(audio_file_path: str, display_summary: bool = True) -> Dict:
    """
    Quick processing with minimal output - useful for batch operations
    
    Args:
        audio_file_path: Path to audio file
        display_summary: Whether to display a summary
    
    Returns:
        Processing results
    """
    
    config = AudioProcessorConfig()
    
    # Process with minimal logging
    
    
    # Temporarily redirect stdout to capture logs
    old_stdout = sys.stdout
    sys.stdout = StringIO()
    
    try:
        results = process_audio_file(audio_file_path, config, cleanup_blob=True)
        
        # Restore stdout
        sys.stdout = old_stdout
        
        if display_summary:
            print(f"✅ Processed: {os.path.basename(audio_file_path)}")
            print(f"   Phrases: {results['metadata']['total_phrases']}")
            print(f"   Duration: {results['metadata']['processing_duration_seconds']}s")
            
            # Show sentiment distribution
            sentiments = [p['sentiment'] for p in results['phrases'] if p['sentiment']]
            if sentiments:
                from collections import Counter
                sentiment_counts = Counter(sentiments)
                print(f"   Sentiments: {dict(sentiment_counts)}")
        
        return results
        
    except Exception as e:
        sys.stdout = old_stdout
        print(f"❌ Failed: {os.path.basename(audio_file_path)} - {str(e)}")
        raise

def analyze_audio_directory(directory_path: str, file_pattern: str = "*.wav") -> Dict:
    """
    Process all audio files in a directory
    
    Args:
        directory_path: Directory containing audio files
        file_pattern: File pattern to match (e.g., "*.wav", "*.mp3")
    
    Returns:
        Batch processing results
    """
    # Find all matching audio files
    search_pattern = os.path.join(directory_path, file_pattern)
    audio_files = glob.glob(search_pattern)
    
    if not audio_files:
        print(f"No audio files found matching pattern: {search_pattern}")
        return {}
    
    print(f"Found {len(audio_files)} audio files to process")
    
    # Process all files
    return process_multiple_audio_files(audio_files, save_results=True)

# 5. Use Cases

### 5.1 Audio Input

In [14]:
def get_audio_and_path(df, idx, emotion):
    """
    Given your DataFrame and row index:
      - looks up the `set_id`
      - builds: AUDIO_ROOT/<set_id>/<emotion>.wav
      - returns: (Audio player or None, full_path, base64-encoded bytes or "")
      - if the file isn’t .wav, returns (None, full_path, "")
    """
    # sanity checks
    if 'set_id' not in df.columns:
        raise KeyError("DataFrame must contain a 'set_id' column")
    
    # grab the set identifier
    sid = df.at[idx, 'set_id']
    
    # build filename and path
    filename  = f"{emotion}.wav"
    full_path = os.path.join(AUDIO_ROOT, sid, filename)
    
    # if it’s not a .wav file, return empties
    if not full_path.lower().endswith('.wav'):
        return None, full_path, ""
    
    # now load audio
    if full_path.startswith(("http://", "https://")):
        player = Audio(url=full_path)
        import requests
        resp = requests.get(full_path)
        resp.raise_for_status()
        audio_bytes = resp.content
    else:
        player = Audio(filename=full_path)
        with open(full_path, "rb") as f:
            audio_bytes = f.read()
    
    # base64 encode for any downstream use
    audio_base64 = base64.b64encode(audio_bytes).decode("utf-8")
    
    return player, full_path, audio_base64

In [15]:
emotions = ["joyfully", "euphoric", "sad", "surprised"]

In [16]:
df = pd.read_csv("speech_emotions.csv")
print("Shape: ", df.shape)
df.head()

Shape:  (20, 6)


Unnamed: 0.1,Unnamed: 0,set_id,text,gender,age,country
0,0,00026029e0--64991b6eef1fe70609d48edc,The delicious aroma of freshly baked bread fil...,MALE,29,ZA
1,1,00026029e0--64991b72e0daf97163c09c66,I enjoy taking long walks in the peaceful coun...,FEMALE,42,NG
2,2,00026029e0--64991b7fd94c0d5726dec353,The suspenseful novel kept me on the edge of m...,FEMALE,29,VN
3,3,00026029e0--64991b907f82d9763944eba2,They celebrated their anniversary with a roman...,FEMALE,20,PK
4,4,00026029e0--64991bf2ffab6240f9f2418b,The diligent student earned top marks for her ...,MALE,30,PK


In [17]:
def fetch_audios(df, n, emotions=None):
    """
    Returns three lists (players, paths, b64s) of length n,
    cycling through `emotions`, skipping any calls where path is falsy.
    """
    if emotions is None:
        emotions = ["joyfully", "euphoric", "sad", "surprised"]

    players, paths, b64s = [], [], []
    attempt_idx = 0
    # cycle() lets us loop emotions endlessly
    emotion_cycle = itertools.cycle(emotions)

    while len(players) < n:
        emotion = next(emotion_cycle)
        player, path, b64 = get_audio_and_path(df, idx=attempt_idx, emotion=emotion)
        attempt_idx += 1

        if not path:
            # skip missing audio, but keep idx moving
            continue

        players.append(player)
        paths.append(path)
        b64s.append(b64)

    return players, paths, b64s


In [18]:
players, paths, b64s = fetch_audios(df, n=6)

In [19]:
players

[<IPython.lib.display.Audio object>,
 <IPython.lib.display.Audio object>,
 <IPython.lib.display.Audio object>,
 <IPython.lib.display.Audio object>,
 <IPython.lib.display.Audio object>,
 <IPython.lib.display.Audio object>]

In [20]:
paths

['audio_emotions/files\\00026029e0--64991b6eef1fe70609d48edc\\joyfully.wav',
 'audio_emotions/files\\00026029e0--64991b72e0daf97163c09c66\\euphoric.wav',
 'audio_emotions/files\\00026029e0--64991b7fd94c0d5726dec353\\sad.wav',
 'audio_emotions/files\\00026029e0--64991b907f82d9763944eba2\\surprised.wav',
 'audio_emotions/files\\00026029e0--64991bf2ffab6240f9f2418b\\joyfully.wav',
 'audio_emotions/files\\00026029e0--64991bfc63d8b20f56d0e1fb\\euphoric.wav']

### 5.2 Initialize configuration

In [21]:
config = AudioProcessorConfig()

### 5.3 Single Audio File

In [22]:
print(paths[0])
display(players[0])

single_result = process_audio_file(
    paths[0], 
    config,
    save_intermediate=True,  
    cleanup_blob=True   
)

# Export results
# df = export_results_to_csv(results)
# export_full_results_to_json(results)

# Show DataFrame preview
# print("\nDataFrame Preview:")
# display(df.head())

audio_emotions/files\00026029e0--64991b6eef1fe70609d48edc\joyfully.wav


Processing audio file: audio_emotions/files\00026029e0--64991b6eef1fe70609d48edc\joyfully.wav

=== Step 1: Uploading to Azure Blob Storage ===
Uploading audio_emotions/files\00026029e0--64991b6eef1fe70609d48edc\joyfully.wav to blob storage...
File uploaded successfully: https://bpiaudiostorage.blob.core.windows.net/audio-files/20250619_132017_joyfully.wav

=== Step 2: Creating Speech Transcription ===
Transcription ID: e8696d7a-fc3e-46f7-896d-96e6421c148d

=== Step 3: Waiting for Transcription ===
Polling status for e8696d7a-fc3e-46f7-896d-96e6421c148d every 10s...
Status for e8696d7a-fc3e-46f7-896d-96e6421c148d: RUNNING
Status for e8696d7a-fc3e-46f7-896d-96e6421c148d: RUNNING
Status for e8696d7a-fc3e-46f7-896d-96e6421c148d: SUCCEEDED
Transcription succeeded.

=== Step 4: Getting Transcription Result ===
Found 1 phrases
Saved transcription to: transcription_e8696d7a-fc3e-46f7-896d-96e6421c148d.json

=== Step 5: Analyzing Sentiment ===
Analyzed sentiment for 1 phrases

=== Step 6: Analy

In [23]:
display_results(single_result)

AUDIO PROCESSING RESULTS

Total Phrases: 1
Speakers: [0]
Processing Time: 2025-06-19T13:20:53.295861

CONVERSATION SUMMARY

DETAILED PHRASE ANALYSIS

Phrase 1 (Speaker 0):
  Text: The delicious aroma of freshly baked bread filled the bakery.
  Sentiment: positive
  Confidence - Positive: 0.97, Negative: 0.00, Neutral: 0.03
  PII Entities: None


In [24]:
single_result

{'phrases': [{'id': 0,
   'text': 'The delicious aroma of freshly baked bread filled the bakery.',
   'speaker': 0,
   'offset': 'PT1.76S',
   'sentiment': 'positive',
   'sentiment_scores': {'positive': 0.97, 'neutral': 0.03, 'negative': 0.0},
   'pii_entities': []}],
 'overall_summary': [],
 'metadata': {'total_phrases': 1,
  'speakers': [0],
  'processing_time': '2025-06-19T13:20:53.295861',
  'processing_duration_seconds': 35.65,
  'transcription_id': 'e8696d7a-fc3e-46f7-896d-96e6421c148d',
  'audio_url': 'https://bpiaudiostorage.blob.core.windows.net/audio-files/20250619_132017_joyfully.wav',
  'original_file': 'audio_emotions/files\\00026029e0--64991b6eef1fe70609d48edc\\joyfully.wav'}}

### 5.4 Multiple Audio Files

In [25]:
for i in range(1, len(paths)):
    print(paths[i])
    display(players[i])

audio_emotions/files\00026029e0--64991b72e0daf97163c09c66\euphoric.wav


audio_emotions/files\00026029e0--64991b7fd94c0d5726dec353\sad.wav


audio_emotions/files\00026029e0--64991b907f82d9763944eba2\surprised.wav


audio_emotions/files\00026029e0--64991bf2ffab6240f9f2418b\joyfully.wav


audio_emotions/files\00026029e0--64991bfc63d8b20f56d0e1fb\euphoric.wav


In [26]:
# Process all files
batch_results = process_multiple_audio_files(
    paths[1:],
    save_results=True  # Save individual results for each file
)

print("\nBatch processing completed!")

Processing 5 audio files...

Processing file 1/5: euphoric.wav
Processing audio file: audio_emotions/files\00026029e0--64991b72e0daf97163c09c66\euphoric.wav

=== Step 1: Uploading to Azure Blob Storage ===
Uploading audio_emotions/files\00026029e0--64991b72e0daf97163c09c66\euphoric.wav to blob storage...
File uploaded successfully: https://bpiaudiostorage.blob.core.windows.net/audio-files/20250619_132053_euphoric.wav

=== Step 2: Creating Speech Transcription ===
Transcription ID: 6b2085a9-d500-4327-97e0-dc9f7588e58e

=== Step 3: Waiting for Transcription ===
Polling status for 6b2085a9-d500-4327-97e0-dc9f7588e58e every 10s...
Status for 6b2085a9-d500-4327-97e0-dc9f7588e58e: RUNNING
Status for 6b2085a9-d500-4327-97e0-dc9f7588e58e: RUNNING
Status for 6b2085a9-d500-4327-97e0-dc9f7588e58e: SUCCEEDED
Transcription succeeded.

=== Step 4: Getting Transcription Result ===
Found 1 phrases

=== Step 5: Analyzing Sentiment ===
Analyzed sentiment for 1 phrases

=== Step 6: Analyzing Conversation

In [27]:
batch_results

{'processing_stats': {'total_files': 5,
  'successful': 5,
  'failed': 0,
  'start_time': '2025-06-19T13:20:53.860945',
  'total_phrases': 6,
  'total_duration': 202.32,
  'end_time': '2025-06-19T13:24:18.847556',
  'failed_files': []},
 'individual_results': {'audio_emotions/files\\00026029e0--64991b72e0daf97163c09c66\\euphoric.wav': {'phrases': [{'id': 0,
     'text': 'I enjoy taking long walks in the peaceful countryside.',
     'speaker': 0,
     'offset': 'PT2.84S',
     'sentiment': 'positive',
     'sentiment_scores': {'positive': 1.0, 'neutral': 0.0, 'negative': 0.0},
     'pii_entities': []}],
   'overall_summary': [],
   'metadata': {'total_phrases': 1,
    'speakers': [0],
    'processing_time': '2025-06-19T13:21:27.201888',
    'processing_duration_seconds': 33.34,
    'transcription_id': '6b2085a9-d500-4327-97e0-dc9f7588e58e',
    'audio_url': 'https://bpiaudiostorage.blob.core.windows.net/audio-files/20250619_132053_euphoric.wav',
    'original_file': 'audio_emotions/file

### 5.5 Quick processing

In [28]:
quick_result = quick_process(paths[0])

✅ Processed: joyfully.wav
   Phrases: 1
   Duration: 33.79s
   Sentiments: {'positive': 1}


In [29]:
quick_result

{'phrases': [{'id': 0,
   'text': 'The delicious aroma of freshly baked bread filled the bakery.',
   'speaker': 0,
   'offset': 'PT1.76S',
   'sentiment': 'positive',
   'sentiment_scores': {'positive': 0.97, 'neutral': 0.03, 'negative': 0.0},
   'pii_entities': []}],
 'overall_summary': [],
 'metadata': {'total_phrases': 1,
  'speakers': [0],
  'processing_time': '2025-06-19T13:24:52.664650',
  'processing_duration_seconds': 33.79,
  'transcription_id': '7d944fb3-a284-4f0b-8535-3d0651d1ce9e',
  'audio_url': 'https://bpiaudiostorage.blob.core.windows.net/audio-files/20250619_132418_joyfully.wav',
  'original_file': 'audio_emotions/files\\00026029e0--64991b6eef1fe70609d48edc\\joyfully.wav'}}

### 5.6 Directory Processing Quick processing

In [30]:
directory_results = analyze_audio_directory("audio_emotions/files/00026029e0--64991c0dd94c0d5726df032a")

Found 4 audio files to process
Processing 4 audio files...

Processing file 1/4: euphoric.wav
Processing audio file: audio_emotions/files/00026029e0--64991c0dd94c0d5726df032a\euphoric.wav

=== Step 1: Uploading to Azure Blob Storage ===
Uploading audio_emotions/files/00026029e0--64991c0dd94c0d5726df032a\euphoric.wav to blob storage...
File uploaded successfully: https://bpiaudiostorage.blob.core.windows.net/audio-files/20250619_132453_euphoric.wav

=== Step 2: Creating Speech Transcription ===
Transcription ID: 4f294ab1-2c09-489c-92ef-fdc359626c8c

=== Step 3: Waiting for Transcription ===
Polling status for 4f294ab1-2c09-489c-92ef-fdc359626c8c every 10s...
Status for 4f294ab1-2c09-489c-92ef-fdc359626c8c: RUNNING
Status for 4f294ab1-2c09-489c-92ef-fdc359626c8c: RUNNING
Status for 4f294ab1-2c09-489c-92ef-fdc359626c8c: SUCCEEDED
Transcription succeeded.

=== Step 4: Getting Transcription Result ===
Found 2 phrases

=== Step 5: Analyzing Sentiment ===
Analyzed sentiment for 2 phrases

===

In [31]:
directory_results

{'processing_stats': {'total_files': 4,
  'successful': 4,
  'failed': 0,
  'start_time': '2025-06-19T13:24:53.122806',
  'total_phrases': 6,
  'total_duration': 144.29999999999998,
  'end_time': '2025-06-19T13:27:19.303037',
  'failed_files': []},
 'individual_results': {'audio_emotions/files/00026029e0--64991c0dd94c0d5726df032a\\euphoric.wav': {'phrases': [{'id': 0,
     'text': 'The Kuru still regularly.',
     'speaker': 0,
     'offset': 'PT1.36S',
     'sentiment': 'neutral',
     'sentiment_scores': {'positive': 0.0, 'neutral': 1.0, 'negative': 0.0},
     'pii_entities': []},
    {'id': 1,
     'text': 'Explores the Mass Museum exits.',
     'speaker': 1,
     'offset': 'PT2.76S',
     'sentiment': 'neutral',
     'sentiment_scores': {'positive': 0.0, 'neutral': 1.0, 'negative': 0.0},
     'pii_entities': []}],
   'overall_summary': [],
   'metadata': {'total_phrases': 2,
    'speakers': [0, 1],
    'processing_time': '2025-06-19T13:25:26.630686',
    'processing_duration_second