# 🎬 AI Video Generator - Optimized for Google Colab

**Features:**
- 720p Resolution Support
- 10-second video generation
- 24fps output
- Realistic human posing
- Batch processing
- Memory optimization
- Progress tracking
- Automatic environment detection

**Requirements:**
- GPU Runtime (T4 or better recommended)
- ~15GB RAM
- ~20GB Disk Space

---

## 📋 Step 1: Environment Detection & Setup

In [None]:
import os
import sys
import subprocess
import platform
import psutil
import time
from pathlib import Path
from IPython.display import display, HTML, clear_output
import warnings
warnings.filterwarnings('ignore')

# Try importing optional packages with fallbacks
try:
    import ipywidgets as widgets
    print("✅ ipywidgets imported successfully")
except ImportError:
    print("⚠️ ipywidgets not available, some UI features may be limited")
    
try:
    from tqdm.notebook import tqdm
    print("✅ tqdm imported successfully")
except ImportError:
    from tqdm import tqdm
    print("⚠️ Using console tqdm instead of notebook version")

class EnvironmentDetector:
    def __init__(self):
        self.system_info = {}
        self.requirements_met = False
        
    def detect_environment(self):
        """Detect and validate the current environment"""
        print("🔍 Detecting environment...")
        
        # Basic system info
        self.system_info['platform'] = platform.system()
        self.system_info['python_version'] = sys.version
        self.system_info['is_colab'] = 'google.colab' in sys.modules
        
        # Memory info
        memory = psutil.virtual_memory()
        self.system_info['total_ram_gb'] = round(memory.total / (1024**3), 2)
        self.system_info['available_ram_gb'] = round(memory.available / (1024**3), 2)
        
        # GPU info with fallback
        try:
            import torch
            self.system_info['cuda_available'] = torch.cuda.is_available()
            if torch.cuda.is_available():
                self.system_info['gpu_name'] = torch.cuda.get_device_name(0)
                self.system_info['gpu_memory_gb'] = round(torch.cuda.get_device_properties(0).total_memory / (1024**3), 2)
            else:
                self.system_info['gpu_name'] = 'None'
                self.system_info['gpu_memory_gb'] = 0
        except:
            self.system_info['cuda_available'] = False
            self.system_info['gpu_name'] = 'None'
            self.system_info['gpu_memory_gb'] = 0
        
        # Disk space
        disk_usage = psutil.disk_usage('/')
        self.system_info['disk_free_gb'] = round(disk_usage.free / (1024**3), 2)
        
        self._display_info()
        self._check_requirements()
        
    def _display_info(self):
        """Display system information in a nice format"""
        info_html = f"""
        <div style="background-color: #f0f2f6; padding: 15px; border-radius: 10px; margin: 10px 0;">
            <h3>🖥️ System Information</h3>
            <table style="width: 100%; border-collapse: collapse;">
                <tr><td><b>Platform:</b></td><td>{self.system_info['platform']}</td></tr>
                <tr><td><b>Google Colab:</b></td><td>{'✅ Yes' if self.system_info['is_colab'] else '❌ No'}</td></tr>
                <tr><td><b>Total RAM:</b></td><td>{self.system_info['total_ram_gb']} GB</td></tr>
                <tr><td><b>Available RAM:</b></td><td>{self.system_info['available_ram_gb']} GB</td></tr>
                <tr><td><b>GPU:</b></td><td>{self.system_info['gpu_name']}</td></tr>
                <tr><td><b>GPU Memory:</b></td><td>{self.system_info['gpu_memory_gb']} GB</td></tr>
                <tr><td><b>Free Disk Space:</b></td><td>{self.system_info['disk_free_gb']} GB</td></tr>
            </table>
        </div>
        """
        display(HTML(info_html))
        
    def _check_requirements(self):
        """Check if system meets requirements"""
        issues = []
        
        if not self.system_info['cuda_available']:
            issues.append("❌ CUDA not available - GPU required for optimal performance")
        elif self.system_info['gpu_memory_gb'] < 6:
            issues.append("⚠️ GPU memory < 6GB - may experience memory issues")
            
        if self.system_info['total_ram_gb'] < 12:
            issues.append("⚠️ RAM < 12GB - may experience memory issues")
            
        if self.system_info['disk_free_gb'] < 15:
            issues.append("⚠️ Disk space < 15GB - may not have enough space for models")
            
        if issues:
            print("\n⚠️ System Issues Detected:")
            for issue in issues:
                print(f"  {issue}")
            print("\n💡 Recommendation: Use Google Colab Pro with GPU runtime for best results")
        else:
            print("\n✅ All requirements met! Ready to proceed.")
            self.requirements_met = True
            
        return len(issues) == 0

