<a href="https://colab.research.google.com/github/xanny1111/Spring_2025-Plotnov-24-VMz-/blob/master/LatentSync.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**PREPARE ENVIRONMENT**
- Due to changes in some dependencies, you will be prompted to restart the session after all the required libraries are installed. Simply restart and run this section again.

In [None]:
# @title
!pip install diffusers mediapipe transformers huggingface-hub omegaconf einops opencv-python face-alignment decord ffmpeg-python safetensors soundfile

import os
import subprocess

if not os.path.exists("LatentSync"):
    !git clone https://github.com/Isi-dev/LatentSync
%cd LatentSync

from google.colab import files
import torch
from omegaconf import OmegaConf
from diffusers import AutoencoderKL, DDIMScheduler
from latentsync.models.unet import UNet3DConditionModel
from latentsync.pipelines.lipsync_pipeline import LipsyncPipeline
from latentsync.whisper.audio2feature import Audio2Feature
from diffusers.utils.import_utils import is_xformers_available
from accelerate.utils import set_seed
import ipywidgets as widgets

os.makedirs("/root/.cache/torch/hub/checkpoints", exist_ok=True)
os.makedirs("checkpoints", exist_ok=True)

model_urls = {
    "/root/.cache/torch/hub/checkpoints/s3fd-619a316812.pth":
        "https://huggingface.co/Isi99999/LatentSync/resolve/main/auxiliary/s3fd-619a316812.pth",
    "/root/.cache/torch/hub/checkpoints/2DFAN4-cd938726ad.zip":
        "https://huggingface.co/Isi99999/LatentSync/resolve/main/auxiliary/2DFAN4-cd938726ad.zip",
    "checkpoints/latentsync_unet.pt":
        "https://huggingface.co/Isi99999/LatentSync/resolve/main/latentsync_unet.pt",
    "checkpoints/tiny.pt":
        "https://huggingface.co/Isi99999/LatentSync/resolve/main/whisper/tiny.pt",
    "checkpoints/diffusion_pytorch_model.safetensors":
        "https://huggingface.co/stabilityai/sd-vae-ft-mse/resolve/main/diffusion_pytorch_model.safetensors",
    "checkpoints/config.json":
        "https://huggingface.co/stabilityai/sd-vae-ft-mse/raw/main/config.json",
}

for file_path, url in model_urls.items():
    if not os.path.exists(file_path):
        print(f"Downloading {file_path} ...")
        subprocess.run(["wget", url, "-O", file_path], check=True)
    else:
        print(f"File {file_path} already exists. Skipping download.")

print("Setup complete.")

def perform_inference(video_path, audio_path, seed=1247, num_inference_steps=20, guidance_scale=1.0, output_path="output_video.mp4"):
    config_path = "configs/unet/first_stage.yaml"
    inference_ckpt_path = "checkpoints/latentsync_unet.pt"

    config = OmegaConf.load(config_path)

    is_fp16_supported = torch.cuda.is_available() and torch.cuda.get_device_capability()[0] > 7
    dtype = torch.float16 if is_fp16_supported else torch.float32

    scheduler = DDIMScheduler.from_pretrained("configs")

    whisper_model_path = "checkpoints/tiny.pt"
    audio_encoder = Audio2Feature(model_path=whisper_model_path, device="cuda", num_frames=config.data.num_frames)

    vae = AutoencoderKL.from_pretrained("checkpoints", torch_dtype=dtype, local_files_only=True)
    vae.config.scaling_factor = 0.18215
    vae.config.shift_factor = 0

    unet, _ = UNet3DConditionModel.from_pretrained(
        OmegaConf.to_container(config.model),
        inference_ckpt_path,
        device="cpu",
    )

    unet = unet.to(dtype=dtype)

    if is_xformers_available():
        unet.enable_xformers_memory_efficient_attention()
        print('x_formers available!')

    pipeline = LipsyncPipeline(
        vae=vae,
        audio_encoder=audio_encoder,
        unet=unet,
        scheduler=scheduler,
    ).to("cuda")

    set_seed(seed)

    pipeline(
        video_path=video_path,
        audio_path=audio_path,
        video_out_path=output_path,
        video_mask_path=output_path.replace(".mp4", "_mask.mp4"),
        num_frames=config.data.num_frames,
        num_inference_steps=num_inference_steps,
        guidance_scale=guidance_scale,
        weight_dtype=dtype,
        width=config.data.resolution,
        height=config.data.resolution,
    )
    return output_path


**RUN IMAGE TO VIDEO**

In [None]:
# Укажите имена ваших файлов, которые вы загрузили в папку LatentSync/checkpoints
image_file_name = "head1111.png"  # <-- Измените на имя вашего файла изображения
audio_file_name = "1 part.WAV"  # <-- Измените на имя вашего аудиофайла

# --- Не изменяйте код ниже, если вы не знаете, что делаете ---

import os
import cv2
import torchaudio
import subprocess
from IPython.display import Video, display
import gc

# Definitions of helper functions (moved from cell t0fOMU7dZ4Qx)
def convert_video_fps(input_path, target_fps):
    if not os.path.exists(input_path) or os.path.getsize(input_path) == 0:
        print(f"Error: The video file {input_path} is missing or empty.")
        return None

    output_path = f"converted_{target_fps}fps.mp4"

    audio_check_cmd = [
        "ffprobe", "-i", input_path, "-show_streams", "-select_streams", "a",
        "-loglevel", "error"
    ]
    audio_present = subprocess.run(audio_check_cmd, capture_output=True, text=True).stdout.strip() != ""

    cmd = [
        "ffmpeg", "-y", "-i", input_path,
        "-filter:v", f"fps={target_fps}",
        "-c:v", "libx264", "-preset", "fast", "-crf", "18",
    ]

    if audio_present:
        cmd.extend(["-c:a", "aac", "-b:a", "192k"])
    else:
        cmd.append("-an")

    cmd.append(output_path)

    subprocess.run(cmd, check=True)
    print(f"Converted video saved as {output_path}")
    return output_path

