In [12]:
import os
import shutil
import logging
from pathlib import Path
from typing import List, Dict, Tuple, Optional
from dataclasses import dataclass
import warnings

import pandas as pd
import numpy as np
import cv2
from PIL import Image
from tqdm.auto import tqdm
import matplotlib.pyplot as plt
import seaborn as sns

In [13]:
def install_dependencies():
    packages = {
        'scenedetect': '0.6.2',
        'opencv-python': '4.8.1.78',
        'timm': '0.9.12',
        'scikit-learn': '1.3.2',
        'ydata-profiling': '4.6.4',  
        'plotly': '5.17.0',
        'kaleido': '0.2.1'
    }
    
    for package, version in packages.items():
        try:
            __import__(package.replace('-', '_'))
        except ImportError:
            os.system(f"pip install {package}=={version} --quiet")

In [14]:
# Install dependencies first
install_dependencies()

In [15]:
import timm
import scenedetect
from scenedetect import VideoManager, SceneManager
from scenedetect.detectors import ContentDetector
from sklearn.metrics.pairwise import cosine_similarity
from ydata_profiling import ProfileReport

In [16]:
# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('video_processing.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# Suppress warnings
warnings.filterwarnings('ignore')

In [17]:
@dataclass
class ProjectConfig:
    input_folder: str = "test_video_frames"
    output_folder: str = "test_video_frames_output"
    project_number: str = "1"
    project_name: str = "video_analysis"
    n_scene_frames: int = 3
    image_max_size: int = 1024  # Increased for better quality
    video_extensions: List[str] = None
    scene_threshold: float = 30.0  # Scene detection sensitivity
    
    def __post_init__(self):
        if self.video_extensions is None:
            self.video_extensions = ['mp4', 'avi', 'mov', 'mkv', 'webm', 'flv', 'wmv']
        
        self.project_title = f"{self.project_number}_{self.project_name}"
        self.project_folder = Path(self.input_folder).parent.absolute()

In [18]:
class ModernImageEmbedder:
    
    def __init__(self, model_name: str = "efficientnet_b0"):
        self.model_name = model_name
        self.model = self._load_model()
        self.transform = self._get_transform()
        
    def _load_model(self):
        try:
            model = timm.create_model(
                self.model_name, 
                pretrained=True, 
                num_classes=0,  # Remove classification head
                global_pool=''  # Remove global pooling
            )
            model.eval()
            return model
        except Exception as e:
            logger.warning(f"Failed to load {self.model_name}, falling back to resnet50")
            model = timm.create_model('resnet50', pretrained=True, num_classes=0)
            model.eval()
            return model
    
    def _get_transform(self):
        return timm.data.resolve_data_config({}, model=self.model)
    
    def embed_image(self, image_path: str) -> np.ndarray:
        try:
            image = Image.open(image_path).convert('RGB')
            # Use timm's built-in preprocessing
            input_tensor = timm.data.transforms_factory.create_transform(**self.transform)(image)
            input_tensor = input_tensor.unsqueeze(0)
            
            with torch.no_grad():
                features = self.model(input_tensor)
                # Global average pooling if needed
                if len(features.shape) > 2:
                    features = features.mean(dim=[2, 3])
                return features.numpy().flatten()
        except Exception as e:
            logger.error(f"Error embedding {image_path}: {e}")
            return np.zeros(1000)  # Return zero vector on error