# Initialize and run environment detection
env_detector = EnvironmentDetector()
env_detector.detect_environment()

## 📦 Step 2: Install Dependencies

In [None]:
class DependencyManager:
    def __init__(self):
        self.packages = {
            'essential': [
                'torch>=2.1.0',
                'torchvision>=0.16.0',
                'diffusers>=0.30.3',
                'transformers>=4.44.2',
                'accelerate>=0.33.0',
                'xformers>=0.0.23',
            ],
            'video': [
                'opencv-python-headless>=4.8.0',
                'imageio[ffmpeg]>=2.31.0',
                'moviepy>=1.0.3',
                'av>=10.0.0',
                'decord>=0.6.0'
            ],
            'ui': [
                'streamlit>=1.28.0',
                'gradio>=4.0.0',
                'ipywidgets>=8.0.0',
                'matplotlib>=3.7.0',
                'pillow>=10.0.0'
            ],
            'utils': [
                'huggingface-hub>=0.19.0',
                'safetensors>=0.4.0',
                'einops>=0.7.0',
                'omegaconf>=2.3.0',
                'pyngrok>=7.0.0'
            ]
        }
        
    def install_packages(self, category='all'):
        """Install packages with progress tracking"""
        categories = [category] if category != 'all' else self.packages.keys()
        
        # Special handling for PyTorch with CUDA
        if category == 'all' or category == 'essential':
            print("\n🔥 Installing PyTorch with CUDA support...")
            torch_install_cmd = [
                sys.executable, '-m', 'pip', 'install', 'torch>=2.1.0', 'torchvision>=0.16.0', 'torchaudio>=2.1.0',
                '--index-url', 'https://download.pytorch.org/whl/cu121', '--quiet'
            ]
            
            try:
                result = subprocess.run(torch_install_cmd, capture_output=True, text=True, timeout=600)
                if result.returncode == 0:
                    print("✅ PyTorch with CUDA installed successfully")
                else:
                    print(f"⚠️ PyTorch CUDA installation failed, trying CPU version...")
                    # Fallback to CPU version
                    cpu_cmd = [sys.executable, '-m', 'pip', 'install', 'torch>=2.1.0', 'torchvision>=0.16.0', 'torchaudio>=2.1.0', '--quiet']
                    subprocess.run(cpu_cmd, timeout=600)
            except subprocess.TimeoutExpired:
                print("⚠️ PyTorch installation timeout, continuing...")
            except Exception as e:
                print(f"⚠️ Error installing PyTorch: {str(e)}")
                
        for cat in categories:
            if cat == 'essential':  # Skip essential as we handled PyTorch separately
                packages = [p for p in self.packages[cat] if not p.startswith('torch')]
            else:
                packages = self.packages[cat]
                
            if not packages:
                continue
                
            print(f"\n📦 Installing {cat} packages...")
            
            progress_bar = tqdm(packages, desc=f"Installing {cat}")
            for package in progress_bar:
                try:
                    progress_bar.set_postfix_str(f"Installing {package.split('>=')[0]}")
                    result = subprocess.run(
                        [sys.executable, '-m', 'pip', 'install', package, '--quiet'],
                        capture_output=True,
                        text=True,
                        timeout=300
                    )
                    if result.returncode != 0:
                        print(f"⚠️ Warning: Failed to install {package}")
                        print(f"Error: {result.stderr}")
                except subprocess.TimeoutExpired:
                    print(f"⚠️ Timeout installing {package}")
                except Exception as e:
                    print(f"⚠️ Error installing {package}: {str(e)}")
                    
        print("\n✅ Dependencies installation completed!")
        
    def verify_installation(self):
        """Verify critical packages are installed"""
        critical_imports = {
            'torch': 'PyTorch',
            'diffusers': 'Diffusers',
            'transformers': 'Transformers',
            'cv2': 'OpenCV',
            'imageio': 'ImageIO',
            'streamlit': 'Streamlit'
        }
        
        print("\n🔍 Verifying installations...")
        for module, name in critical_imports.items():
            try:
                __import__(module)
                print(f"✅ {name} - OK")
            except ImportError:
                print(f"❌ {name} - FAILED")
                
        # Check CUDA availability
        try:
            import torch
            if torch.cuda.is_available():
                print(f"✅ CUDA - OK (Device: {torch.cuda.get_device_name(0)})")
            else:
                print("⚠️ CUDA - Not available")
        except:
            print("❌ CUDA - Error checking")

