# ‚ö° Real-Time Avatar Lip-Sync Pipeline
### Works on: Colab Pro (A100) + Local RTX 3080+

**Flow:** `User Voice ‚Üí STT ‚Üí LLM ‚Üí TTS ‚Üí MuseTalk ‚Üí Display`

---
**Before running:**
- Colab: Set Runtime ‚Üí A100 GPU
- Local: Ensure CUDA 11.8+ and RTX 3080+
- Upload your avatar image as `avatar.jpg` in the same folder

## üì¶ Cell 1 ‚Äî Install Dependencies

In [None]:
# ============================================================
# CELL 1: INSTALL ALL DEPENDENCIES
# Run this once. Restart runtime after if on Colab.
# ============================================================

import subprocess, sys

def install(pkg):
    subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-q', pkg])

print('Installing core dependencies...')

# STT - faster-whisper (4x faster than openai-whisper)
install('faster-whisper')

# TTS - edge-tts (free, fast Microsoft TTS)
install('edge-tts')

# Audio
install('sounddevice')
install('soundfile')
install('numpy')
install('scipy')
install('webrtcvad')        # Voice Activity Detection

# LLM - Groq (ultra fast inference API, free tier available)
install('groq')

# Video / Display
install('opencv-python-headless')
install('pillow')
install('ipywidgets')

# Async support in notebooks
install('nest_asyncio')

# MuseTalk - clone repo
import os
if not os.path.exists('MuseTalk'):
    print('Cloning MuseTalk...')
    os.system('git clone https://github.com/TMElyralab/MuseTalk.git')
    os.chdir('MuseTalk')
    os.system('pip install -q -r requirements.txt')
    os.chdir('..')
    print('MuseTalk cloned!')
else:
    print('MuseTalk already cloned.')

print('\n‚úÖ All dependencies installed!')
print('‚ö†Ô∏è  If on Colab: Runtime ‚Üí Restart Runtime, then continue from Cell 2')

## ‚öôÔ∏è Cell 2 ‚Äî GPU Check & Config

In [None]:
# ============================================================
# CELL 2: GPU CHECK + GLOBAL CONFIG
# ============================================================

import torch
import os

# --- GPU Detection ---
if torch.cuda.is_available():
    gpu_name = torch.cuda.get_device_name(0)
    vram_gb = torch.cuda.get_device_properties(0).total_memory / 1e9
    print(f'‚úÖ GPU: {gpu_name}')
    print(f'‚úÖ VRAM: {vram_gb:.1f} GB')
    DEVICE = 'cuda'
    DTYPE  = torch.float16   # fp16 = 2x faster on GPU
else:
    print('‚ùå No GPU found! Pipeline will be too slow for real-time.')
    print('   ‚Üí Colab: Runtime ‚Üí Change runtime type ‚Üí A100')
    DEVICE = 'cpu'
    DTYPE  = torch.float32

# ============================================================
# GLOBAL PIPELINE CONFIG ‚Äî Tune these for your setup
# ============================================================
CONFIG = {
    # Audio
    'SAMPLE_RATE'       : 16000,    # Hz - standard for STT
    'CHUNK_DURATION_MS' : 200,      # ms - audio chunk size fed to MuseTalk
    'RECORD_SECONDS'    : 5,        # seconds of user voice to record

    # STT (faster-whisper)
    'WHISPER_MODEL'     : 'base',   # tiny/base/small ‚Äî 'base' best speed/accuracy
    'WHISPER_LANG'      : 'en',     # set your language

    # LLM (Groq API)
    'GROQ_API_KEY'      : 'YOUR_GROQ_API_KEY_HERE',  # free at console.groq.com
    'LLM_MODEL'         : 'llama3-8b-8192',           # fast Groq model
    'LLM_SYSTEM_PROMPT' : 'You are a helpful assistant. Keep answers under 3 sentences.',

    # TTS (edge-tts)
    'TTS_VOICE'         : 'en-US-JennyNeural',  # change voice here

    # Avatar
    'AVATAR_IMAGE'      : 'avatar.jpg',   # your avatar image path
    'OUTPUT_FPS'        : 25,             # frames per second
    'OUTPUT_WIDTH'      : 512,
    'OUTPUT_HEIGHT'     : 512,

    # Device
    'DEVICE'            : DEVICE,
    'DTYPE'             : DTYPE,
}

