# **Content-Aware Video Cropping**

The logic behind content-aware video cropping involves applying object detection to each frame and prioritizing specific objects to ensure the crop focuses on the most relevant subjects.


In [1]:
!pip install opencv-python torchvision ultralytics pyDeepInsight
import cv2
import numpy as np
import os
import time,shutil
from moviepy.editor import VideoFileClip, AudioFileClip,ImageSequenceClip
import tempfile
from typing import Tuple, List, Dict
import warnings
from ultralytics import YOLO

Collecting ultralytics
  Downloading ultralytics-8.3.28-py3-none-any.whl.metadata (35 kB)
Collecting pyDeepInsight
  Downloading pyDeepInsight-0.0.1-py3-none-any.whl.metadata (738 bytes)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.11-py3-none-any.whl.metadata (9.4 kB)
Downloading ultralytics-8.3.28-py3-none-any.whl (881 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m881.2/881.2 kB[0m [31m35.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pyDeepInsight-0.0.1-py3-none-any.whl (7.0 kB)
Downloading ultralytics_thop-2.0.11-py3-none-any.whl (26 kB)
Installing collected packages: pyDeepInsight, ultralytics-thop, ultralytics
Successfully installed pyDeepInsight-0.0.1 ultralytics-8.3.28 ultralytics-thop-2.0.11


  if event.key is 'enter':



Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


I'm using the YOLOv8l model for object detection, chosen for its speed and efficiency.


In [2]:
def setup_yolo():

    # Load YOLOv8 model
    model = YOLO('yolov8l.pt')

    return model

This function, detect_objects extracts bounding box coordinates, confidence scores, and class information for each detected object. The output is a list of detections with relevant details, allowing further processing for content-aware cropping.

In [3]:
def detect_objects(frame: np.ndarray, model: YOLO) -> List[Dict]:
    # Run inference
    results = model(frame, verbose=False)[0]  # First result from the list

    # Extract detections
    detections = []
    for box in results.boxes:
        # Get box coordinates (convert to int)
        x1, y1, x2, y2 = [int(x) for x in box.xyxy[0].tolist()]

        # Get confidence and class
        conf = float(box.conf[0])
        cls = int(box.cls[0])

        detection = {
            'bbox': [x1, y1, x2, y2],
            'confidence': conf,
            'class': cls,
            'class_name': results.names[cls]
        }
        detections.append(detection)

    return detections

The `SubjectTracker` class implements sophisticated tracking logic to maintain consistent focus on subjects across video frames.

If score has difference more than 'Threshold' then only switch subjects.

In [4]:
class SubjectTracker:

    def __init__(self,
                 switch_threshold: float = 0.3,    # Minimum score difference to switch subjects
                 persistence_bonus: float = 0.25,  # Bonus for sticking with current subject
                 history_size: int = 5):
        self.current_subject_id = None
        self.current_subject_score = 0
        self.frame_history = {}  # Track how long we've followed each subject
        self.switch_threshold = switch_threshold
        self.persistence_bonus = persistence_bonus
        self.history_size = history_size
        self.last_position = None

    def should_switch_subject(self,
                            current_score: float,
                            new_score: float,
                            frames_tracked: int) -> bool:
        """
        Determine if we should switch to a new subject based on decision rules.
        """
        # Add bonus for currently tracked subject based on duration
        duration_bonus = min(frames_tracked * 0.05, 0.2)  # Up to 0.2 bonus for 4+ frames
        adjusted_current = current_score + self.persistence_bonus + duration_bonus

        # Require significant improvement to switch
        return new_score > (adjusted_current + self.switch_threshold)

    def update_and_select(self, scored_detections: List[Dict], frame_shape: Tuple[int, int]) -> Dict:
        """
        Update tracking state and select subject based on decision rules.
        """
        if not scored_detections:
            self.current_subject_id = None
            return None

        height, width = frame_shape[:2]
        current_frame_subjects = {det['id']: det for det in scored_detections}

        # Update frame history for all detected subjects
        for subject_id in current_frame_subjects:
            if subject_id not in self.frame_history:
                self.frame_history[subject_id] = 0
            self.frame_history[subject_id] += 1

        # Clean up old subjects
        self.frame_history = {k: v for k, v in self.frame_history.items()
                            if v > 0 and k in current_frame_subjects}

        # If we have a current subject and it's still detected
        if self.current_subject_id and self.current_subject_id in current_frame_subjects:
            current_detection = current_frame_subjects[self.current_subject_id]
            current_score = current_detection['score']

            # Find best alternative subject
            other_detections = [det for det in scored_detections
                              if det['id'] != self.current_subject_id]

            if other_detections:
                best_alternative = max(other_detections, key=lambda x: x['score'])

                # Check if we should switch based on decision rules
                if not self.should_switch_subject(
                    current_score,
                    best_alternative['score'],
                    self.frame_history[self.current_subject_id]
                ):
                    return current_detection
            else:
                return current_detection

        # If we reach here, either we don't have a current subject
        # or we decided to switch subjects
        best_detection = max(scored_detections, key=lambda x: x['score'])
        self.current_subject_id = best_detection['id']
        self.current_subject_score = best_detection['score']
        return best_detection

The `ObjectScorer` class implements a multi-criteria scoring system for detected objects. Scoring is based on:

1.   Distance Score (30%)
2.   Size Score (25%)
3.   Time Score (20%)
4.   Class Score (15%)
5.   Movement Score (10%)

We can change weights of each criteria as per requirement.




In [5]:
class ObjectScorer:

    def __init__(self, frame_width: int, frame_height: int):
        self.frame_width = frame_width
        self.frame_height = frame_height
        self.history = {}  # Track objects over time
        self.frame_count = 0
        self.fps = 30  # Assuming 30 fps, adjust as needed

    def calculate_distance_score(self, center_x: float) -> float:
        # Normalize distance from center to [0, 1]
        rel_distance = abs(center_x - self.frame_width/2) / (self.frame_width/2)
        if rel_distance < 0.33:
            return 1.0  # Center third
        elif rel_distance < 0.66:
            return 0.7  # Middle third
        else:
            return 0.4  # Outer third

    def calculate_size_score(self, bbox: List[float]) -> float:
        height = bbox[3] - bbox[1]
        rel_height = height / self.frame_height

        if 0.3 <= rel_height <= 0.5:
            return 1.0
        elif 0.2 <= rel_height < 0.3 or 0.5 < rel_height <= 0.6:
            return 0.7
        elif 0.1 <= rel_height < 0.2:
            return 0.4
        else:
            return 0.2

    def calculate_time_score(self, object_id: str) -> float:
        if object_id not in self.history:
            self.history[object_id] = 1
            return 0.2

        duration = self.history[object_id] / self.fps  # Convert frames to seconds

        if duration > 3:
            return 1.0
        elif duration > 2:
            return 0.7
        elif duration > 1:
            return 0.4
        else:
            return 0.2

    def calculate_class_score(self, class_name: str) -> float:
        if class_name.lower() in ['duck','bird','dog','car','aeroplane','person']:
            return 1.0
        elif class_name.lower() in ['truck', 'bicycle']:
            return 0.5
        else:
            return 0.2

    def calculate_movement_score(self, object_id: str, current_pos: float) -> float:
        if object_id not in self.history:
            return 0.4  # Default score for new objects

        positions = self.history.get(f"{object_id}_positions", [])
        if not positions:
            return 0.4

        # Calculate average velocity
        velocities = [abs(positions[i] - positions[i-1]) for i in range(1, len(positions))]
        if not velocities:
            return 0.7

        avg_velocity = sum(velocities) / len(velocities)
        if avg_velocity < 5:
            return 1.0  # Steady movement
        elif avg_velocity < 10:
            return 0.7  # Slow movement
        elif avg_velocity < 20:
            return 0.4  # Moderate movement
        else:
            return 0.2  # Fast/erratic movement

    def calculate_final_score(self, detection: Dict, object_id: str) -> float:
        bbox = detection['bbox']
        center_x = (bbox[0] + bbox[2]) / 2

        distance_score = self.calculate_distance_score(center_x)
        size_score = self.calculate_size_score(bbox)
        time_score = self.calculate_time_score(object_id)
        class_score = self.calculate_class_score(detection['class_name'])
        movement_score = self.calculate_movement_score(object_id, center_x)

        final_score = (
            0.20 * distance_score +  # Increased from 0.30
             0.35 * size_score +
            0.15 * time_score +  # Decreased from 0.20
            0.25 * class_score +
            0.10 * movement_score
        )

        # Update history
        self.history[object_id] = self.history.get(object_id, 0) + 1
        positions = self.history.get(f"{object_id}_positions", [])
        positions.append(center_x)
        if len(positions) > 10:  # Keep last 10 positions
            positions.pop(0)
        self.history[f"{object_id}_positions"] = positions

        return final_score

The `calculate_crop_window` function determines optimal crop positioning based on scores.

x1 and x2 represent the left and right horizontal boundaries of the cropping window

In [6]:
def calculate_crop_window(frame_shape: Tuple[int, int],
                         detections: List[Dict],
                         object_scorer: ObjectScorer,
                         subject_tracker: SubjectTracker,
                         target_aspect_ratio: float = 9/16) -> Tuple[int, int, int, int]:

    height, width = frame_shape[:2]
    target_width = int(height * target_aspect_ratio)

    # Score all detections
    scored_detections = []
    for i, det in enumerate(detections):
        object_id = f"obj_{i}"
        score = object_scorer.calculate_final_score(det, object_id)
        scored_detections.append({
            'id': object_id,
            'score': score,
            'bbox': det['bbox']
        })

    # Apply decision rules through subject tracker
    selected_subject = subject_tracker.update_and_select(scored_detections, frame_shape)

    if not selected_subject:
        # Default to center crop if no subjects detected
        x_center = width // 2
        x1 = max(0, x_center - target_width // 2)
        x2 = min(width, x1 + target_width)
        return (x1, 0, x2, height)

    # Calculate crop window based on selected subject
    bbox = selected_subject['bbox']
    center_x = (bbox[0] + bbox[2]) / 2

    # Calculate crop coordinates
    x1 = int(max(0, min(center_x - target_width/2, width - target_width)))
    x2 = int(x1 + target_width)

    return (x1, 0, x2, height)

The `SmoothingBuffer` class implements advanced smoothing for crop window transitions.

Operations:
1. Calculates current velocity from position changes
2. Maintains velocity history for trend analysis
3. Uses weighted averaging for smooth transitions
4. Adapts smoothing strength based on motion speed

In [16]:
class SmoothingBuffer:

    def __init__(self, buffer_size: int = 15, smoothing_factor: float = 0.7, velocity_weight: float = 0.3):
        self.buffer_size = buffer_size
        self.smoothing_factor = smoothing_factor
        self.velocity_weight = velocity_weight
        self.positions = []
        self.velocities = []
        self.current_position = None
        self.last_velocity = 0

    def update(self, new_position: Tuple[int, int, int, int]) -> Tuple[int, int, int, int]:
        if self.current_position is None:
            self.current_position = new_position
            return new_position

        # Calculate current velocity
        current_velocity = new_position[0] - self.current_position[0]

        # Update velocity history
        self.velocities.append(current_velocity)
        if len(self.velocities) > self.buffer_size:
            self.velocities.pop(0)

        # Calculate smoothed velocity
        if self.velocities:
            smoothed_velocity = sum(self.velocities) / len(self.velocities)
            # Add velocity-based prediction
            predicted_x1 = new_position[0] + (smoothed_velocity * self.velocity_weight)
        else:
            predicted_x1 = new_position[0]

        # Update position history
        self.positions.append((predicted_x1, new_position[1],
                             predicted_x1 + (new_position[2] - new_position[0]), new_position[3]))
        if len(self.positions) > self.buffer_size:
            self.positions.pop(0)

        # Calculate target position with prediction
        target_x1 = sum(pos[0] for pos in self.positions) / len(self.positions)
        target_x2 = sum(pos[2] for pos in self.positions) / len(self.positions)

        # Adaptive smoothing based on velocity
        velocity_magnitude = abs(smoothed_velocity) if self.velocities else 0
        adaptive_smoothing = max(0.1, self.smoothing_factor - (velocity_magnitude * 0.001))

        # Apply smoothing
        smooth_x1 = int(adaptive_smoothing * self.current_position[0] +
                       (1 - adaptive_smoothing) * target_x1)
        smooth_x2 = int(adaptive_smoothing * self.current_position[2] +
                       (1 - adaptive_smoothing) * target_x2)

        self.current_position = (smooth_x1, new_position[1], smooth_x2, new_position[3])
        return self.current_position

Main video processing function implementing the content-aware cropping system with proper synchronisation of audio.


In [21]:
def process_video(input_path: str,
                  output_path: str,
                  model,
                  target_aspect_ratio: float = 9/16,
                  buffer_size: int = 30,
                  smoothing_factor: float = 0.85,
                  switch_threshold: float = 0.3,
                  persistence_bonus: float = 0.25) -> None:

    # Create temporary directory for frame sequence
    temp_dir = tempfile.mkdtemp()
    frames_dir = os.path.join(temp_dir, "frames")
    os.makedirs(frames_dir, exist_ok=True)

    try:
        # Open input video
        cap = cv2.VideoCapture(input_path)
        if not cap.isOpened():
            raise Exception("Could not open input video")

        original_fps = cap.get(cv2.CAP_PROP_FPS)
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

        # Output dimensions
        output_width = int(height * target_aspect_ratio)
        output_height = height

        # Initialize components for object tracking and smoothing
        object_scorer = ObjectScorer(width, height)
        subject_tracker = SubjectTracker(switch_threshold=switch_threshold, persistence_bonus=persistence_bonus)
        smoother = SmoothingBuffer(buffer_size=buffer_size, smoothing_factor=smoothing_factor)

        frame_count = 0
        print("Processing frames...")

        # Process each frame
        while True:
            ret, frame = cap.read()
            if not ret or frame is None:
                break  # End of video or frame not read

            try:
                # Object detection and tracking
                detections = detect_objects(frame, model)
                raw_x1, raw_y1, raw_x2, raw_y2 = calculate_crop_window(
                    frame.shape,
                    detections,
                    object_scorer,
                    subject_tracker,
                    target_aspect_ratio
                )

                # Apply smoothing
                x1, y1, x2, y2 = smoother.update((raw_x1, raw_y1, raw_x2, raw_y2))

                # Enforce bounds on crop coordinates
                x1, y1 = max(0, int(x1)), max(0, int(y1))
                x2, y2 = min(width, int(x2)), min(height, int(y2))

                # Crop and resize the frame
                cropped_frame = frame[y1:y2, x1:x2]
                if cropped_frame.size > 0:
                    cropped_frame = cv2.resize(cropped_frame, (output_width, output_height))
                else:
                    # Handle empty frame by using a center crop as fallback
                    center = width // 2
                    x1 = max(0, center - output_width // 2)
                    x2 = min(width, x1 + output_width)
                    cropped_frame = frame[:, x1:x2]
                    cropped_frame = cv2.resize(cropped_frame, (output_width, output_height))

            except Exception as e:
                print(f"Warning: Error in frame {frame_count}, using center crop: {str(e)}")
                center = width // 2
                x1 = max(0, center - output_width // 2)
                x2 = min(width, x1 + output_width)
                cropped_frame = frame[:, x1:x2]
                cropped_frame = cv2.resize(cropped_frame, (output_width, output_height))

            # Save the processed frame
            frame_path = os.path.join(frames_dir, f"frame_{frame_count:06d}.jpg")
            cv2.imwrite(frame_path, cropped_frame)
            frame_count += 1

            if frame_count % 30 == 0:
                print(f"Processed {frame_count}/{total_frames} frames")

        cap.release()

        if frame_count == 0:
            raise Exception("No frames were processed")

        print("Frame processing complete. Creating video...")

        # Load original video to get exact audio duration
        original_video = VideoFileClip(input_path)

        # Process frames into a video
        frame_files = sorted([os.path.join(frames_dir, f) for f in os.listdir(frames_dir) if f.endswith('.jpg')])
        processed_video = ImageSequenceClip(frame_files, fps=original_fps)

        # Add audio and ensure the duration matches
        final_video = processed_video.set_audio(original_video.audio).set_duration(original_video.duration)
        final_video.write_videofile(
            output_path,
            codec='libx264',
            audio_codec='aac',
            temp_audiofile=os.path.join(temp_dir, "temp_audio.m4a"),
            remove_temp=True,
            verbose=False
        )

    except Exception as e:
        print(f"Error during video processing: {str(e)}")

    finally:
        # Clean up resources
        try:
            original_video.close()
            processed_video.close()
            final_video.close()
        except:
            pass
        shutil.rmtree(temp_dir)
    print("Processing complete!")


In [None]:
def main():
    print("Loading YOLOv8 model...")
    model = setup_yolo()

    input_video = "2FgBOgck_K0.mp4" #path to videos
    output_video = "cropped_video10.mp4"

    print(f"Processing video: {input_video}")
    process_video(
        input_video,
        output_video,
        model,
        switch_threshold=0.4,
        persistence_bonus=0.3,
        buffer_size=20,
        smoothing_factor=0.70,
    )

In [25]:
if __name__ == "__main__":
    main()

Loading YOLOv8 model...
Processing video: 2FgBOgck_K0.mp4
Processing frames...
Processed 30/625 frames
Processed 60/625 frames
Processed 90/625 frames
Processed 120/625 frames
Processed 150/625 frames
Processed 180/625 frames
Processed 210/625 frames
Processed 240/625 frames
Processed 270/625 frames
Processed 300/625 frames
Processed 330/625 frames
Processed 360/625 frames
Processed 390/625 frames
Processed 420/625 frames
Processed 450/625 frames
Processed 480/625 frames
Processed 510/625 frames
Processed 540/625 frames
Processed 570/625 frames
Processed 600/625 frames
Frame processing complete. Creating video...
Moviepy - Building video cropped_video10.mp4.
MoviePy - Writing audio in /tmp/tmpd7wsja7p/temp_audio.m4a




MoviePy - Done.
Moviepy - Writing video cropped_video10.mp4





Moviepy - Done !
Moviepy - video ready cropped_video10.mp4
Processing complete!
