<a href="https://colab.research.google.com/github/samanthajmichael/ml_project/blob/main/notebooks/MTCNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [11]:
## this one took about 5 mins to run
# %%capture
# !pip install facenet-pytorch

### Change Runtime to GPU

In [6]:
import torch
from facenet_pytorch import MTCNN
import cv2
import numpy as np
from PIL import Image
import pandas as pd
from pathlib import Path
from tqdm import tqdm
from scipy.spatial.distance import cosine

In [7]:
class MovieFrameAnalyzer:
    def __init__(self, face_confidence=0.9, scene_threshold=30.0):
        """
        Initialize detectors for both faces and scene cuts

        Args:
            face_confidence (float): Confidence threshold for face detection
            scene_threshold (float): Threshold for scene cut detection
        """
        # Initialize MTCNN for face detection
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.face_detector = MTCNN(
            keep_all=True,
            device=self.device,
            min_face_size=20,
            thresholds=[0.6, 0.7, face_confidence]
        )

        self.scene_threshold = scene_threshold

    def detect_faces(self, image):
        """
        Detect faces in an image
        """
        boxes, probs = self.face_detector.detect(image)

        return {
            'has_faces': boxes is not None,
            'num_faces': len(boxes) if boxes is not None else 0,
            'face_confidence': max(probs) if probs is not None and len(probs) > 0 else 0,
            'face_locations': boxes.tolist() if boxes is not None else []
        }

    def detect_scene_cut(self, frame1, frame2):
        """
        Detect if there's a scene cut between two frames

        Args:
            frame1: Previous frame
            frame2: Current frame

        Returns:
            dict: Scene cut analysis
        """
        # Convert frames to grayscale
        gray1 = cv2.cvtColor(frame1, cv2.COLOR_RGB2GRAY)
        gray2 = cv2.cvtColor(frame2, cv2.COLOR_RGB2GRAY)

        # Calculate frame difference
        diff = cv2.absdiff(gray1, gray2)
        mean_diff = np.mean(diff)

        # Calculate histogram difference
        hist1 = cv2.calcHist([gray1], [0], None, [256], [0, 256])
        hist2 = cv2.calcHist([gray2], [0], None, [256], [0, 256])
        hist_diff = cv2.compareHist(hist1, hist2, cv2.HISTCMP_CORREL)

        # Detect motion
        flow = cv2.calcOpticalFlowFarneback(
            gray1, gray2, None, 0.5, 3, 15, 3, 5, 1.2, 0
        )
        motion_magnitude = np.mean(np.abs(flow))

        # Analyze color changes
        color_diff = np.mean(cv2.absdiff(frame1, frame2))

        # Determine if it's a scene cut
        is_cut = (mean_diff > self.scene_threshold or
                 hist_diff < 0.5 or
                 color_diff > self.scene_threshold)

        return {
            'is_scene_cut': is_cut,
            'confidence': min(1.0, max(mean_diff / self.scene_threshold,
                                     1 - hist_diff,
                                     color_diff / self.scene_threshold)),
            'transition_type': self.determine_transition_type(mean_diff, hist_diff, motion_magnitude),
            'motion_magnitude': float(motion_magnitude),
            'color_difference': float(color_diff),
            'histogram_correlation': float(hist_diff)
        }

    def determine_transition_type(self, diff, hist_diff, motion):
        """
        Determine the type of transition between frames
        """
        if motion > 5.0:
            return 'motion_blur'
        elif diff > self.scene_threshold * 1.5:
            return 'hard_cut'
        elif hist_diff < 0.3:
            return 'fade'
        elif diff > self.scene_threshold:
            return 'dissolve'
        else:
            return 'continuous'

    def analyze_composition(self, frame):
        """
        Analyze frame composition
        """
        # Convert to grayscale
        gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)

        # Detect edges
        edges = cv2.Canny(gray, 100, 200)

        # Calculate compositional metrics
        height, width = gray.shape
        center_weight = np.mean(gray[height//3:2*height//3, width//3:2*width//3])
        edge_density = np.mean(edges > 0)

        return {
            'composition_style': self.get_composition_type(center_weight, edge_density),
            'edge_density': float(edge_density),
            'center_weight': float(center_weight)
        }

    def get_composition_type(self, center_weight, edge_density):
        """
        Determine composition type based on metrics
        """
        if edge_density > 0.3:
            if center_weight > 128:
                return 'centered_detailed'
            else:
                return 'rule_of_thirds'
        else:
            if center_weight > 128:
                return 'minimal_centered'
            else:
                return 'minimal_balanced'

    def process_frames(self, frames_dir, output_file='frame_analysis.csv'):
        """
        Process all frames in directory
        """
        frames_path = Path(frames_dir)
        results = []
        previous_frame = None

        for img_path in tqdm(sorted(frames_path.glob('*.jpg'))):
            # Load current frame
            current_frame = np.array(Image.open(img_path))

            # Analyze faces
            face_data = self.detect_faces(Image.fromarray(current_frame))

            # Analyze scene cut if we have a previous frame
            if previous_frame is not None:
                scene_data = self.detect_scene_cut(previous_frame, current_frame)
            else:
                scene_data = {
                    'is_scene_cut': False,
                    'confidence': 0.0,
                    'transition_type': 'start',
                    'motion_magnitude': 0.0,
                    'color_difference': 0.0,
                    'histogram_correlation': 1.0
                }

            # Analyze composition
            comp_data = self.analyze_composition(current_frame)

            # Combine results
            frame_data = {
                'filename': img_path.name,
                **face_data,
                **scene_data,
                **comp_data
            }

            results.append(frame_data)
            previous_frame = current_frame

        # Create DataFrame
        df = pd.DataFrame(results)

        # Save to CSV
        df.to_csv(output_file, index=False)

        # Print summary
        print("\nAnalysis Summary:")
        print(f"Total frames processed: {len(df)}")
        print(f"Frames with faces: {df['has_faces'].sum()}")
        print(f"Scene cuts detected: {df['is_scene_cut'].sum()}")
        print("\nTransition types:")
        print(df['transition_type'].value_counts())
        print("\nComposition styles:")
        print(df['composition_style'].value_counts())

        return df


In [9]:
from google.colab import drive
from pathlib import Path
import pandas as pd
import os

# Mount Google Drive
drive.mount('/content/drive', force_remount=True)

# Define paths
DRIVE_FOLDER = "/content/drive/MyDrive/ML Project/scene_cuts"
OUTPUT_DIR = "/content/drive/MyDrive/ML Project/labeled_data"

def ensure_dir_exists(directory):
    """Create directory if it doesn't exist"""
    os.makedirs(directory, exist_ok=True)
    print(f"Directory ready at: {directory}")

def save_dataframe(df, filename="scene_analysis.csv"):
    """
    Save DataFrame to Google Drive

    Args:
        df (pd.DataFrame): DataFrame to save
        filename (str): Name of the output file
    """
    try:
        # Ensure output directory exists
        ensure_dir_exists(OUTPUT_DIR)

        # Create full output path
        output_path = Path(OUTPUT_DIR) / filename

        # Save DataFrame to CSV
        df.to_csv(output_path, index=False)
        print(f"Data saved successfully to: {output_path}")

        # Print first few rows as verification
        print("\nFirst few rows of saved data:")
        print(df.head())

        # Print file size
        file_size = output_path.stat().st_size / (1024 * 1024)  # Convert to MB
        print(f"\nFile size: {file_size:.2f} MB")

        # List contents of output directory
        print("\nContents of output directory:")
        for item in os.listdir(OUTPUT_DIR):
            item_path = Path(OUTPUT_DIR) / item
            if item_path.is_file():
                size = item_path.stat().st_size / (1024 * 1024)  # Convert to MB
                print(f"{item} ({size:.2f} MB)")
            else:
                print(f"{item}/ (directory)")

    except Exception as e:
        print(f"Error saving file: {str(e)}")
        print(f"Attempted to save to: {OUTPUT_DIR}")

        # Print directory existence check
        print(f"\nDirectory check:")
        print(f"Output directory exists: {os.path.exists(OUTPUT_DIR)}")
        print(f"Parent directory exists: {os.path.exists(os.path.dirname(OUTPUT_DIR))}")

Mounted at /content/drive


In [10]:
# Example usage
if __name__ == "__main__":
    analyzer = MovieFrameAnalyzer()
    df = analyzer.process_frames(DRIVE_FOLDER)

100%|██████████| 859/859 [13:54<00:00,  1.03it/s]


Analysis Summary:
Total frames processed: 859
Frames with faces: 619
Scene cuts detected: 739

Transition types:
transition_type
motion_blur    464
dissolve       231
continuous     117
hard_cut        36
fade            10
start            1
Name: count, dtype: int64

Composition styles:
composition_style
minimal_balanced    858
minimal_centered      1
Name: count, dtype: int64