# Install dependencies
dep_manager = DependencyManager()
dep_manager.install_packages()
dep_manager.verify_installation()

## 🤖 Step 3: Model Download & Setup

In [None]:
import torch
import gc
from huggingface_hub import snapshot_download, hf_hub_download
from pathlib import Path
import json
import urllib.request

class ModelManager:
    def __init__(self):
        self.models_dir = Path("/content/models")
        self.models_dir.mkdir(exist_ok=True)
        
        self.model_configs = {
            'cogvideox': {
                'repo_id': 'THUDM/CogVideoX-2b',
                'local_dir': self.models_dir / 'CogVideoX-2b',
                'size_gb': 8.5,
                'description': 'Main video generation model'
            },
            'venhancer': {
                'repo_id': 'jwhejwhe/VEnhancer',
                'local_dir': self.models_dir / 'VEnhancer',
                'size_gb': 2.1,
                'description': 'Video enhancement model'
            }
        }
        
    def download_model(self, model_name, force_download=False):
        """Download a specific model with progress tracking"""
        if model_name not in self.model_configs:
            raise ValueError(f"Unknown model: {model_name}")
            
        config = self.model_configs[model_name]
        local_dir = config['local_dir']
        
        # Check if model already exists
        if local_dir.exists() and not force_download:
            print(f"✅ {model_name} already exists at {local_dir}")
            return str(local_dir)
            
        print(f"\n📥 Downloading {model_name} ({config['size_gb']:.1f}GB)...")
        print(f"📝 {config['description']}")
        
        try:
            # Create progress callback
            def progress_callback(block_num, block_size, total_size):
                if total_size > 0:
                    percent = min(100, (block_num * block_size * 100) / total_size)
                    print(f"\rProgress: {percent:.1f}%", end="")
                    
            # Download with resume capability
            local_dir_str = str(local_dir)
            snapshot_download(
                repo_id=config['repo_id'],
                local_dir=local_dir_str,
                resume_download=True,
                local_dir_use_symlinks=False,
                ignore_patterns=["*.git*", "README.md", "*.txt"]
            )
            
            print(f"\n✅ {model_name} downloaded successfully!")
            return local_dir_str
            
        except Exception as e:
            print(f"\n❌ Error downloading {model_name}: {str(e)}")
            return None
            
    def download_all_models(self):
        """Download all required models"""
        total_size = sum(config['size_gb'] for config in self.model_configs.values())
        print(f"\n📦 Downloading all models (Total: {total_size:.1f}GB)")
        
        downloaded_models = {}
        for model_name in self.model_configs.keys():
            path = self.download_model(model_name)
            if path:
                downloaded_models[model_name] = path
            else:
                print(f"⚠️ Failed to download {model_name}")
                
        return downloaded_models
        
    def get_model_info(self):
        """Get information about downloaded models"""
        info = {}
        for model_name, config in self.model_configs.items():
            local_dir = config['local_dir']
            info[model_name] = {
                'exists': local_dir.exists(),
                'path': str(local_dir),
                'size_gb': config['size_gb'],
                'description': config['description']
            }
        return info
        
    def cleanup_models(self):
        """Clean up downloaded models to free space"""
        import shutil
        if self.models_dir.exists():
            shutil.rmtree(self.models_dir)
            print("🗑️ All models cleaned up")

