## 1. Import Required Libraries

In [None]:
!pip install boto3 botocore python-dotenv

In [None]:
import os
import sys
import json
import time
import asyncio
import re
from pathlib import Path
from typing import Optional, Dict, Any, List, Tuple
from datetime import datetime
from dotenv import load_dotenv
import boto3
from botocore.exceptions import ClientError
import requests
from pydub import AudioSegment
from pydub.generators import Sine
import numpy as np
import google.generativeai as genai

# Load environment variables
load_dotenv()

print("‚úÖ All libraries imported successfully")

## 1.5 Setup Credentials (Required for Google Colab)

**Important:** If running in Google Colab, you need to set up your credentials first.

Choose one of the following methods:

### Method 1: Use Google Colab Secrets (Recommended)

1. Click the üîë key icon in the left sidebar
2. Add these secrets:
   - `AWS_ACCESS_KEY_ID`
   - `AWS_SECRET_ACCESS_KEY`
   - `AWS_S3_BUCKET_NAME`
   - `AWS_S3_REGION`
   - `GEMINI_API_KEY`
3. Enable notebook access for each secret
4. Run the cell below:

In [None]:
# Method 1: Load from Google Colab Secrets
try:
    from google.colab import userdata

    os.environ['AWS_ACCESS_KEY_ID'] = userdata.get('AWS_ACCESS_KEY_ID')
    os.environ['AWS_SECRET_ACCESS_KEY'] = userdata.get('AWS_SECRET_ACCESS_KEY')
    os.environ['AWS_S3_BUCKET_NAME'] = userdata.get('AWS_S3_BUCKET_NAME')
    os.environ['AWS_S3_REGION'] = userdata.get('AWS_S3_REGION')
    os.environ['GEMINI_API_KEY'] = userdata.get('GEMINI_API_KEY')
    os.environ['ENVIRONMENT'] = 'COLAB'

    print("‚úÖ Credentials loaded from Colab Secrets")
except ImportError:
    print("‚ÑπÔ∏è  Not running in Colab - will use .env file or manual setup")
except Exception as e:
    print(f"‚ö†Ô∏è  Could not load from Colab Secrets: {e}")
    print("   Please use Method 2 or 3 below")

### Method 2: Upload .env File

Upload your `.env` file to Colab and run the cell below:

In [None]:
# Method 2: Upload .env file
try:
    from google.colab import files

    print("üì§ Please upload your .env file:")
    uploaded = files.upload()

    if '.env' in uploaded:
        with open('.env', 'wb') as f:
            f.write(uploaded['.env'])

        # Reload environment variables
        load_dotenv(override=True)
        print("‚úÖ .env file uploaded and loaded successfully")
    else:
        print("‚ùå No .env file found in upload")
except ImportError:
    print("‚ÑπÔ∏è  Not running in Colab - skipping file upload")

### Method 3: Manual Entry (Secure Input)

Manually enter your credentials using secure input fields:

In [None]:
# Method 3: Manual credential entry with secure input
from getpass import getpass

print("üîê Enter your credentials (input will be hidden):\n")

# Check if credentials are already set
if not os.getenv('AWS_ACCESS_KEY_ID'):
    os.environ['AWS_ACCESS_KEY_ID'] = getpass('AWS Access Key ID: ')
    os.environ['AWS_SECRET_ACCESS_KEY'] = getpass('AWS Secret Access Key: ')
    os.environ['AWS_S3_BUCKET_NAME'] = input('S3 Bucket Name (default: bpo-project-bucket): ') or 'bpo-project-bucket'
    os.environ['AWS_S3_REGION'] = input('AWS Region (default: us-east-1): ') or 'us-east-1'
    os.environ['GEMINI_API_KEY'] = getpass('Gemini API Key: ')
    os.environ['ENVIRONMENT'] = 'COLAB'

    print("\n‚úÖ Credentials set successfully")
else:
    print("‚ÑπÔ∏è  Credentials already configured (skipping manual entry)")

### Verify Credentials

Run this cell to verify your credentials are properly configured:

