# üéôÔ∏è VibeVoice Podcast Generation Server

This notebook runs the VibeVoice Realtime-0.5B model for podcast generation.

**Requirements:**
- GPU runtime (T4 or better)
- ~8GB GPU memory

**Usage:**
1. Run all cells in order
2. Copy the public URL from Cell 6

In [None]:
#@title 1Ô∏è‚É£ Install Dependencies (exact versions required)
import sys
import os

# CRITICAL: Install exact transformers version required by VibeVoice
!pip install -q transformers==4.51.3 accelerate==1.6.0
!pip install -q torch torchaudio diffusers fastapi uvicorn scipy

# Clone the VibeVoice repo
if not os.path.exists('/content/VibeVoice'):
    !git clone --depth 1 https://github.com/microsoft/VibeVoice.git /content/VibeVoice
    print("‚úÖ Cloned VibeVoice repo")
else:
    print("‚úÖ VibeVoice repo already exists")

# Add repo to Python path
sys.path.insert(0, '/content/VibeVoice')

# Verify import works
try:
    import transformers
    print(f"‚úÖ Transformers version: {transformers.__version__}")
    from vibevoice.modular.modeling_vibevoice_streaming_inference import VibeVoiceStreamingForConditionalGenerationInference
    print("‚úÖ VibeVoice module found!")
except ImportError as e:
    print(f"‚ö†Ô∏è Import failed: {e}")
    print("Running pip install for VibeVoice...")
    !pip install -q /content/VibeVoice

In [None]:
#@title 2Ô∏è‚É£ Check GPU & Setup Path
import sys
import torch

if '/content/VibeVoice' not in sys.path:
    sys.path.insert(0, '/content/VibeVoice')

print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")
else:
    print("‚ö†Ô∏è No GPU! Go to: Runtime ‚Üí Change runtime type ‚Üí T4 GPU")

In [None]:
#@title 3Ô∏è‚É£ Load VibeVoice Model
import torch
import copy
import sys

if '/content/VibeVoice' not in sys.path:
    sys.path.insert(0, '/content/VibeVoice')

from vibevoice.modular.modeling_vibevoice_streaming_inference import VibeVoiceStreamingForConditionalGenerationInference
from vibevoice.processor.vibevoice_streaming_processor import VibeVoiceStreamingProcessor

MODEL_PATH = "microsoft/VibeVoice-Realtime-0.5B"

print("Loading processor...")
processor = VibeVoiceStreamingProcessor.from_pretrained(MODEL_PATH)

print("Loading model (2-3 minutes)...")
try:
    model = VibeVoiceStreamingForConditionalGenerationInference.from_pretrained(
        MODEL_PATH,
        torch_dtype=torch.bfloat16,
        device_map="cuda",
        attn_implementation="flash_attention_2"
    )
    print("Using Flash Attention 2")
except:
    print("Flash attention not available, using sdpa...")
    model = VibeVoiceStreamingForConditionalGenerationInference.from_pretrained(
        MODEL_PATH,
        torch_dtype=torch.bfloat16,
        device_map="cuda",
        attn_implementation="sdpa"
    )
    
model.eval()
model.set_ddpm_inference_steps(num_steps=5)
print("‚úÖ Model loaded successfully!")

In [None]:
#@title 4Ô∏è‚É£ Load Voice Presets
import torch
from pathlib import Path

VOICES_DIR = Path("/content/VibeVoice/demo/voices/streaming_model")

voice_presets = {}

if VOICES_DIR.exists():
    print(f"Loading voices from: {VOICES_DIR}")
    for voice_file in sorted(VOICES_DIR.glob("en-*.pt")):
        name = voice_file.stem
        try:
            voice_presets[name] = torch.load(voice_file, map_location="cuda", weights_only=False)
            print(f"  ‚úÖ {name}")
        except Exception as e:
            print(f"  ‚ö†Ô∏è {name}: {e}")
else:
    print(f"‚ùå Voices directory not found: {VOICES_DIR}")

if voice_presets:
    print(f"\n‚úÖ Loaded {len(voice_presets)} voices!")
else:
    print("\n‚ö†Ô∏è No voices loaded - will use default")

In [None]:
#@title 5Ô∏è‚É£ Create FastAPI Server
from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional
import uuid
import scipy.io.wavfile as wavfile
import numpy as np
import re

app = FastAPI(title="VibeVoice Podcast API")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

class PodcastRequest(BaseModel):
    script: str
    title: Optional[str] = "podcast"
    speaker1_voice: Optional[str] = "en-Carter_man"
    speaker2_voice: Optional[str] = "en-Emma_woman"

@app.get("/")
def root():
    return {"status": "ok", "model": "VibeVoice-Realtime-0.5B", "type": "podcast"}

@app.get("/health")
def health():
    return {"status": "healthy"}

@app.get("/config")
def config():
    return {
        "voices": list(voice_presets.keys()) if voice_presets else ["default"],
        "default_voice": list(voice_presets.keys())[0] if voice_presets else "default"
    }

