# Convert 3D Poses to Action Videos using Diffusion Models

This notebook converts sequences of 3D joint poses into action videos using diffusion models (AnimateDiff/ControlNet).

## 1. Installation

Install required libraries for pose-to-video generation.

In [None]:
# Install required packages
!pip install -q diffusers transformers accelerate torch torchvision opencv-python numpy pillow imageio tqdm
!pip install -q xformers  # For memory efficient attention (optional but recommended)

## 2. Imports and Setup

In [None]:
import os
import json
import numpy as np
import torch
from pathlib import Path
from typing import List, Tuple
import cv2
from PIL import Image, ImageDraw
import imageio
from tqdm import tqdm
import matplotlib.pyplot as plt

from diffusers import (
    StableDiffusionControlNetPipeline,
    ControlNetModel,
    DDIMScheduler,
)
from diffusers.utils import load_image

print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA device: {torch.cuda.get_device_name(0)}")

## 3. Load Pose Data

Upload your pose files to Colab or mount Google Drive.

In [None]:
# Mount Google Drive (optional - if your data is in Drive)
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Alternative: Upload files directly
from google.colab import files
uploaded = files.upload()

# Or specify path if data is already in Colab/Drive
DATA_DIR = "/content/drive/MyDrive/dataset/violence/violence_01"  # Update this path
# DATA_DIR = "/content/pose_data"  # Or use uploaded files

In [None]:
def load_pose_data(file_path):
    """Load pose data from JSON, numpy, or other formats"""
    file_path = Path(file_path)
    
    if file_path.suffix == '.json':
        with open(file_path, 'r') as f:
            data = json.load(f)
        
        # Handle different JSON structures
        if isinstance(data, list):
            poses = []
            for frame in data:
                if isinstance(frame, dict) and 'joints' in frame:
                    joints = np.array(frame['joints'])
                elif isinstance(frame, dict) and 'pose' in frame:
                    joints = np.array(frame['pose'])
                elif isinstance(frame, list):
                    joints = np.array(frame)
                else:
                    joints = np.array(list(frame.values()))
                poses.append(joints)
            return np.array(poses)
        elif isinstance(data, dict):
            if 'frames' in data:
                return np.array(data['frames'])
            elif 'poses' in data:
                return np.array(data['poses'])
            elif 'joints' in data:
                return np.array(data['joints'])
            else:
                return np.array(list(data.values()))
    
    elif file_path.suffix == '.npy':
        return np.load(file_path)
    
    elif file_path.suffix == '.npz':
        data = np.load(file_path)
        return data[list(data.keys())[0]]
    
    else:
        raise ValueError(f"Unsupported format: {file_path.suffix}")

# Example: Load pose file
# poses_3d = load_pose_data("path/to/pose_file.json")
# print(f"Loaded poses shape: {poses_3d.shape}")

## 4. Convert 3D Poses to OpenPose Format

Convert 3D poses to 2D OpenPose skeleton images for ControlNet.

In [None]:
class PoseToOpenPose:
    """Convert 3D poses to 2D OpenPose format for ControlNet"""
    
    # Skeleton connections (adjust based on your joint structure)
    SKELETON = [
        [0, 1], [1, 2], [1, 5], [2, 3], [3, 4],  # Head, arms
        [5, 6], [6, 7], [1, 8], [8, 9], [9, 10],  # Body, right leg
        [10, 11], [8, 12], [12, 13], [13, 14],  # Left leg
    ]
    
    def __init__(self, image_size=(512, 512)):
        self.image_size = image_size
    
    def project_to_2d(self, pose_3d):
        """Project 3D pose to 2D by taking XY coordinates"""
        if pose_3d.shape[-1] >= 2:
            return pose_3d[..., :2]
        return pose_3d
    
    def draw_openpose(self, keypoints_2d):
        """Draw OpenPose skeleton on blank image"""
        img = Image.new('RGB', self.image_size, color='black')
        draw = ImageDraw.Draw(img)
        
        if keypoints_2d.shape[0] > 0:
            # Normalize to [0, 1]
            keypoints = keypoints_2d.copy()
            keypoints = keypoints - keypoints.min(axis=0)
            max_val = keypoints.max()
            if max_val > 0:
                keypoints = keypoints / max_val
            
            # Scale to image size (with padding)
            padding = 0.1
            keypoints = keypoints * np.array([self.image_size[0] * (1 - 2*padding), 
                                             self.image_size[1] * (1 - 2*padding)])
            keypoints = keypoints + np.array([self.image_size[0] * padding, 
                                             self.image_size[1] * padding])
            
            # Draw skeleton connections
            for connection in self.SKELETON:
                if connection[0] < len(keypoints) and connection[1] < len(keypoints):
                    pt1 = tuple(keypoints[connection[0]].astype(int))
                    pt2 = tuple(keypoints[connection[1]].astype(int))
                    draw.line([pt1, pt2], fill='white', width=3)
            
            # Draw keypoints
            for i, pt in enumerate(keypoints):
                if i < len(keypoints):
                    x, y = tuple(pt.astype(int))
                    draw.ellipse([x-5, y-5, x+5, y+5], fill='white')
        
        return img
    
    def convert_sequence(self, poses_3d):
        """Convert sequence of 3D poses to OpenPose images"""
        openpose_images = []
        
        for pose in poses_3d:
            # Project to 2D
            pose_2d = self.project_to_2d(pose)
            # Draw skeleton
            skeleton_img = self.draw_openpose(pose_2d)
            openpose_images.append(skeleton_img)
        
        return openpose_images

