<a href="https://colab.research.google.com/github/sbindal2017-a11y/VoiceAgent/blob/main/Voice_agent.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install -q streamlit google-generativeai faiss-cpu PyMuPDF pyngrok pandas

# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Create folder structure in Google Drive
!mkdir -p /content/drive/MyDrive/InsuranceRAG/pdfs
!mkdir -p /content/drive/MyDrive/InsuranceRAG/indices

print("‚úÖ Setup complete!")
print("\nüìÅ Upload your insurance PDFs to:")
print("   /content/drive/MyDrive/InsuranceRAG/pdfs/")
print("\nüîë Now set your API keys below:")

In [None]:
import os

# Set your API keys here
os.environ['GEMINI_API_KEY'] = 'KEY'
os.environ['NGROK_TOKEN'] = 'KEY'


In [None]:
print("üì¶ Installing voice agent packages...")
print("This may take 2-3 minutes...")

!pip install -q livekit livekit-agents livekit-plugins-deepgram livekit-plugins-cartesia
!pip install -q deepgram-sdk cartesia aiohttp python-dotenv

print("\n‚úÖ All packages installed!")
print("\nInstalled:")
print("  ‚úì LiveKit (WebRTC server)")
print("  ‚úì LiveKit Agents (Voice agent SDK)")
print("  ‚úì Deepgram (Speech-to-Text)")
print("  ‚úì Cartesia (Text-to-Speech)")

In [None]:
import os

# Voice Agent API Keys
print("üîë Setting up Voice Agent credentials...")

# LiveKit (get from: https://cloud.livekit.io/)
os.environ['LIVEKIT_URL'] = 'KEY'
os.environ['LIVEKIT_API_KEY'] = 'KEY'
os.environ['LIVEKIT_API_SECRET'] = 'KEY'

# Deepgram (get from: https://console.deepgram.com/)
os.environ['DEEPGRAM_API_KEY'] = 'KEY'

# Cartesia (get from: https://cartesia.ai/)
os.environ['CARTESIA_API_KEY'] = 'KEY'

print("‚úÖ API keys configured!")
print("\nüìã Services configured:")
print("  ‚úì LiveKit: Real-time audio infrastructure")
print("  ‚úì Deepgram: Speech recognition")
print("  ‚úì Cartesia: Voice synthesis")

In [None]:
%%writefile /content/test_deepgram.py
"""Deepgram test - Simple synchronous version"""
import os
import requests

def test_deepgram():
    """Test Deepgram with their REST API directly"""

    api_key = os.getenv("DEEPGRAM_API_KEY")

    if not api_key:
        print("‚ùå DEEPGRAM_API_KEY not set!")
        return

    print("üé§ Testing Deepgram STT (REST API)...")

    # Sample audio URL
    audio_url = "https://dpgr.am/spacewalk.wav"

    # Deepgram API endpoint
    url = "https://api.deepgram.com/v1/listen"

    headers = {
        "Authorization": f"Token {api_key}",
        "Content-Type": "application/json"
    }

    payload = {
        "url": audio_url
    }

    params = {
        "model": "nova-2",
        "smart_format": "true"
    }

    try:
        print(f"   Sending request to Deepgram...\n")
        response = requests.post(url, headers=headers, json=payload, params=params)

        if response.status_code == 200:
            result = response.json()
            transcript = result["results"]["channels"][0]["alternatives"][0]["transcript"]
            confidence = result["results"]["channels"][0]["alternatives"][0]["confidence"]

            print("‚úÖ Transcription successful!")
            print(f"üìù Transcript: {transcript}")
            print(f"üéØ Confidence: {confidence:.1%}")
            print(f"\n‚úÖ Deepgram is working!")

        else:
            print(f"‚ùå Error: {response.status_code}")
            print(f"   {response.text}")

    except Exception as e:
        print(f"‚ùå Error: {e}")

if __name__ == "__main__":
    test_deepgram()

In [None]:
# Run test
!python /content/test_deepgram.py

In [None]:
%%writefile /content/test_cartesia.py
"""Quick test of Cartesia Text-to-Speech - Updated API"""
import asyncio
import os
import base64
from cartesia import Cartesia