In [None]:
# Verify all required credentials are set
required_vars = {
    'AWS_ACCESS_KEY_ID': os.getenv('AWS_ACCESS_KEY_ID'),
    'AWS_SECRET_ACCESS_KEY': os.getenv('AWS_SECRET_ACCESS_KEY'),
    'AWS_S3_BUCKET_NAME': os.getenv('AWS_S3_BUCKET_NAME'),
    'AWS_S3_REGION': os.getenv('AWS_S3_REGION'),
    'GEMINI_API_KEY': os.getenv('GEMINI_API_KEY')
}

print("üîç Credential Status:\n")
all_set = True
for var_name, var_value in required_vars.items():
    if var_value:
        # Show partial value for security
        if 'KEY' in var_name or 'SECRET' in var_name:
            display_value = f"{var_value[:4]}...{var_value[-4:]}" if len(var_value) > 8 else "***"
        else:
            display_value = var_value
        print(f"‚úÖ {var_name}: {display_value}")
    else:
        print(f"‚ùå {var_name}: NOT SET")
        all_set = False

if all_set:
    print("\n‚úÖ All credentials are configured!")
    print("   You can now proceed with the rest of the notebook")
else:
    print("\n‚ö†Ô∏è  Some credentials are missing!")
    print("   Please use one of the methods above to set them")

## 2. Configuration and Constants

In [None]:
# AWS Configuration
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
AWS_S3_BUCKET_NAME = os.getenv("AWS_S3_BUCKET_NAME", "bpo-box-dev")
AWS_S3_REGION = os.getenv("AWS_S3_REGION", "us-east-1")
ENVIRONMENT = os.getenv("ENVIRONMENT", "LOCAL")
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")

# PII Entity Types to Detect
PII_ENTITY_TYPES = [
    "PERSON", "EMAIL", "PHONE", "ADDRESS", "CREDIT_CARD",
    "BANK_ACCOUNT", "BANK_ROUTING", "SSN", "DRIVER_ID", "PASSPORT",
    "DATE", "URL", "IP_ADDRESS", "MEDICAL_CONDITION", "MEDICAL_PROCEDURE", "MEDICATION"
]

print(f"Environment: {ENVIRONMENT}")
print(f"S3 Bucket: {AWS_S3_BUCKET_NAME}")
print(f"Region: {AWS_S3_REGION}")

## 3. AWS Transcriber Class

