# Voice-Cloned Dubbing with Qwen3-TTS

This notebook synthesizes dubbed audio from a translated transcript using Qwen3-TTS voice cloning.

**Requirements:** GPU runtime (T4 or better)

## 1. Setup Environment

In [None]:
# Install uv package manager
!curl -LsSf https://astral.sh/uv/install.sh | sh
import os
os.environ['PATH'] = f"/root/.local/bin:{os.environ['PATH']}"

In [None]:
# Clone the repository
!git clone https://github.com/pherber3/translate-dub.git
%cd translate-dub

# Clone Qwen3-TTS (dependency)
!git clone https://github.com/QwenLM/Qwen3-TTS.git

In [None]:
# Use CUDA config and sync dependencies
!cp pyproject.cuda.toml pyproject.toml
!uv sync

In [None]:
# Verify GPU is available
!uv run python -c "import torch; print(f'CUDA: {torch.cuda.is_available()}, Device: {torch.cuda.get_device_name(0) if torch.cuda.is_available() else None}')"

## 2. Load Model and Data

In [None]:
import json
import time
from pathlib import Path

import torch
import numpy as np
import soundfile as sf
import librosa
from IPython.display import Audio, display, HTML

from qwen_tts import Qwen3TTSModel

# Config
TRANSCRIPT_PATH = "data/longform_audio/french_conversation_example_speaker_clips/french_conversation_example_en.json"
CLIPS_PATH = "data/longform_audio/french_conversation_example_speaker_clips/clips_metadata.json"
OUTPUT_DIR = Path("output/dubbed")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
(OUTPUT_DIR / "segments").mkdir(exist_ok=True)

MODEL_NAME = "Qwen/Qwen3-TTS-12Hz-1.7B-Base"
DEVICE = "cuda:0"
TEMPERATURE = 0.7
USE_REF_TEXT = True  # ICL mode for better voice matching
SAMPLE_RATE = 24000

In [None]:
# Load transcript and clips metadata
with open(TRANSCRIPT_PATH) as f:
    transcript = json.load(f)

with open(CLIPS_PATH) as f:
    clips_meta = json.load(f)

segments = transcript['segments']
target_lang = transcript.get('translation', {}).get('target_language', 'en')

# Map language codes
lang_map = {'en': 'English', 'fr': 'French', 'de': 'German', 'es': 'Spanish', 'it': 'Italian', 'pt': 'Portuguese', 'zh': 'Chinese', 'ja': 'Japanese', 'ko': 'Korean', 'ru': 'Russian'}
lang_name = lang_map.get(target_lang, 'Auto')

# Get best reference clip per speaker
speaker_refs = {}
for clip in clips_meta['clips']:
    speaker = clip['speaker']
    if speaker not in speaker_refs or clip['rank'] < speaker_refs[speaker]['rank']:
        speaker_refs[speaker] = clip

print(f"Transcript: {len(segments)} segments")
print(f"Target language: {lang_name}")
print(f"Speakers: {list(speaker_refs.keys())}")

In [None]:
# Load TTS model
print(f"Loading model: {MODEL_NAME}")
load_start = time.perf_counter()

tts_model = Qwen3TTSModel.from_pretrained(
    MODEL_NAME,
    device_map=DEVICE,
    dtype=torch.bfloat16,
    attn_implementation="flash_attention_2",
)

print(f"Model loaded in {time.perf_counter() - load_start:.1f}s")

In [None]:
# Create voice clone prompts for each speaker
print("Creating voice clone prompts...")
speaker_prompts = {}

for speaker, ref in speaker_refs.items():
    ref_path = Path(ref['file'])
    if not ref_path.exists():
        ref_path = Path(CLIPS_PATH).parent / ref_path.name
    
    ref_text = ref['text'] if USE_REF_TEXT else None
    prompt = tts_model.create_voice_clone_prompt(
        ref_audio=str(ref_path),
        ref_text=ref_text,
        x_vector_only_mode=not USE_REF_TEXT,
    )
    speaker_prompts[speaker] = prompt
    
    # Play the reference clip
    mode = "ICL" if USE_REF_TEXT else "x-vector"
    print(f"\nSpeaker {speaker} reference ({mode} mode):")
    print(f"  Text: {ref['text'][:60]}...")
    display(Audio(str(ref_path)))

## 3. Synthesize Segments

Each segment is generated and played inline so you can listen as it goes.

In [None]:
def trim_silence(audio, sr, top_db=25):
    """Trim leading and trailing silence."""
    trimmed, _ = librosa.effects.trim(audio, top_db=top_db)
    return trimmed

generated_segments = []
synth_start = time.perf_counter()