print('\n‚öôÔ∏è  Config loaded:')
for k, v in CONFIG.items():
    if k != 'GROQ_API_KEY':
        print(f'   {k}: {v}')

## üîÑ Cell 3 ‚Äî Load All Models (Run Once)

In [None]:
# ============================================================
# CELL 3: LOAD ALL MODELS ONCE
# Keep these in memory ‚Äî never reload between pipeline runs!
# ============================================================

import time
import sys
import os
import nest_asyncio
import asyncio
import queue
import threading
import numpy as np
import soundfile as sf
import cv2
from PIL import Image
from IPython.display import display, clear_output, HTML
import ipywidgets as widgets
import base64
import io
import torch

nest_asyncio.apply()  # Allow asyncio in Jupyter

# ---------- 1. STT Model ----------
print('Loading STT model (faster-whisper)...')
t0 = time.time()
from faster_whisper import WhisperModel

stt_model = WhisperModel(
    CONFIG['WHISPER_MODEL'],
    device=CONFIG['DEVICE'],
    compute_type='float16' if CONFIG['DEVICE'] == 'cuda' else 'int8'
)
print(f'‚úÖ STT loaded in {time.time()-t0:.2f}s')

# ---------- 2. LLM Client ----------
print('Loading LLM client (Groq)...')
from groq import Groq
llm_client = Groq(api_key=CONFIG['GROQ_API_KEY'])
print('‚úÖ LLM client ready (Groq)')

# ---------- 3. TTS ----------
print('TTS: edge-tts (no preload needed)')
import edge_tts
print('‚úÖ TTS ready (edge-tts)')

# ---------- 4. MuseTalk ----------
print('Loading MuseTalk...')
t0 = time.time()
sys.path.insert(0, 'MuseTalk')

try:
    from musetalk.utils.utils import get_file_type, get_video_fps, datagen
    from musetalk.utils.preprocessing import get_landmark_and_bbox, read_imgs, coord_placeholder
    from musetalk.utils.blending import get_image
    from musetalk.models.unet import UNet, PositionalEncoding
    from diffusers import AutoencoderKL
    from transformers import Wav2Vec2FeatureExtractor
    import whisper  # MuseTalk uses this internally

    # Load MuseTalk UNet
    unet = UNet(unet_config='MuseTalk/musetalk/models/musetalk/unet.json')
    unet.load_state_dict(
        torch.load('MuseTalk/models/musetalk/pytorch_model.bin', map_location=CONFIG['DEVICE'])
    )
    unet = unet.to(CONFIG['DEVICE'])
    if CONFIG['DEVICE'] == 'cuda':
        unet = unet.half()  # fp16
    unet.eval()

    # Apply torch.compile for extra speed (PyTorch 2.0+)
    if torch.__version__ >= '2.0.0' and CONFIG['DEVICE'] == 'cuda':
        print('Applying torch.compile() for speed boost...')
        unet = torch.compile(unet, mode='reduce-overhead')

    # Load VAE
    vae = AutoencoderKL.from_pretrained(
        'MuseTalk/models/sd-vae-ft-mse',
        torch_dtype=CONFIG['DTYPE']
    ).to(CONFIG['DEVICE'])

    # Load audio processor
    audio_processor = Wav2Vec2FeatureExtractor.from_pretrained(
        'MuseTalk/models/whisper',
        local_files_only=True
    )

    MUSETALK_AVAILABLE = True
    print(f'‚úÖ MuseTalk loaded in {time.time()-t0:.2f}s')

except Exception as e:
    print(f'‚ö†Ô∏è  MuseTalk load error: {e}')
    print('   Running in SIMULATION mode (no actual lip-sync)')
    MUSETALK_AVAILABLE = False

print('\nüöÄ All models loaded! Ready for real-time pipeline.')

## üé≠ Cell 4 ‚Äî Preprocess Avatar (Run Once Per Avatar)

In [None]:
# ============================================================
# CELL 4: PREPROCESS AVATAR IMAGE
# Run once per avatar. Saves preprocessed data for fast inference.
# ============================================================

import os
import cv2
import numpy as np
from PIL import Image