# Initialize converter
pose_converter = PoseToOpenPose(image_size=(512, 512))

In [None]:
# Visualize pose conversion (example)
# Uncomment and adjust when you have pose data loaded

# if 'poses_3d' in locals():
#     # Show first frame
#     control_images = pose_converter.convert_sequence(poses_3d[:5])  # First 5 frames
#     fig, axes = plt.subplots(1, min(5, len(control_images)), figsize=(15, 3))
#     if len(control_images) == 1:
#         axes = [axes]
#     for i, img in enumerate(control_images[:5]):
#         axes[i].imshow(img)
#         axes[i].axis('off')
#         axes[i].set_title(f'Frame {i+1}')
#     plt.tight_layout()
#     plt.show()

## 5. Load Diffusion Model

Load ControlNet with OpenPose for pose-conditioned video generation.

In [None]:
# Load ControlNet with OpenPose
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

controlnet = ControlNetModel.from_pretrained(
    "lllyasviel/sd-controlnet-openpose",
    torch_dtype=torch.float16 if device == "cuda" else torch.float32,
)

pipe = StableDiffusionControlNetPipeline.from_pretrained(
    "runwayml/stable-diffusion-v1-5",
    controlnet=controlnet,
    torch_dtype=torch.float16 if device == "cuda" else torch.float32,
    safety_checker=None,
    requires_safety_checker=False,
)

# Enable memory efficient attention
pipe.enable_model_cpu_offload()
try:
    pipe.enable_xformers_memory_efficient_attention()
    print("XFormers enabled for memory efficiency")
except:
    print("XFormers not available, using default attention")

print("Model loaded successfully!")

## 6. Generate Video from Poses

Generate action video frames from the pose sequence.

In [None]:
# Configuration
PROMPT = "a person performing a violent action, high quality, detailed, realistic, dynamic movement, cinematic"
NEGATIVE_PROMPT = "blurry, low quality, distorted, cartoon, animation, static, low resolution"
NUM_INFERENCE_STEPS = 20  # More steps = better quality but slower
GUIDANCE_SCALE = 7.5  # How closely to follow the prompt
FPS = 8  # Frames per second for output video

def generate_video_frames(control_images, prompt, negative_prompt, num_steps=20, guidance=7.5):
    """Generate video frames from control images"""
    frames = []
    
    print(f"Generating {len(control_images)} frames...")
    for i, control_img in enumerate(tqdm(control_images, desc="Generating frames")):
        # Add frame context to prompt for better temporal consistency
        frame_prompt = f"{prompt}, frame {i+1} of {len(control_images)}"
        
        # Generate frame
        result = pipe(
            prompt=frame_prompt,
            image=control_img,
            negative_prompt=negative_prompt,
            num_inference_steps=num_steps,
            guidance_scale=guidance,
        ).images[0]
        
        frames.append(result)
    
    return frames

# Example usage (uncomment when you have pose data):
# control_images = pose_converter.convert_sequence(poses_3d)
# video_frames = generate_video_frames(
#     control_images,
#     PROMPT,
#     NEGATIVE_PROMPT,
#     NUM_INFERENCE_STEPS,
#     GUIDANCE_SCALE
# )

