In [None]:
# Celda 0 - Título y guía rápida
from IPython.display import Markdown, display
try:
    title_md = Markdown(r"""# Lab 3 — ASR + LLM + TTS end-to-end
Este laboratorio guía la construcción de un pipeline completo que parte de audio en vivo, lo transcribe con **Whisper**, genera una respuesta corta con un **LLM local** y sintetiza voz clonada y voz base usando **Coqui TTS**.""")
    flow_md = Markdown("""**Flujo general:** `Audio → ASR → LLM → TTS (clonada & base)`""")
    uso_md = Markdown("""**Cómo usar:** ejecuta cada celda en orden. Si la grabación falla (permisos, navegador), cambia automáticamente al modo **Subir archivo** y continúa sin reiniciar el runtime.""")
    display(title_md, flow_md, uso_md)
    print("[Celda 0] Guía mostrada correctamente.")
except Exception as exc:
    print(f"[Celda 0] Error al mostrar la guía: {exc}")


In [None]:
# Celda 1 - Setup de proyecto (Drive + rutas + utils)
import os, json, subprocess, sys, time, math, shutil, pathlib, random
from datetime import datetime
from typing import Dict, Any

try:
    import numpy as np
except Exception:
    np = None

IN_COLAB = False
try:
    import google.colab  # type: ignore
    IN_COLAB = True
except Exception:
    IN_COLAB = False

DRIVE_MOUNTED = False
if IN_COLAB:
    try:
        from google.colab import drive  # type: ignore
        if not os.path.isdir('/content/drive'):
            drive.mount('/content/drive')
        elif not os.path.exists('/content/drive/MyDrive'):
            drive.mount('/content/drive')
        DRIVE_MOUNTED = True
    except Exception as exc:
        print(f"[Celda 1] Advertencia: no fue posible montar Google Drive automáticamente: {exc}")
        DRIVE_MOUNTED = False

BASE_DIR = "/content/drive/MyDrive/Lab3_ASR_LLM_TTS"
if not IN_COLAB:
    BASE_DIR = os.path.abspath('./Lab3_ASR_LLM_TTS_local')

paths = {
    'voice_ref': os.path.join(BASE_DIR, 'data', 'voice_ref'),
    'audios': os.path.join(BASE_DIR, 'data', 'audios'),
    'outputs': os.path.join(BASE_DIR, 'outputs')
}
for p in paths.values():
    os.makedirs(p, exist_ok=True)

SPEAKER_WAV = os.path.join(paths['voice_ref'], 'voz_referencia_grabada.wav')
QUESTION_RAW_WAV = os.path.join(paths['audios'], 'pregunta_raw.wav')
QUESTION_WAV = os.path.join(paths['audios'], 'pregunta_16k.wav')
CLONED_WAV = os.path.join(paths['outputs'], 'respuesta_clonada.wav')
BASE_WAV = os.path.join(paths['outputs'], 'respuesta_base.wav')
ASR_METRICS_PNG = os.path.join(paths['outputs'], 'asr_metrics.png')
LLM_METRICS_PNG = os.path.join(paths['outputs'], 'llm_metrics.png')
TTS_TIMES_PNG = os.path.join(paths['outputs'], 'tts_tiempos.png')
RESULTS_JSON = os.path.join(paths['outputs'], 'results.json')

RESULTS: Dict[str, Any] = {}

def ffmpeg(*args, check=True):
    cmd = ['ffmpeg', '-y'] + list(args)
    try:
        completed = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=check, text=True)
    except FileNotFoundError:
        raise RuntimeError('ffmpeg no está disponible en el sistema.')
    if completed.returncode != 0 and check:
        raise RuntimeError(f"ffmpeg falló: {completed.stderr[-500:]}")
    return completed

def ffprobe_duration(path: str) -> float:
    if not os.path.exists(path):
        return 0.0
    cmd = ['ffprobe', '-i', path, '-show_entries', 'format=duration', '-v', 'quiet', '-of', 'csv=p=0']
    try:
        completed = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True)
        return float(completed.stdout.strip())
    except Exception:
        return 0.0

def to_wav_mono_16k(src: str, dst: str, sr: int = 16000):
    if not os.path.exists(src):
        raise FileNotFoundError(f"Archivo de origen no encontrado: {src}")
    os.makedirs(os.path.dirname(dst), exist_ok=True)
    args = ['-i', src, '-ac', '1', '-ar', str(sr), dst]
    ffmpeg(*args)

def save_results(data: Dict[str, Any], path: str = RESULTS_JSON):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    try:
        with open(path, 'w', encoding='utf-8') as f:
            json.dump(data, f, indent=2, ensure_ascii=False)
    except Exception as exc:
        print(f"[Celda 1] No fue posible guardar {path}: {exc}")

try:
    import torch
    torch.manual_seed(42)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(42)
except Exception:
    pass
random.seed(42)

