CSE5CV Assignment 2

In [None]:

STUDENT_ID = 21547932


### Importing Required Libraries  

In this part, I am bringing in all the external tools (called libraries) that I will use later in my project.  

- `copy`, `math`, `numpy`: basic utilities for mathematical calculations and handling arrays of numbers.  
- `cv2` (OpenCV): used to open videos, read frames, and process images.  
- `matplotlib.pyplot`: used to show images and graphs in the notebook.  
- `scipy` and `scipy.optimize`: advanced math tools, sometimes useful for optimization or numerical tasks.  
- `torch` and `torchvision`: PyTorch libraries, used for deep learning models and computer vision tasks.  
- `torchvision.transforms.functional`: small helper functions to prepare images for models.  
- `google.colab.drive`: used to connect my Google Drive so I can read and save files (like my video and results).  


In [None]:
import copy
import math

import numpy as np
import cv2
import matplotlib.pyplot as plt
import scipy
import scipy.optimize
import torch
import torchvision
import torchvision.transforms.functional as tvtf

from google.colab import drive

### Connecting Google Drive  

In this part, I am connecting my Google Drive storage to Google Colab.

In [None]:
drive.mount('/content/drive')
%cd /content/drive/MyDrive/CSE5CV_Assignment

Mounted at /content/drive
/content/drive/MyDrive/CSE5CV_Assignment


###  Loading and Previewing the First Frame of the Video  

In this step, I check whether my video (`Task1.mp4`) is stored correctly in Google Drive and can be read inside Colab.  

- I give the full path to my video file (`/content/drive/MyDrive/CSE5CV_Assignment/Task1.mp4`).  
- `cv2.VideoCapture(filename)`: Opens the video file so we can read it frame by frame.  
- `vid.read()`: Reads the very first frame of the video.

In [None]:
filename = "/content/drive/MyDrive/CSE5CV_Assignment/Task1.mp4"

# Try reading again
vid = cv2.VideoCapture(filename)
ok, img = vid.read()
vid.release()

if not ok or img is None:
    raise FileNotFoundError(f"Could not read first frame from {filename}")

# Convert and display
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
plt.figure(figsize=(10, 5))
plt.imshow(img)
plt.axis("off")
plt.title("First frame preview")
plt.show()

###  Object Detection with Mask R-CNN  

In this step, I run a **pre-trained deep learning model** (Mask R-CNN) on my video to detect objects in each frame.  

- First, I define `preprocess_image()`, which converts an image into the format the model expects (a PyTorch tensor with a batch dimension).  
- I load the **Mask R-CNN model with a ResNet-50 backbone**, which has already been trained on the COCO dataset (a large dataset with 80 everyday object categories).  
- I set the model to **evaluation mode** (`eval()`) since I am not training it, only using it for predictions.  
- If a GPU is available, I move the model to the GPU to make predictions faster.  
- I reset the video to the first frame and calculate how many frames the video has in total.  
- For each frame:
  - I read the frame and convert it from BGR to RGB (since OpenCV and PyTorch use different color formats).  
  - I pass the frame to Mask R-CNN, which returns three things:  
    - `boxes`: the bounding box coordinates of detected objects.  
    - `labels`: the predicted object categories.  
    - `scores`: the confidence level of each detection.  
  - I store these results for every frame in lists (`all_boxes`, `all_labels`, `all_scores`).  
  - Every 20 frames, I print progress so I know how much is done.  
- Finally, I save all the detections into `.pt` files so I don’t need to run the heavy detection process again later.  

In [None]:
def preprocess_image(image):
    image = tvtf.to_tensor(image)
    image = image.unsqueeze(dim=0)
    return image

maskrcnn = torchvision.models.detection.maskrcnn_resnet50_fpn(pretrained=True)
maskrcnn.eval()
if torch.cuda.is_available():
    maskrcnn.cuda()

# Go to the start of the video
vid.set(cv2.CAP_PROP_POS_FRAMES, 0)

# Record how long the video is (in frames)
vid_length = int(vid.get(cv2.CAP_PROP_FRAME_COUNT))