# Initialize model manager and download models
model_manager = ModelManager()

# Show model info
print("📋 Model Information:")
for name, info in model_manager.get_model_info().items():
    status = "✅ Downloaded" if info['exists'] else "❌ Not downloaded"
    print(f"  {name}: {info['description']} ({info['size_gb']:.1f}GB) - {status}")

# Download all models
downloaded_models = model_manager.download_all_models()
print(f"\n🎉 Model setup complete! Downloaded {len(downloaded_models)} models.")

## 🎬 Step 4: Optimized Video Generation Pipeline

In [None]:
import sys
sys.path.append('/content/models')

import torch
import numpy as np
import cv2
from PIL import Image
import imageio
import gc
from typing import List, Optional, Union
import subprocess
import tempfile
from pathlib import Path

# Safe import of diffusers with compatibility check
try:
    from diffusers import CogVideoXPipeline, CogVideoXDPMScheduler
    print("✅ CogVideoX pipeline imported successfully")
except ImportError as e:
    print(f"❌ Error importing CogVideoX pipeline: {str(e)}")
    print("💡 This might be due to version incompatibility.")
    print("💡 Please ensure you have:")
    print("   - torch>=2.1.0")
    print("   - diffusers>=0.30.3")
    print("   - transformers>=4.44.2")
    print("💡 Try running: pip install --upgrade torch diffusers transformers")
    raise

# Version compatibility check
print(f"📊 Version Info:")
print(f"   torch: {torch.__version__}")
try:
    import diffusers
    print(f"   diffusers: {diffusers.__version__}")
except:
    print("   diffusers: version check failed")
try:
    import transformers
    print(f"   transformers: {transformers.__version__}")
except:
    print("   transformers: version check failed")