summary = {
    'IN_COLAB': IN_COLAB,
    'DRIVE_MOUNTED': DRIVE_MOUNTED,
    'BASE_DIR': BASE_DIR,
    'paths': paths
}
print("[Celda 1] Setup completado:")
print(json.dumps(summary, indent=2, ensure_ascii=False))


In [None]:
# Celda 2 - Dependencias y verificación
import json, subprocess, sys, shutil, time
packages = [
    'openai-whisper',
    'transformers',
    'accelerate',
    'sentencepiece',
    'TTS',
    'librosa',
    'soundfile',
    'jiwer',
    'matplotlib'
]
install_report = {'status': 'skipped', 'detail': '', 'packages': packages}
try:
    start = time.time()
    cmd = [sys.executable, '-m', 'pip', 'install'] + packages
    completed = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
    install_report['stdout_tail'] = completed.stdout.splitlines()[-20:]
    install_report['stderr_tail'] = completed.stderr.splitlines()[-20:]
    install_report['returncode'] = completed.returncode
    install_report['elapsed_s'] = round(time.time() - start, 2)
    install_report['status'] = 'ok' if completed.returncode == 0 else 'error'
except Exception as exc:
    install_report['status'] = 'exception'
    install_report['detail'] = str(exc)

versions = {}
for cmd_name in ['ffmpeg', 'ffprobe']:
    try:
        res = subprocess.run([cmd_name, '-version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
        versions[cmd_name] = res.stdout.split('
')[0]
    except Exception as exc:
        versions[cmd_name] = f'No disponible: {exc}'

try:
    import torch
    versions['torch'] = str(torch.__version__)
    versions['cuda_available'] = bool(torch.cuda.is_available())
except Exception as exc:
    versions['torch'] = f'No importado: {exc}'

try:
    import transformers
    versions['transformers'] = transformers.__version__
except Exception as exc:
    versions['transformers'] = f'No importado: {exc}'

print('[Celda 2] Resumen de instalación/verificación:')
print(json.dumps({'install': install_report, 'versions': versions}, indent=2, ensure_ascii=False))


In [None]:
# Celda 3 - Grabación robusta record_audio() (streaming + idempotente)
import os, base64, threading, tempfile, time, json, shutil
from IPython.display import Javascript, display, Audio

def record_audio(out_wav=None, sr=16000, autoplay=True, timeslice_ms=400):
    """
    Graba audio desde el micrófono (Colab) y devuelve un WAV mono 16 kHz.
    Requisitos:
    - UI idempotente, streaming de chunks y conversiones con ffmpeg.
    """
    if out_wav is None:
        out_wav = os.path.join(paths['audios'], f'grabacion_{int(time.time())}.wav')
    tmp_dir = tempfile.mkdtemp()
    tmp_webm = os.path.join(tmp_dir, 'temp_audio.webm')
    chunk_lock = threading.Lock()
    chunks = []
    done_event = threading.Event()
    error_holder = {'message': None}
    start_time = time.time()
    timeout_seconds = 5 * 60

    try:
        from google.colab import output  # type: ignore
    except Exception:
        raise RuntimeError('record_audio solo está soportado en Google Colab con acceso al navegador.')

    def push_audio_chunk(b64_chunk):
        try:
            data = base64.b64decode(b64_chunk)
            with chunk_lock:
                chunks.append(data)
        except Exception as exc:
            error_holder['message'] = f'Error al decodificar chunk: {exc}'
            done_event.set()

    def audio_done():
        done_event.set()

    def audio_error(msg):
        error_holder['message'] = msg
        done_event.set()

    output.register_callback('notebook.push_audio_chunk', push_audio_chunk)
    output.register_callback('notebook.audio_done', audio_done)
    output.register_callback('notebook.audio_error', audio_error)

    js_code = f"""
    (async () => {{
      try {{
        const existing = document.getElementById('recorder-box');
        if (existing) {{ existing.remove(); }}
        const box = document.createElement('div');
        box.id = 'recorder-box';
        box.style.border = '1px solid #ddd';
        box.style.padding = '12px';
        box.style.margin = '8px 0';
        box.style.borderRadius = '8px';
        box.style.maxWidth = '320px';
        box.style.fontFamily = 'sans-serif';
        const title = document.createElement('div');
        title.textContent = 'Grabación de audio (streaming)';
        title.style.fontWeight = '600';
        title.style.marginBottom = '8px';
        const status = document.createElement('div');
        status.id = 'recorder-status';
        status.textContent = 'Listo para grabar';
        status.style.margin = '8px 0';
        const indicator = document.createElement('span');
        indicator.style.display = 'inline-block';
        indicator.style.width = '10px';
        indicator.style.height = '10px';
        indicator.style.borderRadius = '50%';
        indicator.style.marginRight = '6px';
        indicator.style.background = '#bbb';
        status.prepend(indicator);
        const buttons = document.createElement('div');
        buttons.style.display = 'flex';
        buttons.style.gap = '6px';
        const startBtn = document.createElement('button');
        startBtn.textContent = 'Grabar';
        const stopBtn = document.createElement('button');
        stopBtn.textContent = 'Parar';
        stopBtn.disabled = true;
        buttons.appendChild(startBtn);
        buttons.appendChild(stopBtn);
        const msg = document.createElement('div');
        msg.style.fontSize = '12px';
        msg.style.color = '#555';
        msg.textContent = 'Si no escuchas el preview o no inicia, revisa permisos del navegador.';
        box.appendChild(title);
        box.appendChild(status);
        box.appendChild(buttons);
        box.appendChild(msg);
        document.body.appendChild(box);

        let mediaStream = null;
        let recorder = null;
        let watchdog = null;
        let lastChunk = Date.now();
        const timeslice = {int(timeslice_ms)};
        const stopUI = (reason) => {{
          if (recorder && recorder.state !== 'inactive') {{
            try {{ recorder.stop(); }} catch (e) {{ console.warn(e); }}
          }}
          if (mediaStream) {{
            mediaStream.getTracks().forEach(t => t.stop());
          }}
          if (watchdog) {{ clearInterval(watchdog); }}
          indicator.style.background = '#bbb';
          stopBtn.disabled = true;
          startBtn.disabled = false;
          if (reason) {{ status.textContent = reason; status.prepend(indicator); }}
        }};

        const handleData = (event) => {{
          if (event.data && event.data.size) {{
            lastChunk = Date.now();
            const reader = new FileReader();
            reader.onloadend = () => {{
              const base64 = reader.result.split(',')[1];
              google.colab.kernel.invokeFunction('notebook.push_audio_chunk', [base64], {{}});
            }};
            reader.readAsDataURL(event.data);
          }}
        }};

        startBtn.onclick = async () => {{
          msg.textContent = 'Grabando... concede permisos al micrófono si aparece un diálogo.';
          status.textContent = 'Solicitando micrófono...';
          status.prepend(indicator);
          indicator.style.background = '#d33';
          startBtn.disabled = true;
          stopBtn.disabled = false;
          try {{
            mediaStream = await navigator.mediaDevices.getUserMedia({{ audio: true }});
          }} catch (err) {{
            google.colab.kernel.invokeFunction('notebook.audio_error', ['Permiso de micrófono denegado o no disponible.'], {{}});
            stopUI('Permiso denegado. Usa la opción de subir archivo.');
            return;
          }}
          let options = {{ mimeType: 'audio/webm;codecs=opus' }};
          try {{
            recorder = new MediaRecorder(mediaStream, options);
          }} catch (err) {{
            console.warn('MediaRecorder fallback', err);
            recorder = new MediaRecorder(mediaStream);
          }}
          recorder.ondataavailable = handleData;
          recorder.onerror = (event) => {{
            console.error(event);
            google.colab.kernel.invokeFunction('notebook.audio_error', ['Error de MediaRecorder: ' + event.error.message], {{}});
            stopUI('Error de MediaRecorder. Reintenta o sube un archivo.');
          }};
          recorder.onstop = () => {{
            google.colab.kernel.invokeFunction('notebook.audio_done', [], {{}});
            stopUI('Grabación finalizada.');
            setTimeout(() => {{
              const box = document.getElementById('recorder-box');
              if (box) box.remove();
            }}, 1200);
          }};
          recorder.start(timeslice);
          lastChunk = Date.now();
          watchdog = setInterval(() => {{
            const diff = Date.now() - lastChunk;
            if (diff > 10000) {{
              status.textContent = 'No se recibe audio del micro. Revisa conexión o permisos.';
              status.prepend(indicator);
              msg.textContent = 'Puedes detener y volver a intentar o usar subida de archivo.';
            }}
          }}, 2000);
          status.textContent = 'Grabando...';
          status.prepend(indicator);
        }};

        stopBtn.onclick = () => {{
          stopUI('Detenido por el usuario.');
        }};
      }} catch (err) {{
        console.error(err);
        google.colab.kernel.invokeFunction('notebook.audio_error', ['Fallo inicializando el grabador: ' + err.message], {{}});
      }}
    }})()
    """
    display(Javascript(js_code))

    while True:
        if done_event.wait(timeout=1.0):
            break
        if (time.time() - start_time) > timeout_seconds:
            error_holder['message'] = 'Timeout de grabación (5 minutos).'
            break
    if error_holder['message']:
        raise RuntimeError(error_holder['message'])
    if not chunks:
        raise RuntimeError('No se recibieron datos de audio. Usa la opción de subir archivo.')
    with open(tmp_webm, 'wb') as f:
        with chunk_lock:
            for chunk in chunks:
                f.write(chunk)
    to_wav_mono_16k(tmp_webm, out_wav, sr=sr)
    duration = ffprobe_duration(out_wav)
    if autoplay:
        try:
            display(Audio(out_wav))
        except Exception:
            pass
    print(json.dumps({'status': 'ok', 'wav_path': out_wav, 'duration_s': round(duration, 3)}, ensure_ascii=False))
    shutil.rmtree(tmp_dir, ignore_errors=True)
    return out_wav

print('[Celda 3] Función record_audio disponible.')


In [None]:
# Celda 4 - Voz de referencia (record / upload → SPEAKER_WAV)
import json, traceback
from IPython.display import Audio
voice_summary = {'mode': 'record', 'success': False, 'path': None, 'duration_s': 0.0, 'error': None}
if os.path.exists(SPEAKER_WAV):
    try:
        os.remove(SPEAKER_WAV)
    except Exception:
        pass

try:
    voice_path = record_audio(out_wav=SPEAKER_WAV, sr=16000, autoplay=True)
    voice_summary.update({'success': True, 'path': voice_path, 'duration_s': round(ffprobe_duration(voice_path), 2)})
except Exception as exc:
    voice_summary['error'] = str(exc)
    voice_summary['mode'] = 'upload'
    print(f"[Celda 4] Grabación falló: {exc}. Cambiando a modo 'upload'.")
    try:
        from google.colab import files  # type: ignore
        uploaded = files.upload()
        if uploaded:
            tmp_name = list(uploaded.keys())[0]
            tmp_path = os.path.join(paths['voice_ref'], tmp_name)
            with open(tmp_path, 'wb') as f:
                f.write(uploaded[tmp_name])
            to_wav_mono_16k(tmp_path, SPEAKER_WAV)
            voice_summary.update({'success': True, 'path': SPEAKER_WAV, 'duration_s': round(ffprobe_duration(SPEAKER_WAV), 2)})
        else:
            print('[Celda 4] No se subió archivo alguno.')
    except Exception as exc_upload:
        voice_summary['error'] = f"No se pudo subir archivo: {exc_upload}"

if voice_summary['success']:
    RESULTS['voice_ref'] = voice_summary
    print(json.dumps({'status': 'ok', 'path': voice_summary['path'], 'duration_s': voice_summary['duration_s']}, ensure_ascii=False))
    try:
        display(Audio(voice_summary['path']))
    except Exception:
        pass
else:
    print(json.dumps({'status': 'error', 'detail': voice_summary}, ensure_ascii=False))


In [None]:
# Celda 5 - Pregunta hablada + ASR (Whisper)
import json, time
from IPython.display import Audio, display
import matplotlib.pyplot as plt

question_summary = {'mode': 'record', 'path': None, 'duration_s': 0.0, 'error': None}
if os.path.exists(QUESTION_RAW_WAV):
    try:
        os.remove(QUESTION_RAW_WAV)
    except Exception:
        pass
if os.path.exists(QUESTION_WAV):
    try:
        os.remove(QUESTION_WAV)
    except Exception:
        pass

try:
    question_path = record_audio(out_wav=QUESTION_RAW_WAV, sr=16000, autoplay=False)
    question_summary['path'] = question_path
except Exception as exc:
    question_summary['error'] = str(exc)
    question_summary['mode'] = 'upload'
    print(f"[Celda 5] Grabación falló: {exc}. Cambiando a modo 'upload'.")
    try:
        from google.colab import files  # type: ignore
        uploaded = files.upload()
        if uploaded:
            tmp_name = list(uploaded.keys())[0]
            tmp_path = os.path.join(paths['audios'], tmp_name)
            with open(tmp_path, 'wb') as f:
                f.write(uploaded[tmp_name])
            question_summary['path'] = tmp_path
        else:
            raise RuntimeError('No se subió ningún archivo.')
    except Exception as exc_upload:
        question_summary['error'] = f"Fallo en subida: {exc_upload}"

if question_summary['path']:
    to_wav_mono_16k(question_summary['path'], QUESTION_WAV)
    question_summary['duration_s'] = round(ffprobe_duration(QUESTION_WAV), 2)

ASR_TEXT = ''
ASR_LANG = 'desconocido'
ASR_TIME = None

try:
    import whisper
    model_name = 'small'
    whisper_model = whisper.load_model(model_name)
    start = time.time()
    result = whisper_model.transcribe(QUESTION_WAV, language=None)
    ASR_TIME = round(time.time() - start, 2)
    ASR_TEXT = result.get('text', '').strip()
    ASR_LANG = result.get('language', 'auto')
except Exception as exc:
    question_summary['error'] = f"Whisper falló: {exc}"

RESULTS['asr'] = {
    'text': ASR_TEXT,
    'language': ASR_LANG,
    'time_s': ASR_TIME,
    'audio_path': QUESTION_WAV,
    'mode': question_summary['mode']
}

print(json.dumps({'status': 'ok' if ASR_TEXT else 'warning', 'text': ASR_TEXT, 'language': ASR_LANG, 'time_s': ASR_TIME}, ensure_ascii=False))

try:
    plt.figure(figsize=(4,3))
    plt.bar(['ASR'], [ASR_TIME or 0], color='steelblue')
    plt.title('Tiempo de Whisper (s)')
    plt.ylabel('segundos')
    plt.tight_layout()
    plt.savefig(ASR_METRICS_PNG)
    plt.close()
except Exception as exc:
    print(f"[Celda 5] No se pudo generar gráfico ASR: {exc}")

if ASR_TEXT:
    try:
        display(Audio(QUESTION_WAV))
    except Exception:
        pass


In [None]:
# Celda 6 - LLM local (Qwen2.5-3B-Instruct) + alternativa Flan-T5
import json, time
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import matplotlib.pyplot as plt

DEFAULT_LLM = 'Qwen/Qwen2.5-3B-Instruct'
FALLBACK_LLM = 'google/flan-t5-base'
llm_report = {'model': DEFAULT_LLM, 'fallback': False, 'time_s': None, 'response': None, 'error': None}

prompt = f"Responde en español en 1–3 oraciones. Pregunta: {RESULTS.get('asr', {}).get('text', '')}"
if not prompt.strip():
    prompt = 'Responde en español en 1–3 oraciones una descripción genérica del flujo ASR-LLM-TTS.'

try:
    load_kwargs = {}
    if torch.cuda.is_available():
        load_kwargs.update({'device_map': 'auto', 'torch_dtype': torch.float16})
    tokenizer = AutoTokenizer.from_pretrained(DEFAULT_LLM)
    model = AutoModelForCausalLM.from_pretrained(DEFAULT_LLM, **load_kwargs)
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    start = time.time()
    inputs = tokenizer(prompt, return_tensors='pt').to(model.device)
    output_tokens = model.generate(**inputs, max_new_tokens=120, temperature=0.7, top_p=0.9, repetition_penalty=1.1)
    response = tokenizer.decode(output_tokens[0], skip_special_tokens=True)
    if response.startswith(prompt):
        response = response[len(prompt):].strip()
    llm_report['response'] = response.strip()
    llm_report['time_s'] = round(time.time() - start, 2)
except Exception as exc:
    llm_report['error'] = str(exc)
    llm_report['fallback'] = True
    llm_report['model'] = FALLBACK_LLM
    print(f"[Celda 6] Fallo con {DEFAULT_LLM}: {exc}. Intentando fallback {FALLBACK_LLM}.")
    try:
        text_gen = pipeline('text2text-generation', model=FALLBACK_LLM)
        start = time.time()
        response = text_gen(prompt, max_new_tokens=120)[0]['generated_text']
        llm_report['response'] = response.strip()
        llm_report['time_s'] = round(time.time() - start, 2)
    except Exception as exc_fb:
        llm_report['error'] = f"Fallback falló: {exc_fb}"

RESULTS['llm'] = llm_report
print(json.dumps({'model': llm_report['model'], 'time_s': llm_report['time_s'], 'fallback': llm_report['fallback']}, ensure_ascii=False))

try:
    plt.figure(figsize=(4,3))
    values = [llm_report['time_s'] or 0]
    plt.bar(['LLM'], values, color='salmon')
    plt.title('Tiempo de generación LLM (s)')
    plt.ylabel('segundos')
    plt.tight_layout()
    plt.savefig(LLM_METRICS_PNG)
    plt.close()
except Exception as exc:
    print(f"[Celda 6] No se pudo generar gráfico LLM: {exc}")


In [None]:
# Celda 7 - TTS (xTTS v2) + Comparación voz base
import json, time
from IPython.display import Audio
from TTS.api import TTS as TTS_API
import matplotlib.pyplot as plt

tts_summary = {
    'clone_model': 'tts_models/multilingual/multi-dataset/xtts_v2',
    'base_model': 'tts_models/es/css10/vits',
    'clone_time_s': None,
    'base_time_s': None,
    'clone_duration_s': None,
    'base_duration_s': None,
    'error': None
}

response_text = RESULTS.get('llm', {}).get('response') or 'Respuesta de prueba para síntesis.'

try:
    tts_clone = TTS_API(tts_summary['clone_model'], gpu=torch.cuda.is_available(), progress_bar=False)
    start = time.time()
    tts_clone.tts_to_file(text=response_text, speaker_wav=SPEAKER_WAV if os.path.exists(SPEAKER_WAV) else None, language='es', file_path=CLONED_WAV)
    tts_summary['clone_time_s'] = round(time.time() - start, 2)
    tts_summary['clone_duration_s'] = round(ffprobe_duration(CLONED_WAV), 2)
except Exception as exc:
    tts_summary['error'] = f"Clonado falló: {exc}"
    print(f"[Celda 7] Error en clonación: {exc}")

try:
    tts_base = TTS_API(tts_summary['base_model'], gpu=torch.cuda.is_available(), progress_bar=False)
    start = time.time()
    tts_base.tts_to_file(text=response_text, file_path=BASE_WAV)
    tts_summary['base_time_s'] = round(time.time() - start, 2)
    tts_summary['base_duration_s'] = round(ffprobe_duration(BASE_WAV), 2)
except Exception as exc:
    if tts_summary['error']:
        tts_summary['error'] += f' | Base falló: {exc}'
    else:
        tts_summary['error'] = f'Base falló: {exc}'
    print(f"[Celda 7] Error en voz base: {exc}")

RESULTS['tts'] = {
    'clone_path': CLONED_WAV if os.path.exists(CLONED_WAV) else None,
    'base_path': BASE_WAV if os.path.exists(BASE_WAV) else None,
    'clone_time_s': tts_summary['clone_time_s'],
    'base_time_s': tts_summary['base_time_s'],
    'clone_duration_s': tts_summary['clone_duration_s'],
    'base_duration_s': tts_summary['base_duration_s'],
    'error': tts_summary['error']
}

print(json.dumps(RESULTS['tts'], indent=2, ensure_ascii=False))

try:
    plt.figure(figsize=(5,3))
    labels = ['Clonada', 'Base']
    times = [tts_summary['clone_time_s'] or 0, tts_summary['base_time_s'] or 0]
    plt.bar(labels, times, color=['#4c72b0', '#55a868'])
    plt.ylabel('Tiempo (s)')
    plt.title('Comparación de tiempos TTS')
    plt.tight_layout()
    plt.savefig(TTS_TIMES_PNG)
    plt.close()
except Exception as exc:
    print(f"[Celda 7] No se pudo generar gráfico de tiempos TTS: {exc}")

if os.path.exists(CLONED_WAV):
    try:
        display(Audio(CLONED_WAV))
    except Exception:
        pass
if os.path.exists(BASE_WAV):
    try:
        display(Audio(BASE_WAV))
    except Exception:
        pass


In [None]:
# Celda 8 - Pipeline end-to-end (función + demo)
import json, time
from IPython.display import Audio, display

def asr_llm_tts_pipeline(question_mode='record', lang_hint='auto'):
    summary = {'question_mode': question_mode, 'lang_hint': lang_hint, 'steps': {}, 'error': None}
    try:
        if question_mode == 'record':
            try:
                question_path = record_audio(out_wav=QUESTION_RAW_WAV, sr=16000, autoplay=False)
            except Exception as exc:
                print(f"[Pipeline] Grabación falló ({exc}). Cambiando a upload.")
                question_mode = 'upload'
                summary['question_mode'] = 'upload'
        if question_mode == 'upload':
            from google.colab import files  # type: ignore
            uploaded = files.upload()
            if uploaded:
                tmp_name = list(uploaded.keys())[0]
                tmp_path = os.path.join(paths['audios'], tmp_name)
                with open(tmp_path, 'wb') as f:
                    f.write(uploaded[tmp_name])
                question_path = tmp_path
            else:
                raise RuntimeError('No se subió archivo para la pregunta.')
        to_wav_mono_16k(question_path, QUESTION_WAV)
        duration = round(ffprobe_duration(QUESTION_WAV), 2)
        summary['steps']['input_audio'] = {'path': QUESTION_WAV, 'duration_s': duration}

        import whisper
        model = whisper.load_model('small')
        start = time.time()
        result = model.transcribe(QUESTION_WAV, language=None if lang_hint == 'auto' else lang_hint)
        asr_time = round(time.time() - start, 2)
        asr_text = result.get('text', '').strip()
        asr_lang = result.get('language', lang_hint)
        summary['steps']['asr'] = {'text': asr_text, 'language': asr_lang, 'time_s': asr_time}

        from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
        import torch
        prompt = f"Responde en español en 1–3 oraciones. Pregunta: {asr_text}"
        response_text = ''
        model_name = 'Qwen/Qwen2.5-3B-Instruct'
        fallback = False
        start = time.time()
        try:
            load_kwargs = {}
            if torch.cuda.is_available():
                load_kwargs.update({'device_map': 'auto', 'torch_dtype': torch.float16})
            tokenizer = AutoTokenizer.from_pretrained(model_name)
            model_llm = AutoModelForCausalLM.from_pretrained(model_name, **load_kwargs)
            inputs = tokenizer(prompt, return_tensors='pt').to(model_llm.device)
            outputs = model_llm.generate(**inputs, max_new_tokens=120, temperature=0.7, top_p=0.9, repetition_penalty=1.1)
            response_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
            if response_text.startswith(prompt):
                response_text = response_text[len(prompt):].strip()
        except Exception as exc:
            fallback = True
            pipe = pipeline('text2text-generation', model='google/flan-t5-base')
            response_text = pipe(prompt, max_new_tokens=120)[0]['generated_text']
        llm_time = round(time.time() - start, 2)
        summary['steps']['llm'] = {'text': response_text.strip(), 'time_s': llm_time, 'fallback': fallback}

        from TTS.api import TTS as TTS_API
        import torch
        tts_clone = TTS_API('tts_models/multilingual/multi-dataset/xtts_v2', gpu=torch.cuda.is_available(), progress_bar=False)
        start = time.time()
        tts_clone.tts_to_file(text=response_text, speaker_wav=SPEAKER_WAV if os.path.exists(SPEAKER_WAV) else None, language='es', file_path=CLONED_WAV)
        clone_time = round(time.time() - start, 2)
        clone_dur = round(ffprobe_duration(CLONED_WAV), 2)
        tts_base = TTS_API('tts_models/es/css10/vits', gpu=torch.cuda.is_available(), progress_bar=False)
        start = time.time()
        tts_base.tts_to_file(text=response_text, file_path=BASE_WAV)
        base_time = round(time.time() - start, 2)
        base_dur = round(ffprobe_duration(BASE_WAV), 2)
        summary['steps']['tts'] = {
            'clone_path': CLONED_WAV,
            'base_path': BASE_WAV,
            'clone_time_s': clone_time,
            'base_time_s': base_time,
            'clone_duration_s': clone_dur,
            'base_duration_s': base_dur
        }

        summary['total_time_s'] = round(sum(filter(None, [clone_time, base_time, llm_time, asr_time])), 2)
    except Exception as exc:
        summary['error'] = str(exc)
        print(f"[Pipeline] Error: {exc}")
    return summary

pipeline_result = None
try:
    pipeline_result = asr_llm_tts_pipeline(question_mode='record', lang_hint='auto')
except Exception as exc:
    print(f"[Celda 8] Pipeline en modo record falló: {exc}")

if pipeline_result:
    RESULTS['pipeline'] = pipeline_result
    save_results(RESULTS, RESULTS_JSON)
    print(json.dumps({'status': 'ok', 'pipeline': pipeline_result}, indent=2, ensure_ascii=False))
    if pipeline_result.get('steps', {}).get('tts', {}):
        if pipeline_result['steps']['tts'].get('clone_path'):
            try:
                display(Audio(pipeline_result['steps']['tts']['clone_path']))
            except Exception:
                pass
        if pipeline_result['steps']['tts'].get('base_path'):
            try:
                display(Audio(pipeline_result['steps']['tts']['base_path']))
            except Exception:
                pass
else:
    print('[Celda 8] Pipeline no se completó. Revisa errores previos.')


In [None]:
# Celda 9 - Pruebas automáticas mínimas
import json, time, math
import numpy as np
import soundfile as sf

results_tests = []

def report_test(name, status, detail):
    results_tests.append({'test': name, 'status': status, 'detail': detail})
    print(json.dumps(results_tests[-1], ensure_ascii=False))

try:
    sr = 16000
    t = np.linspace(0, 2, int(sr*2), endpoint=False)
    tone = 0.1 * np.sin(2 * np.pi * 440 * t)
    tone_path = os.path.join(paths['outputs'], 'tone.wav')
    sf.write(tone_path, tone, sr)
    converted_path = os.path.join(paths['outputs'], 'tone_16k.wav')
    to_wav_mono_16k(tone_path, converted_path)
    duration = ffprobe_duration(converted_path)
    status = 'OK' if duration > 0 else 'FAIL'
    detail = f'duración={duration:.2f}s'
    report_test('audio_loopback', status, detail)
except Exception as exc:
    report_test('audio_loopback', 'FAIL', str(exc))

try:
    phrase = 'Hola, este es un test automático.'
    tts_base_path = os.path.join(paths['outputs'], 'tts_test.wav')
    from TTS.api import TTS as TTS_API
    tts_tmp = TTS_API('tts_models/es/css10/vits', gpu=torch.cuda.is_available(), progress_bar=False)
    tts_tmp.tts_to_file(text=phrase, file_path=tts_base_path)
    to_wav_mono_16k(tts_base_path, QUESTION_WAV)
    import whisper
    model = whisper.load_model('tiny')
    result = model.transcribe(QUESTION_WAV, language='es')
    text = result.get('text', '').strip()
    status = 'OK' if text else 'SKIP'
    detail = text if text else 'No se obtuvo transcripción, posiblemente ruido.'
    report_test('asr_dummy', status, detail)
except Exception as exc:
    report_test('asr_dummy', 'SKIP', f'Whisper/TTS no disponible: {exc}')

try:
    question = '¿Qué es la gravedad?'
    from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
    import torch
    load_kwargs = {}
    if torch.cuda.is_available():
        load_kwargs.update({'device_map': 'auto', 'torch_dtype': torch.float16})
    tokenizer = AutoTokenizer.from_pretrained('Qwen/Qwen2.5-3B-Instruct')
    model = AutoModelForCausalLM.from_pretrained('Qwen/Qwen2.5-3B-Instruct', **load_kwargs)
    inputs = tokenizer(question, return_tensors='pt').to(model.device)
    output = model.generate(**inputs, max_new_tokens=120, temperature=0.7, top_p=0.9)
    response = tokenizer.decode(output[0], skip_special_tokens=True)
    word_count = len(response.split())
    status = 'OK' if 5 <= word_count <= 200 else 'FAIL'
    report_test('llm_smoke', status, f'{word_count} palabras')
except Exception as exc:
    try:
        text_gen = pipeline('text2text-generation', model='google/flan-t5-base')
        response = text_gen(question)[0]['generated_text']
        word_count = len(response.split())
        status = 'OK' if 5 <= word_count <= 200 else 'FAIL'
        report_test('llm_smoke', status, f'Fallback {word_count} palabras')
    except Exception as exc_fb:
        report_test('llm_smoke', 'SKIP', f'No fue posible cargar LLM: {exc_fb}')

try:
    text = 'Prueba de voz clonada.'
    base_path = os.path.join(paths['outputs'], 'tts_smoke.wav')
    from TTS.api import TTS as TTS_API
    tts_tmp = TTS_API('tts_models/es/css10/vits', gpu=torch.cuda.is_available(), progress_bar=False)
    tts_tmp.tts_to_file(text=text, file_path=base_path)
    duration = ffprobe_duration(base_path)
    status = 'OK' if duration > 0.2 else 'FAIL'
    report_test('tts_smoke', status, f'duración={duration:.2f}s')
except Exception as exc:
    report_test('tts_smoke', 'SKIP', str(exc))

try:
    print('Iniciando test voluntario de grabación. Puedes omitirlo dejando pasar 30 s.')
    start = time.time()
    try:
        record_audio(out_wav=os.path.join(paths['audios'], 'test_record.wav'), sr=16000, autoplay=False, timeslice_ms=300)
        report_test('record_audio_manual', 'OK', 'Grabación disponible')
    except Exception as exc:
        if time.time() - start > 30:
            report_test('record_audio_manual', 'SKIP', f'Sin interacción del usuario: {exc}')
        else:
            report_test('record_audio_manual', 'FAIL', str(exc))
except Exception as exc:
    report_test('record_audio_manual', 'SKIP', f'Entorno sin soporte de navegador: {exc}')

print('[Celda 9] Resumen de tests:')
print(json.dumps(results_tests, indent=2, ensure_ascii=False))


In [None]:
# Celda 10 - Troubleshooting interactivo
from IPython.display import Markdown, display, Javascript

checklist = """
### Checklist rápido
- ¿El navegador concedió permisos al micrófono?
- ¿Algún bloqueador de pop-ups o extensiones está interfiriendo?
- ¿`timeslice_ms` es muy grande? Prueba valores de 200–400 ms.
- Reintenta en Google Chrome estable si usas otro navegador.
- Si `MediaRecorder` falla con `audio/webm;codecs=opus`, el código ya intenta un fallback automático.
"""

reset_js = """
(() => {
  const existing = document.getElementById('recorder-box');
  if (existing) existing.remove();
  google.colab.kernel.invokeFunction('notebook.audio_error', ['Reset manual solicitado.'], {});
  console.log('Recorder UI reiniciada. Vuelve a ejecutar la celda de grabación.');
})();
"""

def reset_recorder():
    try:
        display(Javascript(reset_js))
    except Exception as exc:
        print(f'No fue posible reiniciar el grabador: {exc}')

display(Markdown(checklist))
print('[Celda 10] Usa reset_recorder() si necesitas reiniciar la UI del grabador.')


In [None]:
# Celda 11 - Conclusión y guía para exposición
from IPython.display import Markdown, display

summary_md = """
### Conclusiones
- El pipeline completo convierte audio de entrada en texto (Whisper), genera una respuesta corta (LLM) y sintetiza dos voces (clonada y base) para comparar.
- Las gráficas generadas muestran los tiempos estimados de cada módulo para facilitar el análisis de rendimiento.
- Los resultados consolidados, incluyendo rutas de archivos y métricas, se guardan en `results.json` dentro de la carpeta `outputs/`.

### Cómo exponer el demo
1. Muestra la voz de referencia y explica la clonación (escucha ambos audios).
2. Ejecuta la celda del pipeline end-to-end y enseña los tiempos en pantalla.
3. Usa las gráficas (`asr_metrics.png`, `llm_metrics.png`, `tts_tiempos.png`) para comentar diferencias.

### Próximos pasos sugeridos
- Probar LLMs más grandes o especializados para respuestas detalladas.
- Incluir diarización y supresión de ruido antes de Whisper para ambientes complejos.
- Explorar control de prosodia multilingüe en la síntesis clonada.
"""

display(Markdown(summary_md))
print('[Celda 11] Conclusión mostrada. ¡Listo para presentar!')