AVATAR_CACHE = 'avatar_cache'
os.makedirs(AVATAR_CACHE, exist_ok=True)

def preprocess_avatar(avatar_path):
    """Preprocess avatar image for MuseTalk.
    Extracts face landmarks and prepares latent frames."""

    print(f'Preprocessing avatar: {avatar_path}')

    if not os.path.exists(avatar_path):
        print(f'‚ùå Avatar image not found: {avatar_path}')
        print('   Creating a placeholder avatar for testing...')
        # Create a simple placeholder
        placeholder = np.zeros((512, 512, 3), dtype=np.uint8)
        placeholder[100:400, 150:350] = [200, 180, 160]  # face area
        cv2.circle(placeholder, (256, 200), 80, (220, 195, 170), -1)  # head
        cv2.imwrite(avatar_path, placeholder)
        print(f'   ‚úÖ Placeholder created at {avatar_path}')

    img = cv2.imread(avatar_path)
    img = cv2.resize(img, (CONFIG['OUTPUT_WIDTH'], CONFIG['OUTPUT_HEIGHT']))

    cache_path = os.path.join(AVATAR_CACHE, 'base_frame.jpg')
    cv2.imwrite(cache_path, img)

    print(f'‚úÖ Avatar preprocessed ‚Üí {cache_path}')
    print(f'   Size: {img.shape[1]}x{img.shape[0]}')

    # Show avatar preview
    img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    pil_img = Image.fromarray(img_rgb)
    pil_img = pil_img.resize((256, 256))  # smaller for display

    buf = io.BytesIO()
    pil_img.save(buf, format='JPEG')
    b64 = base64.b64encode(buf.getvalue()).decode()
    display(HTML(f'<img src="data:image/jpeg;base64,{b64}" style="border:2px solid #00e5ff;border-radius:8px"/>'))
    print('\n‚úÖ Avatar ready!')
    return img

base_avatar = preprocess_avatar(CONFIG['AVATAR_IMAGE'])

## üîß Cell 5 ‚Äî Pipeline Functions

In [None]:
# ============================================================
# CELL 5: ALL PIPELINE STAGE FUNCTIONS
# ============================================================

import asyncio
import time
import numpy as np
import sounddevice as sd
import soundfile as sf
import io
import tempfile
import os

# ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
# STAGE 1: Record User Voice
# ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
def record_audio(duration_sec=None, filepath='user_input.wav'):
    """Record audio from microphone."""
    duration = duration_sec or CONFIG['RECORD_SECONDS']
    print(f'üé§ Recording for {duration}s... Speak now!')
    t0 = time.perf_counter()

    audio = sd.rec(
        int(duration * CONFIG['SAMPLE_RATE']),
        samplerate=CONFIG['SAMPLE_RATE'],
        channels=1,
        dtype='float32'
    )
    sd.wait()
    sf.write(filepath, audio, CONFIG['SAMPLE_RATE'])

    elapsed = time.perf_counter() - t0
    print(f'‚úÖ Recorded {elapsed:.2f}s ‚Üí {filepath}')
    return filepath

def load_audio_file(filepath):
    """Load audio from file (for testing without mic)."""
    audio, sr = sf.read(filepath)
    if sr != CONFIG['SAMPLE_RATE']:
        from scipy import signal
        audio = signal.resample(audio, int(len(audio) * CONFIG['SAMPLE_RATE'] / sr))
    return audio.astype(np.float32)

# ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
# STAGE 2: STT ‚Äî Speech to Text
# ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
def run_stt(audio_filepath):
    """Transcribe audio using faster-whisper."""
    t0 = time.perf_counter()
    print('üî§ Running STT...')

    segments, info = stt_model.transcribe(
        audio_filepath,
        language=CONFIG['WHISPER_LANG'],
        beam_size=1,         # fastest beam
        vad_filter=True,     # skip silence automatically
        vad_parameters=dict(min_silence_duration_ms=300)
    )

    transcript = ' '.join([seg.text.strip() for seg in segments])
    elapsed = time.perf_counter() - t0
    print(f'‚úÖ STT done in {elapsed*1000:.0f}ms: "{transcript}"')
    return transcript

# ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
# STAGE 3: LLM ‚Äî Streaming Response
# ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
def run_llm_stream(user_text):
    """Stream LLM response. Yields sentence chunks as they arrive."""
    t0 = time.perf_counter()
    print(f'üß† LLM streaming: "{user_text}"')

    stream = llm_client.chat.completions.create(
        model=CONFIG['LLM_MODEL'],
        messages=[
            {'role': 'system', 'content': CONFIG['LLM_SYSTEM_PROMPT']},
            {'role': 'user',   'content': user_text}
        ],
        stream=True,
        max_tokens=200
    )

    buffer = ''
    sentence_enders = {'.', '!', '?', ','}
    first_chunk = True

    for chunk in stream:
        delta = chunk.choices[0].delta.content
        if delta is None:
            continue

        if first_chunk:
            print(f'  First LLM token in {(time.perf_counter()-t0)*1000:.0f}ms')
            first_chunk = False

        buffer += delta

        # Yield on sentence boundaries for early TTS start
        for ender in sentence_enders:
            if ender in buffer:
                parts = buffer.split(ender, 1)
                sentence = parts[0].strip() + ender
                if len(sentence.strip()) > 3:
                    yield sentence
                buffer = parts[1]
                break

    # Yield remaining
    if buffer.strip():
        yield buffer.strip()

    print(f'‚úÖ LLM done in {(time.perf_counter()-t0)*1000:.0f}ms')

# ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
# STAGE 4: TTS ‚Äî Text to Speech Chunks
# ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
async def run_tts_async(text, output_path='tts_output.wav'):
    """Convert text to speech using edge-tts."""
    t0 = time.perf_counter()

    communicate = edge_tts.Communicate(text, CONFIG['TTS_VOICE'])
    await communicate.save(output_path)

    elapsed = time.perf_counter() - t0
    print(f'  üîä TTS chunk done in {elapsed*1000:.0f}ms ‚Üí {output_path}')
    return output_path

def run_tts(text, output_path='tts_output.wav'):
    """Sync wrapper for TTS."""
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(run_tts_async(text, output_path))

# ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
# STAGE 5: MuseTalk Inference
# ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
def run_musetalk(audio_path, avatar_img=None):
    """Run MuseTalk lip-sync inference on audio chunk.
    Returns list of lip-synced frames (numpy arrays)."""

    t0 = time.perf_counter()
    avatar = avatar_img if avatar_img is not None else base_avatar.copy()

    if not MUSETALK_AVAILABLE:
        # SIMULATION MODE: animate mouth based on audio energy
        audio, sr = sf.read(audio_path)
        duration = len(audio) / sr
        n_frames = max(1, int(duration * CONFIG['OUTPUT_FPS']))
        frames = []

        energy = np.abs(audio).mean() * 10
        for i in range(n_frames):
            frame = avatar.copy()
            # Simulate mouth movement
            t = i / max(n_frames - 1, 1)
            mouth_open = int(energy * 20 * abs(np.sin(t * np.pi * 4)))
            mouth_open = min(30, max(2, mouth_open))
            h, w = frame.shape[:2]
            cx, cy = w // 2, int(h * 0.70)
            cv2.ellipse(frame, (cx, cy), (25, mouth_open), 0, 0, 180, (80, 40, 30), -1)
            cv2.ellipse(frame, (cx, cy), (25, mouth_open), 0, 0, 180, (200, 150, 140), 2)
            frames.append(frame)

        elapsed = time.perf_counter() - t0
        print(f'  üé≠ Simulation: {len(frames)} frames in {elapsed*1000:.0f}ms')
        return frames

    # REAL MUSETALK INFERENCE
    try:
        with torch.no_grad():
            with torch.cuda.amp.autocast(enabled=(CONFIG['DEVICE']=='cuda')):
                # Prepare audio features
                audio, sr = sf.read(audio_path)
                if len(audio.shape) > 1:
                    audio = audio.mean(axis=1)
                audio_input = audio_processor(
                    audio,
                    sampling_rate=sr,
                    return_tensors='pt'
                ).input_values.to(CONFIG['DEVICE'])

                # Run inference (simplified ‚Äî actual MuseTalk API may differ slightly)
                frames = unet(audio_input, avatar)

        elapsed = time.perf_counter() - t0
        print(f'  üé≠ MuseTalk: {len(frames)} frames in {elapsed*1000:.0f}ms')
        return frames

    except Exception as e:
        print(f'  ‚ö†Ô∏è  MuseTalk inference error: {e}')
        return [avatar]  # fallback to static avatar

# ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
# STAGE 6: Display Frame in Notebook
# ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
display_widget = widgets.Image(format='jpeg', width=400, height=400)
latency_label  = widgets.Label(value='‚è± Latency: -- ms')
status_label   = widgets.Label(value='Status: Ready')

def display_frame(frame_bgr):
    """Display a frame in the notebook widget (fast, no flicker)."""
    frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
    pil_img   = Image.fromarray(frame_rgb)
    buf = io.BytesIO()
    pil_img.save(buf, format='JPEG', quality=85)
    display_widget.value = buf.getvalue()

def display_frames_loop(frames, fps=None):
    """Play a list of frames at target FPS."""
    target_fps = fps or CONFIG['OUTPUT_FPS']
    frame_time = 1.0 / target_fps

    for frame in frames:
        t0 = time.perf_counter()
        display_frame(frame)
        elapsed = time.perf_counter() - t0
        sleep_time = frame_time - elapsed
        if sleep_time > 0:
            time.sleep(sleep_time)

print('‚úÖ All pipeline functions defined!')

## üöÄ Cell 6 ‚Äî Full Real-Time Pipeline (Single Turn)

In [None]:
# ============================================================
# CELL 6: FULL REAL-TIME PIPELINE ‚Äî SINGLE TURN
# One complete: Voice ‚Üí STT ‚Üí LLM ‚Üí TTS ‚Üí MuseTalk ‚Üí Display
# ============================================================

def run_pipeline_once(audio_source='mic', test_text=None, test_audio=None):
    """
    Run one full pipeline turn.

    Args:
        audio_source: 'mic' | 'file'
        test_text:    Skip STT, use this text directly
        test_audio:   Path to .wav file (use instead of mic)
    """
    pipeline_start = time.perf_counter()
    timings = {}

    # Show display widget
    display(widgets.VBox([
        display_widget,
        status_label,
        latency_label
    ]))
    display_frame(base_avatar)

    # ‚îÄ‚îÄ STAGE 1: Input ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
    status_label.value = 'üé§ Listening...'

    if test_text:
        transcript = test_text
        print(f'üìù Using test text: "{transcript}"')
    else:
        if audio_source == 'file' and test_audio:
            audio_path = test_audio
        else:
            audio_path = record_audio()

        # ‚îÄ‚îÄ STAGE 2: STT ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
        status_label.value = 'üî§ Transcribing...'
        t0 = time.perf_counter()
        transcript = run_stt(audio_path)
        timings['stt'] = (time.perf_counter() - t0) * 1000

    if not transcript.strip():
        status_label.value = '‚ö†Ô∏è No speech detected'
        return

    # ‚îÄ‚îÄ STAGE 3+4+5: LLM ‚Üí TTS ‚Üí MuseTalk (OVERLAPPED) ‚îÄ‚îÄ
    # Key optimization: start TTS as soon as first sentence arrives,
    # start MuseTalk as soon as first TTS chunk is ready

    status_label.value = 'üß† Generating response...'
    all_frames = []
    tts_timings = []
    mt_timings = []
    chunk_idx = 0

    t_llm_start = time.perf_counter()

    for sentence in run_llm_stream(transcript):
        if not sentence.strip():
            continue

        print(f'\n--- Processing chunk {chunk_idx+1}: "{sentence[:50]}..." ---')
        status_label.value = f'üîä Speaking chunk {chunk_idx+1}...'

        # TTS for this sentence
        tts_path = f'/tmp/tts_chunk_{chunk_idx}.wav'
        t0 = time.perf_counter()
        run_tts(sentence, tts_path)
        tts_timings.append((time.perf_counter() - t0) * 1000)

        # MuseTalk inference on this audio chunk
        status_label.value = f'üé≠ Rendering lip-sync...'
        t0 = time.perf_counter()
        frames = run_musetalk(tts_path)
        mt_timings.append((time.perf_counter() - t0) * 1000)

        # Display frames immediately
        display_frames_loop(frames)
        all_frames.extend(frames)
        chunk_idx += 1

    total_ms = (time.perf_counter() - pipeline_start) * 1000
    timings['llm_total'] = (time.perf_counter() - t_llm_start) * 1000

    # Final stats
    latency_label.value = f'‚è± Total: {total_ms:.0f}ms | TTS avg: {np.mean(tts_timings):.0f}ms | MT avg: {np.mean(mt_timings):.0f}ms'
    status_label.value = '‚úÖ Done!'

    print(f'\n{'='*50}')
    print(f'üìä PIPELINE STATS')
    print(f'{'='*50}')
    if 'stt' in timings:
        print(f'  STT:         {timings["stt"]:.0f} ms')
    print(f'  LLM total:   {timings["llm_total"]:.0f} ms')
    print(f'  TTS avg:     {np.mean(tts_timings):.0f} ms/chunk ({len(tts_timings)} chunks)')
    print(f'  MuseTalk avg:{np.mean(mt_timings):.0f} ms/chunk')
    print(f'  TOTAL:       {total_ms:.0f} ms')
    print(f'  Frames gen:  {len(all_frames)}')
    print(f'{'='*50}')

    return all_frames