class OptimizedVideoPipeline:
    def __init__(self, model_path: str, device: str = "cuda"):
        self.device = device
        self.model_path = model_path
        self.pipeline = None
        self.loaded = False
        
        # Optimized settings for 720p, 10s, 24fps
        self.default_settings = {
            'width': 720,
            'height': 480,  # 3:2 aspect ratio for better model performance
            'num_frames': 240,  # 10 seconds at 24fps
            'fps': 24,
            'num_inference_steps': 50,
            'guidance_scale': 6.0,
            'num_videos_per_prompt': 1
        }
        
    def load_pipeline(self):
        """Load the video generation pipeline with memory optimization"""
        if self.loaded:
            return
            
        print("🔄 Loading CogVideoX pipeline...")
        
        try:
            # Clear memory before loading
            gc.collect()
            torch.cuda.empty_cache()
            
            # Load pipeline with optimizations
            self.pipeline = CogVideoXPipeline.from_pretrained(
                self.model_path,
                torch_dtype=torch.float16,
                variant="fp16",
                use_safetensors=True,
                low_cpu_mem_usage=True
            )
            
            # Move to device
            self.pipeline = self.pipeline.to(self.device)
            
            # Enable memory efficient optimizations
            self.pipeline.enable_model_cpu_offload()
            self.pipeline.enable_vae_slicing()
            
            # Try to enable xformers if available
            try:
                self.pipeline.enable_xformers_memory_efficient_attention()
                print("✅ xFormers enabled for memory efficiency")
            except:
                print("⚠️ xFormers not available, using default attention")
                
            self.loaded = True
            print("✅ Pipeline loaded successfully!")
            
        except Exception as e:
            print(f"❌ Error loading pipeline: {str(e)}")
            raise
            
    def generate_video(
        self,
        prompt: str,
        negative_prompt: str = None,
        seed: int = None,
        **kwargs
    ):
        """Generate a single video with optimized settings"""
        if not self.loaded:
            self.load_pipeline()
            
        # Merge settings
        settings = {**self.default_settings, **kwargs}
        
        print(f"🎬 Generating video: {settings['width']}x{settings['height']}, {settings['num_frames']} frames, {settings['fps']} fps")
        print(f"📝 Prompt: {prompt}")
        
        # Set up generator for reproducibility
        generator = None
        if seed is not None:
            generator = torch.Generator(device=self.device).manual_seed(seed)
            print(f"🎲 Using seed: {seed}")
            
        try:
            # Clear memory before generation
            gc.collect()
            torch.cuda.empty_cache()
            
            # Generate video
            with torch.inference_mode():
                result = self.pipeline(
                    prompt=prompt,
                    negative_prompt=negative_prompt,
                    width=settings['width'],
                    height=settings['height'],
                    num_frames=settings['num_frames'],
                    num_inference_steps=settings['num_inference_steps'],
                    guidance_scale=settings['guidance_scale'],
                    num_videos_per_prompt=settings['num_videos_per_prompt'],
                    generator=generator,
                    output_type="pil"
                )
                
            # Extract frames
            frames = result.frames[0]
            
            # Clean up
            del result
            gc.collect()
            torch.cuda.empty_cache()
            
            print(f"✅ Generated {len(frames)} frames successfully!")
            return frames, settings
            
        except Exception as e:
            print(f"❌ Error generating video: {str(e)}")
            # Clean up on error
            gc.collect()
            torch.cuda.empty_cache()
            raise
            
    def save_video(
        self,
        frames: List[Image.Image],
        output_path: str,
        fps: int = 24,
        quality: int = 8
    ):
        """Save frames as video file"""
        print(f"💾 Saving video to {output_path}...")
        
        try:
            # Convert PIL images to numpy arrays
            video_frames = []
            for frame in frames:
                if isinstance(frame, Image.Image):
                    video_frames.append(np.array(frame))
                else:
                    video_frames.append(frame)
                    
            # Save using imageio
            with imageio.get_writer(
                output_path,
                fps=fps,
                codec='libx264',
                quality=quality,
                pixelformat='yuv420p'
            ) as writer:
                for frame in video_frames:
                    writer.append_data(frame)
                    
            print(f"✅ Video saved successfully!")
            return True
            
        except Exception as e:
            print(f"❌ Error saving video: {str(e)}")
            return False
            
    def unload_pipeline(self):
        """Unload pipeline to free memory"""
        if self.pipeline:
            del self.pipeline
            self.pipeline = None
            self.loaded = False
            gc.collect()
            torch.cuda.empty_cache()
            print("🗑️ Pipeline unloaded and memory cleared")

# Initialize the pipeline
cogvideox_path = "/content/models/CogVideoX-2b"
video_pipeline = OptimizedVideoPipeline(cogvideox_path)

print("🎬 Video pipeline initialized and ready!")

## 🎯 Step 5: Example Usage & Testing

In [None]:
from IPython.display import Video
from datetime import datetime
import os

# Create output directory
output_dir = Path("/content/outputs")
output_dir.mkdir(exist_ok=True)

# Example prompts optimized for realistic human poses
example_prompts = [
    "A professional businesswoman walking confidently down a modern office hallway, natural lighting, 4K quality",
    "A young man doing yoga poses in a peaceful garden, morning sunlight, cinematic composition",
    "A dancer performing contemporary dance moves in an empty studio, dramatic lighting, artistic shot",
    "An elderly person reading a book in a comfortable armchair by a window, warm lighting, cozy atmosphere",
    "A chef preparing ingredients in a modern kitchen, professional cooking, dynamic movements"
]

