In [None]:
print("=" * 80)
print("INSTALLING REQUIRED PACKAGES")
print("=" * 80)

# Install all required packages
!pip install -q torch torchvision torchaudio
!pip install -q transformers accelerate
!pip install -q ultralytics
!pip install -q opencv-python-headless
!pip install -q pandas numpy
!pip install -q pytorchvideo
!pip install -q decord
!pip install -q pillow
!pip install -q av
!pip install -q huggingface-hub

print("\n✓ All packages installed successfully!\n")

In [None]:
import torch
import cv2
import numpy as np
import pandas as pd
from pathlib import Path
import json
import time
from datetime import datetime
from PIL import Image
import urllib.request
from google.colab import files
import os
from typing import List, Dict, Tuple
import warnings
warnings.filterwarnings('ignore')

print("=" * 80)
print("IMPORTING LIBRARIES")
print("=" * 80)

# Check GPU availability
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"\n✓ Using device: {device}")
if device == "cuda":
    print(f"  GPU: {torch.cuda.get_device_name(0)}")
    print(f"  Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")

In [None]:
print("\n" + "=" * 80)
print("DOWNLOADING SAMPLE VIDEOS")
print("=" * 80)

# Create directories
os.makedirs("data/videos", exist_ok=True)
os.makedirs("data/annotations", exist_ok=True)
os.makedirs("results", exist_ok=True)

# For this demo, we'll use sample videos from EPIC-KITCHENS or create test videos
# You can replace these with actual EPIC-KITCHENS videos

print("\nNote: For full EPIC-KITCHENS dataset, visit: https://epic-kitchens.github.io/")
print("For this demo, we'll work with sample/test videos.")
print("\nYou can upload your own videos using the code below:")

# Uncomment to upload your own videos
# uploaded = files.upload()
# for filename in uploaded.keys():
#     !mv {filename} data/videos/

# Create a sample annotation file structure
sample_annotations = {
    "video_id": [],
    "start_frame": [],
    "end_frame": [],
    "verb": [],
    "noun": [],
    "action": []
}

print("\n✓ Directory structure created")

In [None]:
print("\n" + "=" * 80)
print("MODEL 1: YOLOv8 + VideoMAE Pipeline")
print("=" * 80)

from ultralytics import YOLO
from transformers import VideoMAEImageProcessor, VideoMAEForVideoClassification