async def test():
    """Test Cartesia with current API"""

    api_key = os.getenv("CARTESIA_API_KEY")

    if not api_key:
        print("‚ùå CARTESIA_API_KEY not set!")
        return

    print("üîä Testing Cartesia TTS...\n")

    # SARA's introduction text
    text = "Hello! I'm SARA, your insurance assistant. How can I help you today?"
    print(f"   Text: {text}\n")

    try:
        # Initialize client (synchronous for simplicity)
        client = Cartesia(api_key=api_key)

        # Voice settings
        voice_id = "a0e99841-438c-4a64-b679-ae501e7d6091"  # Friendly female
        model_id = "sonic-english"

        print("   Generating speech...\n")

        # Generate audio (updated API)
        output = client.tts.bytes(
            model_id=model_id,
            transcript=text,
            voice_embedding=voice_id,  # Changed from voice_id to voice_embedding
            output_format={
                "container": "wav",
                "encoding": "pcm_f32le",
                "sample_rate": 24000,
            },
        )

        # Save audio
        audio_bytes = output["audio"]  # Extract audio bytes

        with open("/content/sara_test.wav", "wb") as f:
            f.write(audio_bytes)

        file_size = len(audio_bytes)
        duration = file_size / (24000 * 4)  # 24kHz, 32-bit float

        print(f"‚úÖ Audio generated successfully!")
        print(f"üìä Size: {file_size:,} bytes")
        print(f"‚è±Ô∏è  Duration: ~{duration:.1f} seconds")
        print(f"üíæ Saved to: /content/sara_test.wav")
        print(f"\n‚úÖ Cartesia TTS working!")

    except Exception as e:
        print(f"‚ùå Error: {e}")
        print(f"   Error type: {type(e).__name__}")

        # Try REST API fallback
        print("\nüîÑ Trying REST API fallback...")
        try_rest_api(api_key, text)

def try_rest_api(api_key, text):
    """Fallback: Use Cartesia REST API directly"""
    import requests

    try:
        url = "https://api.cartesia.ai/tts/bytes"

        headers = {
            "X-API-Key": api_key,
            "Cartesia-Version": "2024-06-10",
            "Content-Type": "application/json"
        }

        payload = {
            "model_id": "sonic-english",
            "transcript": text,
            "voice": {
                "mode": "id",
                "id": "a0e99841-438c-4a64-b679-ae501e7d6091"
            },
            "output_format": {
                "container": "wav",
                "encoding": "pcm_f32le",
                "sample_rate": 24000
            }
        }

        print("   Calling Cartesia REST API...")
        response = requests.post(url, headers=headers, json=payload)

        if response.status_code == 200:
            with open("/content/sara_test.wav", "wb") as f:
                f.write(response.content)

            print(f"‚úÖ REST API worked!")
            print(f"üìä Size: {len(response.content):,} bytes")
            print(f"üíæ Saved to: /content/sara_test.wav")
        else:
            print(f"‚ùå REST API failed: {response.status_code}")
            print(f"   {response.text}")

    except Exception as e2:
        print(f"‚ùå REST API also failed: {e2}")

if __name__ == "__main__":
    asyncio.run(test())

In [None]:
# Run test
!python /content/test_cartesia.py

# Verify and play
import os
from IPython.display import Audio

if os.path.exists("/content/sara_test.wav"):
    file_size = os.path.getsize("/content/sara_test.wav")
    print(f"‚úÖ Audio file created: {file_size:,} bytes")
    print("üîä Playing audio...\n")
    display(Audio("/content/sara_test.wav", rate=24000))
else:
    print("‚ùå Audio file not found. Check test_cartesia.py output above.")

In [None]:
%%writefile /content/rag_wrapper.py
"""
MOCK RAG - Indian Health Insurance Q&A
========================================
Sample data for testing voice pipeline
Replace later with real RAG
"""

import google.generativeai as genai
import os

# Configure Gemini
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))

