# Tracking

### Libraries and Variables

In [None]:
# general
import os
import random
import mmcv
import numpy as np
import cv2
from tqdm import tqdm
import numpy as np

import torch
import matplotlib.pyplot as plt
import ssl
ssl._create_default_https_context = ssl._create_unverified_context

# detection
from mmdet.apis import init_detector, inference_detector
from mmdet.utils import register_all_modules
from mmdet.apis import init_detector, inference_detector

# segmentation
import matplotlib.pyplot as plt
from scipy.optimize import linear_sum_assignment

home_dir = os.path.expanduser('~')
raw_data_dir = os.path.join(home_dir, 'repos/DaNuMa2024/data/raw_data')
output_data_dir = os.path.join(home_dir, 'repos/DaNuMa2024/data/output_data')


### Overview

In this notebook, you will implement a simple algorithm for multiple-object tracking to track pigs in videos. We will employ the so-called "tracking-by-detection" paradigm. In this paradigm, an object detector is used to generate bounding boxes for every frame. Then, some algorithm to match these bounding boxes with each other is used to obtain tracks. In this exercise, the matching rule you will implement is very simple and could be improved in many ways. However, it is still the basis of many modern mutli-object-tracking frameworks (see further reads).

### Object detector

Just as in exercise 7, we will use a pre-trained object detector for pigs. The code is already provided below. If you want to know details about the pretrained pig detection framework, take a look at the repository at https://github.com/jonaden94/PigDetect/ and the corresponding demo notebook at https://github.com/jonaden94/PigDetect/blob/main/tools/inference/inference_demo.ipynb

In [4]:
# 1. initialize the model
config_path = os.path.join(home_dir, 'repos/PigDetect/configs/co-detr/co_dino_swin.py')
checkpoint_codino_path = os.path.join(raw_data_dir, '7_instance_segmentation/pretrained/codino_swin.pth')
register_all_modules(init_default_scope=False)
model = init_detector(config_path, checkpoint_codino_path, device='cuda:0') # cuda:0 for gpu

# 2. run model inference on image
image_path = os.path.join(raw_data_dir, '7_instance_segmentation/images/danuma_1578.jpg')
image = mmcv.imread(image_path, channel_order='rgb')
result = inference_detector(model, image)

# # this is how you get scores and bboxes
# scores = result.pred_instances.scores.cpu().numpy()
# bboxes = result.pred_instances.bboxes.cpu().numpy()
# bboxes = bboxes[scores > 0.5] # filter boxes by score