class YOLOVideoMAEPipeline:
    def __init__(self, device="cuda"):
        self.device = device
        print("\nLoading YOLOv8 model...")
        self.yolo_model = YOLO('yolov8m.pt')  # Medium model for balance
        
        print("Loading VideoMAE model...")
        self.videomae_processor = VideoMAEImageProcessor.from_pretrained(
            "MCG-NJU/videomae-base-finetuned-kinetics"
        )
        self.videomae_model = VideoMAEForVideoClassification.from_pretrained(
            "MCG-NJU/videomae-base-finetuned-kinetics"
        ).to(device)
        print("✓ Models loaded successfully")
    
    def process_video(self, video_path: str, sample_frames: int = 16) -> Dict:
        """Process video with YOLOv8 for objects and VideoMAE for actions"""
        results = {
            "video_path": video_path,
            "timestamp": datetime.now().isoformat(),
            "objects_detected": [],
            "actions_detected": [],
            "structured_output": {}
        }
        
        # Read video
        cap = cv2.VideoCapture(video_path)
        fps = cap.get(cv2.CAP_PROP_FPS)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        
        print(f"\nProcessing: {video_path}")
        print(f"Total frames: {total_frames}, FPS: {fps}")
        
        # Sample frames uniformly
        frame_indices = np.linspace(0, total_frames - 1, sample_frames, dtype=int)
        frames = []
        
        for idx in frame_indices:
            cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
            ret, frame = cap.read()
            if ret:
                frames.append(frame)
        
        cap.release()
        
        if len(frames) == 0:
            print("⚠ No frames extracted")
            return results
        
        # YOLO Object Detection on middle frame
        middle_frame = frames[len(frames) // 2]
        yolo_results = self.yolo_model(middle_frame, verbose=False)
        
        objects = []
        for result in yolo_results:
            for box in result.boxes:
                obj = {
                    "class": result.names[int(box.cls)],
                    "confidence": float(box.conf),
                    "bbox": box.xyxy[0].cpu().numpy().tolist()
                }
                objects.append(obj)
        
        results["objects_detected"] = objects
        print(f"✓ Detected {len(objects)} objects")
        
        # VideoMAE Action Recognition
        # Convert frames to RGB and resize
        video_frames = [cv2.cvtColor(f, cv2.COLOR_BGR2RGB) for f in frames]
        video_frames = [Image.fromarray(f) for f in video_frames]
        
        inputs = self.videomae_processor(video_frames, return_tensors="pt")
        inputs = {k: v.to(self.device) for k, v in inputs.items()}
        
        with torch.no_grad():
            outputs = self.videomae_model(**inputs)
            logits = outputs.logits
            predicted_class = logits.argmax(-1).item()
        
        action = self.videomae_model.config.id2label[predicted_class]
        confidence = torch.softmax(logits, dim=-1).max().item()
        
        results["actions_detected"] = [{
            "action": action,
            "confidence": confidence
        }]
        
        # Structured output for downstream modules
        results["structured_output"] = {
            "timestamp": datetime.now().isoformat(),
            "duration_seconds": total_frames / fps if fps > 0 else 0,
            "primary_action": action,
            "action_confidence": confidence,
            "objects_in_scene": [obj["class"] for obj in objects],
            "high_confidence_objects": [
                obj["class"] for obj in objects if obj["confidence"] > 0.5
            ]
        }
        
        print(f"✓ Action detected: {action} (confidence: {confidence:.3f})")
        
        return results

# Initialize Model 1
print("\nInitializing YOLOv8 + VideoMAE Pipeline...")
model1 = YOLOVideoMAEPipeline(device=device)


In [None]:
# ============================================================================
# SECTION 5: MODEL 2 - LLAVA-1.5
# ============================================================================

print("\n" + "=" * 80)
print("MODEL 2: LLaVA-1.5 (End-to-End VLM)")
print("=" * 80)

from transformers import LlavaNextProcessor, LlavaNextForConditionalGeneration

class LLaVAPipeline:
    def __init__(self, device="cuda"):
        self.device = device
        print("\nLoading LLaVA-1.5 model (this may take a few minutes)...")
        
        # Using LLaVA-NeXT which is the latest version
        model_id = "llava-hf/llava-1.5-7b-hf"
        
        self.processor = LlavaNextProcessor.from_pretrained(model_id)
        self.model = LlavaNextForConditionalGeneration.from_pretrained(
            model_id,
            torch_dtype=torch.float16,
            device_map="auto",
            low_cpu_mem_usage=True
        )
        print("✓ LLaVA model loaded successfully")
    
    def process_video(self, video_path: str, sample_frames: int = 8) -> Dict:
        """Process video with LLaVA for end-to-end understanding"""
        results = {
            "video_path": video_path,
            "timestamp": datetime.now().isoformat(),
            "llava_analysis": {}
        }
        
        # Read video and sample frames
        cap = cv2.VideoCapture(video_path)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        fps = cap.get(cv2.CAP_PROP_FPS)
        
        frame_indices = np.linspace(0, total_frames - 1, sample_frames, dtype=int)
        frames = []
        
        for idx in frame_indices:
            cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
            ret, frame = cap.read()
            if ret:
                frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                frames.append(Image.fromarray(frame_rgb))
        
        cap.release()
        
        if len(frames) == 0:
            return results
        
        # Analyze multiple frames with different prompts
        prompts = [
            "What objects can you see in this image? List them.",
            "What action is being performed in this image?",
            "Describe what is happening in this scene in detail.",
            "Is there any safety concern in this image? What objects need attention?"
        ]
        
        analyses = {}
        
        for i, prompt in enumerate(prompts):
            # Use middle frame for analysis
            middle_frame = frames[len(frames) // 2]
            
            conversation = [
                {
                    "role": "user",
                    "content": [
                        {"type": "image"},
                        {"type": "text", "text": prompt}
                    ]
                }
            ]
            
            prompt_text = self.processor.apply_chat_template(
                conversation, add_generation_prompt=True
            )
            
            inputs = self.processor(
                images=middle_frame,
                text=prompt_text,
                return_tensors="pt"
            ).to(self.device)
            
            with torch.no_grad():
                output = self.model.generate(
                    **inputs,
                    max_new_tokens=100,
                    do_sample=False
                )
            
            response = self.processor.decode(
                output[0], skip_special_tokens=True
            )
            
            # Extract just the answer (after "ASSISTANT:")
            if "ASSISTANT:" in response:
                response = response.split("ASSISTANT:")[-1].strip()
            
            analyses[f"prompt_{i+1}"] = {
                "question": prompt,
                "answer": response
            }
            
            print(f"✓ Analyzed with prompt {i+1}/{len(prompts)}")
        
        results["llava_analysis"] = analyses
        
        # Create structured output
        results["structured_output"] = {
            "timestamp": datetime.now().isoformat(),
            "duration_seconds": total_frames / fps if fps > 0 else 0,
            "objects_mentioned": analyses.get("prompt_1", {}).get("answer", ""),
            "action_description": analyses.get("prompt_2", {}).get("answer", ""),
            "scene_description": analyses.get("prompt_3", {}).get("answer", ""),
            "safety_assessment": analyses.get("prompt_4", {}).get("answer", "")
        }
        
        return results

# Initialize Model 2
print("\nInitializing LLaVA Pipeline...")
model2 = LLaVAPipeline(device=device)


In [None]:
print("\n" + "=" * 80)
print("MODEL 3: Qwen2-VL (State-of-Art VLM)")
print("=" * 80)

from transformers import Qwen2VLForConditionalGeneration, AutoProcessor

class Qwen2VLPipeline:
    def __init__(self, device="cuda"):
        self.device = device
        print("\nLoading Qwen2-VL model...")
        
        model_id = "Qwen/Qwen2-VL-2B-Instruct"  # Using 2B for Colab compatibility
        
        self.model = Qwen2VLForConditionalGeneration.from_pretrained(
            model_id,
            torch_dtype=torch.float16,
            device_map="auto"
        )
        self.processor = AutoProcessor.from_pretrained(model_id)
        print("✓ Qwen2-VL model loaded successfully")
    
    def process_video(self, video_path: str, sample_frames: int = 8) -> Dict:
        """Process video with Qwen2-VL"""
        results = {
            "video_path": video_path,
            "timestamp": datetime.now().isoformat(),
            "qwen_analysis": {}
        }
        
        # Read video
        cap = cv2.VideoCapture(video_path)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        fps = cap.get(cv2.CAP_PROP_FPS)
        
        frame_indices = np.linspace(0, total_frames - 1, sample_frames, dtype=int)
        frames = []
        
        for idx in frame_indices:
            cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
            ret, frame = cap.read()
            if ret:
                frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                frames.append(Image.fromarray(frame_rgb))
        
        cap.release()
        
        if len(frames) == 0:
            return results
        
        # Qwen2-VL prompts for dementia care
        prompts = [
            "List all objects visible in this image.",
            "What hand-object interaction is happening? Describe the action.",
            "Describe the scene and what activity is being performed.",
            "From a safety perspective for an elderly person, what should we monitor here?"
        ]
        
        analyses = {}
        
        for i, prompt in enumerate(prompts):
            middle_frame = frames[len(frames) // 2]
            
            messages = [
                {
                    "role": "user",
                    "content": [
                        {"type": "image", "image": middle_frame},
                        {"type": "text", "text": prompt}
                    ]
                }
            ]
            
            text = self.processor.apply_chat_template(
                messages, tokenize=False, add_generation_prompt=True
            )
            
            inputs = self.processor(
                text=[text],
                images=[middle_frame],
                return_tensors="pt",
                padding=True
            ).to(self.device)
            
            with torch.no_grad():
                output_ids = self.model.generate(
                    **inputs,
                    max_new_tokens=128
                )
            
            response = self.processor.batch_decode(
                output_ids,
                skip_special_tokens=True,
                clean_up_tokenization_spaces=False
            )[0]
            
            # Extract answer after the prompt
            if prompt in response:
                response = response.split(prompt)[-1].strip()
            
            analyses[f"prompt_{i+1}"] = {
                "question": prompt,
                "answer": response
            }
            
            print(f"✓ Analyzed with prompt {i+1}/{len(prompts)}")
        
        results["qwen_analysis"] = analyses
        
        # Structured output
        results["structured_output"] = {
            "timestamp": datetime.now().isoformat(),
            "duration_seconds": total_frames / fps if fps > 0 else 0,
            "objects_detected": analyses.get("prompt_1", {}).get("answer", ""),
            "action_detected": analyses.get("prompt_2", {}).get("answer", ""),
            "scene_context": analyses.get("prompt_3", {}).get("answer", ""),
            "safety_monitoring": analyses.get("prompt_4", {}).get("answer", "")
        }
        
        return results

# Initialize Model 3
print("\nInitializing Qwen2-VL Pipeline...")
model3 = Qwen2VLPipeline(device=device)

In [None]:
#  ============================================================================
# SECTION 7: BENCHMARKING FRAMEWORK
# ============================================================================

print("\n" + "=" * 80)
print("BENCHMARKING FRAMEWORK")
print("=" * 80)

class BenchmarkRunner:
    def __init__(self, models: Dict):
        self.models = models
        self.results = []
    
    def run_benchmark(self, video_path: str) -> Dict:
        """Run all models on a video and collect results"""
        print(f"\n{'=' * 80}")
        print(f"BENCHMARKING VIDEO: {video_path}")
        print(f"{'=' * 80}")
        
        benchmark_results = {
            "video": video_path,
            "timestamp": datetime.now().isoformat(),
            "model_results": {}
        }
        
        for model_name, model in self.models.items():
            print(f"\n--- Running {model_name} ---")
            start_time = time.time()
            
            try:
                result = model.process_video(video_path)
                elapsed_time = time.time() - start_time
                
                result["inference_time"] = elapsed_time
                benchmark_results["model_results"][model_name] = result
                
                print(f"✓ {model_name} completed in {elapsed_time:.2f}s")
                
            except Exception as e:
                print(f"✗ {model_name} failed: {str(e)}")
                benchmark_results["model_results"][model_name] = {
                    "error": str(e),
                    "inference_time": time.time() - start_time
                }
        
        self.results.append(benchmark_results)
        return benchmark_results
    
    def save_results(self, output_path: str = "results/benchmark_results.json"):
        """Save all benchmark results to JSON"""
        with open(output_path, 'w') as f:
            json.dump(self.results, f, indent=2)
        print(f"\n✓ Results saved to {output_path}")
    
    def generate_report(self) -> pd.DataFrame:
        """Generate a summary report"""
        report_data = []
        
        for result in self.results:
            video = result["video"]
            for model_name, model_result in result["model_results"].items():
                row = {
                    "video": video,
                    "model": model_name,
                    "inference_time": model_result.get("inference_time", None),
                    "success": "error" not in model_result
                }
                report_data.append(row)
        
        df = pd.DataFrame(report_data)
        return df

# Initialize benchmark runner
models_dict = {
    "YOLOv8_VideoMAE": model1,
    "LLaVA-1.5": model2,
    "Qwen2-VL": model3
}

benchmark = BenchmarkRunner(models_dict)

In [None]:
# ============================================================================
# SECTION 8: RUN BENCHMARK ON UCF101
# ============================================================================

print("\n" + "=" * 80)
print("RUNNING BENCHMARK ON UCF101 VIDEOS")
print("=" * 80)

# Run benchmark on all selected videos
for i, video_path in enumerate(test_videos, 1):
    print(f"\n[{i}/{len(test_videos)}] Processing: {Path(video_path).parent.name}/{Path(video_path).name}")
    try:
        results = benchmark.run_benchmark(video_path)
    except Exception as e:
        print(f"✗ Error processing {video_path}: {e}")
        continue

# Save results
benchmark.save_results()

# Generate report
report = benchmark.generate_report()

print("\n" + "=" * 80)
print("BENCHMARK REPORT")
print("=" * 80)
print(report)

# Summary statistics
print("\n" + "=" * 80)
print("SUMMARY STATISTICS")
print("=" * 80)

summary = report.groupby('model').agg({
    'inference_time': ['mean', 'std', 'min', 'max'],
    'success': 'sum'
}).round(2)

print(summary)

# Save report to CSV
report.to_csv('results/benchmark_report.csv', index=False)
print("\n✓ Detailed report saved to results/benchmark_report.csv")