# Display example prompts
print("🎯 Example Prompts for Realistic Human Poses:")
for i, prompt in enumerate(example_prompts, 1):
    print(f"\n{i}. {prompt}")

# Quick test function
def quick_test():
    """Quick test of the video generation pipeline"""
    print("\n🧪 Running quick test...")
    
    try:
        # Load pipeline
        video_pipeline.load_pipeline()
        
        # Generate a short test video
        test_frames, test_settings = video_pipeline.generate_video(
            prompt="A person waving hello, friendly gesture, clear background",
            width=480,
            height=320,
            num_frames=48,  # 2 seconds at 24fps
            fps=24,
            num_inference_steps=25,  # Faster for testing
            seed=42
        )
        
        # Save test video
        test_path = "/content/outputs/test_video.mp4"
        success = video_pipeline.save_video(test_frames, test_path, fps=24)
        
        if success:
            print(f"✅ Test successful! Video saved to: {test_path}")
            
            # Display test video
            display(Video(test_path, width=480, height=320))
            
            return True
        else:
            print("❌ Test failed: Could not save video")
            return False
            
    except Exception as e:
        print(f"❌ Test failed: {str(e)}")
        return False

# Generate video function
def generate_video_simple(
    prompt: str,
    negative_prompt: str = "blurry, low quality, distorted, artifacts",
    width: int = 720,
    height: int = 480,
    duration: int = 10,
    fps: int = 24,
    seed: int = 42
):
    """Simple function to generate a video"""
    print(f"🎬 Generating video with prompt: {prompt}")
    
    try:
        # Calculate frames
        num_frames = duration * fps
        
        # Generate video
        frames, settings = video_pipeline.generate_video(
            prompt=prompt,
            negative_prompt=negative_prompt,
            width=width,
            height=height,
            num_frames=num_frames,
            fps=fps,
            seed=seed
        )
        
        # Save video
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        output_path = f"/content/outputs/video_{timestamp}.mp4"
        
        success = video_pipeline.save_video(frames, output_path, fps=fps)
        
        if success:
            print(f"✅ Video generated successfully: {output_path}")
            
            # Display video
            display(Video(output_path, width=min(width, 640), height=min(height, 480)))
            
            return output_path
        else:
            print("❌ Failed to save video")
            return None
            
    except Exception as e:
        print(f"❌ Error generating video: {str(e)}")
        return None

# Memory status function
def check_memory_status():
    """Check current memory usage"""
    try:
        import torch
        if torch.cuda.is_available():
            allocated = torch.cuda.memory_allocated(0) / 1024**3
            cached = torch.cuda.memory_reserved(0) / 1024**3
            total = torch.cuda.get_device_properties(0).total_memory / 1024**3
            
            print(f"🧠 GPU Memory Status:")
            print(f"  Allocated: {allocated:.2f} GB")
            print(f"  Cached: {cached:.2f} GB")
            print(f"  Total: {total:.2f} GB")
            print(f"  Free: {total - cached:.2f} GB")
            
        # RAM status
        import psutil
        memory = psutil.virtual_memory()
        print(f"\n🧠 RAM Status:")
        print(f"  Used: {memory.used / 1024**3:.2f} GB")
        print(f"  Available: {memory.available / 1024**3:.2f} GB")
        print(f"  Total: {memory.total / 1024**3:.2f} GB")
        
    except Exception as e:
        print(f"❌ Error checking memory: {str(e)}")

# Check memory status
check_memory_status()

print("\n🎯 Ready for video generation!")
print("\n💡 Usage Examples:")
print("  quick_test()  # Run a quick test")
print("  generate_video_simple('A person dancing in the rain')  # Generate a video")
print("\n📁 All videos will be saved to: /content/outputs/")