# For each frame, read it, give it to maskrcnn and record the detections
all_boxes = []
all_labels = []
all_scores = []
for i in range(vid_length):
    _, img = vid.read()
    if img is None:
        break
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

    with torch.no_grad():
        input_image = preprocess_image(img)
        if torch.cuda.is_available():
            input_image = input_image.cuda()
        result = maskrcnn(input_image)[0]

    all_boxes.append(result['boxes'].detach().cpu().numpy())
    all_labels.append(result['labels'].detach().cpu().numpy())
    all_scores.append(result['scores'].detach().cpu().numpy())
    if i % 20 == 0:
        print(f'{i+1:0d}/{vid_length}')

torch.save(all_boxes, 'all_boxes.pt')
torch.save(all_labels, 'all_labels.pt')
torch.save(all_scores, 'all_scores.pt')



Downloading: "https://download.pytorch.org/models/maskrcnn_resnet50_fpn_coco-bf2d0c1e.pth" to /root/.cache/torch/hub/checkpoints/maskrcnn_resnet50_fpn_coco-bf2d0c1e.pth


100%|██████████| 170M/170M [00:01<00:00, 158MB/s]


###  Reloading Saved Detections  

In this step, I am **loading the results** that I previously saved when I ran the Mask R-CNN detection.  

- `torch.load('all_boxes.pt')`, `torch.load('all_labels.pt')`, and `torch.load('all_scores.pt')` bring back the bounding box coordinates, object class labels, and confidence scores for every frame.  
- This means I don’t need to re-run the heavy detection process again — I can just re-use the saved results.  
- I also calculate how many frames were processed (`vid_length`) by checking the length of the loaded detections.  
- Finally, I print a message confirming how many frames of the video now have detections ready.  


In [None]:
all_boxes = torch.load('all_boxes.pt')
all_labels = torch.load('all_labels.pt')
all_scores = torch.load('all_scores.pt')

vid_length = len(all_boxes)

print(f'Loaded detections for {vid_length} video frames')

Loaded detections for 0 video frames


Task 2 — Filter & Visualise Detections

In this step, I am setting up the **Mask R-CNN model** that will detect objects in my video frames.  

- Imported the main libraries:  
  - `cv2` for working with video frames.  
  - `torch` and `torchvision` for deep learning and pre-trained models.  
  - `torchvision.transforms.functional` for image preprocessing.  
  - `numpy` for handling arrays.  

- Loaded **Mask R-CNN with a ResNet-50 backbone**. This model is already trained on the **COCO dataset** (80 everyday object categories like person, bottle, cup, laptop, etc.).  
- Set the model to **evaluation mode** (`eval()`), which means it will only be used for predictions, not training.  
- Chose whether to run the model on a **GPU** (if available) for speed, otherwise it defaults to the CPU.  


In [None]:
import cv2
import torch
import torchvision
import torchvision.transforms.functional as T
import numpy as np

# Load pre-trained Mask R-CNN model with ResNet-50 backbone
mask_rcnn_model = torchvision.models.detection.maskrcnn_resnet50_fpn(weights="DEFAULT")
mask_rcnn_model.eval()

# Set device to GPU if available, otherwise CPU
device_type = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
mask_rcnn_model.to(device_type)