# ============================================================
# MOCK INDIAN HEALTH INSURANCE DATA
# ============================================================
MOCK_POLICY_DATA = """
HEALTH INSURANCE POLICY INFORMATION
Policy Number: HLTH/DEL/2024/123456
Insured: Rajesh Kumar
Policy Type: Individual Health Insurance

COVERAGE DETAILS:

Sum Insured:
- Base coverage: ‚Çπ10,00,000 (10 Lakhs) per year (Page 3, Paragraph 1)
- Cumulative bonus: Additional ‚Çπ50,000 per claim-free year, up to ‚Çπ50,00,000 (Page 3, Paragraph 2)

Room Rent:
- Private AC room covered up to ‚Çπ5,000 per day (Page 4, Paragraph 1)
- ICU charges: Up to ‚Çπ8,000 per day (Page 4, Paragraph 2)

Pre and Post Hospitalization:
- Pre-hospitalization: 60 days before admission (Page 5, Paragraph 1)
- Post-hospitalization: 90 days after discharge (Page 5, Paragraph 2)

Cashless Network:
- 8,500+ network hospitals across India (Page 2, Paragraph 3)
- Cashless facility available at all network hospitals (Page 6, Paragraph 1)

WAITING PERIODS:

Initial Waiting Period:
- 30 days from policy start date (Page 7, Paragraph 1)
- Except for accidents - covered from day 1 (Page 7, Paragraph 2)

Pre-existing Diseases:
- Covered after 48 months (4 years) (Page 7, Paragraph 3)
- Examples: Diabetes, hypertension, asthma (Page 7, Paragraph 4)

Specific Diseases:
- 24 months waiting for: Hernia, cataract, kidney stones, joint replacement (Page 8, Paragraph 1)

EXCLUSIONS (Not Covered):

Permanent Exclusions:
- Cosmetic/aesthetic surgery (Page 9, Paragraph 1)
- Dental treatment (unless due to accident) (Page 9, Paragraph 2)
- HIV/AIDS related treatment (Page 9, Paragraph 3)
- Self-inflicted injuries (Page 9, Paragraph 4)
- War or nuclear contamination (Page 9, Paragraph 5)

Temporary Exclusions:
- Maternity expenses (covered after 9 months) (Page 10, Paragraph 1)
- Newborn baby coverage (from day 1 if born during policy period) (Page 10, Paragraph 2)

PREMIUM DETAILS:

Annual Premium: ‚Çπ12,500 (Page 11, Paragraph 1)
Payment Options:
- Annual: ‚Çπ12,500 (no extra charge)
- Half-yearly: ‚Çπ6,500 x 2 (‚Çπ13,000 total)
- Quarterly: ‚Çπ3,300 x 4 (‚Çπ13,200 total)
- Monthly: ‚Çπ1,150 x 12 (‚Çπ13,800 total)

Tax Benefits:
- Eligible for deduction under Section 80D up to ‚Çπ25,000 (Page 11, Paragraph 3)
- For senior citizens: up to ‚Çπ50,000 (Page 11, Paragraph 4)

CLAIM PROCESS:

Cashless Claims:
1. Inform TPA (Third Party Administrator) at least 48 hours before planned hospitalization (Page 12, Paragraph 1)
2. Fill pre-authorization form at hospital (Page 12, Paragraph 2)
3. TPA approves within 2-4 hours for emergency, 24 hours for planned (Page 12, Paragraph 3)
4. Hospital directly settles with insurance company (Page 12, Paragraph 4)

Reimbursement Claims:
1. Pay hospital bills upfront (Page 13, Paragraph 1)
2. Submit claim within 15 days of discharge (Page 13, Paragraph 2)
3. Required documents: Discharge summary, bills, prescriptions, diagnostic reports (Page 13, Paragraph 3)
4. Claim processed within 30 days (Page 13, Paragraph 4)

Helpline: 1800-123-4567 (Toll-free, 24x7) (Page 14, Paragraph 1)
Email: claims@healthinsurance.co.in (Page 14, Paragraph 2)

RENEWALS:

Grace Period: 30 days after policy expiry (Page 15, Paragraph 1)
Lifetime Renewability: Policy can be renewed for lifetime (Page 15, Paragraph 2)
Portability: Can port to another insurer after 1 year (Page 15, Paragraph 3)

SPECIAL FEATURES:

Daycare Procedures: 150+ procedures covered (Page 16, Paragraph 1)
Ambulance Charges: Up to ‚Çπ2,000 per hospitalization (Page 16, Paragraph 2)
Health Check-up: Free annual health check-up after 4 continuous claim-free years (Page 16, Paragraph 3)
AYUSH Treatment: Covered up to ‚Çπ50,000 per year (Ayurveda, Yoga, Unani, Siddha, Homeopathy) (Page 16, Paragraph 4)
"""