def pad_audio_to_multiple_of_16(audio_path, target_fps=25):

    waveform, sample_rate = torchaudio.load(audio_path)
    audio_duration = waveform.shape[1] / sample_rate  # Duration in seconds

    num_frames = int(audio_duration * target_fps)

    # Pad audio to ensure frame count is a multiple of 16
    remainder = num_frames % 16
    if remainder > 0:
        pad_frames = 16 - remainder
        pad_samples = int((pad_frames / target_fps) * sample_rate)
        pad_waveform = torch.zeros((waveform.shape[0], pad_samples))  # Silence padding
        waveform = torch.cat((waveform, pad_waveform), dim=1)

        # Save the padded audio
        padded_audio_path = "padded_audio.wav"
        torchaudio.save(padded_audio_path, waveform, sample_rate)
    else:
        padded_audio_path = audio_path  # No padding needed

    padded_duration = waveform.shape[1] / sample_rate
    padded_num_frames = int(padded_duration * target_fps)

    return padded_audio_path, padded_num_frames

def create_video_from_image(image_path, output_video_path, num_frames, fps=25):
    """Convert an image into a video of specified length (num_frames at 25 FPS)."""
    img = cv2.imread(image_path)
    if img is None:
        print("Error: Unable to read the image.")
        return None

    height, width, _ = img.shape
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    video_writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

    for _ in range(num_frames):
        video_writer.write(img)

    video_writer.release()
    print(f"Created video {output_video_path} with {num_frames} frames ({num_frames / fps:.2f} seconds).")
    return output_video_path


# Очистка CUDA кэша перед запуском инференса
if torch.cuda.is_available():
    gc.collect()
    torch.cuda.empty_cache()
    torch.cuda.ipc_collect()

output_display = widgets.Output() # Для вывода сообщений

with output_display:
    print("Starting inference with local files...")

    # Initialize paths to None
    video_path_base = None
    audio_path_padded = None
    output_path_inference = None

    try:
        # Определяем полные пути к файлам
        image_path_final = os.path.join("checkpoints", image_file_name)
        audio_path_final = os.path.join("checkpoints", audio_file_name)

        if not os.path.exists(image_path_final):
            print(f"Error: Image file not found at {image_path_final}")
        if not os.path.exists(audio_path_final):
            print(f"Error: Audio file not found at {audio_path_final}")

        if not os.path.exists(image_path_final) or not os.path.exists(audio_path_final):
            print("Please ensure both image and audio files are in the 'checkpoints' folder and filenames are correct.")
        else:
            print(f"Using image: {image_path_final}")
            print(f"Using audio: {audio_path_final}")

            # Assume default values for parameters, or define new widgets if needed for these.
            seed_value = 1247
            num_steps_value = 20
            guidance_scale_value = 1.0
            video_scale_value = 0.5
            output_fps_value = 25

            # Get audio duration with padding
            with output_display:
                print("Processing audio...")
                audio_path_padded, num_frames = pad_audio_to_multiple_of_16(audio_path_final, target_fps=25)
                print(f"Audio padded. Total frames for video: {num_frames}")

                # Create video from image
                print("Creating base video from image...")
                video_path_base = "generated_video.mp4"
                video_path_base = create_video_from_image(image_path_final, video_path_base, num_frames)

                if video_path_base is None:
                    print("Error: Failed to create video from image.")
                else:
                    print("Running inference...")
                    output_path_inference = "output_video.mp4"
                    perform_inference(video_path_base, audio_path_padded, seed_value, num_steps_value, guidance_scale_value, output_path_inference)

                    print("Converting video FPS...")
                    final_output_video_path = convert_video_fps(output_path_inference, output_fps_value)

                    if final_output_video_path is None:
                        print("Error: Failed to convert video FPS.")
                    else:
                        # Get dimensions from the original image for display scaling
                        img_display = cv2.imread(image_path_final)
                        if img_display is not None:
                            height_display, width_display, _ = img_display.shape
                        else:
                            height_display, width_display = 720, 1280 # Default if image read fails

                        print("Inference complete. Displaying output video:")
                        display(Video(final_output_video_path, embed=True, width=int(width_display * video_scale_value), height=int(height_display * video_scale_value)))
    except Exception as e:
        with output_display:
            print(f"An unexpected error occurred: {e}")
    finally:
        with output_display:
            print("Cleanup complete.")
            if torch.cuda.is_available():
                torch.cuda.empty_cache()
                torch.cuda.ipc_collect()
                gc.collect()
            # Clean up temporary files created during this run
            for path in [video_path_base, audio_path_padded, output_path_inference]:
                if path and os.path.exists(path):
                    os.remove(path)
            print("Temporary files removed.")

display(output_display)

In [5]:
from google.colab import files

output_filename = 'converted_25fps.mp4'

if os.path.exists(output_filename):
    files.download(output_filename)
    print(f"Файл {output_filename} был скачан.")
else:
    print(f"Файл {output_filename} не найден. Убедитесь, что он был успешно сгенерирован.")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Файл converted_25fps.mp4 был скачан.