MaskRCNN(
  (transform): GeneralizedRCNNTransform(
      Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
      Resize(min_size=(800,), max_size=1333, mode='bilinear')
  )
  (backbone): BackboneWithFPN(
    (body): IntermediateLayerGetter(
      (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (bn1): FrozenBatchNorm2d(64, eps=0.0)
      (relu): ReLU(inplace=True)
      (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (layer1): Sequential(
        (0): Bottleneck(
          (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): FrozenBatchNorm2d(64, eps=0.0)
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): FrozenBatchNorm2d(64, eps=0.0)
          (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn3): FrozenBatchNorm2d(256, eps=0.0)
          (relu): ReLU(in

###  Setting Up the Video and Target Classes  

In this part, I prepare my video file and decide **which objects I want the model to detect**.  

- First, I set the path to my video (`Task1.mp4`) and open it with OpenCV’s `VideoCapture`.  
- Next, I define the **COCO dataset classes** that are relevant for my assignment:  
  - `1: person`  
  - `44: bottle`  
  - `47: cup`  
  - `63: laptop`  
  - `65: remote`  
- I then put these into a set (`target_classes`) so the model only pays attention to these categories instead of all 80 classes in COCO.  
- Finally, I define **confidence thresholds** for each class. This sets the minimum probability required for a detection to count:  
  - For example, a detection of a "person" must have at least 80% confidence, but for "remote" (which is smaller and harder to detect), I allow a lower threshold of 30%.  


In [None]:
# Video file path
video_file = 'Task1.mp4'
video_capture = cv2.VideoCapture(video_file)

# Define COCO dataset classes and the ones we are interested in
coco_classes = {
    1:  "person",
    44: "bottle",
    47: "cup",
    63: "laptop",
    65: "remote"
}
target_classes = {1, 44, 47, 63, 65}

# Define detection confidence thresholds per class
confidence_thresholds = {1: 0.8, 44: 0.7, 47: 0.6, 63: 0.6, 65: 0.3}


### Preprocessing Video Frames  

Before sending a video frame to the detection model, I need to **convert it into the right format**.  

- The function `preprocess_frame(frame)` takes a single frame from the video.  
- If the frame is empty (for example, if the video ended or could not be read), it raises an error so I know something went wrong.  
- `cv2.cvtColor` changes the image colors from **BGR (default in OpenCV)** to **RGB** (which most deep learning models use).  
- `T.to_tensor` converts the image into a PyTorch tensor (the data format required for the model).  
- `.unsqueeze(0)` adds a new dimension so that the frame looks like a batch of images (even though it’s just one image).  
- Finally, `.to(device_type)` moves the tensor to either **GPU or CPU** depending on what is available.  


In [None]:
# Function to preprocess video frames for model input
def preprocess_frame(frame):
    if frame is None:
        raise ValueError("Error: Captured frame is empty.")
    rgb_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    tensor_image = T.to_tensor(rgb_image).unsqueeze(0).to(device_type)
    return tensor_image


### ▶️ Running Detection on Video Frames  

In this section, I actually run the object detection model on my video, frame by frame.  


In [None]:
# Initialize frame counter
frame_counter = 0

# Process the video frame-by-frame
while video_capture.isOpened():
    ret, frame = video_capture.read()
    if not ret or frame is None:
        print("End of video or failed to read frame.")
        break

    frame_counter += 1

    # Process only every 10th frame
    if frame_counter % 10 != 0:
        continue

    # Preprocess the frame and run the model
    input_tensor = preprocess_frame(frame)
    with torch.no_grad():
        outputs = mask_rcnn_model(input_tensor)[0]

    # Draw bounding boxes on the frame for selected detections
    for i in range(len(outputs['labels'])):
        class_id = outputs['labels'][i].item()
        score = outputs['scores'][i].item()

        # Filter based on selected classes and confidence thresholds
        if class_id in target_classes and score >= confidence_thresholds[class_id]:
            bbox = outputs['boxes'][i].cpu().numpy().astype(int)
            label = coco_classes[class_id]

            # Draw the bounding box and label on the frame
            cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (0, 255, 0), 2)
            text = f"{label}: {score:.2f}"
            cv2.putText(frame, text, (bbox[0], bbox[1] - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

    # Display the frame with detections
    from google.colab.patches import cv2_imshow  # Colab-specific display function
    cv2_imshow(frame)


In [None]:
# Release video resources
video_capture.release()
print("Detection process completed.")


Detection process completed.


Task 3


In [None]:
import cv2
import torch
import torchvision
import torchvision.transforms.functional as T
import numpy as np
from collections import defaultdict
from scipy.spatial import distance

# Load pre-trained Mask R-CNN model with ResNet-50 backbone
tracking_model = torchvision.models.detection.maskrcnn_resnet50_fpn(weights="DEFAULT")
tracking_model.eval()

# Set device to GPU if available, otherwise use CPU
active_device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
tracking_model.to(active_device)


MaskRCNN(
  (transform): GeneralizedRCNNTransform(
      Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
      Resize(min_size=(800,), max_size=1333, mode='bilinear')
  )
  (backbone): BackboneWithFPN(
    (body): IntermediateLayerGetter(
      (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (bn1): FrozenBatchNorm2d(64, eps=0.0)
      (relu): ReLU(inplace=True)
      (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (layer1): Sequential(
        (0): Bottleneck(
          (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): FrozenBatchNorm2d(64, eps=0.0)
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): FrozenBatchNorm2d(64, eps=0.0)
          (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn3): FrozenBatchNorm2d(256, eps=0.0)
          (relu): ReLU(in

###  Setting Up Video Output and Tracking Structures  

In this step, I prepare everything needed to save the **tracked video** with detections drawn on it.  

- I specify the **input video** (`Task1.mp4`) and the name of the **output video** that will be created with tracking results (`Task3.mp4`).  
- Using OpenCV, I read the video dimensions (width and height) so that the output video has the same size.  
- I also set up a **video writer** with the XVID codec so the new video can be saved frame by frame at 30 fps.  
- I define the **object classes** (from the COCO dataset) that I want to track: person, bottle, cup, laptop, and remote.  
- I assign **confidence thresholds** for each class so that only detections with enough confidence are accepted.  
- Finally, I prepare data structures (`object_trackers` and `track_ids`) that will be used to store and assign unique IDs to objects across frames.  


In [None]:
# Define paths for input and output video
source_video = 'Task1.mp4'
output_tracked_video = 'Task3.mp4'
video_stream = cv2.VideoCapture(source_video)

# Get frame width and height
frame_width = int(video_stream.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(video_stream.get(cv2.CAP_PROP_FRAME_HEIGHT))

# Set up the video writer with the proper codec
video_codec = cv2.VideoWriter_fourcc(*'XVID')  # Changed from 'mp4v' to 'XVID'
output_writer = cv2.VideoWriter(output_tracked_video, video_codec, 30.0, (frame_width, frame_height))

# COCO class mapping and target classes
class_map = {1: 'person', 44: 'bottle', 47: 'cup', 63: 'laptop', 65: 'remote'}
targeted_classes = {1, 44, 47, 63, 65}

# Confidence thresholds for each class
class_conf_thresholds = {1: 0.8, 44: 0.7, 47: 0.6, 63: 0.6, 65: 0.3}

# Initialize tracking-related structures
object_trackers = defaultdict(dict)
track_ids = defaultdict(int)


###  Converting a Frame into Model Input  

This function prepares each video frame so that it can be understood by the deep learning model.  


In [None]:
# Function to preprocess video frames for model input
def frame_to_tensor(frame_data):
    if frame_data is None:
        raise ValueError("Captured an empty frame.")
    rgb_image = cv2.cvtColor(frame_data, cv2.COLOR_BGR2RGB)
    tensor_image = T.to_tensor(rgb_image).unsqueeze(0).to(active_device)
    return tensor_image


### Tracking Objects Across Frames  

This section defines the helper functions that allow me to **track objects from one frame to the next**.  

1. **`compute_box_center(box_coords)`**  
   - Takes the coordinates of a bounding box (`x1, y1, x2, y2`).  
   - Returns the center point of that box as `(x, y)`.  
   - The center is useful for checking how far an object has moved between frames.  

2. **`track_objects(detections, cls_id)`**  
   - Updates the list of tracked objects for a given class (e.g., person, bottle, laptop).  
   - For every detection in the current frame:  
     - Computes its center.  
     - Looks at existing tracked objects of the same class.  
     - Finds the closest one by measuring the **Euclidean distance** between centers.  

In [None]:
# Function to compute the center of the bounding box
def compute_box_center(box_coords):
    x1, y1, x2, y2 = box_coords
    return (int((x1 + x2) / 2), int((y1 + y2) / 2))

# Function to update trackers based on distance between box centers
def track_objects(detections, cls_id):
    global track_ids, object_trackers

    updated_tracker_data = {}  # Temporary dictionary to hold updated tracks

    # Loop through each detection for the current class
    for detection_data in detections:
        bbox = detection_data['bbox']
        score = detection_data['score']
        center = compute_box_center(bbox)

        # Find the closest existing tracker for the same class
        min_distance = float('inf')
        closest_tracker = None

        for t_id, tracking_data in object_trackers[cls_id].items():
            tracked_center = tracking_data['center']
            dist = distance.euclidean(center, tracked_center)
            if dist < min_distance:
                min_distance = dist
                closest_tracker = t_id

        # If a close match is found, update the tracker
        if min_distance < 50:  # Adjust this threshold for better accuracy
            updated_tracker_data[closest_tracker] = {'center': center, 'bbox': bbox, 'score': score}
        else:
            # Create a new track if no suitable match is found
            new_tracker_id = track_ids[cls_id]
            track_ids[cls_id] += 1
            updated_tracker_data[new_tracker_id] = {'center': center, 'bbox': bbox, 'score': score}

    # Update the global trackers for the current class
    object_trackers[cls_id] = updated_tracker_data


###  Frame-by-Frame Tracking and Drawing Results  

This is the **main loop** where the video is processed one frame at a time, detections are made, and objects are tracked across frames.  

- The loop runs while the video is open. If no frame can be read, it stops.  
- Each frame is preprocessed (`frame_to_tensor`) and passed into the detection model to get predictions.  
- For every prediction, I collect:  
  - the **class ID** (e.g., person, bottle, laptop),  
  - the **confidence score**, and  
  - the **bounding box coordinates**.  
- I only keep detections that belong to my **target classes** and are above their confidence thresholds.  
- For each class, I update the **object trackers** using the `track_objects()` function. This ensures that the same object keeps its ID across frames.  
- Then, for every tracked object, I draw:  
  - a **green bounding box** around it,  
  - a **red dot** at its center,  
  - and a label showing the object’s name, its unique ID, and the confidence score.  
- Finally, the processed frame (with boxes, IDs, and labels) is saved into the **output video file** using `output_writer.write()`.  


In [None]:
# Process the video frame-by-frame
while video_stream.isOpened():
    ret, current_frame = video_stream.read()
    if not ret or current_frame is None:
        print("End of video reached or failed to capture frame.")
        break

    # Preprocess the frame and perform object detection
    frame_tensor = frame_to_tensor(current_frame)
    with torch.no_grad():
        predictions = tracking_model(frame_tensor)[0]

    # Gather detections by class
    class_detections = defaultdict(list)

    for i in range(len(predictions['labels'])):
        cls_id = predictions['labels'][i].item()
        confidence = predictions['scores'][i].item()
        bbox = predictions['boxes'][i].cpu().numpy().astype(int)

        # Filter by selected classes and confidence thresholds
        if cls_id in targeted_classes and confidence >= class_conf_thresholds[cls_id]:
            class_detections[cls_id].append({'bbox': bbox, 'score': confidence})

    # Update object trackers for each class
    for cls_id, detections in class_detections.items():
        track_objects(detections, cls_id)

    # Draw bounding boxes and track information on the frame
    for cls_id, tracked_objects in object_trackers.items():
        for tracker_id, track_data in tracked_objects.items():
            bbox = track_data['bbox']
            center = track_data['center']
            score = track_data['score']
            label = class_map[cls_id]

            # Draw bounding box, center point, and track ID on the frame
            cv2.rectangle(current_frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (0, 255, 0), 2)
            cv2.circle(current_frame, center, 3, (0, 0, 255), -1)
            track_text = f"{label} ID: {tracker_id} | {score:.2f}"
            cv2.putText(current_frame, track_text, (bbox[0], bbox[1] - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)

    # Write the processed frame to the output video
    output_writer.write(current_frame)


End of video reached or failed to capture frame.


In [None]:
# Release video resources
video_stream.release()
output_writer.release()  # Ensure this is called to properly close the video file


print("Object tracking completed. The result is saved as Task3.mp4.")


Object tracking completed. The result is saved as Task3.mp4.