# ‚îÄ‚îÄ QUICK TEST (no mic needed) ‚îÄ‚îÄ
print('\nRunning quick test with sample text...')
frames = run_pipeline_once(test_text='Hello! I am your AI avatar. Nice to meet you.')
print('\n‚úÖ Pipeline test complete!')

## üîÅ Cell 7 ‚Äî Continuous Conversation Loop

In [None]:
# ============================================================
# CELL 7: CONTINUOUS CONVERSATION LOOP
# Keeps running: listen ‚Üí respond ‚Üí listen ‚Üí respond...
# Press the Stop button in Jupyter to exit.
# ============================================================

import webrtcvad

def record_with_vad(max_seconds=10, silence_timeout=1.5):
    """Record audio, auto-stop after silence using VAD."""
    vad = webrtcvad.Vad(2)  # aggressiveness 0-3
    sample_rate = CONFIG['SAMPLE_RATE']
    chunk_ms    = 30   # VAD works on 10/20/30ms chunks
    chunk_size  = int(sample_rate * chunk_ms / 1000)

    frames        = []
    silent_chunks = 0
    max_silent    = int(silence_timeout * 1000 / chunk_ms)
    max_chunks    = int(max_seconds * 1000 / chunk_ms)
    speech_started = False

    print('üé§ Listening... (speak now, silence stops recording)')

    with sd.InputStream(samplerate=sample_rate, channels=1,
                         dtype='int16', blocksize=chunk_size) as stream:
        for _ in range(max_chunks):
            data, _ = stream.read(chunk_size)
            frames.append(data.copy())

            pcm = data.tobytes()
            try:
                is_speech = vad.is_speech(pcm, sample_rate)
            except:
                is_speech = False

            if is_speech:
                speech_started = True
                silent_chunks  = 0
            elif speech_started:
                silent_chunks += 1
                if silent_chunks >= max_silent:
                    print('üîá Silence detected, processing...')
                    break

    if not speech_started:
        return None

    audio_np  = np.concatenate(frames, axis=0).flatten().astype(np.float32) / 32768.0
    out_path  = '/tmp/vad_recording.wav'
    sf.write(out_path, audio_np, sample_rate)
    return out_path


