# Video dubbing from English to Assamese Language using AI

### Note: Run this code on Kaagle and use Accelerator "GPU P100" while running the code. Make sure to activate it in the 'Settings'. To use accelerator first verify Kaagle account,

In [2]:
!pip install python-multipart
!pip install uvicorn
!pip install fastapi
!pip install nest_asyncio==1.5.8
!pip install -U openai-whisper
!pip install pydub
!pip install ffmpeg
!pip install praat-parselmouth transformers torch 
!pip install parler_tts
!pip install demucs

[0m[31mERROR: Could not find a version that satisfies the requirement python-multipart (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for python-multipart[0m[31m
[0m[31mERROR: Could not find a version that satisfies the requirement uvicorn (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for uvicorn[0m[31m
[0m[31mERROR: Could not find a version that satisfies the requirement fastapi (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for fastapi[0m[31m
[0m[31mERROR: Could not find a version that satisfies the requirement nest_asyncio==1.5.8 (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for nest_asyncio==1.5.8[0m[31m
[0m[31mERROR: Could not find a version that satisfies the requirement openai-whisper (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for openai-whisper[0m[31m
[0m[31mERROR: Could not find a version that sat

In [None]:
!pip install pyngrok


In [None]:
from pyngrok import ngrok 
ngrok.set_auth_token("Auth_token from ngrok website")

# Without Demucs

In [None]:
import os
import asyncio
import gc
import re
import torch
import whisper
import librosa
import soundfile as sf
import numpy as np
from pyngrok import ngrok
from pydub import AudioSegment
from fastapi import FastAPI, File, UploadFile, HTTPException, Form, BackgroundTasks
from fastapi.responses import FileResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from moviepy.editor import VideoFileClip, AudioFileClip
from transformers import pipeline, AutoTokenizer
from parler_tts import ParlerTTSForConditionalGeneration
import nest_asyncio

# Apply nest_asyncio to allow nested event loops
nest_asyncio.apply()

# Initialize FastAPI app
app = FastAPI(title="Video Translation API", 
              description="Translates videos from English to Assamese with TTS")

# CORS middleware to allow frontend interactions
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Global paths for working directory
WORKING_DIR = "/kaggle/working"
os.makedirs(WORKING_DIR, exist_ok=True)

# Determine device
device = "cuda:0" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

# Global model variables - initialized on demand
translation_pipeline = None
tts_model = None
tts_tokenizer = None
desc_tokenizer = None
whisper_model = None

# Function to split text into sentences for better translation
def split_into_sentences(text):
    """Split text into sentences more reliably."""
    # Handle common abbreviations to avoid incorrect splitting
    text = re.sub(r'(?<=[A-Za-z])\.(?=[A-Za-z])', '. ', text)
    
    # Split on period, exclamation mark, or question mark followed by space or quote
    sentences = re.split(r'(?<=[.!?])\s+|(?<=[.!?])"', text)
    
    # Clean up the sentences
    sentences = [s.strip() for s in sentences if s.strip()]
    
    # Additional checks to merge incorrectly split sentences
    merged_sentences = []
    current = ""
    
    for s in sentences:
        if current and (not current[-1] in ".!?" or len(s) == 1 or s[0].islower()):
            current += " " + s
        else:
            if current:
                merged_sentences.append(current)
            current = s
    
    if current:
        merged_sentences.append(current)
    
    # Final check to ensure each sentence ends with punctuation
    final_sentences = []
    for s in merged_sentences:
        if not s[-1] in ".!?":
            s += "."
        final_sentences.append(s)
    
    return final_sentences

# Lazy loading functions for models
def load_whisper_model():
    global whisper_model
    if whisper_model is None:
        print("Loading Whisper model...")
        whisper_model = whisper.load_model("medium").to(device)
    return whisper_model

def load_translation_model():
    global translation_pipeline
    if translation_pipeline is None:
        print("Loading translation model...")
        translation_pipeline = pipeline(
            "translation", 
            model="facebook/nllb-200-distilled-600M", 
            device=0 if "cuda" in device else -1
        )
    return translation_pipeline

def load_tts_models():
    global tts_model, tts_tokenizer, desc_tokenizer
    if tts_model is None:
        print("Loading TTS model...")
        tts_model_name = "ai4bharat/indic-parler-tts-pretrained"
        tts_model = ParlerTTSForConditionalGeneration.from_pretrained(tts_model_name).to(device)
        tts_tokenizer = AutoTokenizer.from_pretrained(tts_model_name)
        
        # Use the correct attribute name for the text encoder
        try:
            desc_tokenizer = AutoTokenizer.from_pretrained(tts_model.config.text_encoder_id)
        except (AttributeError, ValueError):
            try:
                desc_tokenizer = AutoTokenizer.from_pretrained(tts_model.config.text_encoder._name_or_path)
            except (AttributeError, ValueError):
                print("Falling back to default encoder path")
                desc_tokenizer = AutoTokenizer.from_pretrained("facebook/mms-tts-eng")
    
    return tts_model, tts_tokenizer, desc_tokenizer

def translate_text(text, src_lang="eng_Latn", tgt_lang="asm_Beng"):
    """Translate text from source language to target language"""
    translator = load_translation_model()
    
    # Split into sentences for better translation quality
    sentences = split_into_sentences(text)
    
    translated_sentences = []
    for sentence in sentences:
        if sentence:  # Skip empty sentences
            translation = translator(sentence, src_lang=src_lang, tgt_lang=tgt_lang, max_length=400)
            translated_text = translation[0]['translation_text']
            translated_sentences.append(translated_text)
            print(f"Translated: {sentence[:50]}... → {translated_text[:50]}...")
    
    return translated_sentences

def generate_tts_audio(sentences):
    """Generate TTS audio for each sentence and combine them"""
    tts_model, tts_tokenizer, desc_tokenizer = load_tts_models()
    
    # TTS voice description
    description = (
        "Amit. "
        "Clear native speaker with moderate speed and pitch. Very high quality "
        "voice sounding natural and close up."
    )
    
    audio_segments = []
    
    for idx, sentence in enumerate(sentences):
        try:
            print(f"Generating audio for sentence {idx+1}/{len(sentences)}: {sentence[:30]}...")
            
            # Tokenize the sentence and description
            prompt_input = tts_tokenizer(sentence, return_tensors="pt").to(device)
            desc_input = desc_tokenizer(description, return_tensors="pt").to(device)

            with torch.no_grad():
                gen_audio = tts_model.generate(
                    input_ids=desc_input.input_ids,
                    attention_mask=desc_input.attention_mask,
                    prompt_input_ids=prompt_input.input_ids,
                    prompt_attention_mask=prompt_input.attention_mask
                )

            # Process the audio
            audio_array = gen_audio.cpu().numpy().squeeze()
            
            # Clear GPU memory after processing
            del gen_audio
            torch.cuda.empty_cache() if torch.cuda.is_available() else None
            
            # Determine if we have multiple audio items
            if audio_array.ndim == 1:
                audio_items = [audio_array]
            else:
                # For multi-dimensional arrays, treat each row as a separate audio item
                audio_items = [audio_array[i] for i in range(audio_array.shape[0])]
            
            # We'll use the first audio variant for each sentence
            if len(audio_items) > 0:
                output_path = os.path.join(WORKING_DIR, f"audio_chunk_{idx}.wav")
                sf.write(output_path, audio_items[0], tts_model.config.sampling_rate)
                
                # Add to our audio segments collection
                audio_segment = AudioSegment.from_file(output_path)
                audio_segments.append(audio_segment)
                
                # Clear numpy arrays to free memory
                del audio_items, audio_array
            
        except Exception as e:
            print(f"Error generating audio for sentence {idx}: {e}")
            # Continue with next sentence
    
    # Combine audio segments with small pauses
    if audio_segments:
        combined_audio = audio_segments[0]
        for segment in audio_segments[1:]:
            combined_audio += AudioSegment.silent(duration=300) + segment
        
        output_path = os.path.join(WORKING_DIR, "translated_audio.wav")
        combined_audio.export(output_path, format="wav")
        return output_path
    
    return None

def adjust_audio_to_video_duration(audio_path, video_duration_ms):
    """Adjust audio to match video duration"""
    audio = AudioSegment.from_file(audio_path)
    audio_duration_ms = len(audio)
    
    # Calculate difference
    diff_ms = video_duration_ms - audio_duration_ms
    
    print(f"Audio duration: {audio_duration_ms}ms, Video duration: {video_duration_ms}ms, Difference: {diff_ms}ms")
    
    # If difference is minimal (less than 100ms), no adjustment needed
    if abs(diff_ms) < 100:
        print("Audio and video durations match closely. No adjustment needed.")
        return audio_path
    
    # Adjust audio
    if diff_ms < 0:
        # Audio is longer than video, speed up slightly
        print(f"Audio is {-diff_ms}ms longer than video. Adjusting tempo...")
        speedup_factor = video_duration_ms / audio_duration_ms
        
        if 0.9 <= speedup_factor <= 1.1:
            adjusted_audio = audio._spawn(audio.raw_data, overrides={
                "frame_rate": int(audio.frame_rate * speedup_factor)
            })
            adjusted_audio = adjusted_audio.set_frame_rate(audio.frame_rate)
        else:
            print(f"Required speedup factor {speedup_factor:.3f} is too extreme. Trimming audio instead.")
            adjusted_audio = audio[:video_duration_ms]
    else:
        # Audio is shorter than video, add silence
        print(f"Audio is {diff_ms}ms shorter than video. Adding silence...")
        silence_begin = AudioSegment.silent(duration=diff_ms//4)
        silence_end = AudioSegment.silent(duration=diff_ms - diff_ms//4)
        adjusted_audio = silence_begin + audio + silence_end
    
    # Save adjusted audio
    adjusted_path = os.path.join(WORKING_DIR, "adjusted_audio.wav")
    adjusted_audio.export(adjusted_path, format="wav")
    return adjusted_path

def free_memory():
    """Free up GPU memory"""
    global translation_pipeline, tts_model, tts_tokenizer, desc_tokenizer, whisper_model
    
    if translation_pipeline is not None:
        del translation_pipeline
        translation_pipeline = None
    
    if tts_model is not None:
        del tts_model
        del tts_tokenizer
        del desc_tokenizer
        tts_model = None
        tts_tokenizer = None
        desc_tokenizer = None
    
    if whisper_model is not None:
        del whisper_model
        whisper_model = None
    
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
    
    print("Memory freed.")

# Define background task for processing
async def process_video_translation(input_video_path, target_language="asm_Beng"):
    try:
        print(f"Processing video: {input_video_path}")
        
        # Extract audio from video
        video = VideoFileClip(input_video_path)
        video_duration_ms = int(video.duration * 1000)
        
        # Extract audio
        audio_path = os.path.join(WORKING_DIR, "extracted_audio.wav")
        video.audio.write_audiofile(audio_path)
        
        # Transcribe audio
        whisper_model = load_whisper_model()
        audio_data, sr = librosa.load(audio_path, sr=16000)
        result = whisper_model.transcribe(audio_data)
        recognized_text = result["text"]
        
        print(f"Transcribed text: {recognized_text[:100]}...")
        
        # Translate text
        translated_sentences = translate_text(recognized_text, tgt_lang=target_language)
        
        # Generate TTS audio
        translated_audio_path = generate_tts_audio(translated_sentences)
        
        if not translated_audio_path:
            raise Exception("Failed to generate translated audio")
        
        # Adjust audio to match video duration
        adjusted_audio_path = adjust_audio_to_video_duration(translated_audio_path, video_duration_ms)
        
        # Merge audio with video
        output_path = os.path.join(WORKING_DIR, "translated_video.mp4")
        translated_audio = AudioFileClip(adjusted_audio_path)
        final_clip = video.set_audio(translated_audio)
        final_clip.write_videofile(output_path, codec="libx264", audio_codec="aac")
        
        # Close video files
        video.close()
        translated_audio.close()
        
        # Free memory
        free_memory()
        
        return output_path
    
    except Exception as e:
        print(f"Error in video translation process: {e}")
        import traceback
        traceback.print_exc()
        raise e

# API Endpoints
@app.post("/translate-video/")
async def translate_video(
    background_tasks: BackgroundTasks,
    file: UploadFile = File(...),
    target_language: str = Form("asm_Beng")
):
    """Endpoint for video translation with file upload"""
    try:
        # Save uploaded video
        input_video_path = os.path.join(WORKING_DIR, "input_video.mp4")
        with open(input_video_path, "wb") as buffer:
            buffer.write(await file.read())
        
        # Process in background
        output_path = await process_video_translation(input_video_path, target_language)
        
        # Return response
        return FileResponse(
            path=output_path, 
            filename="translated_video.mp4",
            media_type="video/mp4"
        )
    
    except Exception as e:
        return JSONResponse(
            status_code=500,
            content={"error": str(e)}
        )

@app.post("/translate-from-path/")
async def translate_from_path(
    video_path: str = Form(...),
    target_language: str = Form("asm_Beng")
):
    """Endpoint for video translation using an existing path"""
    try:
        if not os.path.exists(video_path):
            raise HTTPException(status_code=404, detail=f"Video file not found at {video_path}")
        
        # Process video
        output_path = await process_video_translation(video_path, target_language)
        
        # Return response
        return FileResponse(
            path=output_path, 
            filename="translated_video.mp4",
            media_type="video/mp4"
        )
    
    except Exception as e:
        return JSONResponse(
            status_code=500,
            content={"error": str(e)}
        )

@app.get("/")
async def root():
    """Root endpoint with API information"""
    return {
        "message": "Video Translation API is running",
        "endpoints": {
            "/translate-video/": "Upload a video for translation",
            "/translate-from-path/": "Translate a video from an existing path",
        }
    }

# Ngrok setup
async def setup_ngrok(port):
    """Configure and start ngrok tunnel"""
    try:
        # Set up your ngrok auth token if you have one
        # ngrok.set_auth_token("YOUR_NGROK_AUTH_TOKEN") # Uncomment and add your token if available
        
        # Start ngrok tunnel to the specified port
        public_url = ngrok.connect(port)
        print(f"✅ Ngrok tunnel established: {public_url}")
        return public_url
    except Exception as e:
        print(f"❌ Failed to establish ngrok tunnel: {e}")
        return None

if __name__ == "__main__":
    import uvicorn
    
    # Define port
    PORT = 8000
    
    # Start ngrok in the background
    asyncio.run(setup_ngrok(PORT))
    
    # Run the FastAPI app
    uvicorn.run(app, host="0.0.0.0", port=PORT)

# With Demucs

In [None]:
import os
import asyncio
import subprocess
from fastapi.responses import FileResponse
import torch
import whisper
import librosa
import soundfile as sf
from pyngrok import ngrok
from fastapi import FastAPI, File, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from moviepy.editor import VideoFileClip, AudioFileClip, CompositeAudioClip
from transformers import pipeline
from parler_tts import ParlerTTSForConditionalGeneration
from transformers import AutoTokenizer

app = FastAPI()

# CORS middleware to allow frontend interactions
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Determine device
device = "cuda:0" if torch.cuda.is_available() else "cpu"

# Global models to be loaded once
translation_pipe = pipeline("translation", model="facebook/nllb-200-distilled-600M", device=device)
tts_model = ParlerTTSForConditionalGeneration.from_pretrained("ai4bharat/indic-parler-tts-pretrained")
tts_tokenizer = AutoTokenizer.from_pretrained("ai4bharat/indic-parler-tts-pretrained")
whisper_model = whisper.load_model("medium")

def translate_english_to_assamese(text):
    """Translate text from English to Assamese"""
    results = translation_pipe(text, src_lang="eng_Latn", tgt_lang="asm_Beng")
    return results[0]['translation_text']

def generate_tts_audio(text):
    """Generate TTS audio for Assamese text"""
    tts_model.to(device)
    
    description = (
        "A male speaker with an assamese accent delivers a slightly expressive "
        "and animated speech with a moderate speed and pitch."
    )
    
    description_tokenizer = AutoTokenizer.from_pretrained(tts_model.config.text_encoder._name_or_path)
    description_input_ids = description_tokenizer(description, return_tensors="pt").to(device)
    
    prompt_input_ids = tts_tokenizer(text, return_tensors="pt").to(device)
    
    generation = tts_model.generate(
        input_ids=description_input_ids.input_ids,
        attention_mask=description_input_ids.attention_mask,
        prompt_input_ids=prompt_input_ids.input_ids,
        prompt_attention_mask=prompt_input_ids.attention_mask
    )
    
    audio_arr = generation.cpu().numpy().squeeze()
    return audio_arr, tts_model.config.sampling_rate

def separate_audio_sources(audio_path):
    """Separate vocals from background using Demucs"""
    # Create the output directory if it doesn't exist
    os.makedirs("/kaggle/working/separated", exist_ok=True)
    
    # Run demucs to separate vocals and background
    subprocess.run([
        "demucs", 
        "--two-stems=vocals",
        audio_path
    ], check=True)
    
    # Get paths for separated audio files
    base_filename = os.path.basename(audio_path).split('.')[0]
    vocals_path = f"/kaggle/working/separated/htdemucs/{base_filename}/vocals.wav"
    no_vocals_path = f"/kaggle/working/separated/htdemucs/{base_filename}/no_vocals.wav"
    
    return vocals_path, no_vocals_path

@app.post("/translate-video/")
async def translate_video(file: UploadFile = File(...)):
    """Main endpoint for video translation"""
    try:
        # Save uploaded video
        input_video_path = "/kaggle/working/input_video.mp4"
        with open(input_video_path, "wb") as buffer:
            buffer.write(await file.read())
        
        # Extract audio
        video = VideoFileClip(input_video_path)
        audio_path = "/kaggle/working/audio.wav"
        video.audio.write_audiofile(audio_path)
        
        # Separate vocals from background music
        vocals_path, no_vocals_path = separate_audio_sources(audio_path)
        
        # Transcribe vocals only
        audio_data, sr = librosa.load(vocals_path, sr=16000)
        result = whisper_model.transcribe(audio_data)
        recognized_text = result["text"]
        
        # Translate text
        translated_text = translate_english_to_assamese(recognized_text)
        
        # Generate TTS audio
        audio_arr, sample_rate = generate_tts_audio(translated_text)
        tts_audio_path = "/kaggle/working/translated_audio.wav"
        sf.write(tts_audio_path, audio_arr, sample_rate)
        
        # Load the background audio and translated vocals
        background_audio = AudioFileClip(no_vocals_path)
        translated_vocals = AudioFileClip(tts_audio_path)
        
        # Adjust durations if needed
        if background_audio.duration > translated_vocals.duration:
            background_audio = background_audio.subclip(0, translated_vocals.duration)
        else:
            translated_vocals = translated_vocals.subclip(0, background_audio.duration)
        
        # Mix the background and translated vocals
        final_audio = CompositeAudioClip([background_audio, translated_vocals])
        
        # Merge audio with video
        final_clip = video.set_audio(final_audio)
        output_path = "/kaggle/working/translated_video.mp4"
        final_clip.write_videofile(output_path)
        
        return {"message": "Video translation completed", "output_path": output_path}
    
    except Exception as e:
        return {"error": str(e)}

@app.get("/download-video/")
async def download_video():
    """Endpoint to download the translated video"""
    output_path = "/kaggle/working/translated_video.mp4"
    if os.path.exists(output_path):
        return FileResponse(output_path, filename="translated_video.mp4")
    return {"error": "Translated video not found"}

async def start_ngrok():
    """Start ngrok tunnel"""
    listener = ngrok.connect(8009)
    print(f"Ingress established at: {listener.url()}")

def main():
    import uvicorn
    import nest_asyncio
    
    # Apply nest_asyncio to allow nested event loops
    nest_asyncio.apply()
    
    # Start ngrok
    ngrok.connect(8009)
    print(f"Ingress established at {ngrok.get_tunnels()[0].public_url}")
    
    # Run FastAPI
    uvicorn.run(app, host="0.0.0.0", port=8009)

if __name__ == "__main__":
    main()