In [None]:
!sudo apt-get update -y
!sudo apt-get install python3.8 python3.8-distutils python3.8-dev -y

!sudo update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.11 2
!sudo update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.8 1
!sudo update-alternatives --set python3 /usr/bin/python3.8
!wget https://bootstrap.pypa.io/pip/3.8/get-pip.py -O get-pip.py
!python3.8 get-pip.py

In [None]:
!python --version

In [None]:
# clone SadTalker
!git clone https://github.com/OpenTalker/SadTalker.git
%cd SadTalker

# install dependencies
!python3.8 -m pip install --upgrade pip setuptools wheel
!python3.8 -m pip install -r requirements.txt
!python3.8 -m pip install gfpgan realesrgan

# download pretrained checkpoints
!bash scripts/download_models.sh

In [None]:
!python3.8 -m pip install torch==1.12.1+cu113 torchvision==0.13.1+cu113 torchaudio==0.12.1 --extra-index-url https://download.pytorch.org/whl/cu113

In [None]:
import sys
!{sys.executable} -m pip install flask pyngrok --ignore-installed blinker

In [None]:
import sys, importlib, os
print("sys.executable:", sys.executable)
print("python version:", sys.version)
!{sys.executable} -m pip show pyngrok
!{sys.executable} -m pip list | grep pyngrok || true
print("find_spec:", importlib.util.find_spec("pyngrok"))

In [None]:
import sys
sys.path.insert(0, "/usr/local/lib/python3.8/dist-packages")
from pyngrok import ngrok
print("ok", ngrok)

In [None]:
!pip install fal-client asyncio sync-so

In [None]:
from flask import Flask, request, jsonify
from pyngrok import ngrok
import subprocess
import uuid
import shutil
import base64
import os
import fal_client
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
from sync import Sync
from sync.common import Audio, GenerationOptions, Video
from sync.core.api_error import ApiError

app = Flask(__name__)
executor = ThreadPoolExecutor(max_workers=2)

# Configure Sync.so API
SYNC_API_KEY = "x-api-key:a1f9c3e2-7b4d-4e8a-9f2b-6d3e0c1a9b7f"
sync_client = Sync(
    base_url="https://api.sync.so",
    api_key=SYNC_API_KEY
).generations

def run_async(coroutine):
    """Helper to run async code in sync context"""
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    try:
        return loop.run_until_complete(coroutine)
    finally:
        loop.close()

async def upscale_video(video_path):
    """Async video upscaling with fal.ai"""
    with open(video_path, 'rb') as f:
        video_bytes = f.read()

    url = await fal_client.upload_file_async(video_bytes)
    handler = await fal_client.submit_async(
        "fal-ai/bytedance-upscaler/upscale/video",
        arguments={
            "video_url": url,
            "target_resolution": "1080p"
        }
    )

    async for event in handler.iter_events(with_logs=True):
        print(f"Upscale event: {event}")

    result = await handler.get()
    return result.get('video', {}).get('url') if isinstance(result, dict) else result

def apply_wav2lip_refinement(video_url, audio_path):
    """Apply Wav2Lip refinement using Sync.so"""
    try:
        # Upload audio to get URL
        with open(audio_path, 'rb') as f:
            audio_bytes = f.read()
        audio_url = run_async(fal_client.upload_file_async(audio_bytes))

        print("Starting Wav2Lip refinement with Sync.so...")

        # Submit lip-sync job
        response = sync_client.create(
            input=[Video(url=video_url), Audio(url=audio_url)],
            model="lipsync-2",
            options=GenerationOptions(sync_mode="cut_off"),
            outputFileName=f"refined_{uuid.uuid4()}"
        )

        job_id = response.id
        print(f"Sync.so job submitted: {job_id}")

        # Poll for completion
        max_retries = 60  # 10 minutes max
        retry_count = 0

        while retry_count < max_retries:
            generation = sync_client.get(job_id)
            status = generation.status

            if status == 'COMPLETED':
                print(f"Refinement completed: {generation.output_url}")
                return generation.output_url
            elif status == 'FAILED':
                raise Exception(f"Sync.so job {job_id} failed")

            print(f"Polling Sync.so job {job_id} - Status: {status}")
            time.sleep(10)
            retry_count += 1

        raise Exception("Sync.so job timeout")

    except ApiError as e:
        raise Exception(f"Sync.so API error: {e.status_code} - {e.body}")

@app.route('/lipsync', methods=['POST'])
def create_lipsync():
    temp_dir = None
    try:
        data = request.json
        if not data or 'image' not in data or 'audio' not in data:
            return jsonify({'success': False, 'error': 'Missing image or audio data'}), 400

        # Decode base64 files
        image_data = base64.b64decode(data['image'])
        audio_data = base64.b64decode(data['audio'])

        # Create temp directory
        session_id = str(uuid.uuid4())
        temp_dir = f"/content/temp_{session_id}"
        os.makedirs(temp_dir, exist_ok=True)

        # Save files
        image_path = os.path.join(temp_dir, "source.jpg")
        audio_path = os.path.join(temp_dir, "audio.wav")
        results_dir = os.path.join(temp_dir, "results")

        with open(image_path, 'wb') as f:
            f.write(image_data)
        with open(audio_path, 'wb') as f:
            f.write(audio_data)

        print("Step 1: Running SadTalker...")
        # Run SadTalker
        cmd = [
            'python3.8', 'inference.py',
            '--driven_audio', audio_path,
            '--source_image', image_path,
            '--result_dir', results_dir,
            '--preprocess', 'full',
            '--enhancer', 'gfpgan',
            '--background_enhancer', 'realesrgan',
            '--expression_scale', '1.2',
        ]

        subprocess.run(
            cmd,
            cwd='/content/SadTalker',
            check=True,
            capture_output=True,
            text=True
        )

        # Find output video
        output_path = None
        for root, _, files in os.walk(results_dir):
            for file in files:
                if file.endswith('.mp4'):
                    output_path = os.path.join(root, file)
                    break
            if output_path:
                break

        if not output_path:
            return jsonify({'success': False, 'error': 'No output generated'}), 500

        print("Step 2: Upscaling video...")
        # Upscale video
        upscaled_url = run_async(upscale_video(output_path))

        print("Step 3: Applying Wav2Lip refinement...")
        # Apply Wav2Lip refinement
        final_video_url = apply_wav2lip_refinement(upscaled_url, audio_path)

        return jsonify({
            'success': True,
            'video_url': final_video_url,
            'upscaled_url': upscaled_url,
            'session_id': session_id
        })

    except subprocess.CalledProcessError as e:
        return jsonify({
            'success': False,
            'error': f'SadTalker failed: {e.stderr}'
        }), 500
    except Exception as e:
        return jsonify({'success': False, 'error': str(e)}), 500
    finally:
        # Cleanup
        if temp_dir and os.path.exists(temp_dir):
            shutil.rmtree(temp_dir, ignore_errors=True)

@app.route('/health', methods=['GET'])
def health():
    return jsonify({'status': 'healthy', 'service': 'sadtalker-sync'})

In [None]:
from pyngrok import ngrok
import json

# Set your ngrok auth token (get from ngrok.com)
ngrok.set_auth_token("2zy1iAMFuUNxSNf8ZF1F0dYJsJO_3FjsdqffVjGAbRz7Ga1oe")  # Optional but recommended

# Start tunnel
public_url = ngrok.connect(8003)
print(f"Public URL: {public_url}")
print(f"API Endpoint: {public_url}/lipsync")

# Save URL to file for easy access
with open('/content/api_url.txt', 'w') as f:
    f.write(str(public_url))

In [None]:
app.run(port=8003)