# 🎬 Video Dubbing Application - Google Colab

AI-powered video dubbing with WhisperX, OpenAI GPT, Coqui TTS, and MuseTalk

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/yourusername/video-dubbing/blob/main/Video_Dubbing_Colab.ipynb)

## Features:
- 🎤 **WhisperX**: High-quality speech recognition with alignment
- 🧠 **OpenAI GPT**: Context-aware translation (GPT-4o, GPT-4)
- 🗣️ **Coqui XTTS**: Natural multilingual text-to-speech
- 💋 **MuseTalk**: Advanced lip synchronization
- ⚡ **GPU Accelerated**: Optimized for Colab's free GPU

## 🚀 Setup Instructions

**IMPORTANT**: Enable GPU runtime:
1. Go to `Runtime` → `Change runtime type`
2. Select `T4 GPU` (free) or `A100/V100` (Pro)
3. Click `Save`


In [None]:
# Check GPU availability and system info
import torch
import subprocess
import os

print("🔥 GPU & System Check")
print("=" * 50)

# Check GPU
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    gpu_name = torch.cuda.get_device_name(0)
    gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1024**3
    print(f"📱 GPU: {gpu_name}")
    print(f"💾 VRAM: {gpu_memory:.1f}GB")
else:
    print("⚠️ No GPU detected! Please enable GPU runtime.")

# Check disk space
result = subprocess.run(['df', '-h', '/content'], capture_output=True, text=True)
print(f"\n💽 Disk Space:")
print(result.stdout)

# Python version
import sys
print(f"🐍 Python: {sys.version.split()[0]}")


In [None]:
# Install system dependencies
print("📦 Installing system dependencies...")

!apt-get update -qq
!apt-get install -y -qq ffmpeg git-lfs

print("✅ System dependencies installed!")


In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')
print("✅ Google Drive mounted!")

In [None]:
# Clone repository and install Python dependencies
import os

if not os.path.exists('/content/drive/MyDrive/github/video-dubbing'):
    print("📥 Cloning video dubbing repository...")
    !git clone https://github.com/phillip1029/video-dubbing.git /content/drive/MyDrive/github/video-dubbing
    print("✅ Repository cloned!")
else:
    print("📁 Repository exists, updating...")
    !cd /content/drive/MyDrive/github/video-dubbing && git pull

# Change to project directory
os.chdir('/content/drive/MyDrive/github/video-dubbing')
print(f"📂 Current directory: {os.getcwd()}")

# Install Python dependencies
print("\n🐍 Installing Python dependencies...")
!pip install --upgrade pip
!pip install -q -r requirements.txt


In [None]:
# API Key Setup
import os
from google.colab import userdata

print("🔑 OpenAI API Key Setup")
print("For best translation quality, using OpenAI API is recommended.")
print("Ensure your OpenAI API key is stored in Colab Secrets named 'OPENAI_API_KEY'.")
print("If the key is not found, Google Translate will be used.")

openai_key = userdata.get('OPENAI_API_KEY')

if openai_key.strip():
    os.environ['OPENAI_API_KEY'] = openai_key
    print("✅ OpenAI API key found in Secrets! Using GPT for translations.")
    translation_service = "openai"
else:
    print("📝 OpenAI API key not found in Secrets. Using Google Translate.")
    translation_service = "google"

print(f"🌐 Translation service: {translation_service}")

## Upload video file

In [None]:
# Upload video file or provide YouTube link
from google.colab import files
import os
import subprocess
import re

print("📤 Upload your video file OR provide a YouTube link:")
print("Supported: MP4, AVI, MOV, MKV (for upload)")
print("Recommended: < 100MB for free Colab")

# @markdown ### ⬆️ Upload Video File
# @markdown Upload a video file directly from your computer.
# @markdown **OR**
# @markdown ### 🔗 Provide YouTube Link
# @markdown Paste a YouTube video URL here to download it.
youtube_url = "https://www.youtube.com/watch?v=_nvJq1nN6Wo" #@param {type:"string"}

video_path = None
video_filename = None

# Define the base directory for the video-dubbing repository
base_dir = '/content/drive/MyDrive/github/video-dubbing'