09/26 02:43:34 - mmengine - [4m[97mINFO[0m - 
rpn_conv.weight - torch.Size([256, 256, 3, 3]): 
NormalInit: mean=0, std=0.01, bias=0 
 
09/26 02:43:34 - mmengine - [4m[97mINFO[0m - 
rpn_conv.bias - torch.Size([256]): 
NormalInit: mean=0, std=0.01, bias=0 
 
09/26 02:43:34 - mmengine - [4m[97mINFO[0m - 
rpn_cls.weight - torch.Size([9, 256, 1, 1]): 
NormalInit: mean=0, std=0.01, bias=0 
 
09/26 02:43:34 - mmengine - [4m[97mINFO[0m - 
rpn_cls.bias - torch.Size([9]): 
NormalInit: mean=0, std=0.01, bias=0 
 
09/26 02:43:34 - mmengine - [4m[97mINFO[0m - 
rpn_reg.weight - torch.Size([36, 256, 1, 1]): 
NormalInit: mean=0, std=0.01, bias=0 
 
09/26 02:43:34 - mmengine - [4m[97mINFO[0m - 
rpn_reg.bias - torch.Size([36]): 
NormalInit: mean=0, std=0.01, bias=0 
 




09/26 02:43:34 - mmengine - [4m[97mINFO[0m - 
bbox_head.fc_cls.weight - torch.Size([2, 1024]): 
NormalInit: mean=0, std=0.01, bias=0 
 
09/26 02:43:34 - mmengine - [4m[97mINFO[0m - 
bbox_head.fc_cls.bias - torch.Size([2]): 
NormalInit: mean=0, std=0.01, bias=0 
 
09/26 02:43:34 - mmengine - [4m[97mINFO[0m - 
bbox_head.fc_reg.weight - torch.Size([4, 1024]): 
NormalInit: mean=0, std=0.001, bias=0 
 
09/26 02:43:34 - mmengine - [4m[97mINFO[0m - 
bbox_head.fc_reg.bias - torch.Size([4]): 
NormalInit: mean=0, std=0.001, bias=0 
 
09/26 02:43:34 - mmengine - [4m[97mINFO[0m - 
bbox_head.shared_fcs.0.weight - torch.Size([1024, 12544]): 
XavierInit: gain=1, distribution=uniform, bias=0 
 
09/26 02:43:34 - mmengine - [4m[97mINFO[0m - 
bbox_head.shared_fcs.0.bias - torch.Size([1024]): 
XavierInit: gain=1, distribution=uniform, bias=0 
 
09/26 02:43:34 - mmengine - [4m[97mINFO[0m - 
bbox_head.shared_fcs.1.weight - torch.Size([1024, 1024]): 
XavierInit: gain=1, distribution=unif



RuntimeError: No CUDA GPUs are available

### Definition of tracking functions

The ``track_objects_in_video`` function given below takes as input a directory where all frames of a video are saved ("images_dir") and a pretrained object detection model ("model"). The pretrained object detection model is used to generate detections for every frame. To match the bounding boxes of two consecutive frames with each other, the function ``match_bboxes`` function is called. It calculates for every pair of bounding boxes from the two consecutive frames how much they overlap. For example, if there are M detections in the first frame and N detections in the second frame, M $\cdot$ N overlap metrics are computed. The overlap metric that we use here is the so-called **IoU**. Your task in this exercise is to implement the function ``calculate_iou``:
* Take a look at the iou_example.svg image in the exercise folder to understand what the IoU is.
* Hint: Keep in mind that the minimum y-value of a box is at the top (see iou_example.svg)! This is because for images the y-coordinate zero is at the top. This is just a convention but it is also in line with our intuition about matrices (images are just matrices, right?) where we start counting the rows from the top. The rows represent the y-axis.

To calculate the IoU, perform the following steps:
1. get the minimum x/y and maximum x/y values for both boxes
2. from these, infer a condition when the bounding boxes do not overlap at all. In this case, immediately return 0.
3. If the bounding boxes overlap, calculate the intersection and union and return it.

The other parts of the code are already complete and the tracking algorithm should work if you correctly implemented ``calculate_iou``! :) . However, you might want to take a look at the matching algorithm in the ``match_bboxes`` function to understand how the matching is done. Basically, we calculate the IoU for every pair of bounding boxes in two consecutive frames. So, if there are for example N bounding boxes in frame 1 and M bounding boxes in frame 2, we calculate an IoU matrix of size M $\cdot$ N. Then we choose the matching that maximizes the total IoU. For this, we use the ``linear_sum_assignment`` function from scipy. The underlying algorithm is relatively complex. Google "Hungarian algorithm" if you want to know more!

In [None]:
def calculate_iou(bbox1, bbox2):
    """
    Calculate IoU between two bounding boxes.
    Args:
        bbox1, bbox2: Bounding boxes [x_min, y_min, x_max, y_max]
    Returns:
        iou: Intersection over Union
    """
    ################ YOUR CODE HERE:
    x_min1, y_min1, x_max1, y_max1 = bbox1
    x_min2, y_min2, x_max2, y_max2 = bbox2
    
    # Calculate intersection
    x_min_inter = max(x_min1, x_min2)
    y_min_inter = max(y_min1, y_min2)
    x_max_inter = min(x_max1, x_max2)
    y_max_inter = min(y_max1, y_max2)
    
    if x_max_inter < x_min_inter or y_max_inter < y_min_inter:
        return 0.0
    
    # Intersection area
    intersection_area = (x_max_inter - x_min_inter) * (y_max_inter - y_min_inter)
    
    # Areas of the bboxes
    bbox1_area = (x_max1 - x_min1) * (y_max1 - y_min1)
    bbox2_area = (x_max2 - x_min2) * (y_max2 - y_min2)
    
    # Union area
    union_area = bbox1_area + bbox2_area - intersection_area
    
    # IoU
    return intersection_area / union_area

In [None]:
def match_bboxes(bboxes_frame1, bboxes_frame2, iou_threshold):
    """
    Match bounding boxes between two frames using IoU and Hungarian Algorithm.
    Args:
        bboxes_frame1: Bounding boxes in frame 1 (list of [x_min, y_min, x_max, y_max])
        bboxes_frame2: Bounding boxes in frame 2 (list of [x_min, y_min, x_max, y_max])
    Returns:
        matches: List of tuples where each tuple is (index_in_frame1, index_in_frame2)
    """
    iou_matrix = np.zeros((len(bboxes_frame1), len(bboxes_frame2)))

    for i, bbox1 in enumerate(bboxes_frame1):
        for j, bbox2 in enumerate(bboxes_frame2):
            iou_matrix[i, j] = calculate_iou(bbox1, bbox2)
    
    # Perform Hungarian matching (maximize IoU by minimizing negative IoU)
    row_ind, col_ind = linear_sum_assignment(-iou_matrix)
    
    # Filter matches based on IoU threshold (e.g., ignore low IoU matches)
    matches = []
    for r, c in zip(row_ind, col_ind):
        if iou_matrix[r, c] > iou_threshold:  # Threshold for valid IoU match
            matches.append((r, c))
    
    return matches

In [None]:
# Tracking function for all frames
def track_objects_in_video(images_dir, model, output_track_path, iou_threshold=0.3):
    """
    Perform object detection on each frame and track objects using IoU-based matching.
    Args:
        images_dir: Directory containing images (frames)
        model: Initialized object detection model
        output_track_path: Path to save the tracking result in MOT format
        iou_threshold: IoU threshold for matching
    """
    frame_files = sorted([f for f in os.listdir(images_dir) if f.endswith('.jpg')])
    
    tracks = []
    next_track_id = 0
    active_tracks = {}
    
    for frame_idx, frame_file in tqdm(enumerate(frame_files)):
        # Load frame
        frame_path = os.path.join(images_dir, frame_file)
        frame = mmcv.imread(frame_path, channel_order='rgb')
        
        # Run detection
        result = inference_detector(model, frame)
        bboxes = result.pred_instances.bboxes.cpu().numpy()
        scores = result.pred_instances.scores.cpu().numpy()
        bboxes = bboxes[scores > 0.5]
        
        if frame_idx == 0:
            # Initialize new tracks in the first frame
            for bbox in bboxes:
                tracks.append([frame_idx + 1, next_track_id, *bbox])
                active_tracks[next_track_id] = bbox
                next_track_id += 1
        else:
            # Match current frame bboxes with previous frame tracks
            previous_bboxes = active_tracks.values()
            matches = match_bboxes(previous_bboxes, bboxes, iou_threshold)
            
            # Update existing tracks with matched bboxes
            matched_tracks = set()
            for match in matches:
                track_idx, bbox_idx = match
                track_id = list(active_tracks.keys())[track_idx]
                tracks.append([frame_idx + 1, track_id, *bboxes[bbox_idx]])
                active_tracks[track_id] = bboxes[bbox_idx]
                matched_tracks.add(track_id)

            # Remove inactive tracks
            inactive_tracks = set(active_tracks.keys()) - matched_tracks
            for track_id in inactive_tracks:
                del active_tracks[track_id]
                
            # Start new tracks for unmatched bboxes
            unmatched_bboxes = set(range(len(bboxes))) - {m[1] for m in matches}
            for bbox_idx in unmatched_bboxes:
                tracks.append([frame_idx + 1, next_track_id, *bboxes[bbox_idx]])
                active_tracks[next_track_id] = bboxes[bbox_idx]
                next_track_id += 1
            
    # Save tracks to file
    with open(output_track_path, 'w') as f:
        for track in tracks:
            f.write(','.join(map(str, track)) + '\n')

### Run tracking

We can now simply apply the tracking functionality from above.

In [None]:
# path to frames of the video
images_dir = os.path.join(raw_data_dir, '9_tracking/video1/images')

# path to save tracking results
output_dir_tracking = os.path.join(output_data_dir, '9_tracking/video1')
os.makedirs(output_dir_tracking, exist_ok=True)
tracking_results_path = os.path.join(output_dir_tracking, 'tracking_results.txt')

# call the tracking function
track_objects_in_video(images_dir, model, tracking_results_path)

### Visualize tracking results

This function is used to visualize the tracks. It just plots the bounding boxes on each image and creates a video. The color of a bounding box represents the predicted id! Inspect the visualization. 
* Where do mistakes happen and why? 
* How could we improve the tracker?

################### YOUR ANSWER HERE: \
We can see that, if the detector has problems, tracks immediately terminate since a match is not found. \
This could be alleviated by e.g. adding a patience where tracks are not lost immediately but rather keps im memory for a couple of frames. \
However, this does not alleviate the problem if objects that were not detected for a couple of frames moved or if the detector has severe problems. \
More recent frameworks (see furhter reads) try to mitigate the reliance on external object detectors by actually LEARNING the whole tracking process without external detectors.

In [None]:
# Function to visualize tracking and save both individual frames and a video
def visualize_mot(images_dir, tracks_file, output_dir, video_output_path, ids_to_visualize=None, bbox_linewidth=2, id_size=1, fps=10):
    np.random.seed(2)
    
    # Load tracking data from file
    tracks = {}
    with open(tracks_file, 'r') as f:
        for line in f:
            data = [int(i) if i.isdigit() else float(i) for i in line.split(',')]
            frame_id, obj_id, x_min, y_min, x_max, y_max = data # assuming the format is [x_min, y_min, x_max, y_max]
            if frame_id not in tracks:
                tracks[frame_id] = []
            tracks[frame_id].append((obj_id, x_min, y_min, x_max, y_max))
    
    # Get the list of image files (frames)
    frame_files = sorted([f for f in os.listdir(images_dir) if f.endswith('.jpg')])
    
    # Initialize output directory for frames and video
    frames_output_dir = os.path.join(output_dir, 'frames')
    os.makedirs(frames_output_dir, exist_ok=True)
    
    # Initialize the video writer
    first_frame = cv2.imread(os.path.join(images_dir, frame_files[0]))
    height, width, _ = first_frame.shape
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Codec for .mp4
    video_writer = cv2.VideoWriter(video_output_path, fourcc, fps, (width, height))
    
    # Initialize a dictionary to store bbox colors for each obj_id
    colors = {}
    
    # Iterate over frames and draw bounding boxes
    for frame_idx, frame_file in enumerate(tqdm(frame_files)):
        frame_id = frame_idx + 1  # Assuming frame ID is based on the order of files
        frame_path = os.path.join(images_dir, frame_file)
        frame = cv2.imread(frame_path)
        
        # If the frame contains tracking data, draw the bounding boxes
        if frame_id in tracks:
            for obj_id, x_min, y_min, x_max, y_max in tracks[frame_id]:
                if ids_to_visualize is not None and obj_id not in ids_to_visualize:
                    continue
                
                # Assign a random color to the object if it's not already in the dictionary
                if obj_id not in colors:
                    colors[obj_id] = tuple(map(int, np.random.choice(range(256), size=3)))
                
                color = colors[obj_id]
                
                # Cast x_min, y_min, x_max, y_max to integers
                x_min, y_min, x_max, y_max = int(x_min), int(y_min), int(x_max), int(y_max)
                
                # Draw bounding box
                cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), color, bbox_linewidth)
                
                # Draw object ID label
                cv2.putText(frame, str(obj_id), (x_min, y_min - 5), cv2.FONT_HERSHEY_SIMPLEX, id_size, color, 2)
        
        # Save the output frame with bounding boxes
        output_frame_path = os.path.join(frames_output_dir, frame_file)
        cv2.imwrite(output_frame_path, frame)
        
        # Write the frame to the video
        video_writer.write(frame)
    
    # Release the video writer
    video_writer.release()

In [None]:
visualize_mot(
    images_dir=images_dir, 
    tracks_file=tracking_results_path, 
    output_dir=output_dir_tracking,
    video_output_path=os.path.join(output_dir_tracking, 'tracking_results.mp4')
)

### further reads
* ground-breaking paper that first introduced the basic tracking idea employed in this exercise: https://ieeexplore.ieee.org/stamp/stamp.jsp?arnumber=7533003
* example of further developments of this method that builds more sophisticated matching rule: https://ieeexplore.ieee.org/stamp/stamp.jsp?arnumber=8296962
* more recent frameworks try learning tracking (not only pretrained object detector with some association rule). Examples are: https://openaccess.thecvf.com/content/CVPR2022/papers/Meinhardt_TrackFormer_Multi-Object_Tracking_With_Transformers_CVPR_2022_paper.pdf \
https://arxiv.org/pdf/2105.03247 \
https://arxiv.org/pdf/2211.09791
