# Solvathon Layer-1 End-to-End Colab Notebook

This notebook builds and demonstrates the complete voice-agent flow in one place:

1. Install dependencies.
2. Build each component (LID, STT, Agent Router, TTS).
3. Run component test cases inside the notebook.
4. Run integrated single-turn pipeline tests.
5. Launch a press-and-hold website.
6. Expose it publicly with ngrok for two-way voice interaction.

Target behavior implemented:
- Press and hold record button -> recording starts.
- Release button -> recorded chunk uploads to server.
- Server returns synthesized voice reply -> browser auto-plays.
- Purpose selector: `Hospital Kiosk`, `College Admission`, `Laptop Customer Support`.
- Language selector before speaking.
- TTS policy: Piper for `en/hi/te/ta` (Tamil with your trained IITM model), Edge TTS for `kn`.


## 1) Install Dependencies

In [None]:
%%capture
!apt-get -qq update
!apt-get -qq install -y ffmpeg
!pip -q install flask flask-cors pyngrok numpy librosa soundfile scipy requests
!pip -q install torch transformers sentencepiece accelerate
!pip -q install piper-tts edge-tts av nest_asyncio

In [None]:
import torch
print('Torch version:', torch.__version__)
print('CUDA available:', torch.cuda.is_available())
if torch.cuda.is_available():
    print('GPU:', torch.cuda.get_device_name(0))

## 2) Global Setup and Config

In [None]:
import os
import io
import av
import json
import time
import base64
import shutil
import asyncio
import random
import requests
import subprocess
import numpy as np
import librosa
import soundfile as sf
from pathlib import Path
from collections import defaultdict

import nest_asyncio
nest_asyncio.apply()

from IPython.display import Audio, display, HTML
from transformers import AutoFeatureExtractor, Wav2Vec2ForSequenceClassification
from transformers import WhisperProcessor, WhisperForConditionalGeneration

ROOT = Path('/content/solvathon_colab')
MODELS_DIR = ROOT / 'piper_models'
TEST_AUDIO_DIR = ROOT / 'test_audio'
OUTPUT_DIR = ROOT / 'outputs'
for p in [ROOT, MODELS_DIR, TEST_AUDIO_DIR, OUTPUT_DIR]:
    p.mkdir(parents=True, exist_ok=True)

DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
print('Working directory:', ROOT)
print('Device:', DEVICE)

LANGUAGE_CONFIG = {
    'en': {
        'name': 'English',
        'edge_voice': 'en-US-AriaNeural',
        'test_text': 'Hello, I need quick help from the voice assistant.'
    },
    'hi': {
        'name': 'Hindi',
        'edge_voice': 'hi-IN-SwaraNeural',
        'test_text': 'नमस्ते, मुझे वॉयस असिस्टेंट से मदद चाहिए।'
    },
    'ta': {
        'name': 'Tamil',
        'edge_voice': 'ta-IN-PallaviNeural',
        'test_text': 'வணக்கம், எனக்கு குரல் உதவி வேண்டும்.'
    },
    'te': {
        'name': 'Telugu',
        'edge_voice': 'te-IN-MohanNeural',
        'test_text': 'హలో, నాకు వాయిస్ సహాయం కావాలి.'
    },
    'kn': {
        'name': 'Kannada',
        'edge_voice': 'kn-IN-SapnaNeural',
        'test_text': 'ನಮಸ್ಕಾರ, ನನಗೆ ಧ್ವನಿ ಸಹಾಯ ಬೇಕು.'
    },
}

AGENT_TYPES = {
    'hospital_kiosk': {
        'label': 'Hospital Kiosk',
        'assumption': 'You are speaking to a hospital front-desk voice kiosk.'
    },
    'college_admission': {
        'label': 'College Admission',
        'assumption': 'You are speaking to a college admission help desk voice agent.'
    },
    'laptop_support': {
        'label': 'Laptop Customer Support',
        'assumption': 'You are speaking to a laptop technical support voice agent.'
    },
}

SESSION_MEMORY = defaultdict(list)
print('Configured languages:', ', '.join(LANGUAGE_CONFIG.keys()))
print('Configured agent types:', ', '.join(AGENT_TYPES.keys()))

## 3) Download/Prepare Piper Models (en/hi/te required, ta custom)

Tamil uses your trained model: `ta_IN-iitm-female-s1-medium.onnx`.
If this file is not already in Colab, upload it (and json) first.


In [None]:
PIPER_BASE = 'https://huggingface.co/rhasspy/piper-voices/resolve/main'