In [19]:
class ModernSceneDetector:
    
    def __init__(self, config: ProjectConfig):
        self.config = config
        self.embedder = ModernImageEmbedder()
        
    def detect_scenes(self, video_path: str) -> List[Tuple[float, float]]:
        try:
            # Create video manager
            video_manager = VideoManager([str(video_path)])
            scene_manager = SceneManager()
            
            # Add content detector with configurable threshold
            scene_manager.add_detector(
                ContentDetector(threshold=self.config.scene_threshold)
            )
            
            # Detect scenes
            video_manager.start()
            scene_manager.detect_scenes(frame_source=video_manager)
            video_manager.release()
            
            # Get scene list
            scene_list = scene_manager.get_scene_list()
            return [(scene[0].get_seconds(), scene[1].get_seconds()) 
                   for scene in scene_list]
            
        except Exception as e:
            logger.error(f"Scene detection failed for {video_path}: {e}")
            return []
    
    def extract_scene_frames(self, video_path: str, scenes: List[Tuple[float, float]], 
                           output_dir: str) -> pd.DataFrame:
        cap = cv2.VideoCapture(str(video_path))
        fps = cap.get(cv2.CAP_PROP_FPS)
        
        scene_data = []
        frame_paths = []
        
        for i, (start_time, end_time) in enumerate(tqdm(scenes, desc="Extracting frames")):
            duration = end_time - start_time
            
            # Extract frames at evenly spaced intervals within the scene
            frame_times = np.linspace(start_time, end_time, self.config.n_scene_frames + 2)[1:-1]
            
            scene_frames = []
            for j, frame_time in enumerate(frame_times):
                frame_number = int(frame_time * fps)
                cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
                ret, frame = cap.read()
                
                if ret:
                    # Resize frame
                    frame = self._resize_frame(frame)
                    
                    # Save frame
                    frame_filename = f"scene_{i:03d}_frame_{j:03d}.jpg"
                    frame_path = os.path.join(output_dir, frame_filename)
                    cv2.imwrite(frame_path, frame)
                    
                    scene_frames.append(frame_path)
                    frame_paths.append(frame_path)
            
            # Select best representative frame using embeddings
            if scene_frames:
                best_frame = self._select_best_frame(scene_frames)
                
                scene_data.append({
                    'scene_number': i,
                    'start_time': start_time,
                    'end_time': end_time,
                    'duration': duration,
                    'representative_frame': best_frame,
                    'all_frames': scene_frames,
                    'frame_count': len(scene_frames)
                })
        
        cap.release()
        return pd.DataFrame(scene_data)
    
    def _resize_frame(self, frame: np.ndarray) -> np.ndarray:
        height, width = frame.shape[:2]
        max_size = self.config.image_max_size
        
        if max(height, width) > max_size:
            if width > height:
                new_width = max_size
                new_height = int(height * max_size / width)
            else:
                new_height = max_size
                new_width = int(width * max_size / height)
            
            frame = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_LANCZOS4)
        
        return frame
    
    def _select_best_frame(self, frame_paths: List[str]) -> str:
        if len(frame_paths) == 1:
            return frame_paths[0]
        
        # Get embeddings for all frames
        embeddings = []
        for path in frame_paths:
            embedding = self.embedder.embed_image(path)
            embeddings.append(embedding)
        
        embeddings = np.array(embeddings)
        
        # Find frame closest to centroid
        centroid = np.mean(embeddings, axis=0)
        similarities = cosine_similarity([centroid], embeddings)[0]
        best_idx = np.argmax(similarities)
        
        return frame_paths[best_idx]