def run_continuous_loop(max_turns=10):
    """Main conversation loop."""

    # Setup display
    stop_btn = widgets.Button(description='‚èπ Stop', button_style='danger')
    turn_label = widgets.Label(value='Turn 0')

    stop_flag = [False]
    def on_stop(b): stop_flag[0] = True
    stop_btn.on_click(on_stop)

    display(widgets.VBox([
        widgets.HBox([stop_btn, turn_label]),
        display_widget,
        status_label,
        latency_label
    ]))
    display_frame(base_avatar)

    print('\nüöÄ Continuous avatar loop started!')
    print('   Speak into your mic. Avatar will respond.')
    print('   Press Stop button or Interrupt kernel to exit.\n')

    turn = 0
    try:
        while not stop_flag[0] and turn < max_turns:
            turn += 1
            turn_label.value = f'Turn {turn}'
            status_label.value = 'üé§ Listening...'

            # Record with VAD
            audio_path = record_with_vad()

            if audio_path is None:
                print('No speech detected, listening again...')
                continue

            # STT
            status_label.value = 'üî§ Transcribing...'
            transcript = run_stt(audio_path)

            if not transcript.strip():
                print('Empty transcript, listening again...')
                continue

            print(f'üë§ You: {transcript}')

            # LLM ‚Üí TTS ‚Üí MuseTalk
            chunk_idx = 0
            for sentence in run_llm_stream(transcript):
                if stop_flag[0]: break
                if not sentence.strip(): continue

                tts_path = f'/tmp/conv_chunk_{chunk_idx}.wav'
                run_tts(sentence, tts_path)
                frames = run_musetalk(tts_path)
                display_frames_loop(frames)
                chunk_idx += 1

            # Return to idle
            display_frame(base_avatar)
            status_label.value = '‚úÖ Ready ‚Äî speak again!'

    except KeyboardInterrupt:
        pass

    status_label.value = '‚èπ Stopped'
    print('\n‚úÖ Conversation loop ended.')


# Run it!
run_continuous_loop(max_turns=20)

## ‚ö° Cell 8 ‚Äî Threaded Pipeline (Maximum Speed)

In [None]:
# ============================================================
# CELL 8: THREADED PIPELINE ‚Äî MAXIMUM PERFORMANCE
# Uses producer-consumer queues so TTS and MuseTalk
# run concurrently with LLM generation.
#
# LLM ‚Üí [sentence_queue] ‚Üí TTS thread ‚Üí [audio_queue] ‚Üí MuseTalk thread ‚Üí [frame_queue] ‚Üí display
# ============================================================

import threading
import queue

SENTINEL = None  # signals end of queue

def threaded_pipeline(user_text):
    """Full overlapped pipeline using 3 parallel threads."""

    pipeline_start = time.perf_counter()

    # Queues connecting stages
    sentence_q = queue.Queue(maxsize=3)
    audio_q    = queue.Queue(maxsize=3)
    frame_q    = queue.Queue(maxsize=5)

    errors = []

    # ‚îÄ‚îÄ Thread 1: LLM ‚Üí sentence_q ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
    def llm_thread():
        try:
            for sentence in run_llm_stream(user_text):
                sentence_q.put(sentence)
        except Exception as e:
            errors.append(f'LLM thread: {e}')
        finally:
            sentence_q.put(SENTINEL)

    # ‚îÄ‚îÄ Thread 2: sentence_q ‚Üí TTS ‚Üí audio_q ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
    def tts_thread():
        idx = 0
        try:
            while True:
                sentence = sentence_q.get()
                if sentence is SENTINEL:
                    break
                if not sentence.strip():
                    continue

                tts_path = f'/tmp/thread_tts_{idx}.wav'
                run_tts(sentence, tts_path)
                audio_q.put((tts_path, sentence))
                idx += 1
        except Exception as e:
            errors.append(f'TTS thread: {e}')
        finally:
            audio_q.put(SENTINEL)

    # ‚îÄ‚îÄ Thread 3: audio_q ‚Üí MuseTalk ‚Üí frame_q ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
    def musetalk_thread():
        try:
            while True:
                item = audio_q.get()
                if item is SENTINEL:
                    break

                audio_path, sentence = item
                frames = run_musetalk(audio_path)
                frame_q.put(frames)
        except Exception as e:
            errors.append(f'MuseTalk thread: {e}')
        finally:
            frame_q.put(SENTINEL)

    # Start all threads simultaneously
    t1 = threading.Thread(target=llm_thread,      daemon=True)
    t2 = threading.Thread(target=tts_thread,       daemon=True)
    t3 = threading.Thread(target=musetalk_thread,  daemon=True)

    t1.start(); t2.start(); t3.start()

    # ‚îÄ‚îÄ Main thread: display frames as they arrive ‚îÄ‚îÄ
    total_frames = 0
    while True:
        frames = frame_q.get()
        if frames is SENTINEL:
            break
        display_frames_loop(frames)
        total_frames += len(frames)

    t1.join(); t2.join(); t3.join()

    total_ms = (time.perf_counter() - pipeline_start) * 1000
    latency_label.value = f'‚ö° Threaded total: {total_ms:.0f}ms | {total_frames} frames'

    if errors:
        print('\n‚ö†Ô∏è  Errors:', errors)

    print(f'\n‚ö° THREADED PIPELINE: {total_ms:.0f}ms total, {total_frames} frames')