def download_file(url: str, target: Path, required: bool = True) -> bool:
    target.parent.mkdir(parents=True, exist_ok=True)
    r = requests.get(url, timeout=120)
    if r.status_code == 200:
        target.write_bytes(r.content)
        return True
    if required:
        raise RuntimeError(f'Failed to download required file: {url} (status={r.status_code})')
    return False


def download_piper_voice(lang: str, region: str, name: str, quality: str = 'medium', required: bool = True) -> bool:
    stem = f'{lang}_{region}-{name}-{quality}'
    onnx = f'{stem}.onnx'
    meta = f'{stem}.onnx.json'
    voice_path = f'{lang}/{lang}_{region}/{name}/{quality}'
    ok_onnx = download_file(f'{PIPER_BASE}/{voice_path}/{onnx}', MODELS_DIR / onnx, required=required)
    if ok_onnx:
        download_file(f'{PIPER_BASE}/{voice_path}/{meta}', MODELS_DIR / meta, required=False)
        print('Downloaded:', onnx)
        return True
    return False

# Required voices
_ = download_piper_voice('en', 'US', 'lessac', 'medium', required=True)
_ = download_piper_voice('hi', 'IN', 'rohan', 'medium', required=True)
_ = download_piper_voice('te', 'IN', 'maya', 'medium', required=True)

# Tamil custom model: preferred
custom_ta = Path('/content/ta_IN-iitm-female-s1-medium.onnx')
custom_ta_json = Path('/content/ta_IN-iitm-female-s1-medium.onnx.json')
if custom_ta.exists():
    shutil.copy2(custom_ta, MODELS_DIR / custom_ta.name)
    if custom_ta_json.exists():
        shutil.copy2(custom_ta_json, MODELS_DIR / custom_ta_json.name)
    print('Copied custom Tamil model into', MODELS_DIR)
else:
    print('Custom Tamil model not found in /content. Upload ta_IN-iitm-female-s1-medium.onnx for Piper Tamil.')