In [None]:
def save_video(frames, output_path, fps=8):
    """Save frames as video file"""
    output_path = Path(output_path)
    output_path.parent.mkdir(parents=True, exist_ok=True)
    
    # Convert PIL images to numpy arrays
    frame_arrays = [np.array(frame) for frame in frames]
    
    # Save as video using imageio
    imageio.mimsave(
        str(output_path),
        frame_arrays,
        fps=fps,
        codec='libx264',
        quality=8,
    )
    
    print(f"Video saved to: {output_path}")

# Example usage:
# save_video(video_frames, "/content/output_video.mp4", fps=FPS)

## 7. Complete Pipeline

Run the complete pipeline from pose data to video.

In [None]:
# Complete pipeline
def process_pose_file(pose_file_path, output_path, max_frames=None):
    """Complete pipeline: Load pose -> Convert to OpenPose -> Generate video -> Save"""
    
    # 1. Load pose data
    print(f"Loading pose data from: {pose_file_path}")
    poses_3d = load_pose_data(pose_file_path)
    print(f"  Loaded poses shape: {poses_3d.shape}")
    
    # Limit frames if specified (for testing)
    if max_frames and len(poses_3d) > max_frames:
        poses_3d = poses_3d[:max_frames]
        print(f"  Limited to {max_frames} frames")
    
    # 2. Normalize poses (center and scale)
    # Center the pose
    for i in range(len(poses_3d)):
        poses_3d[i] = poses_3d[i] - poses_3d[i].mean(axis=0, keepdims=True)
    
    # 3. Convert to OpenPose format
    print("Converting to OpenPose format...")
    control_images = pose_converter.convert_sequence(poses_3d)
    
    # 4. Generate video frames
    print("Generating video frames...")
    video_frames = generate_video_frames(
        control_images,
        PROMPT,
        NEGATIVE_PROMPT,
        NUM_INFERENCE_STEPS,
        GUIDANCE_SCALE
    )
    
    # 5. Save video
    print(f"Saving video to: {output_path}")
    save_video(video_frames, output_path, fps=FPS)
    
    return video_frames

# Example: Process a single file
# video_frames = process_pose_file(
#     pose_file_path="/content/pose_data/pose_sequence.json",
#     output_path="/content/output_video.mp4",
#     max_frames=16  # Limit for testing
# )

In [None]:
# Process all pose files in a directory
def process_directory(input_dir, output_dir, max_frames=None):
    """Process all pose files in a directory"""
    input_dir = Path(input_dir)
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
    
    # Find all pose files
    pose_files = []
    for ext in ['.json', '.npy', '.npz']:
        pose_files.extend(list(input_dir.glob(f'*{ext}')))
    
    pose_files.sort()
    
    if not pose_files:
        print(f"No pose files found in {input_dir}")
        return
    
    print(f"Found {len(pose_files)} pose file(s)")
    
    for pose_file in pose_files:
        print(f"\nProcessing: {pose_file.name}")
        output_path = output_dir / f"{pose_file.stem}_generated.mp4"
        
        try:
            process_pose_file(pose_file, output_path, max_frames=max_frames)
        except Exception as e:
            print(f"Error processing {pose_file.name}: {e}")
            import traceback
            traceback.print_exc()
            continue

# Example: Process entire directory
# process_directory(
#     input_dir="/content/drive/MyDrive/dataset/violence/violence_01",
#     output_dir="/content/output_videos",
#     max_frames=16  # Limit for testing
# )

## 8. Visualize Results

Display generated frames.

In [None]:
# Display generated video frames
def display_frames(frames, num_frames=8):
    """Display a sample of generated frames"""
    num_display = min(num_frames, len(frames))
    indices = np.linspace(0, len(frames)-1, num_display, dtype=int)
    
    fig, axes = plt.subplots(1, num_display, figsize=(4*num_display, 4))
    if num_display == 1:
        axes = [axes]
    
    for i, idx in enumerate(indices):
        axes[i].imshow(frames[idx])
        axes[i].axis('off')
        axes[i].set_title(f'Frame {idx+1}')
    
    plt.tight_layout()
    plt.show()

# Example: Display frames
# if 'video_frames' in locals():
#     display_frames(video_frames, num_frames=8)

## 9. Download Results

Download the generated video.

In [None]:
# Download generated video
# from google.colab import files
# files.download('/content/output_video.mp4')