In [20]:
class VideoProcessor:    
    def __init__(self, config: ProjectConfig):
        self.config = config
        self.scene_detector = ModernSceneDetector(config)
        self.setup_directories()
        
    def setup_directories(self):
        directories = [
            self.config.output_folder,
            os.path.join(self.config.output_folder, "scenes"),
            os.path.join(self.config.output_folder, "metadata"),
            os.path.join(self.config.output_folder, "reports")
        ]
        
        for directory in directories:
            os.makedirs(directory, exist_ok=True)
    
    def find_videos(self) -> List[str]:
        """Find all video files in input directory"""
        video_paths = []
        input_path = Path(self.config.input_folder)
        
        for ext in self.config.video_extensions:
            video_paths.extend(input_path.rglob(f"*.{ext}"))
        
        return [str(path) for path in video_paths]
    
    def get_video_metadata(self, video_path: str) -> Dict:
        """Extract comprehensive video metadata"""
        cap = cv2.VideoCapture(video_path)
        
        metadata = {
            'path': video_path,
            'name': Path(video_path).stem,
            'extension': Path(video_path).suffix,
            'size_mb': os.path.getsize(video_path) / (1024 * 1024),
            'fps': cap.get(cv2.CAP_PROP_FPS),
            'frame_count': int(cap.get(cv2.CAP_PROP_FRAME_COUNT)),
            'width': int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
            'height': int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)),
        }
        
        metadata['duration'] = metadata['frame_count'] / metadata['fps'] if metadata['fps'] > 0 else 0
        metadata['aspect_ratio'] = metadata['width'] / metadata['height'] if metadata['height'] > 0 else 1
        metadata['format'] = self._determine_format(metadata['width'], metadata['height'])
        
        cap.release()
        return metadata
    
    def _determine_format(self, width: int, height: int) -> str:
        """Determine video format based on dimensions"""
        ratio = width / height
        if ratio > 1.5:
            return "landscape"
        elif ratio < 0.75:
            return "portrait"
        else:
            return "square"
    
    def process_video(self, video_path: str) -> Dict:
        """Process a single video"""
        logger.info(f"Processing video: {Path(video_path).name}")
        
        try:
            # Get metadata
            metadata = self.get_video_metadata(video_path)
            
            # Detect scenes
            scenes = self.scene_detector.detect_scenes(video_path)
            
            if not scenes:
                logger.warning(f"No scenes detected in {video_path}")
                return {**metadata, 'status': 'no_scenes', 'scene_count': 0}
            
            # Create output directory for this video
            video_output_dir = os.path.join(
                self.config.output_folder, "scenes", metadata['name']
            )
            os.makedirs(video_output_dir, exist_ok=True)
            
            # Extract scene frames
            scene_df = self.scene_detector.extract_scene_frames(
                video_path, scenes, video_output_dir
            )
            
            # Save scene metadata
            scene_metadata_path = os.path.join(
                self.config.output_folder, "metadata", f"{metadata['name']}_scenes.xlsx"
            )
            scene_df.to_excel(scene_metadata_path, index=False)
            
            # Add statistics
            result = {
                **metadata,
                'status': 'success',
                'scene_count': len(scenes),
                'avg_scene_duration': scene_df['duration'].mean() if not scene_df.empty else 0,
                'total_extracted_frames': scene_df['frame_count'].sum() if not scene_df.empty else 0,
                'scenes_per_second': len(scenes) / metadata['duration'] if metadata['duration'] > 0 else 0,
                'scene_metadata_path': scene_metadata_path
            }
            
            return result
            
        except Exception as e:
            logger.error(f"Error processing {video_path}: {e}")
            metadata = self.get_video_metadata(video_path)
            return {**metadata, 'status': f'error: {str(e)}', 'scene_count': 0}
    
    def process_all_videos(self) -> pd.DataFrame:
        """Process all videos and return results"""
        video_paths = self.find_videos()
        logger.info(f"Found {len(video_paths)} videos to process")
        
        if not video_paths:
            logger.warning("No videos found!")
            return pd.DataFrame()
        
        results = []
        for video_path in tqdm(video_paths, desc="Processing videos"):
            result = self.process_video(video_path)
            results.append(result)
        
        results_df = pd.DataFrame(results)
        
        # Save results
        results_path = os.path.join(
            self.config.output_folder, 
            f"{self.config.project_title}_results.xlsx"
        )
        results_df.to_excel(results_path, index=False)
        
        return results_df
    
    def generate_report(self, results_df: pd.DataFrame):
        """Generate comprehensive analysis report"""
        if results_df.empty:
            logger.warning("No data to generate report")
            return
        
        # Create visualizations
        self._create_visualizations(results_df)
        
        # Generate profile report
        profile = ProfileReport(
            results_df,
            title=f"{self.config.project_title} - Video Analysis Report",
            explorative=True,
            dark_mode=True
        )
        
        report_path = os.path.join(
            self.config.output_folder, 
            "reports",
            f"{self.config.project_title}_profile_report.html"
        )
        profile.to_file(report_path)
        logger.info(f"Profile report saved to: {report_path}")
    
    def _create_visualizations(self, df: pd.DataFrame):
        """Create custom visualizations"""
        plt.style.use('seaborn-v0_8')
        fig, axes = plt.subplots(2, 2, figsize=(15, 12))
        
        # Scene count distribution
        df[df['scene_count'] > 0]['scene_count'].hist(bins=20, ax=axes[0,0])
        axes[0,0].set_title('Distribution of Scene Counts')
        axes[0,0].set_xlabel('Number of Scenes')
        axes[0,0].set_ylabel('Frequency')
        
        # Duration vs Scene count
        valid_data = df[(df['duration'] > 0) & (df['scene_count'] > 0)]
        axes[0,1].scatter(valid_data['duration'], valid_data['scene_count'], alpha=0.6)
        axes[0,1].set_title('Video Duration vs Scene Count')
        axes[0,1].set_xlabel('Duration (seconds)')
        axes[0,1].set_ylabel('Scene Count')
        
        # Format distribution
        format_counts = df['format'].value_counts()
        axes[1,0].pie(format_counts.values, labels=format_counts.index, autopct='%1.1f%%')
        axes[1,0].set_title('Video Format Distribution')
        
        # Processing status
        status_counts = df['status'].value_counts()
        axes[1,1].bar(range(len(status_counts)), status_counts.values)
        axes[1,1].set_xticks(range(len(status_counts)))
        axes[1,1].set_xticklabels(status_counts.index, rotation=45)
        axes[1,1].set_title('Processing Status Distribution')
        
        plt.tight_layout()
        viz_path = os.path.join(
            self.config.output_folder, 
            "reports",
            f"{self.config.project_title}_visualizations.png"
        )
        plt.savefig(viz_path, dpi=300, bbox_inches='tight')
        plt.close()