print('
Available Piper models:')
for f in sorted(MODELS_DIR.glob('*.onnx')):
    print('-', f.name)

print('Piper binary:', shutil.which('piper'))

## 4) Build Component: Language Identification (MMS-LID)

In [None]:
LID_MODEL_NAME = 'facebook/mms-lid-256'

lid_processor = AutoFeatureExtractor.from_pretrained(LID_MODEL_NAME)
lid_model = Wav2Vec2ForSequenceClassification.from_pretrained(LID_MODEL_NAME).to(DEVICE)
lid_model.eval()
lid_id2label = lid_model.config.id2label

LANG_MAP = {
    'eng': 'en',
    'hin': 'hi',
    'tam': 'ta',
    'tel': 'te',
    'kan': 'kn',
}
TARGET_CODES = set(LANG_MAP.keys())


def detect_language_from_array(audio: np.ndarray, sr: int = 16000):
    if sr != 16000:
        audio = librosa.resample(audio, orig_sr=sr, target_sr=16000)
    audio = audio.astype(np.float32)

    inputs = lid_processor(audio, sampling_rate=16000, return_tensors='pt')
    inputs = {k: v.to(DEVICE) for k, v in inputs.items()}

    with torch.no_grad():
        logits = lid_model(**inputs).logits
        probs = torch.softmax(logits, dim=-1)[0].detach().cpu().numpy()

    ranked = sorted(
        [(lid_id2label[i], float(probs[i])) for i in range(len(probs))],
        key=lambda x: x[1],
        reverse=True,
    )

    filtered = [(code, p) for code, p in ranked if code in TARGET_CODES]
    if not filtered:
        return 'en', 0.0, ranked[:5]

    best_code, best_prob = filtered[0]
    return LANG_MAP.get(best_code, 'en'), best_prob, filtered[:5]


def detect_language_from_file(path: Path):
    wav, sr = librosa.load(str(path), sr=16000, mono=True)
    return detect_language_from_array(wav, sr=16000)

print('LID loaded on', DEVICE)

### LID Test Cases (generated inside notebook)

This generates one short sample per language (using Edge voices), then runs MMS-LID.


In [None]:
import edge_tts

async def edge_save_wav(text: str, voice: str, out_wav: Path):
    out_wav.parent.mkdir(parents=True, exist_ok=True)
    tmp_mp3 = out_wav.with_suffix('.mp3')
    comm = edge_tts.Communicate(text=text, voice=voice)
    await comm.save(str(tmp_mp3))
    subprocess.run([
        'ffmpeg', '-y', '-loglevel', 'error', '-i', str(tmp_mp3), '-ac', '1', '-ar', '16000', str(out_wav)
    ], check=True)
    tmp_mp3.unlink(missing_ok=True)

for code, cfg in LANGUAGE_CONFIG.items():
    out = TEST_AUDIO_DIR / f'lid_{code}.wav'
    try:
        asyncio.get_event_loop().run_until_complete(edge_save_wav(cfg['test_text'], cfg['edge_voice'], out))
        print('Generated:', out.name)
    except Exception as e:
        print('Generation failed for', code, '-', e)

print('
Running LID predictions:')
for wav_path in sorted(TEST_AUDIO_DIR.glob('lid_*.wav')):
    pred, conf, top5 = detect_language_from_file(wav_path)
    print(f'{wav_path.name:16s} -> pred={pred}, conf={conf:.3f}, top={top5[:3]}')

## 5) Build Component: STT (Whisper)

Using `openai/whisper-small` to transcribe uploaded user audio.


In [None]:
WHISPER_MODEL_NAME = 'openai/whisper-small'

whisper_processor = WhisperProcessor.from_pretrained(WHISPER_MODEL_NAME)
whisper_model = WhisperForConditionalGeneration.from_pretrained(WHISPER_MODEL_NAME).to(DEVICE)
whisper_model.eval()


def transcribe_audio_array(audio: np.ndarray, sr: int = 16000, language: str = 'en') -> str:
    if sr != 16000:
        audio = librosa.resample(audio, orig_sr=sr, target_sr=16000)
    audio = np.clip(audio.astype(np.float32), -1.0, 1.0)

    inputs = whisper_processor(audio, sampling_rate=16000, return_tensors='pt')
    forced_ids = None
    try:
        forced_ids = whisper_processor.get_decoder_prompt_ids(language=language, task='transcribe')
    except Exception:
        forced_ids = None

    kwargs = {'forced_decoder_ids': forced_ids} if forced_ids else {}
    with torch.no_grad():
        generated = whisper_model.generate(inputs.input_features.to(DEVICE), **kwargs)
    text = whisper_processor.batch_decode(generated, skip_special_tokens=True)[0].strip()
    return text


def transcribe_audio_file(path: Path, language: str = 'en') -> str:
    wav, sr = librosa.load(str(path), sr=16000, mono=True)
    return transcribe_audio_array(wav, sr=16000, language=language)

print('Whisper loaded on', DEVICE)

In [None]:
print('STT test outputs:')
for wav_path in sorted(TEST_AUDIO_DIR.glob('lid_*.wav')):
    lang_code = wav_path.stem.split('_')[-1]
    try:
        text = transcribe_audio_file(wav_path, language=lang_code if lang_code != 'auto' else 'en')
        print(f'{wav_path.name:16s} -> {text}')
    except Exception as e:
        print(f'{wav_path.name:16s} -> ERROR: {e}')

## 6) Build Component: Purpose-Aware Voice Agent Logic

Assumption-based response generation aligned to selected agent purpose.


In [None]:
REPLY_LIBRARY = {
    'hospital_kiosk': {
        'en': 'I am the hospital kiosk assistant. Please share age, key symptom, and how long it has been present. If breathing pain or heavy bleeding is present, go to emergency desk now.',
        'hi': 'मैं अस्पताल कियोस्क सहायक हूँ। कृपया उम्र, मुख्य लक्षण और कितने समय से समस्या है बताइए। सांस की तकलीफ या ज्यादा bleeding हो तो तुरंत emergency desk पर जाएँ।',
        'ta': 'நான் மருத்துவமனை கியோஸ்க் உதவியாளர். வயது, முக்கிய அறிகுறி, எத்தனை நாளாக உள்ளது என்பதை சொல்லுங்கள். மூச்சுத்திணறல் அல்லது கடும் ரத்தப்போக்கு இருந்தால் உடனே அவசர பிரிவுக்கு செல்லுங்கள்.',
        'te': 'నేను హాస్పిటల్ కియోస్క్ అసిస్టెంట్‌ను. వయసు, ప్రధాన లక్షణం, ఎంతకాలంగా ఉందో చెప్పండి. శ్వాస ఇబ్బంది లేదా ఎక్కువ రక్తస్రావం ఉంటే వెంటనే ఎమర్జెన్సీ డెస్క్‌కు వెళ్లండి.',
        'kn': 'ನಾನು ಆಸ್ಪತ್ರೆ ಕಿಯಾಸ್ಕ್ ಸಹಾಯಕ. ವಯಸ್ಸು, ಮುಖ್ಯ ಲಕ್ಷಣ, ಎಷ್ಟು ದಿನದಿಂದ ಇದೆ ಎಂದು ಹೇಳಿ. ಉಸಿರಾಟ ಕಷ್ಟ ಅಥವಾ ಹೆಚ್ಚು ರಕ್ತಸ್ರಾವ ಇದ್ದರೆ ತಕ್ಷಣ ತುರ್ತು ವಿಭಾಗಕ್ಕೆ ಹೋಗಿ.'
    },
    'college_admission': {
        'en': 'I am the college admission assistant. I can guide eligibility, documents, fee estimate, and timeline. Please share your course and board marks.',
        'hi': 'मैं कॉलेज एडमिशन सहायक हूँ। मैं eligibility, documents, fee estimate और timeline बताऊँगा। कृपया course और marks बताइए।',
        'ta': 'நான் கல்லூரி சேர்க்கை உதவியாளர். தகுதி, ஆவணங்கள், கட்டண மதிப்பீடு மற்றும் கால அட்டவணையில் வழிகாட்டுவேன். உங்கள் பாடநெறி மற்றும் மதிப்பெண்களை சொல்லுங்கள்.',
        'te': 'నేను కాలేజ్ అడ్మిషన్ అసిస్టెంట్‌ను. అర్హత, డాక్యుమెంట్లు, ఫీజు అంచనా, టైమ్‌లైన్ గురించి సహాయం చేస్తాను. మీ కోర్సు మరియు మార్కులు చెప్పండి.',
        'kn': 'ನಾನು ಕಾಲೇಜು ಪ್ರವೇಶ ಸಹಾಯಕ. ಅರ್ಹತೆ, ದಾಖಲೆಗಳು, ಶುಲ್ಕ ಅಂದಾಜು ಮತ್ತು ಸಮಯರೇಖೆಯಲ್ಲಿ ಸಹಾಯ ಮಾಡುತ್ತೇನೆ. ನಿಮ್ಮ ಕೋರ್ಸ್ ಮತ್ತು ಅಂಕಗಳನ್ನು ತಿಳಿಸಿ.'
    },
    'laptop_support': {
        'en': 'I am laptop support assistant. Please share laptop model, operating system, and exact issue. I will give safe step-by-step troubleshooting.',
        'hi': 'मैं लैपटॉप सपोर्ट सहायक हूँ। कृपया laptop model, operating system और exact issue बताइए। मैं safe step-by-step troubleshooting दूँगा।',
        'ta': 'நான் லாப்டாப் ஆதரவு உதவியாளர். லாப்டாப் மாடல், operating system, மற்றும் சரியான பிரச்சினையை சொல்லுங்கள். பாதுகாப்பான படிப்படியான troubleshooting கொடுக்கிறேன்.',
        'te': 'నేను ల్యాప్‌టాప్ సపోర్ట్ అసిస్టెంట్‌ను. ల్యాప్‌టాప్ మోడల్, operating system, ఖచ్చితమైన సమస్య చెప్పండి. సురక్షితంగా దశలవారీ troubleshooting ఇస్తాను.',
        'kn': 'ನಾನು ಲ್ಯಾಪ್‌ಟಾಪ್ ಸಹಾಯ ಸಹಾಯಕ. ಲ್ಯಾಪ್‌ಟಾಪ್ ಮಾದರಿ, operating system, ನಿಖರ ಸಮಸ್ಯೆ ಹೇಳಿ. ಸುರಕ್ಷಿತ ಹಂತ-ಹಂತದ troubleshooting ನೀಡುತ್ತೇನೆ.'
    }
}


def generate_agent_response(user_text: str, agent_type: str, language: str, session_id: str = 'demo') -> str:
    agent_type = agent_type if agent_type in AGENT_TYPES else 'hospital_kiosk'
    language = language if language in LANGUAGE_CONFIG else 'en'

    base = REPLY_LIBRARY[agent_type].get(language, REPLY_LIBRARY[agent_type]['en'])

    history = SESSION_MEMORY[session_id]
    history.append({'user': user_text, 'agent_type': agent_type, 'lang': language})
    if len(history) > 4:
        SESSION_MEMORY[session_id] = history[-4:]

    if len(history) > 1:
        if language == 'en':
            base += ' I am continuing from your previous question in this same session.'
        elif language == 'hi':
            base += ' मैं इसी session में आपके पिछले सवाल को ध्यान में रखकर जवाब दे रहा हूँ।'
        elif language == 'ta':
            base += ' இதே session-இல் உங்கள் முந்தைய கேள்வியை வைத்து தொடர்கிறேன்.'
        elif language == 'te':
            base += ' ఇదే session‌లో మీ ముందటి ప్రశ్నను తీసుకుని కొనసాగిస్తున్నాను.'
        elif language == 'kn':
            base += ' ಇದೇ session‌ನಲ್ಲಿ ನಿಮ್ಮ ಹಿಂದಿನ ಪ್ರಶ್ನೆಯನ್ನು ಗಮನಿಸಿ ಮುಂದುವರಿಸುತ್ತಿದ್ದೇನೆ.'

    return base

agent_tests = [
    ('hospital_kiosk', 'en', 'My father has chest pain for 20 minutes.'),
    ('college_admission', 'hi', 'BTech computer science ke liye kya documents chahiye?'),
    ('laptop_support', 'ta', 'என் laptop அடிக்கடி restart ஆகிறது.'),
]
for a, l, q in agent_tests:
    print(f'[{a} | {l}] Q: {q}')
    print('A:', generate_agent_response(q, a, l, session_id='agent_test'))
    print('-' * 100)

## 7) Build Component: TTS (Piper + Edge Policy)

Policy:
- `kn` -> Edge TTS
- `en/hi/te/ta` -> Piper if model exists
- fallback to Edge TTS


In [None]:
import edge_tts


def piper_model_candidates(lang: str):
    table = {
        'en': ['en_US-lessac-medium.onnx'],
        'hi': ['hi_IN-rohan-medium.onnx'],
        'te': ['te_IN-maya-medium.onnx'],
        'ta': ['ta_IN-iitm-female-s1-medium.onnx', 'ta_IN-kani-medium.onnx', 'ta_IN-ponni-medium.onnx', 'ta_IN-tamil-medium.onnx'],
    }
    return table.get(lang, [])


def pick_piper_model(lang: str):
    for name in piper_model_candidates(lang):
        path = MODELS_DIR / name
        if path.exists():
            return path
    return None


async def edge_tts_to_wav(text: str, lang: str, out_wav: Path):
    voice = LANGUAGE_CONFIG.get(lang, LANGUAGE_CONFIG['en'])['edge_voice']
    tmp_mp3 = out_wav.with_suffix('.mp3')
    comm = edge_tts.Communicate(text=text, voice=voice)
    await comm.save(str(tmp_mp3))
    subprocess.run(['ffmpeg', '-y', '-loglevel', 'error', '-i', str(tmp_mp3), '-ac', '1', '-ar', '48000', str(out_wav)], check=True)
    tmp_mp3.unlink(missing_ok=True)


def piper_tts_to_wav(text: str, model_path: Path, out_wav: Path):
    piper_bin = shutil.which('piper')
    if not piper_bin:
        raise RuntimeError('piper binary not found')
    proc = subprocess.run(
        [piper_bin, '--model', str(model_path), '--output_file', str(out_wav)],
        input=text.encode('utf-8'),
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        check=False,
    )
    if proc.returncode != 0:
        raise RuntimeError(proc.stderr.decode('utf-8', errors='ignore'))


def synthesize_tts(text: str, lang: str, tag: str = 'reply'):
    out_wav = OUTPUT_DIR / f'{tag}_{int(time.time() * 1000)}_{random.randint(1000,9999)}.wav'

    engine = 'edge'
    if lang == 'kn':
        engine = 'edge'
    elif lang in {'en', 'hi', 'te', 'ta'} and pick_piper_model(lang) is not None:
        engine = 'piper'
    elif lang in {'en', 'hi', 'te', 'ta'}:
        engine = 'edge'

    if engine == 'piper':
        model = pick_piper_model(lang)
        piper_tts_to_wav(text, model, out_wav)
    else:
        asyncio.get_event_loop().run_until_complete(edge_tts_to_wav(text, lang, out_wav))

    return out_wav, engine

for lang in ['en', 'hi', 'ta', 'te', 'kn']:
    text = LANGUAGE_CONFIG[lang]['test_text']
    wav_path, engine = synthesize_tts(text, lang, tag=f'tts_{lang}')
    print(f'{lang} -> engine={engine}, file={wav_path.name}')
    display(Audio(filename=str(wav_path), autoplay=False))

## 8) Integrate End-to-End Single-Turn Pipeline

In [None]:
def decode_audio_bytes_to_float32(audio_bytes: bytes, target_sr: int = 16000):
    try:
        audio, sr = sf.read(io.BytesIO(audio_bytes), dtype='float32')
        if audio.ndim > 1:
            audio = np.mean(audio, axis=1)
        if sr != target_sr:
            audio = librosa.resample(audio, orig_sr=sr, target_sr=target_sr)
        return audio.astype(np.float32), target_sr
    except Exception:
        container = av.open(io.BytesIO(audio_bytes))
        resampler = av.AudioResampler(format='s16', layout='mono', rate=target_sr)
        pcm = bytearray()
        for frame in container.decode(audio=0):
            for resampled in resampler.resample(frame):
                pcm += bytes(resampled.planes[0])
        if not pcm:
            raise RuntimeError('Could not decode uploaded audio bytes')
        audio = np.frombuffer(pcm, dtype=np.int16).astype(np.float32) / 32768.0
        return audio, target_sr


def run_single_turn(audio_bytes: bytes, language: str = 'auto', agent_type: str = 'hospital_kiosk', session_id: str = 'demo_session'):
    audio_16k, sr = decode_audio_bytes_to_float32(audio_bytes, target_sr=16000)

    if language == 'auto':
        detected, conf, _ = detect_language_from_array(audio_16k, sr=16000)
        lang = detected if conf >= 0.35 else 'en'
    else:
        lang = language if language in LANGUAGE_CONFIG else 'en'

    transcript = transcribe_audio_array(audio_16k, sr=16000, language=lang)
    response = generate_agent_response(transcript or 'No speech captured.', agent_type, lang, session_id=session_id)
    reply_wav, tts_engine = synthesize_tts(response, lang, tag='turn')

    reply_bytes = reply_wav.read_bytes()

    return {
        'language': lang,
        'transcript': transcript,
        'response': response,
        'tts_engine': tts_engine,
        'reply_wav_path': str(reply_wav),
        'reply_audio_b64': base64.b64encode(reply_bytes).decode('ascii')
    }

# End-to-end test using generated sample input (English)
test_in = TEST_AUDIO_DIR / 'lid_en.wav'
if test_in.exists():
    result = run_single_turn(test_in.read_bytes(), language='auto', agent_type='hospital_kiosk', session_id='pipeline_demo')
    print(json.dumps({k: v for k, v in result.items() if k != 'reply_audio_b64'}, indent=2, ensure_ascii=False))
    display(Audio(filename=result['reply_wav_path'], autoplay=False))
else:
    print('No test input found at', test_in)


## 9) Build Website + API Server (Press and Hold)

This app exposes:
- `GET /` -> press-and-hold web page
- `GET /healthz`
- `GET /agent-types`
- `POST /voice-turn`


In [None]:
from flask import Flask, request, jsonify
from flask_cors import CORS
import threading

INDEX_HTML = r'''
<!doctype html>
<html>
<head>
  <meta charset="utf-8" />
  <meta name="viewport" content="width=device-width, initial-scale=1" />
  <title>Solvathon Voice Agent</title>
  <style>
    :root{--bg:#0d1b26;--card:#132737;--fg:#e9f4fb;--muted:#9cb7ca;--brand:#1f9f8f;--danger:#c94d42}
    *{box-sizing:border-box} body{margin:0;background:linear-gradient(135deg,#0b141d,#0f2333);color:var(--fg);font-family:Segoe UI,Arial,sans-serif;padding:18px}
    .wrap{max-width:900px;margin:0 auto;display:grid;gap:14px}
    .card{background:var(--card);border:1px solid rgba(255,255,255,.1);border-radius:14px;padding:16px}
    h1{margin:0 0 8px} .sub{color:var(--muted);font-size:14px}
    .grid{display:grid;grid-template-columns:1fr 1fr 1fr;gap:10px} label{font-size:12px;color:var(--muted);display:block;margin-bottom:6px}
    input,select{width:100%;padding:10px;border-radius:10px;border:1px solid rgba(255,255,255,.2);background:#0b1a26;color:var(--fg)}
    #holdBtn{width:100%;padding:16px;border:none;border-radius:12px;background:var(--brand);color:white;font-weight:700;font-size:16px;touch-action:none}
    #holdBtn.active{background:var(--danger)}
    .status{font-size:13px;color:var(--muted);margin-top:8px}
    .chat{max-height:320px;overflow:auto;border:1px solid rgba(255,255,255,.15);border-radius:10px;padding:10px;background:#0b1a26}
    .line{padding:8px;border-radius:8px;margin-bottom:8px}.u{background:rgba(31,159,143,.18)}.a{background:rgba(96,141,194,.18)}
    @media (max-width: 760px){.grid{grid-template-columns:1fr}}
  </style>
</head>
<body>
  <div class="wrap">
    <div class="card">
      <h1>Solvathon Push-To-Talk</h1>
      <div class="sub">Hold button to record. Release to upload. Server replies with voice.</div>
      <div class="grid" style="margin-top:12px">
        <div><label>Backend URL</label><input id="backend" /></div>
        <div><label>Agent Purpose</label><select id="agentType"></select></div>
        <div><label>Language</label>
          <select id="lang">
            <option value="en">English</option>
            <option value="hi">Hindi</option>
            <option value="ta">Tamil</option>
            <option value="te">Telugu</option>
            <option value="kn">Kannada</option>
            <option value="auto">Auto Detect</option>
          </select>
        </div>
      </div>
      <div style="margin-top:12px"><button id="holdBtn">Hold To Talk</button></div>
      <div class="status" id="status">Ready</div>
    </div>

    <div class="card">
      <div class="chat" id="chat"><div class="sub">No turns yet.</div></div>
    </div>
  </div>

  <audio id="player" autoplay playsinline></audio>

<script>
const backend = document.getElementById('backend');
const agentType = document.getElementById('agentType');
const lang = document.getElementById('lang');
const holdBtn = document.getElementById('holdBtn');
const statusEl = document.getElementById('status');
const chat = document.getElementById('chat');
const player = document.getElementById('player');

let stream=null, recorder=null, chunks=[], recording=false, busy=false, sessionId='';
backend.value = window.location.origin;

function setStatus(s){statusEl.textContent=s}
function line(text, cls){
  if(chat.children.length===1 && chat.textContent.includes('No turns')) chat.innerHTML='';
  const d=document.createElement('div'); d.className='line '+cls; d.textContent=text; chat.appendChild(d); chat.scrollTop=chat.scrollHeight;
}

async function loadAgentTypes(){
  try{
    const res = await fetch(`${backend.value}/agent-types`);
    const data = await res.json();
    agentType.innerHTML='';
    (data.agent_types||[]).forEach(a=>{
      const o=document.createElement('option'); o.value=a.type; o.textContent=a.label; agentType.appendChild(o);
    });
  }catch(e){
    agentType.innerHTML='<option value="hospital_kiosk">Hospital Kiosk</option><option value="college_admission">College Admission</option><option value="laptop_support">Laptop Customer Support</option>';
  }
}

function pickMime(){
  const m=['audio/webm;codecs=opus','audio/webm','audio/mp4','audio/ogg;codecs=opus'];
  for(const x of m){if(window.MediaRecorder && MediaRecorder.isTypeSupported(x)) return x;}
  return '';
}

async function ensureMic(){
  if(stream) return stream;
  stream = await navigator.mediaDevices.getUserMedia({audio:true});
  return stream;
}

async function sendTurn(blob, mime){
  busy=true; holdBtn.disabled=true; setStatus('Uploading and processing...');
  try{
    const ext = mime.includes('mp4')?'m4a':mime.includes('ogg')?'ogg':'webm';
    const form = new FormData();
    form.append('audio', blob, `turn.${ext}`);
    form.append('language', lang.value);
    form.append('auto_detect', String(lang.value==='auto'));
    form.append('agent_type', agentType.value);
    if(sessionId) form.append('session_id', sessionId);

    const res = await fetch(`${backend.value}/voice-turn`, {method:'POST', body:form});
    const data = await res.json();
    if(!res.ok) throw new Error(data.error || `HTTP ${res.status}`);

    sessionId = data.session_id || sessionId;
    if(data.transcript) line('You: '+data.transcript, 'u');
    if(data.response) line((data.agent_name||'Agent')+': '+data.response, 'a');

    if(data.audio_b64){
      const bin = atob(data.audio_b64);
      const arr = new Uint8Array(bin.length);
      for(let i=0;i<bin.length;i++) arr[i]=bin.charCodeAt(i);
      const url = URL.createObjectURL(new Blob([arr], {type:data.audio_mime || 'audio/wav'}));
      player.src = url;
      await player.play().catch(()=>{});
    }

    setStatus('Turn complete');
  }catch(e){
    setStatus('Error: '+e.message);
    alert(e.message);
  }finally{
    busy=false; holdBtn.disabled=false; holdBtn.textContent='Hold To Talk'; holdBtn.classList.remove('active');
  }
}

async function startRec(ev){
  ev.preventDefault();
  if(busy || recording) return;
  await ensureMic();
  chunks=[];
  const mime = pickMime();
  recorder = mime ? new MediaRecorder(stream,{mimeType:mime}) : new MediaRecorder(stream);
  recorder.ondataavailable = e => {if(e.data && e.data.size>0) chunks.push(e.data)};
  recorder.onstop = async ()=>{
    const useMime = recorder.mimeType || mime || 'audio/webm';
    const blob = new Blob(chunks, {type:useMime});
    chunks=[]; recorder=null;
    if(blob.size<1200){setStatus('Audio too short'); return;}
    await sendTurn(blob, useMime);
  };
  recorder.start(120);
  recording=true;
  holdBtn.classList.add('active');
  holdBtn.textContent='Release To Send';
  setStatus('Recording...');
}

function stopRec(ev){
  if(ev) ev.preventDefault();
  if(!recording || !recorder) return;
  recording=false;
  holdBtn.textContent='Uploading...';
  if(recorder.state==='recording') recorder.stop();
}

holdBtn.addEventListener('pointerdown', startRec);
holdBtn.addEventListener('pointerup', stopRec);
holdBtn.addEventListener('pointerleave', stopRec);
holdBtn.addEventListener('pointercancel', stopRec);
holdBtn.addEventListener('contextmenu', e=>e.preventDefault());

loadAgentTypes();
</script>
</body>
</html>
'''

app = Flask(__name__)
CORS(app)

@app.get('/')
def root_page():
    return INDEX_HTML

@app.get('/healthz')
def healthz():
    return jsonify({'ok': True, 'device': DEVICE})

@app.get('/agent-types')
def list_agent_types():
    payload = []
    for k, v in AGENT_TYPES.items():
        payload.append({'type': k, 'label': v['label'], 'description': v['assumption']})
    return jsonify({'agent_types': payload})

@app.post('/voice-turn')
def voice_turn_api():
    if 'audio' not in request.files:
        return jsonify({'error': "Missing 'audio' file"}), 400

    audio_bytes = request.files['audio'].read()
    if not audio_bytes:
        return jsonify({'error': 'Empty audio file'}), 400

    requested_language = (request.form.get('language') or 'auto').strip().lower()
    requested_agent_type = (request.form.get('agent_type') or 'hospital_kiosk').strip().lower()
    session_id = (request.form.get('session_id') or f'session_{int(time.time())}').strip()

    try:
        result = run_single_turn(
            audio_bytes=audio_bytes,
            language=requested_language,
            agent_type=requested_agent_type,
            session_id=session_id,
        )
    except Exception as e:
        return jsonify({'error': f'Pipeline failed: {e}'}), 500

    return jsonify({
        'ok': True,
        'session_id': session_id,
        'agent_type': requested_agent_type,
        'agent_name': AGENT_TYPES.get(requested_agent_type, AGENT_TYPES['hospital_kiosk'])['label'],
        'language': result['language'],
        'transcript': result['transcript'],
        'response': result['response'],
        'audio_b64': result['reply_audio_b64'],
        'audio_mime': 'audio/wav',
        'tts_engine': result['tts_engine'],
    })


def run_server():
    app.run(host='0.0.0.0', port=8080, debug=False, use_reloader=False)

server_thread = threading.Thread(target=run_server, daemon=True)
server_thread.start()
time.sleep(2)
print('Server started at: http://127.0.0.1:8080')

## 10) Expose Website With ngrok

1. Create/get your token from [ngrok dashboard](https://dashboard.ngrok.com/get-started/your-authtoken).
2. Paste token below and run this cell.


In [None]:
from pyngrok import ngrok

# Set your token once in Colab session
NGROK_AUTHTOKEN = 'PASTE_YOUR_NGROK_TOKEN'
assert NGROK_AUTHTOKEN != 'PASTE_YOUR_NGROK_TOKEN', 'Please set NGROK_AUTHTOKEN first.'

ngrok.set_auth_token(NGROK_AUTHTOKEN)
public_tunnel = ngrok.connect(8080, bind_tls=True)
print('Public URL:', public_tunnel.public_url)
print('Open this URL in browser and use Hold To Talk.')

## 11) API Self-Test (inside notebook)

This sends a local sample audio file to `/voice-turn` and validates response fields.


In [None]:
import requests

sample = TEST_AUDIO_DIR / 'lid_en.wav'
if not sample.exists():
    raise FileNotFoundError(f'Missing sample file: {sample}')

with open(sample, 'rb') as f:
    files = {'audio': ('turn.wav', f, 'audio/wav')}
    data = {
        'language': 'auto',
        'agent_type': 'hospital_kiosk',
        'session_id': 'colab_self_test',
    }
    resp = requests.post('http://127.0.0.1:8080/voice-turn', files=files, data=data, timeout=300)

print('Status:', resp.status_code)
payload = resp.json()
print(json.dumps({k: payload.get(k) for k in ['ok','language','agent_name','transcript','response','tts_engine']}, indent=2, ensure_ascii=False))

if payload.get('audio_b64'):
    audio_bytes = base64.b64decode(payload['audio_b64'])
    out = OUTPUT_DIR / 'self_test_reply.wav'
    out.write_bytes(audio_bytes)
    print('Saved:', out)
    display(Audio(filename=str(out), autoplay=False))

## 12) What To Do Next

1. Open the ngrok URL in mobile/desktop browser.
2. Choose `Agent Purpose` and `Language` first.
3. Hold button to speak.
4. Release button to upload that spoken chunk.
5. Listen to server-generated voice reply.

If you want to use your repo server directly (`src/realtime/signaling_server.py`), use:
- `GET /push-to-talk`
- `POST /voice-turn` with fields: `audio`, `language`, `auto_detect`, `agent_type`, `session_id`