# ============================================================
# MOCK RAG CLASS
# ============================================================
class InsuranceRAG:
    """
    Mock RAG for Indian Health Insurance
    Uses Gemini with sample policy data
    """

    def __init__(self, index_path=None):
        """Initialize mock RAG"""
        print("üîÑ Initializing MOCK RAG (Indian Health Insurance)...")
        self.model = genai.GenerativeModel('gemini-1.5-flash-latest')
        self.policy_data = MOCK_POLICY_DATA
        self.sessions = {}

        # Mock stats
        self.index = type('obj', (object,), {'ntotal': 75})()

        print("‚úÖ Mock RAG ready (75 mock chunks)")
        print("üáÆüá≥ Using Indian Health Insurance sample data")
        print("‚ö†Ô∏è  Replace with real RAG later")

    def query(self, text: str, session_id: str = None):
        """
        Answer question using Gemini + mock Indian health insurance data
        """

        # Get conversation context
        context = self._get_context(session_id) if session_id else ""

        # Build prompt
        prompt = f"""You are SARA, a helpful health insurance assistant for Indian customers. Answer questions using the policy information provided.

CRITICAL RULES:
1. Keep answer under 500 characters (for voice)
2. Use Indian terminology: Lakhs instead of hundred thousands, Crores instead of millions
3. Use Rupee symbol (‚Çπ) for amounts
4. Mention page numbers naturally: "According to page 5, paragraph 3..."
5. Be warm, friendly, and conversational in Hindi-English style if natural
6. If asked about something not in policy, politely say you don't have that information

POLICY INFORMATION:
{self.policy_data}
"""

        if context:
            prompt += f"\nPREVIOUS CONVERSATION:\n{context}\n"

        prompt += f"\nCUSTOMER QUESTION: {text}\n\nYour helpful answer (under 500 characters, use Indian terminology):"

        # Generate answer
        try:
            response = self.model.generate_content(
                prompt,
                generation_config=genai.types.GenerationConfig(
                    temperature=0.3,
                    max_output_tokens=150,
                )
            )

            answer = response.text

            # Mock citations
            citations = [
                {
                    "id": 1,
                    "page": 5,
                    "section": "Coverage Details",
                    "text": "Sample health insurance policy text"
                }
            ]

            confidence = "High"

        except Exception as e:
            answer = f"I apologize, I'm having trouble accessing your policy details right now. Please try again. Error: {str(e)[:50]}"
            citations = []
            confidence = "Low"

        # Store in session
        if session_id:
            self._update_session(session_id, text, answer, confidence)

        return {
            "answer": answer,
            "citations": citations,
            "confidence": confidence,
            "num_citations_used": len(citations)
        }

    def _get_context(self, session_id):
        """Get recent conversation history"""
        if session_id not in self.sessions:
            return ""

        history = self.sessions[session_id]
        recent = history[-4:] if len(history) > 4 else history

        context_parts = []
        for item in recent:
            context_parts.append(f"Customer: {item['query']}")
            short_ans = item['answer'][:100]
            if len(item['answer']) > 100:
                short_ans += "..."
            context_parts.append(f"SARA: {short_ans}")

        return "\n".join(context_parts)

    def _update_session(self, session_id, query, answer, confidence):
        """Store conversation turn"""
        if session_id not in self.sessions:
            self.sessions[session_id] = []

        self.sessions[session_id].append({
            "query": query,
            "answer": answer,
            "confidence": confidence
        })

        if len(self.sessions[session_id]) > 10:
            self.sessions[session_id] = self.sessions[session_id][-10:]

    def get_stats(self):
        """Get system stats"""
        return {
            "chunks_indexed": 75,
            "active_sessions": len(self.sessions)
        }