In [21]:
def main():
    """Main execution function"""
    # Configuration
    config = ProjectConfig(
        input_folder="test_video_frames",
        output_folder="enhanced_video_analysis",
        project_name="modern_scene_extraction",
        n_scene_frames=5,  # Extract more frames per scene
        image_max_size=1024,
        scene_threshold=25.0  # More sensitive scene detection
    )
    
    # Initialize processor
    processor = VideoProcessor(config)
    
    # Process all videos
    results_df = processor.process_all_videos()
    
    if not results_df.empty:
        # Generate comprehensive report
        processor.generate_report(results_df)
        
        # Print summary
        print(f"\n=== Processing Summary ===")
        print(f"Total videos processed: {len(results_df)}")
        print(f"Successful: {sum(results_df['status'] == 'success')}")
        print(f"Failed: {sum(results_df['status'] != 'success')}")
        print(f"Total scenes extracted: {results_df['scene_count'].sum()}")
        print(f"Average scenes per video: {results_df['scene_count'].mean():.2f}")
        print(f"\nResults saved to: {config.output_folder}")
    else:
        print("No videos found or processed.")

In [22]:
if __name__ == "__main__":
    main()

2025-05-26 18:01:45,114 - INFO - Loading pretrained weights from Hugging Face hub (timm/efficientnet_b0.ra_in1k)
2025-05-26 18:01:45,473 - INFO - [timm/efficientnet_b0.ra_in1k] Safe alternative available for 'pytorch_model.bin' (as 'model.safetensors'). Loading weights using safetensors.
2025-05-26 18:01:45,521 - INFO - Found 0 videos to process


No videos found or processed.