In [None]:
class AWSTranscriber:
    """Handles transcription using AWS Transcribe"""

    def __init__(self, region: str = AWS_S3_REGION):
        """Initialize AWS Transcribe and S3 clients"""
        self.transcribe_client = boto3.client(
            "transcribe",
            region_name=region,
            aws_access_key_id=AWS_ACCESS_KEY_ID,
            aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
        )
        self.s3_client = boto3.client(
            "s3",
            region_name=region,
            aws_access_key_id=AWS_ACCESS_KEY_ID,
            aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
        )

    def upload_audio_to_s3(self, audio_file_path: str, call_id: str) -> Optional[str]:
        """Upload audio file to S3"""
        print(f"\nüì§ Uploading audio file to S3: {audio_file_path}")

        if not os.path.exists(audio_file_path):
            print(f"‚ùå File not found: {audio_file_path}")
            return None

        try:
            s3_key = f"{ENVIRONMENT.lower()}/audio/{call_id}/original.wav"
            self.s3_client.upload_file(audio_file_path, AWS_S3_BUCKET_NAME, s3_key)
            s3_uri = f"s3://{AWS_S3_BUCKET_NAME}/{s3_key}"
            print(f"‚úÖ Audio uploaded successfully")
            print(f"   S3 URI: {s3_uri}")
            return s3_uri
        except Exception as e:
            print(f"‚ùå Error uploading audio: {e}")
            return None

    def submit_transcription(self, s3_uri: str, call_id: str) -> Optional[str]:
        """Submit audio for transcription"""
        print(f"\nüìù Submitting for transcription...")
        job_name = f"transcribe-{call_id}-{int(time.time())}"

        try:
            response = self.transcribe_client.start_transcription_job(
                TranscriptionJobName=job_name,
                Media={"MediaFileUri": s3_uri},
                MediaFormat="wav",
                LanguageCode="en-US",
                OutputBucketName=AWS_S3_BUCKET_NAME,
                OutputKey=f"{ENVIRONMENT.lower()}/transcripts/{call_id}/",
                Settings={
                    "ShowAlternatives": False,
                    "MaxSpeakerLabels": 2,
                    "ShowSpeakerLabels": True,
                    "VocabularyFilterMethod": "mask",
                },
            )
            print(f"‚úÖ Transcription job submitted: {job_name}")
            return job_name
        except Exception as e:
            print(f"‚ùå Error submitting transcription: {e}")
            return None

    def wait_for_completion(self, job_name: str, max_retries: int = 120) -> Optional[Dict[str, Any]]:
        """Poll AWS Transcribe until transcription is complete"""
        print(f"\n‚è≥ Waiting for transcription to complete...")
        retry_count = 0

        while retry_count < max_retries:
            try:
                response = self.transcribe_client.get_transcription_job(
                    TranscriptionJobName=job_name
                )
                job = response["TranscriptionJob"]
                status = job["TranscriptionJobStatus"]

                if status == "COMPLETED":
                    print(f"‚úÖ Transcription completed!")
                    return job
                elif status == "FAILED":
                    error = job.get("FailureReason", "Unknown error")
                    print(f"‚ùå Transcription failed: {error}")
                    return None
                else:
                    print(f"   Status: {status} ({retry_count * 10}s elapsed)")
                    time.sleep(10)
                    retry_count += 1
            except Exception as e:
                print(f"‚ùå Error polling status: {e}")
                return None

        print(f"‚ùå Transcription timeout")
        return None

    def get_transcript_content(self, job: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """Download and parse transcription result"""
        try:
            transcript_uri = job["Transcript"]["TranscriptFileUri"]
            print(f"\nüì• Downloading transcript...")

            if transcript_uri.startswith("https://"):
                response = requests.get(transcript_uri)
                response.raise_for_status()
                content = response.json()
            else:
                s3_parts = transcript_uri.replace("s3://", "").split("/", 1)
                bucket = s3_parts[0]
                key = s3_parts[1]
                response = self.s3_client.get_object(Bucket=bucket, Key=key)
                content = json.loads(response["Body"].read())

            print(f"‚úÖ Transcript downloaded")
            return content
        except Exception as e:
            print(f"‚ùå Error downloading transcript: {e}")
            return None

    def delete_transcription_job(self, job_name: str) -> bool:
        """Delete transcription job"""
        try:
            self.transcribe_client.delete_transcription_job(TranscriptionJobName=job_name)
            print(f"\nüóëÔ∏è  Transcription job deleted")
            return True
        except Exception as e:
            print(f"‚ùå Error deleting job: {e}")
            return False

print("‚úÖ AWSTranscriber class defined")

## 4. PII Redactor Class

In [None]:
class PIIRedactor:
    """Handles PII detection and redaction using AWS Comprehend"""

    def __init__(self, region: str = AWS_S3_REGION):
        self.comprehend_client = boto3.client(
            "comprehend",
            region_name=region,
            aws_access_key_id=AWS_ACCESS_KEY_ID,
            aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
        )

    def detect_pii_entities(self, text: str) -> List[Dict[str, Any]]:
        """Detect PII entities in text"""
        try:
            response = self.comprehend_client.detect_pii_entities(
                Text=text, LanguageCode="en"
            )
            entities = response.get("Entities", [])
            print(f"\nüîí PII Detection: {len(entities)} entities found")

            entity_types = {}
            for entity in entities:
                entity_type = entity.get("Type")
                entity_types[entity_type] = entity_types.get(entity_type, 0) + 1

            if entity_types:
                for entity_type, count in sorted(entity_types.items()):
                    print(f"   - {entity_type}: {count}")

            return entities
        except Exception as e:
            print(f"‚ùå Error detecting PII: {e}")
            return []

    def redact_text(self, text: str, entities: List[Dict[str, Any]]) -> str:
        """Redact PII entities from text"""
        if not entities:
            return text

        sorted_entities = sorted(entities, key=lambda x: x["BeginOffset"], reverse=True)
        redacted_text = text

        for entity in sorted_entities:
            entity_type = entity.get("Type", "UNKNOWN")
            begin = entity.get("BeginOffset")
            end = entity.get("EndOffset")
            if begin is not None and end is not None:
                placeholder = f"[{entity_type}]"
                redacted_text = redacted_text[:begin] + placeholder + redacted_text[end:]

        return redacted_text

    def extract_word_timings(self, transcript_content: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Extract word-level timing from transcript"""
        words = []
        try:
            results = transcript_content.get("results", {})
            items = results.get("items", [])

            for item in items:
                if item.get("type") == "pronunciation":
                    words.append({
                        "word": item.get("alternatives", [{}])[0].get("content", ""),
                        "start_time": float(item.get("start_time", 0)),
                        "end_time": float(item.get("end_time", 0)),
                    })
        except Exception as e:
            print(f"   Warning: Could not extract word timings: {e}")

        return words

    def redact_audio(self, audio_file_path: str, original_text: str,
                     entities: List[Dict[str, Any]], transcript_content: Optional[Dict[str, Any]] = None,
                     redaction_mode: str = "tone") -> Optional[bytes]:
        """Redact audio by replacing PII segments"""
        try:
            print(f"\nüîä Redacting audio ({redaction_mode} mode)...")
            if not entities:
                print("   No PII to redact")
                return None

            audio = AudioSegment.from_file(audio_file_path)
            audio_duration_ms = len(audio)
            print(f"   Audio duration: {audio_duration_ms / 1000:.2f}s")

            # Use word-level timing if available
            words = self.extract_word_timings(transcript_content) if transcript_content else []
            if words:
                print(f"   Using word-level timing ({len(words)} words)")

            sorted_entities = sorted(entities, key=lambda x: x["BeginOffset"], reverse=True)

            for entity in sorted_entities:
                begin_char = entity.get("BeginOffset")
                end_char = entity.get("EndOffset")
                entity_type = entity.get("Type", "UNKNOWN")

                if begin_char is not None and end_char is not None:
                    # Estimate timing (simplified)
                    chars_per_second = len(original_text) / (audio_duration_ms / 1000)
                    start_ms = int((begin_char / chars_per_second) * 1000)
                    end_ms = int((end_char / chars_per_second) * 1000)

                    start_ms = max(0, start_ms)
                    end_ms = min(audio_duration_ms, end_ms)
                    duration_ms = end_ms - start_ms

                    if duration_ms > 50:
                        # Create tone for redaction
                        tone = Sine(1000).to_audio_segment(duration=duration_ms)
                        redaction_segment = tone - 15
                        audio = audio[:start_ms] + redaction_segment + audio[end_ms:]
                        print(f"   ‚úì Redacted {entity_type} at {start_ms}ms-{end_ms}ms")

            audio_bytes = audio.export(format="wav").read()
            print(f"‚úÖ Audio redaction completed")
            return audio_bytes
        except Exception as e:
            print(f"‚ùå Error redacting audio: {e}")
            return None

print("‚úÖ PIIRedactor class defined")

## 5. Gemini Sentiment Analyzer Class

In [None]:
class GeminiSentimentAnalyzer:
    """Handles sentiment analysis using Google Gemini API"""

    def __init__(self, api_key: str):
        genai.configure(api_key=api_key)
        self.model = genai.GenerativeModel('gemini-pro')

    def format_time(self, seconds: float) -> str:
        """Convert seconds to MM:SS format"""
        minutes = int(seconds // 60)
        secs = int(seconds % 60)
        return f"{minutes:02d}:{secs:02d}"

    def extract_speaker_segments(self, transcript_content: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Extract speaker segments from AWS Transcribe results"""
        segments = []
        try:
            results = transcript_content.get("results", {})
            items = results.get("items", [])
            speaker_labels = results.get("speaker_labels", {})
            segments_data = speaker_labels.get("segments", [])

            for segment in segments_data:
                speaker = segment.get("speaker_label", "Unknown")
                start_time = float(segment.get("start_time", 0))
                end_time = float(segment.get("end_time", 0))

                text_parts = []
                for i, item in enumerate(items):
                    if item.get("type") == "pronunciation":
                        item_start = float(item.get("start_time", 0))
                        item_end = float(item.get("end_time", 0))

                        if item_start >= start_time and item_end <= end_time:
                            word = item.get("alternatives", [{}])[0].get("content", "")
                            text_parts.append(word)

                            if i + 1 < len(items) and items[i + 1].get("type") == "punctuation":
                                punct = items[i + 1].get("alternatives", [{}])[0].get("content", "")
                                if punct and text_parts:
                                    text_parts[-1] = text_parts[-1] + punct

                text = " ".join(text_parts)
                if text.strip():
                    segments.append({
                        "speaker": speaker,
                        "text": text.strip(),
                        "start_time": start_time,
                        "end_time": end_time
                    })
        except Exception as e:
            print(f"‚ùå Error extracting segments: {e}")

        return segments

    def merge_consecutive_segments(self, segments: List[Dict[str, Any]], max_gap: float = 2.0) -> List[Dict[str, Any]]:
        """Merge consecutive segments from same speaker and fix diarization errors"""
        if not segments:
            return segments

        # First pass: Fix speaker diarization errors
        fixed_segments = []
        i = 0
        while i < len(segments):
            current = segments[i]

            if i + 1 < len(segments):
                next_seg = segments[i + 1]
                current_duration = current["end_time"] - current["start_time"]
                next_duration = next_seg["end_time"] - next_seg["start_time"]
                time_gap = next_seg["start_time"] - current["end_time"]

                # Detect suspicious short segments alternating speakers
                if (current_duration < 3.0 and next_duration < 3.0 and
                    time_gap <= 0.5 and current["speaker"] != next_seg["speaker"]):

                    cluster = [current]
                    j = i + 1
                    while j < len(segments):
                        seg = segments[j]
                        seg_duration = seg["end_time"] - seg["start_time"]
                        gap = seg["start_time"] - cluster[-1]["end_time"]

                        if seg_duration < 3.0 and gap <= 0.5:
                            cluster.append(seg)
                            j += 1
                        else:
                            break

                    # Merge cluster if 3+ segments
                    if len(cluster) >= 3:
                        if i > 0:
                            correct_speaker = segments[i - 1]["speaker"]
                        elif i + len(cluster) < len(segments):
                            next_speaker = segments[i + len(cluster)]["speaker"]
                            correct_speaker = "spk_0" if next_speaker == "spk_1" else "spk_1"
                        else:
                            correct_speaker = cluster[0]["speaker"]

                        merged_text = " ".join(seg["text"] for seg in cluster)
                        merged_segment = {
                            "speaker": correct_speaker,
                            "text": merged_text.strip(),
                            "start_time": cluster[0]["start_time"],
                            "end_time": cluster[-1]["end_time"]
                        }
                        fixed_segments.append(merged_segment)
                        i = j
                        continue

            fixed_segments.append(current)
            i += 1

        # Second pass: Merge same-speaker segments
        if not fixed_segments:
            return []

        merged = []
        current_segment = fixed_segments[0].copy()

        for segment in fixed_segments[1:]:
            if (segment["speaker"] == current_segment["speaker"] and
                segment["start_time"] - current_segment["end_time"] <= max_gap):
                current_segment["text"] += " " + segment["text"]
                current_segment["end_time"] = segment["end_time"]
            else:
                merged.append(current_segment)
                current_segment = segment.copy()

        merged.append(current_segment)
        return merged

    def analyze_sentiment_with_gemini(self, segments: List[Dict[str, Any]],
                                     full_transcript: str = "",
                                     pii_entities: List[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
        """Analyze sentiment for each segment using Gemini"""
        print(f"\nü§ñ Analyzing sentiment with Gemini for {len(segments)} segments...")
        analyzed_segments = []

        try:
            prompt = """Analyze this customer service conversation. For each segment, provide:
1. sentiment: "positive", "negative", or "neutral"
2. confidence: 0.0-1.0
3. tone_note: Brief description

Segments:\n"""

            for i, segment in enumerate(segments, 1):
                prompt += f"{i}. [{segment['speaker']}]: {segment['text']}\n"

            prompt += "\nReturn ONLY valid JSON array: [{\"sentiment\": \"neutral\", \"confidence\": 0.9, \"tone_note\": \"...\"}]"

            response = self.model.generate_content(prompt)
            response_text = response.text.strip()

            if "```json" in response_text:
                json_start = response_text.find("```json") + 7
                json_end = response_text.find("```", json_start)
                response_text = response_text[json_start:json_end].strip()

            sentiment_results = json.loads(response_text)

            for i, segment in enumerate(segments):
                speaker_label = segment["speaker"].replace("spk_", "Speaker ")
                if "0" in speaker_label:
                    speaker_label = "Agent"
                elif "1" in speaker_label:
                    speaker_label = "Customer"

                sentiment_data = sentiment_results[i] if i < len(sentiment_results) else {}

                analyzed_segments.append({
                    "order": i + 1,
                    "speaker": speaker_label,
                    "text": segment["text"],
                    "start_time": self.format_time(segment["start_time"]),
                    "end_time": self.format_time(segment["end_time"]),
                    "sentiment": sentiment_data.get("sentiment", "neutral"),
                    "confidence": sentiment_data.get("confidence", 0.9),
                    "tone_note": sentiment_data.get("tone_note", "Neutral tone")
                })

            print(f"‚úÖ Sentiment analysis completed")
        except Exception as e:
            print(f"‚ùå Error with Gemini: {e}")
            # Fallback with neutral sentiment
            for i, segment in enumerate(segments):
                speaker_label = "Agent" if "0" in segment["speaker"] else "Customer"
                analyzed_segments.append({
                    "order": i + 1,
                    "speaker": speaker_label,
                    "text": segment["text"],
                    "start_time": self.format_time(segment["start_time"]),
                    "end_time": self.format_time(segment["end_time"]),
                    "sentiment": "neutral",
                    "confidence": 0.9,
                    "tone_note": "Neutral tone"
                })

        return analyzed_segments

print("‚úÖ GeminiSentimentAnalyzer class defined")

## 6. S3 Manager Class

In [None]:
class S3Manager:
    """Handles S3 operations for storing redacted audio"""

    def __init__(self, access_key: str, secret_key: str, bucket_name: str, region: str = "us-east-1"):
        self.bucket_name = bucket_name
        self.region = region
        self.s3_client = boto3.client(
            "s3",
            aws_access_key_id=access_key,
            aws_secret_access_key=secret_key,
            region_name=region,
        )

    def upload_redacted_audio(self, audio_bytes: bytes, call_id: str, audio_format: str = "wav") -> Optional[str]:
        """Upload redacted audio to S3"""
        print(f"\n‚òÅÔ∏è  Uploading redacted audio to S3...")
        try:
            s3_key = f"{ENVIRONMENT.lower()}/transcriptions/{call_id}/redacted_audio.{audio_format}"
            content_type = "audio/wav" if audio_format == "wav" else "audio/mpeg"

            self.s3_client.put_object(
                Bucket=self.bucket_name,
                Key=s3_key,
                Body=audio_bytes,
                ContentType=content_type,
            )

            s3_url = f"https://{self.bucket_name}.s3.{self.region}.amazonaws.com/{s3_key}"
            print(f"‚úÖ Uploaded successfully ({len(audio_bytes) / (1024*1024):.2f} MB)")
            return s3_url
        except Exception as e:
            print(f"‚ùå Error uploading to S3: {e}")
            return None

print("‚úÖ S3Manager class defined")

## 7. Main Processing Pipeline

Set your audio file path and call ID here:

### Option A: Upload Audio File (Interactive Widget)

Use the file upload widget below to select an audio file from your computer:

### Available Audio Files in Current Directory

In [None]:
# List audio files in current directory
import glob

audio_extensions = ['*.mp3', '*.wav', '*.m4a', '*.flac', '*.ogg', '*.aac']
audio_files = []

for ext in audio_extensions:
    audio_files.extend(glob.glob(ext))

if audio_files:
    print(f"üìÇ Found {len(audio_files)} audio file(s) in current directory:")
    for i, file in enumerate(audio_files, 1):
        size = os.path.getsize(file) / (1024 * 1024)
        print(f"   {i}. {file} ({size:.2f} MB)")
else:
    print("‚ÑπÔ∏è  No audio files found in current directory")
    print("   You can upload a file using the widget below or specify a path manually")

In [None]:
from ipywidgets import FileUpload, Button, Output, VBox, HBox, Label
from IPython.display import display, clear_output
import ipywidgets as widgets

# Create file upload widget
uploader = FileUpload(
    accept='.mp3,.wav,.m4a,.flac',  # Accept common audio formats
    multiple=False,
    description='Choose Audio File'
)

# Output area for messages
upload_output = Output()

# Variable to store uploaded file info
uploaded_audio_info = {'filename': None, 'content': None}

def on_upload_change(change):
    """Handle file upload"""
    with upload_output:
        clear_output()
        if uploader.value:
            # Get the uploaded file
            uploaded_file = list(uploader.value.values())[0]
            filename = uploaded_file['metadata']['name']
            content = uploaded_file['content']

            # Save to disk
            with open(filename, 'wb') as f:
                f.write(content)

            uploaded_audio_info['filename'] = filename
            uploaded_audio_info['content'] = content

            print(f"‚úÖ File uploaded: {filename}")
            print(f"   Size: {len(content) / (1024*1024):.2f} MB")
            print(f"   Saved to: {os.path.abspath(filename)}")

uploader.observe(on_upload_change, names='value')

# Display the widget
display(VBox([
    Label('Upload an audio file (MP3, WAV, M4A, FLAC):'),
    uploader,
    upload_output
]))

### Option B: Specify Audio File Path

Or manually specify the path to an audio file on your system:

In [None]:
# Option 1: Use uploaded file from widget above
if uploaded_audio_info['filename']:
    AUDIO_FILE = uploaded_audio_info['filename']
    print(f"‚úÖ Using uploaded file: {AUDIO_FILE}")
else:
    # Option 2: Manually specify path
    AUDIO_FILE = "10min.mp3"  # Change this to your audio file path
    print(f"üìÅ Using specified file: {AUDIO_FILE}")

# Verify file exists
if os.path.exists(AUDIO_FILE):
    file_size = os.path.getsize(AUDIO_FILE) / (1024 * 1024)
    print(f"   File size: {file_size:.2f} MB")
    print(f"   Full path: {os.path.abspath(AUDIO_FILE)}")
else:
    print(f"‚ùå File not found: {AUDIO_FILE}")
    print(f"   Please upload a file using the widget above or check the file path")

### Generate Call ID

In [None]:
# Generate unique call ID with timestamp
CALL_ID = f"test_{datetime.now().strftime('%Y%m%d_%H%M%S')}"

print(f"üìã Call ID: {CALL_ID}")
print(f"üéôÔ∏è  Ready to process: {AUDIO_FILE}")

### Step 1: Initialize Services

In [None]:
# Initialize all services
transcriber = AWSTranscriber()
pii_redactor = PIIRedactor()
gemini_analyzer = GeminiSentimentAnalyzer(api_key=GEMINI_API_KEY)
s3_manager = S3Manager(
    access_key=AWS_ACCESS_KEY_ID,
    secret_key=AWS_SECRET_ACCESS_KEY,
    bucket_name=AWS_S3_BUCKET_NAME,
    region=AWS_S3_REGION,
)

print("‚úÖ All services initialized")

### Step 2: Upload Audio to S3

In [None]:
s3_uri = transcriber.upload_audio_to_s3(AUDIO_FILE, CALL_ID)
if not s3_uri:
    raise Exception("Failed to upload audio to S3")

print(f"S3 URI: {s3_uri}")

### Step 3: Submit Transcription Job

In [None]:
job_name = transcriber.submit_transcription(s3_uri, CALL_ID)
if not job_name:
    raise Exception("Failed to submit transcription")

print(f"Job Name: {job_name}")

### Step 4: Wait for Transcription

In [None]:
job = transcriber.wait_for_completion(job_name)
if not job:
    raise Exception("Transcription failed or timed out")

### Step 5: Download Transcript

In [None]:
transcript_content = transcriber.get_transcript_content(job)
if not transcript_content:
    raise Exception("Failed to download transcript")

# Extract text
results = transcript_content.get("results", {})
transcriptions = results.get("transcripts", [])
original_text = transcriptions[0].get("transcript", "") if transcriptions else ""

print(f"\nOriginal Transcript ({len(original_text)} chars):")
print(original_text[:500] + "...")

### Step 6: Detect and Redact PII

In [None]:
# Detect PII
pii_entities = pii_redactor.detect_pii_entities(original_text)

# Redact text
redacted_text = pii_redactor.redact_text(original_text, pii_entities)

print(f"\nRedacted Transcript:")
print(redacted_text[:500] + "...")

### Step 7: Speaker Segmentation & Sentiment Analysis

In [None]:
# Extract speaker segments
speaker_segments = gemini_analyzer.extract_speaker_segments(transcript_content)
print(f"Original segments: {len(speaker_segments)}")

# Merge consecutive segments
merged_segments = gemini_analyzer.merge_consecutive_segments(speaker_segments, max_gap=2.0)
print(f"Merged segments: {len(merged_segments)}")

# Analyze sentiment
sentiment_analysis_results = gemini_analyzer.analyze_sentiment_with_gemini(
    merged_segments,
    full_transcript=original_text,
    pii_entities=pii_entities
)

# Save results
sentiment_file = f"sentiment_analysis_{CALL_ID}.json"
with open(sentiment_file, "w") as f:
    json.dump(sentiment_analysis_results, f, indent=2)

print(f"\n‚úÖ Sentiment analysis saved to: {sentiment_file}")

### Step 8: Preview Sentiment Results

In [None]:
print(f"\nüìä Sentiment Analysis Preview (first 5 segments):\n")
for segment in sentiment_analysis_results[:5]:
    print(f"Order {segment['order']}: {segment['speaker']}")
    print(f"  Text: {segment['text'][:80]}...")
    print(f"  Time: {segment['start_time']} - {segment['end_time']}")
    print(f"  Sentiment: {segment['sentiment']} (confidence: {segment['confidence']:.2f})")
    print(f"  Tone: {segment['tone_note']}")
    print()

### Step 9: Redact Audio

In [None]:
redacted_audio_bytes = pii_redactor.redact_audio(
    AUDIO_FILE,
    original_text,
    pii_entities,
    transcript_content=transcript_content,
    redaction_mode="tone"
)

### Step 10: Upload Redacted Audio to S3

In [None]:
redacted_audio_s3_url = None
if redacted_audio_bytes:
    redacted_audio_s3_url = s3_manager.upload_redacted_audio(
        redacted_audio_bytes,
        CALL_ID,
        audio_format="wav"
    )
    print(f"Redacted audio URL: {redacted_audio_s3_url}")
else:
    print("No redacted audio to upload")

### Step 11: Cleanup & Final Summary

In [None]:
# Delete transcription job
transcriber.delete_transcription_job(job_name)

# Print final summary
print(f"\n{'='*80}")
print(f"‚úÖ PROCESSING COMPLETED")
print(f"{'='*80}")
print(f"\nüìä Results Summary:")
print(f"   Call ID: {CALL_ID}")
print(f"   Original transcript: {len(original_text)} characters")
print(f"   Redacted transcript: {len(redacted_text)} characters")
print(f"   PII entities found: {len(pii_entities)}")
print(f"   Speaker segments: {len(sentiment_analysis_results)}")
print(f"   Sentiment file: {sentiment_file}")
print(f"   Original audio: {s3_uri}")
if redacted_audio_s3_url:
    print(f"   Redacted audio: {redacted_audio_s3_url}")
print(f"\n{'='*80}")