def parse_script(script: str):
    lines = script.strip().split("\n")
    parsed = []
    for line in lines:
        if not line.strip():
            continue
        match = re.match(r'^Speaker\s+(\d+)\s*:\s*(.*)$', line.strip(), re.IGNORECASE)
        if match:
            speaker_id = int(match.group(1))
            text = match.group(2).strip()
            if text:
                parsed.append((speaker_id, text))
    return parsed

def generate_speech(text: str, voice_name: str):
    voice = None
    if voice_presets:
        voice = voice_presets.get(voice_name, list(voice_presets.values())[0])
    
    if voice:
        inputs = processor.process_input_with_cached_prompt(
            text=text,
            cached_prompt=voice,
            padding=True,
            return_tensors="pt",
            return_attention_mask=True,
        )
    else:
        inputs = processor(
            text=text,
            padding=True,
            return_tensors="pt",
            return_attention_mask=True,
        )
    
    for k, v in inputs.items():
        if torch.is_tensor(v):
            inputs[k] = v.to("cuda")
    
    with torch.no_grad():
        outputs = model.generate(
            **inputs,
            max_new_tokens=None,
            cfg_scale=1.5,
            tokenizer=processor.tokenizer,
            generation_config={'do_sample': False},
            verbose=False,
            all_prefilled_outputs=copy.deepcopy(voice) if voice else None,
        )
    
    # Convert bfloat16 to float32 and flatten to 1D
    audio = outputs.speech_outputs[0].cpu().float().numpy()
    return audio.flatten()

@app.post("/generate-podcast")
async def generate_podcast(request: PodcastRequest):
    try:
        print(f"\nüéôÔ∏è Generating podcast ({len(request.script)} chars)...")
        
        segments = parse_script(request.script)
        if not segments:
            raise HTTPException(400, "No valid speaker lines. Use: Speaker 1: text")
        
        print(f"üìù Found {len(segments)} segments")
        
        voice_map = {1: request.speaker1_voice, 2: request.speaker2_voice}
        
        audio_segments = []
        for i, (speaker_id, text) in enumerate(segments):
            voice = voice_map.get(speaker_id, request.speaker1_voice)
            print(f"  [{i+1}/{len(segments)}] Speaker {speaker_id} ({voice}): {text[:40]}...")
            audio = generate_speech(text, voice)
            audio_segments.append(audio)
            # Add pause between segments (0.3s at 24kHz)
            audio_segments.append(np.zeros(int(24000 * 0.3), dtype=np.float32))
        
        full_audio = np.concatenate(audio_segments)
        
        # Normalize and convert to int16
        max_val = np.max(np.abs(full_audio))
        if max_val > 0:
            full_audio = full_audio / max_val
        full_audio = (full_audio * 32767).astype(np.int16)
        
        output_id = str(uuid.uuid4())[:8]
        output_path = f"/tmp/{request.title}_{output_id}.wav"
        wavfile.write(output_path, 24000, full_audio)
        
        print(f"‚úÖ Generated: {output_path} ({len(full_audio)/24000:.1f}s)")
        
        return FileResponse(output_path, media_type="audio/wav", filename=f"{request.title}.wav")
        
    except Exception as e:
        import traceback
        traceback.print_exc()
        raise HTTPException(500, str(e))

print("‚úÖ FastAPI server created!")

In [None]:
#@title 6Ô∏è‚É£ Start Server with Public URL
import subprocess
import threading
import time
import re

!wget -q https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-amd64 -O /tmp/cloudflared
!chmod +x /tmp/cloudflared

PORT = 8000

def run_uvicorn():
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=PORT, log_level="warning")

server_thread = threading.Thread(target=run_uvicorn, daemon=True)
server_thread.start()
time.sleep(3)

print("\nüîó Starting Cloudflare tunnel...\n")
process = subprocess.Popen(
    ["/tmp/cloudflared", "tunnel", "--url", f"http://localhost:{PORT}"],
    stdout=subprocess.PIPE,
    stderr=subprocess.STDOUT,
    text=True
)

for line in process.stdout:
    match = re.search(r'https://[\w-]+\.trycloudflare\.com', line)
    if match:
        public_url = match.group(0)
        print("="*60)
        print("üéôÔ∏è VibeVoice Podcast Server is ready!")
        print("="*60)
        print(f"\nüì° PUBLIC URL: {public_url}")
        print(f"\nüëÜ Copy this to your web app Settings!")
        print("\n" + "="*60)
        break

In [None]:
#@title 7Ô∏è‚É£ Test: Generate Sample Podcast
import requests
from IPython.display import Audio

test_script = """Speaker 1: Welcome! Today we're learning about text to speech.
Speaker 2: That sounds interesting! How does it work?
Speaker 1: AI models convert written text into natural sounding speech.
Speaker 2: Wow, that's amazing technology!"""

print("Generating test podcast...")
response = requests.post(
    f"http://localhost:{PORT}/generate-podcast",
    json={"script": test_script, "title": "test", "speaker1_voice": "en-Carter_man", "speaker2_voice": "en-Emma_woman"}
)

if response.status_code == 200:
    with open("/tmp/test.wav", "wb") as f:
        f.write(response.content)
    print("‚úÖ Success!")
    Audio("/tmp/test.wav")
else:
    print(f"‚ùå Error: {response.text}")