In [3]:
import torch
from PIL import Image
import pandas as pd
import numpy as np
from tqdm import tqdm
import json
import os
import cv2


def process(current_frame, previous_frame, motion_threshold=None):
    """
    Computes the degree of motion between the current and previous frames.

    Args:
        current_frame (np.ndarray): Current frame in grayscale.
        previous_frame (np.ndarray): Previous frame in grayscale.
        motion_threshold (float): Threshold to filter out low-motion noise.

    Returns:
        float: A single value representing the degree of motion in the frame.
    """
    if previous_frame is None:
        return {'score': None}

    # Calculate optical flow
    flow = cv2.calcOpticalFlowFarneback(
        prev=previous_frame,
        next=current_frame,
        flow=None,
        pyr_scale=0.5,
        levels=3,
        winsize=15,
        iterations=3,
        poly_n=5,
        poly_sigma=1.2,
        flags=0
    )

    # Compute magnitude and angle of flow vectors
    magnitude, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1], angleInDegrees=True)

    # Filter out small magnitudes (noise) using a threshold
    if motion_threshold is not None:
        significant_motion = magnitude[magnitude > motion_threshold]
    else:
        significant_motion = magnitude

    # Calculate the mean magnitude of significant motion
    motion_score = significant_motion.mean() if significant_motion.size > 0 else 0.0

    return {'score': motion_score}

In [15]:
folder_path = "/home/cermavo3/projects/kaggle/coool/COOOL-Benchmark"
target_fps = 5

results = {}
for filename in tqdm(os.listdir(folder_path)):
    if filename.endswith(".mp4"):
        video_path = os.path.join(folder_path, filename)
        cap = cv2.VideoCapture(video_path)
        original_fps = cap.get(cv2.CAP_PROP_FPS)
        frame_skip = int(original_fps / target_fps)

        # Initialize storage for this video
        previous_frame = None
        video_results = []
        frame_count = 0
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            if frame_count % frame_skip == 0:
                # Convert the current frame to grayscale
                current_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    
                # Calculate optical flow between consecutive frames
                result = process(current_frame, previous_frame)
                result["frame"] = frame_count
                video_results.append(result)
                # Update the previous frame
                previous_frame = current_frame

            frame_count += 1

        cap.release()
        results[filename] = video_results
torch.save(results, f'results/optical_flow.pkl')

100%|██████████| 200/200 [1:13:19<00:00, 22.00s/it]