# Optional: limit segments for testing
LIMIT = None  # Set to e.g. 5 to only process first 5 segments
process_segments = segments[:LIMIT] if LIMIT else segments

print(f"Synthesizing {len(process_segments)} segments...")
print("=" * 60)

In [None]:
for i, seg in enumerate(process_segments):
    text = seg.get('text', '')
    original_text = seg.get('original_text', '')
    speaker = seg.get('speaker', 0)
    
    # Skip non-speech markers and empty text
    if text.startswith('[') and text.endswith(']'):
        print(f"[{i+1}/{len(process_segments)}] Skipping: {text}")
        continue
    if not text.strip():
        print(f"[{i+1}/{len(process_segments)}] Skipping empty segment")
        continue
    if speaker not in speaker_prompts:
        print(f"[{i+1}/{len(process_segments)}] Skipping (no ref): Speaker {speaker}")
        continue
    
    # Generate
    seg_start = time.perf_counter()
    wavs, sr = tts_model.generate_voice_clone(
        text=text,
        language=lang_name,
        voice_clone_prompt=speaker_prompts[speaker],
        temperature=TEMPERATURE,
        do_sample=True,
        top_k=50,
        top_p=1.0,
        repetition_penalty=1.05,
    )
    
    audio = trim_silence(wavs[0], SAMPLE_RATE)
    seg_time = time.perf_counter() - seg_start
    duration = len(audio) / SAMPLE_RATE
    
    # Save segment
    seg_filename = f"segment_{i:04d}_speaker_{speaker}.wav"
    seg_path = OUTPUT_DIR / "segments" / seg_filename
    sf.write(str(seg_path), audio, SAMPLE_RATE)
    
    generated_segments.append({
        'index': i,
        'speaker': speaker,
        'text': text,
        'original_text': original_text,
        'audio_file': str(seg_path),
        'duration': duration,
    })
    
    # Display with playback
    display(HTML(f"<h4>[{i+1}/{len(process_segments)}] Speaker {speaker}</h4>"))
    display(HTML(f"<b>Original:</b> {original_text[:80]}{'...' if len(original_text) > 80 else ''}"))
    display(HTML(f"<b>Translated:</b> {text[:80]}{'...' if len(text) > 80 else ''}"))
    display(HTML(f"<i>{duration:.2f}s audio generated in {seg_time:.2f}s</i>"))
    display(Audio(audio, rate=SAMPLE_RATE))
    print()

In [None]:
synth_time = time.perf_counter() - synth_start
print("=" * 60)
print(f"Synthesis completed in {synth_time:.1f}s")
print(f"Generated {len(generated_segments)} segments")

## 4. Combine Segments

In [None]:
# Combine all segments with gaps
GAP_SECONDS = 0.3
gap_samples = int(GAP_SECONDS * SAMPLE_RATE)
gap_audio = np.zeros(gap_samples, dtype=np.float32)

combined_parts = []
for seg_info in generated_segments:
    audio, _ = sf.read(seg_info['audio_file'])
    combined_parts.append(audio)
    combined_parts.append(gap_audio)

# Remove trailing gap
if combined_parts:
    combined_parts = combined_parts[:-1]

combined_audio = np.concatenate(combined_parts) if combined_parts else np.array([])
combined_duration = len(combined_audio) / SAMPLE_RATE

# Save combined audio
combined_path = OUTPUT_DIR / "combined_dubbed.wav"
sf.write(str(combined_path), combined_audio, SAMPLE_RATE)

print(f"Combined audio: {combined_path}")
print(f"Total duration: {combined_duration:.1f}s")

In [None]:
# Play the full combined audio
display(HTML("<h3>Combined Dubbed Audio</h3>"))
display(Audio(combined_audio, rate=SAMPLE_RATE))

In [None]:
# Save metadata
metadata = {
    'target_language': target_lang,
    'model': MODEL_NAME,
    'sample_rate': SAMPLE_RATE,
    'gap_seconds': GAP_SECONDS,
    'synth_seconds': round(synth_time, 2),
    'combined_duration': round(combined_duration, 2),
    'segments': generated_segments,
}

metadata_path = OUTPUT_DIR / "dub_metadata.json"
with open(metadata_path, 'w') as f:
    json.dump(metadata, f, indent=2, ensure_ascii=False)

print(f"Metadata saved: {metadata_path}")

## 5. Download Results

Download the combined audio and individual segments.

In [None]:
from google.colab import files

# Download combined audio
files.download(str(combined_path))

# Optionally download all segments as zip
# !zip -r output/segments.zip output/dubbed/segments/
# files.download('output/segments.zip')