# ============================================================
# TEST THE MOCK RAG
# ============================================================
if __name__ == "__main__":
    print("\n" + "="*70)
    print("TESTING INDIAN HEALTH INSURANCE MOCK RAG")
    print("="*70)

    rag = InsuranceRAG()

    # Test questions in Indian context
    test_questions = [
        "What is my sum insured?",
        "What is the waiting period for pre-existing diseases?",
        "Is dental treatment covered?",
        "How do I make a cashless claim?",
        "What about maternity coverage?",
        "Do you cover AYUSH treatment?"
    ]

    session_id = "test_session_india"

    for i, question in enumerate(test_questions, 1):
        print(f"\n{'‚îÄ'*70}")
        print(f"Q{i}: {question}")
        print(f"{'‚îÄ'*70}")

        result = rag.query(question, session_id=session_id)

        print(f"SARA: {result['answer']}")
        print(f"\nüìä Confidence: {result['confidence']}")
        print(f"üìÑ Citations: {result['num_citations_used']}")

    print("\n" + "="*70)
    print("‚úÖ INDIAN HEALTH INSURANCE MOCK RAG WORKING!")
    print("="*70)

In [None]:
%%writefile /content/sara_direct.py
"""
SARA Direct Connection - Fixed LiveKit API
"""

import asyncio
import os
import sys
import logging
import requests
from datetime import timedelta

from livekit import rtc
from livekit.api import AccessToken, VideoGrants

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("sara")

# Import RAG
sys.path.append("/content")
from rag_wrapper import InsuranceRAG

# Configuration
ROOM_NAME = "playground-oLGR-lQ1r"
DEEPGRAM_API_KEY = os.getenv("DEEPGRAM_API_KEY")
CARTESIA_API_KEY = os.getenv("CARTESIA_API_KEY")


async def create_token(room_name: str, identity: str) -> str:
    """Create LiveKit access token - FIXED VERSION"""

    api_key = os.getenv('LIVEKIT_API_KEY')
    api_secret = os.getenv('LIVEKIT_API_SECRET')

    # Create token using AccessToken (correct API)
    token = AccessToken(api_key, api_secret)
    token.with_identity(identity)
    token.with_name("SARA Assistant")
    token.with_grants(VideoGrants(
        room_join=True,
        room=room_name,
    ))

    return token.to_jwt()