if youtube_url:
    print(f"\nAttempting to download from YouTube: {youtube_url}")
    try:
        # Use yt-dlp to download the video
        # Install yt-dlp if not already installed
        !pip install -q yt-dlp

        # Define output template within the base_dir
        output_template = os.path.join(base_dir, '%(title)s.%(ext)s')

        # Download the video
        download_process = subprocess.run(
            ['yt-dlp', '-f', 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best', '-o', output_template, youtube_url],
            capture_output=True,
            text=True,
            check=True
        )
        print(download_process.stdout)

        # Check yt-dlp output for the downloaded file path
        # Look for lines indicating download or existence, e.g., "[download] Destination: ..." or "[download] ... has already been downloaded"
        match = re.search(r'\[download\] Destination: (.+)', download_process.stdout)
        if not match:
             match = re.search(r'\[download\] (.+) has already been downloaded', download_process.stdout)

        if match:
            video_path = match.group(1).strip()
            video_filename = os.path.basename(video_path)
            if os.path.exists(video_path):
                file_size = os.path.getsize(video_path) / (1024*1024)  # MB
                print(f"\n✅ Video found: {video_filename}")
                print(f"📏 Size: {file_size:.1f} MB")
                if file_size > 200:
                    print("⚠️ Large file - may hit memory limits")
            else:
                print(f"❌ Video path extracted but file not found: {video_path}")
                video_path = None
        else:
             print("❌ Failed to extract downloaded video path from yt-dlp output.")
             video_path = None


    except subprocess.CalledProcessError as e:
        print(f"❌ Error downloading video: {e.stderr}")
        video_path = None
    except Exception as e:
        print(f"❌ An unexpected error occurred during download: {e}")
        video_path = None

elif not youtube_url:
    # Handle file upload if no YouTube URL is provided
    print("Please upload a video file:")
    try:
        uploaded = files.upload()
        if uploaded:
            video_filename = list(uploaded.keys())[0]
            # Construct the video path within the base_dir for uploads
            video_path = os.path.join(base_dir, video_filename)

            file_size = os.path.getsize(video_path) / (1024*1024)  # MB
            print(f"\n✅ Video uploaded: {video_filename}")
            print(f"📏 Size: {file_size:.1f} MB")

            if file_size > 200:
                print("⚠️ Large file - may hit memory limits")
        else:
            print("❌ No video uploaded")
            video_path = None
    except Exception as e:
        print(f"❌ An error occurred during file upload: {e}")
        video_path = None

else:
    print("❌ Please provide either a YouTube link or upload a video file.")
    video_path = None

In [None]:
# Select source language
print("🎙️ Select the original language of the video:")

source_languages = {
    "auto": "Auto-Detect",
    "en": "English", "es": "Spanish", "fr": "French", "de": "German",
    "it": "Italian", "pt": "Portuguese", "ru": "Russian", "ja": "Japanese",
    "ko": "Korean", "zh": "Chinese", "ar": "Arabic", "hi": "Hindi",
    "nl": "Dutch", "pl": "Polish", "tr": "Turkish", "cy": "Welsh"
}

for code, name in source_languages.items():
    print(f"  {code}: {name}")

source_language =  'en'

if not source_language:
    source_language = "auto"

if source_language in source_languages:
    print(f"✅ Source Language: {source_languages[source_language]} ({source_language})")
else:
    print(f"⚠️ Custom source language code entered: {source_language}. This will be passed to the ASR service.")


In [None]:
# Select target language
print("🌍 Select target language:")

languages = {
    "es": "Spanish", "fr": "French", "de": "German", "it": "Italian",
    "pt": "Portuguese", "ru": "Russian", "ja": "Japanese", "ko": "Korean", 
    "zh": "Chinese", "ar": "Arabic", "hi": "Hindi", "nl": "Dutch",
    "pl": "Polish", "tr": "Turkish"
}

for code, name in languages.items():
    print(f"  {code}: {name}")

target_language = 'zh'

if target_language in languages:
    print(f"✅ Target: {languages[target_language]} ({target_language})")
else:
    print("❌ Invalid code. Using Spanish (es)")
    target_language = "es"


In [None]:
# Process video dubbing
import sys
import time
import os
import logging

# --- Configure Logging to see pipeline details ---
# This will stream all logs from the pipeline directly to this cell's output
log_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(name)s - %(message)s')
log_handler = logging.StreamHandler(sys.stdout)
log_handler.setFormatter(log_formatter)
root_logger = logging.getLogger()
# Clear previous handlers to avoid duplicate logs if the cell is run again
if root_logger.hasHandlers():
    root_logger.handlers.clear()
root_logger.addHandler(log_handler)
root_logger.setLevel(logging.INFO)

# --- Ensure Python can find the source code ---
src_path = '/content/drive/MyDrive/github/video-dubbing/src'
if src_path not in sys.path:
    sys.path.append(src_path)

# @markdown ### 🔄 Resume Previous Session?
# @markdown Enter a session ID here to resume a crashed or stopped run. Leave blank for a new run.
resume_session_id = "" #@param {type:"string"}

if ('video_path' in locals() and video_path and os.path.exists(video_path)) or resume_session_id:
    print("\n🚀 Starting video dubbing pipeline...")

    from src.utils.config import AppConfig
    from src.pipeline import VideoDubbingPipeline

    # --- Create Configuration ---
    config = AppConfig()
    config.asr.service = "openai_api"
    config.asr.api_key = os.environ.get('OPENAI_API_KEY')
    config.asr.split_long_audio = True
    config.asr.split_threshold_min = 15 
    config.asr.split_chunk_duration_min = 15
    config.translation.service = translation_service
    if translation_service == "openai":
        config.translation.model_name = "gpt-4o"
        config.translation.api_key = os.environ.get('OPENAI_API_KEY')

    # --- Create and run pipeline ---
    pipeline = VideoDubbingPipeline(config, session_id=resume_session_id if resume_session_id else None, resume=bool(resume_session_id), source_language=source_language)
    
    print("-" * 50)
    print(f"SESSION ID: {pipeline.session_id}")
    print("-" * 50)

    # Determine input video path
    if resume_session_id:
        input_video = pipeline.pipeline_state.get("metadata", {}).get("input_video")
        if not input_video or not os.path.exists(input_video):
             raise FileNotFoundError(f"Cannot resume: Original video file not found at path: {input_video}")
        print(f"Resuming with video: {os.path.basename(input_video)}")
    else:
        input_video = video_path

    # Define output path in the main /content/ directory for easy download
    output_filename = f"dubbed_{os.path.basename(input_video)}_{target_language}.mp4"
    output_path = f"/content/{output_filename}"

    print(f"📹 Input: {os.path.basename(input_video)}")
    print(f"🌍 Target Language: {languages.get(target_language, target_language)}")
    print(f"🎤 ASR Service: {config.asr.service}")
    print(f"🧠 Translation Service: {config.translation.service.upper()}")
    print("\n⏳ Processing... See detailed logs below for real-time progress.")
    print("-" * 50)

    start_time = time.time()
    try:
        result = pipeline.process_video(
            video_path=input_video,
            target_language=target_language,
            output_path=output_path,
            speaker_reference=None,
            auto_approve=True
        )
        end_time = time.time()

        if result["success"]:
            processing_time = end_time - start_time
            print("-" * 50)
            print(f"\n🎉 SUCCESS! Dubbing completed!")
            print(f"⏱️ Total Time: {processing_time:.1f} seconds")
            print(f"📁 Output File: {output_filename}")
        else:
            print(f"\n❌ FAILED: {result['error']}")

    except Exception as e:
        print(f"\n💥 PIPELINE CRASHED: {str(e)}")
        print("\n🔧 Troubleshooting:")
        print("1. Review the detailed logs above to find the point of failure.")
        print("2. Ensure your OpenAI API key is correct and has sufficient credits.")
        print(f"3. You can attempt to resume this run later using the session ID: {pipeline.session_id}")
else:
    print("❌ No video file to process. Upload a video in the cell above, or provide a session ID to resume.")


In [None]:
# Download results
from google.colab import files

if 'output_path' in locals() and os.path.exists(output_path):
    print(f"📥 Downloading: {output_filename}")
    files.download(output_path)
    print("✅ Download started! Check your browser downloads.")
else:
    print("❌ No output file. Run dubbing pipeline first.")

# List available files
print("\\n📁 Available files:")
for file in os.listdir('/content'):
    if file.endswith(('.mp4', '.avi', '.mov', '.wav', '.mp3')):
        size = os.path.getsize(f'/content/{file}') / (1024*1024)
        print(f"  {file} ({size:.1f} MB)")