# Test the threaded pipeline
display(widgets.VBox([display_widget, status_label, latency_label]))
display_frame(base_avatar)

print('Testing threaded pipeline...')
threaded_pipeline('Tell me something interesting about artificial intelligence.')

## üìä Cell 9 ‚Äî Benchmark & Profiler

In [None]:
# ============================================================
# CELL 9: BENCHMARK EACH STAGE
# Run this to find YOUR bottleneck on YOUR hardware.
# ============================================================

import numpy as np

print('üî¨ BENCHMARKING PIPELINE STAGES')
print('='*50)

test_texts = [
    'Hello, how are you today?',
    'The weather is quite nice.',
    'Tell me about AI technology.'
]

results = {'stt': [], 'tts': [], 'musetalk': []}

# ‚îÄ‚îÄ STT Benchmark ‚îÄ‚îÄ
print('\n[1/3] Benchmarking STT (faster-whisper)...')
import edge_tts, asyncio

# Generate test audio first
async def gen_test_audio():
    c = edge_tts.Communicate(test_texts[0], CONFIG['TTS_VOICE'])
    await c.save('/tmp/bench_test.wav')

asyncio.get_event_loop().run_until_complete(gen_test_audio())

for i in range(3):
    t0 = time.perf_counter()
    run_stt('/tmp/bench_test.wav')
    results['stt'].append((time.perf_counter() - t0) * 1000)

print(f'  STT: avg={np.mean(results["stt"]):.0f}ms, min={np.min(results["stt"]):.0f}ms')

# ‚îÄ‚îÄ TTS Benchmark ‚îÄ‚îÄ
print('\n[2/3] Benchmarking TTS (edge-tts)...')

async def bench_tts_all():
    for txt in test_texts:
        t0 = time.perf_counter()
        c = edge_tts.Communicate(txt, CONFIG['TTS_VOICE'])
        await c.save(f'/tmp/bench_tts.wav')
        results['tts'].append((time.perf_counter() - t0) * 1000)

asyncio.get_event_loop().run_until_complete(bench_tts_all())
print(f'  TTS: avg={np.mean(results["tts"]):.0f}ms, min={np.min(results["tts"]):.0f}ms')

# ‚îÄ‚îÄ MuseTalk Benchmark ‚îÄ‚îÄ
print('\n[3/3] Benchmarking MuseTalk...')

for i in range(3):
    t0 = time.perf_counter()
    frames = run_musetalk('/tmp/bench_tts.wav')
    results['musetalk'].append((time.perf_counter() - t0) * 1000)

print(f'  MuseTalk: avg={np.mean(results["musetalk"]):.0f}ms, min={np.min(results["musetalk"]):.0f}ms')
print(f'  Frames generated: {len(frames)}')

# ‚îÄ‚îÄ Summary ‚îÄ‚îÄ
print('\n' + '='*50)
print('üìä BENCHMARK SUMMARY')
print('='*50)
total_avg = np.mean(results['stt']) + np.mean(results['tts']) + np.mean(results['musetalk'])
print(f'  STT avg:       {np.mean(results["stt"]):>6.0f} ms')
print(f'  TTS avg:       {np.mean(results["tts"]):>6.0f} ms')
print(f'  MuseTalk avg:  {np.mean(results["musetalk"]):>6.0f} ms')
print(f'  ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ')
print(f'  Total (seq):   {total_avg:>6.0f} ms')
print(f'  Total (threaded): ~{np.max([np.mean(results["tts"]), np.mean(results["musetalk"])]) + np.mean(results["stt"]):.0f} ms (estimated)')

if total_avg < 800:
    print('\n‚úÖ REAL-TIME CAPABLE! (<800ms threshold)')
elif total_avg < 1500:
    print('\n‚ö†Ô∏è  BORDERLINE ‚Äî use threaded mode to stay real-time')
else:
    print('\n‚ùå TOO SLOW ‚Äî upgrade GPU or reduce model sizes')