# Distributed Video Generation with Zeroscope

This notebook demonstrates how to use the distributed coordinator to generate videos by splitting the latent space into chunks and processing them in parallel.

In [10]:
!pip install pynvml


Collecting pynvml
  Downloading pynvml-12.0.0-py3-none-any.whl.metadata (5.4 kB)
Collecting nvidia-ml-py<13.0.0a0,>=12.0.0 (from pynvml)
  Downloading nvidia_ml_py-12.570.86-py3-none-any.whl.metadata (8.7 kB)
Downloading pynvml-12.0.0-py3-none-any.whl (26 kB)
Downloading nvidia_ml_py-12.570.86-py3-none-any.whl (44 kB)
Installing collected packages: nvidia-ml-py, pynvml
Successfully installed nvidia-ml-py-12.570.86 pynvml-12.0.0


## 1. Setup and Imports

First, let's import all necessary libraries and set up logging.

In [11]:
import os
import torch
import pickle
import logging
import numpy as np
from pathlib import Path
from typing import List, Dict, Tuple

from diffusers import DiffusionPipeline
from diffusers.utils import export_to_video

# Configure logging
logging.basicConfig(level=logging.INFO, 
                    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

## 2. Initialise Coordinator

Create a coordinator instance with our desired configuration.

In [12]:
from distributed_coordinator import DistributedCoordinator

coordinator = DistributedCoordinator(
    model_id="cerspense/zeroscope_v2_576w",
    device="cuda",
    num_inference_steps=50,
    chunk_size=8,
    overlap=2,
    num_workers=2
)

## 3. Load Models

Load all required model components (VAE, text encoder, etc.).

In [4]:
coordinator.load_models()

2025-04-18 15:16:01,382 - distributed_coordinator - INFO - Loading pipeline components from cerspense/zeroscope_v2_576w
Fetching 12 files: 100%|██████████| 12/12 [01:17<00:00,  6.49s/it]
Loading pipeline components...: 100%|██████████| 5/5 [00:34<00:00,  6.91s/it]


## 4. Prepare Latents

Create initial noise latents and encode the text prompt.

In [5]:
prompt = "A rocket launching into space, cinematic, detailed, 4K"
num_frames = 16

latents, text_embeddings = coordinator.prepare_latents(prompt, num_frames)
print(f"Latents shape: {latents.shape}")
print(f"Text embeddings shape: {text_embeddings.shape}")

Latents shape: torch.Size([1, 4, 16, 40, 72])
Text embeddings shape: torch.Size([1, 77, 1024])


## 5. Split into Chunks

Split the latent tensor into overlapping chunks for distributed processing.

In [8]:
chunks = coordinator.split_into_chunks(latents, text_embeddings)
print(f"Number of chunks: {len(chunks)}")
for i, chunk in enumerate(chunks):
    print(f"Chunk {i}: frames {chunk['start_idx']}-{chunk['end_idx']}, shape {chunk['chunk'].shape}")

Number of chunks: 3
Chunk 0: frames 0-8, shape torch.Size([1, 4, 8, 40, 72])
Chunk 1: frames 6-14, shape torch.Size([1, 4, 8, 40, 72])
Chunk 2: frames 12-16, shape torch.Size([1, 4, 4, 40, 72])


## 6. Save Chunks

Save each chunk to disk for processing by workers.

In [9]:
chunk_files = []
for i, chunk in enumerate(chunks):
    chunk_file = coordinator.save_chunk(chunk, i)
    chunk_files.append(chunk_file)
    print(f"Saved chunk {i} to {chunk_file}")

Saved chunk 0 to ./output/chunks/chunk_0.pkl
Saved chunk 1 to ./output/chunks/chunk_1.pkl
Saved chunk 2 to ./output/chunks/chunk_2.pkl


## 6.5 Performance Evaluation Functions

In [14]:
import pynvml

def log_vram_usage(stage=""):
    pynvml.nvmlInit()
    handle = pynvml.nvmlDeviceGetHandleByIndex(0)  # GPU 0 by default
    info = pynvml.nvmlDeviceGetMemoryInfo(handle)
    used = info.used // 1024**2
    total = info.total // 1024**2
    print(f"[{stage}] GPU VRAM Usage: {used} MB / {total} MB")
    pynvml.nvmlShutdown()

## 7. Launch Workers

Launch worker processes to process each chunk. In a real distributed setting, these would run on separate machines.

In [None]:
output_files = []
for i, chunk_file in enumerate(chunk_files):
    output_file = coordinator.launch_worker(chunk_file, i)
    output_files.append(output_file)
    print(f"Launched worker for chunk {i}, output will be saved to {output_file}")

## 8. Wait for Workers

Wait for all workers to complete processing. In a real distributed setting, you might want to implement a more sophisticated waiting mechanism.

In [None]:
import time

while True:
    all_done = True
    for output_file in output_files:
        if not os.path.exists(output_file):
            all_done = False
            break
    
    if all_done:
        print("All workers have completed!")
        break
    
    print("Waiting for workers to complete...")
    time.sleep(5)

## 9. Load Results

Load the processed chunks from disk.

In [None]:
processed_chunks = coordinator.load_results(output_files)
print(f"Loaded {len(processed_chunks)} processed chunks")

## 10. Stitch Chunks

Stitch the processed chunks back together, averaging overlapping regions.

In [None]:
stitched_latents = coordinator.stitch_chunks(processed_chunks, latents.shape)
print(f"Stitched latents shape: {stitched_latents.shape}")

## 11. Decode to Video

Decode the stitched latents into a video and save it.

In [None]:
output_path = "final_video.mp4"
coordinator.decode_to_video(stitched_latents, output_path)
print(f"Video saved to {output_path}")

## 12. Cleanup

Clean up temporary files (optional).

In [None]:
coordinator.cleanup()