async def main():
    """Main SARA logic"""

    logger.info("\n" + "="*70)
    logger.info("üöÄ STARTING SARA - Insurance Voice Assistant")
    logger.info("="*70)

    # Step 1: Initialize RAG
    logger.info("\nüìö Step 1: Loading RAG system...")
    try:
        rag = InsuranceRAG()
        logger.info("   ‚úÖ RAG loaded with Indian health insurance data")
        logger.info(f"   üìä Chunks available: {rag.index.ntotal}")
    except Exception as e:
        logger.error(f"   ‚ùå RAG failed to load: {e}")
        return

    # Step 2: Generate LiveKit token (FIXED)
    logger.info("\nüîë Step 2: Generating LiveKit token...")
    try:
        token = await create_token(ROOM_NAME, "SARA")
        logger.info("   ‚úÖ Token generated successfully")
    except Exception as e:
        logger.error(f"   ‚ùå Token generation failed: {e}")
        import traceback
        traceback.print_exc()
        return

    # Step 3: Connect to LiveKit room
    logger.info(f"\nüîå Step 3: Connecting to room: {ROOM_NAME}")

    room = rtc.Room()

    # Event handlers
    @room.on("participant_connected")
    def on_participant_connected(participant: rtc.RemoteParticipant):
        logger.info(f"   üë§ User joined: {participant.identity}")

    @room.on("participant_disconnected")
    def on_participant_disconnected(participant: rtc.RemoteParticipant):
        logger.info(f"   üëã User left: {participant.identity}")

    @room.on("track_subscribed")
    def on_track_subscribed(
        track: rtc.Track,
        publication: rtc.RemoteTrackPublication,
        participant: rtc.RemoteParticipant,
    ):
        if track.kind == rtc.TrackKind.KIND_AUDIO:
            logger.info(f"   üé§ Receiving audio from: {participant.identity}")

    try:
        livekit_url = os.getenv('LIVEKIT_URL')
        await room.connect(livekit_url, token)
        logger.info("   ‚úÖ Connected successfully!")
    except Exception as e:
        logger.error(f"   ‚ùå Connection failed: {e}")
        import traceback
        traceback.print_exc()
        return

    # Step 4: Success!
    logger.info("\n" + "="*70)
    logger.info("‚úÖ‚úÖ‚úÖ SARA IS LIVE AND READY ‚úÖ‚úÖ‚úÖ")
    logger.info("="*70)

    logger.info(f"\nüìç Room Details:")
    logger.info(f"   Room Name: {ROOM_NAME}")
    logger.info(f"   SARA Identity: {room.local_participant.identity}")
    logger.info(f"   Current Participants: {len(room.remote_participants)}")

    logger.info(f"\nüéØ HOW TO CONNECT:")
    logger.info(f"   1. Open: https://cloud.livekit.io/")
    logger.info(f"   2. Go to 'Playground'")
    logger.info(f"   3. Room Name: {ROOM_NAME}")
    logger.info(f"   4. Click 'Join Room'")
    logger.info(f"   5. You should see 'SARA' as a participant!")

    logger.info(f"\nüí° Current Status:")
    logger.info(f"   - SARA connected and listening")
    logger.info(f"   - RAG ready to answer questions")
    logger.info(f"   - Waiting for users to join")

    logger.info("\n" + "="*70)
    logger.info("‚è≥ Keeping SARA alive... (Ctrl+C to stop)")
    logger.info("="*70 + "\n")

    # Test RAG
    logger.info("üìã Testing RAG with sample query...")
    test_result = rag.query("What is my sum insured?", session_id="test")
    logger.info(f"   Sample Answer: {test_result['answer'][:80]}...")
    logger.info("   ‚úÖ RAG working!\n")

    # Keep connection alive
    try:
        while True:
            await asyncio.sleep(1)
    except KeyboardInterrupt:
        logger.info("\n\nüëã Shutting down SARA...")
    finally:
        await room.disconnect()
        logger.info("‚úÖ SARA disconnected cleanly")


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\n\nüëã SARA stopped")
    except Exception as e:
        print(f"\n‚ùå Fatal error: {e}")
        import traceback
        traceback.print_exc()

In [None]:
!python /content/sara_direct.py

In [None]:
# Test: Question in, Audio answer out
import sys
sys.path.append("/content")

from rag_wrapper import InsuranceRAG
import requests
import os

print("="*60)
print("TESTING FULL VOICE PIPELINE")
print("="*60)

# 1. Initialize RAG
print("\n1Ô∏è‚É£ Loading RAG...")
rag = InsuranceRAG()
print("   ‚úÖ RAG ready")

# 2. Ask question
question = "What is my sum insured?"
print(f"\n2Ô∏è‚É£ Question: {question}")

result = rag.query(question, session_id="test")
answer = result["answer"]
print(f"   Answer: {answer}")

# 3. Convert to voice
print("\n3Ô∏è‚É£ Converting answer to voice...")

url = "https://api.cartesia.ai/tts/bytes"
headers = {
    "X-API-Key": os.getenv("CARTESIA_API_KEY"),
    "Cartesia-Version": "2024-06-10",
    "Content-Type": "application/json"
}

payload = {
    "model_id": "sonic-english",
    "transcript": answer,
    "voice": {
        "mode": "id",
        "id": "a0e99841-438c-4a64-b679-ae501e7d6091"
    },
    "output_format": {
        "container": "wav",
        "encoding": "pcm_f32le",
        "sample_rate": 24000
    }
}

response = requests.post(url, headers=headers, json=payload, timeout=30)

if response.status_code == 200:
    # Save audio
    audio_file = "/content/sara_answer.wav"
    with open(audio_file, "wb") as f:
        f.write(response.content)

    print(f"   ‚úÖ Audio generated: {len(response.content):,} bytes")
    print(f"   üíæ Saved to: {audio_file}")

    # Play audio
    from IPython.display import Audio
    print("\n4Ô∏è‚É£ Playing SARA's answer:")
    display(Audio(audio_file, rate=24000))

    print("\n" + "="*60)
    print("‚úÖ FULL PIPELINE WORKS!")
    print("="*60)
else:
    print(f"   ‚ùå TTS failed: {response.status_code}")