## Task 1.3: Object Tracking with Optical Flow

### Import required libraries

In [1]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
# Some basic setup:
# Setup detectron2 logger
import detectron2
from detectron2.utils.logger import setup_logger
setup_logger()
import motmetrics

# import some common libraries
import numpy as np
import os, json, cv2, random
from PIL import Image
# import some common detectron2 utilities
from detectron2 import model_zoo
from detectron2.engine import DefaultPredictor
from detectron2.config import get_cfg
from detectron2.utils.visualizer import Visualizer
from detectron2.data import MetadataCatalog, DatasetCatalog

### Code to compute IoU

In [2]:
def compute_iou(bboxA, bboxB):
    # Code provided by teacher in M1 subject
    # compute the intersection over union of two bboxes
    
    # Format of the bboxes is [xtl, ytl, xbr, ybr, ...], where tl and br
    # indicate top-left and bottom-right corners of the bbox respectively.

    # determine the coordinates of the intersection rectangle
    xA = max(bboxA[0], bboxB[0])
    
    yA = max(bboxA[1], bboxB[1])
    xB = min(bboxA[2], bboxB[2])
    yB = min(bboxA[3], bboxB[3])
    
    # compute the area of intersection rectangle
    interArea = max(0, xB - xA + 1) * max(0, yB - yA + 1)
 
    # compute the area of both bboxes
    bboxAArea = (bboxA[3] - bboxA[1] + 1) * (bboxA[2] - bboxA[0] + 1)
    bboxBArea = (bboxB[3] - bboxB[1] + 1) * (bboxB[2] - bboxB[0] + 1)
    
    iou = interArea / float(bboxAArea + bboxBArea - interArea)
    
    # return the intersection over union value
    return iou



### Parse annotations

In [4]:
import xml.etree.ElementTree as ET
def parse_xml(file_path):
    """
    Parses an XML file and extracts bounding box information for each frame and track.

    Args:
        file_path (str): Path to the XML file.

    Returns:
        tuple: A tuple containing two dictionaries, `tracks` and `frames`.
            `tracks` contains information for each track, with the track IDs as keys and the box information
            for each frame as values.
            `frames` contains information for each frame, with the frame numbers as keys and a list of boxes as
            values.
    """
    # Parse the XML file
    tree = ET.parse(file_path)
    root = tree.getroot()

    frames = {}

    # Iterate over the tracks and extract their bounding box information
    for track in root.findall(".//track[@label='car']"):
        track_id = track.get('id')
        for box in track.findall(".//box"):
            box_frame = int(box.get('frame'))
            xtl, ytl, xbr, ybr = map(float, [box.get('xtl'), box.get('ytl'), box.get('xbr'), box.get('ybr')])
            outside, occluded, keyframe = map(int, [box.get('outside'), box.get('occluded'), box.get('keyframe')])
            parked = box.find(".//attribute[@name='parked']").text == 'true'

            # Add the box to the list of boxes for this frame
            if box_frame not in frames:
                frames[box_frame] = []

            frames[box_frame].append({
                'xtl': xtl,
                'ytl': ytl,
                'xbr': xbr,
                'ybr': ybr,
                'track_id': track_id,
                'occluded': occluded
            })

    return frames


frames_gt = parse_xml('annotations.xml')
frames_gt = dict(sorted(frames_gt.items(), key=lambda x: x[0]))
# Open the output file for writing
output_file = "output_annotations.txt"
output_fp = open(output_file, "w")

for frame in frames_gt:
    for bbox in frames_gt[frame]:
        x, y, z = -1, -1, -1  # No information about x, y, z
        line = "{},{},{},{},{},{},{},{},{},{}\n".format(
            str(int(frame)+1),
            bbox['track_id'],
            bbox['xtl'],
            bbox['ytl'],
            bbox['xbr'] - bbox['xtl'],
            bbox['ybr'] - bbox['ytl'],
            1,
            x,
            y,
            z
        )

        output_fp.write(line)

#Release the output file
output_fp.close()

### Function for detecting objects using Detectron2

In [3]:
def detect_objects(predictor, frame):
    # Detect objects in the current frame using the pretrained model
    outputs = predictor(frame) 
    current_objects = outputs['instances']

    instances = outputs["instances"].to("cpu")
    boxes = instances.pred_boxes.tensor.numpy()
    #print("Boxes", boxes)
    classes = instances.pred_classes.numpy()
    #print("Classes", classes)
    #print(classes)
    scores = instances.scores.numpy()
    num_boxes = boxes.shape[0]

    return boxes, classes, scores, num_boxes



### Function for writing tracker output in MOT16 format

In [4]:
def write_tracker_output_MOT16(object_tracker, output_fp):
      # Write the tracker output to the output file in the MOT16 format
    
    for track, bbox in object_tracker.items():
    
        x, y, z = -1, -1, -1  # No information about x, y, z
        line = "{},{},{},{},{},{},{},{},{},{}\n".format(
            frame_num,
            track,
            bbox[0],
            bbox[1],
            bbox[2] - bbox[0],
            bbox[3] - bbox[1],
            1,
            x,
            y,
            z
        )
        output_fp.write(line)
    
    return

### Function for visualizing result

In [5]:
def visualize_tracked_objects_in_frame(frame, object_tracker, prev_object_tracker):
    # Visualize the tracked objects in the current frame
    v = Visualizer(frame[:, :, ::-1], MetadataCatalog.get(cfg.DATASETS.TRAIN[0]), scale=1.2)
    #out = v.draw_instance_predictions(outputs["instances"].to("cpu"))
    index_color = 0
    for object_id, object_box in object_tracker.items():
        out = v.draw_text(f"{object_id}", (object_box[0], object_box[1]), font_size=8)
        out = v.draw_box(object_box, edge_color='g')
    for object_id, object_box in prev_object_tracker.items():
        out = v.draw_text(f"{object_id}", (object_box[0], object_box[1]), font_size=8)
        out = v.draw_box(object_box, alpha=0.2, edge_color='b')

    result = out.get_image()[:, :, ::-1]

    return result


### Function for computing Lucas-Kanade optical flow

In [6]:
def compute_optical_flow_lucas_kanade(prev_frame, frame, object_box):
    #Define the parameters for Lucas-Kanade optical flow
    lk_params = dict(winSize=(15, 15), maxLevel=4, criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

    #Define the feature points to track (Shi Tomasi corner detection)
    feature_params = dict(maxCorners=100, qualityLevel=0.3, minDistance=7, blockSize=7)
    #Covnert prev_frame and frame to gray
    prev_frame_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
    #pick bounding box
    p0 = cv2.goodFeaturesToTrack(prev_frame_gray[int(object_box[1]):int(object_box[3]), int(object_box[0]):int(object_box[2])], mask=None, **feature_params)

    if p0 is None:
        #Computing the OF for all the pixels inside detection if we do not have good points.
        height = int(object_box[3]) - int(object_box[1])
        width = int(object_box[2]) - int(object_box[0])
        p0 = np.array([[x, y] for y in range(int(object_box[1]), int(object_box[3])) for x in
                    range(int(object_box[0]), int(object_box[2]))],
                    dtype=np.float32).reshape((-1, 1, 2))
    #Calculate the optical flow 
    p1, st, err = cv2.calcOpticalFlowPyrLK(prev_frame, frame, p0, None, **lk_params)


    #print(p1)
    #print(p1.shape)

    flow = p1 - p0
    flow[st == 0] = 0
    flow[:,:,1] = - flow[:,:,1]
    flow = np.mean(flow, axis=(0,1))
    print(flow)
    return flow
    

### Function for computing Pyflow optical flow

In [6]:
def compute_optical_flow_pyflow(prev_frame, frame, object_box):
    im1 = prev_frame.astype(float) / 255.
    im2 = frame.astype(float) / 255.


    # Flow Options:
    alpha = 0.012
    ratio = 0.75
    minWidth = 20
    nOuterFPIterations = 7
    nInnerFPIterations = 1
    nSORIterations = 30
    colType = 0  # 0 or default:RGB, 1:GRAY (but pass gray image with shape (h,w,1))

    s = time.time()
    bounding_box_im1 = im1[int(object_box[1]):int(object_box[3]), int(object_box[0]):int(object_box[2])].copy(order='C')
    bounding_box_im2 = im2[int(object_box[1]):int(object_box[3]), int(object_box[0]):int(object_box[2])].copy(order='C')
    u, v, im2W = pyflow.coarse2fine_flow(
        bounding_box_im1, bounding_box_im2, alpha, ratio, minWidth, nOuterFPIterations, nInnerFPIterations,
        nSORIterations, colType)
    e = time.time()
    print('Time Taken: %.2f seconds for image of size (%d, %d, %d)' % (
        e - s, bounding_box_im1.shape[0], bounding_box_im1.shape[1], bounding_box_im1.shape[2]))
    flow = np.concatenate((u[..., None], v[..., None]), axis=2)
    flow = np.mean(flow, axis=(0,1))

    return flow

## Object tracking from previous week using optical flow

### Store detections in a pickle file

In [None]:
import pickle

# Load the video
video = cv2.VideoCapture('vdo.avi')


#Load configuration
cfg = get_cfg()
cfg.merge_from_file(model_zoo.get_config_file("COCO-Detection/faster_rcnn_X_101_32x8d_FPN_3x.yaml"))
cfg.MODEL.ROI_HEADS.NUM_CLASSES = 1
cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.5  # set threshold for this model
# Find a model from detectron2's model zoo. You can use the https://dl.fbaipublicfiles... url as well
cfg.MODEL.WEIGHTS = 'model_final.pth'
predictor = DefaultPredictor(cfg)

frames = {}
frame_num = 1
while True:
    
    # Read the current frame from the video
    ret, frame = video.read()

    # Stop if there are no more frames
    if not ret:
        break

    boxes, classes, scores, num_boxes = detect_objects(predictor, frame)    
    frames[frame_num] = {}
    frames[frame_num]['boxes'] = boxes
    frames[frame_num]['classes'] = classes
    frames[frame_num]['scores'] = scores
    frames[frame_num]['num_boxes'] = num_boxes
    frame_num += 1

# save dictionary to person_data.pkl file
with open('detections.pkl', 'wb') as fp:
    pickle.dump(frames, fp)
    print('dictionary saved successfully to file')




In [None]:
# # Load the video
video = cv2.VideoCapture('vdo.avi')
import pickle

# #Load configuration
# cfg = get_cfg()
# cfg.merge_from_file(model_zoo.get_config_file("COCO-Detection/faster_rcnn_X_101_32x8d_FPN_3x.yaml"))
# cfg.MODEL.ROI_HEADS.NUM_CLASSES = 1
# cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.5  # set threshold for this model
# # Find a model from detectron2's model zoo. You can use the https://dl.fbaipublicfiles... url as well
# cfg.MODEL.WEIGHTS = 'model_final.pth'
# predictor = DefaultPredictor(cfg)

# Read dictionary pkl file
with open('detections.pkl', 'rb') as fp:
    frames = pickle.load(fp)


# Initialize variables

object_tracker = {}
optical_flow = {}
current_id = 0
frame_num = 1
prev_frame = None

# Open the output file for writing
output_file = "pyflow.txt"
output_fp = open(output_file, "w")
while True:
    
    # Read the current frame from the video
    ret, frame = video.read()
    print(frame_num)

    # Stop if there are no more frames
    if not ret:
        break

    # Initialize a new dictionary to store the detected objects in the current frame
    new_object_tracker = {}
    prev_object_tracker = object_tracker.copy()
    #boxes, classes, scores, num_boxes = detect_objects(predictor, frame)
    boxes = frames[frame_num]['boxes']
    classes = frames[frame_num]['classes']
    scores = frames[frame_num]['scores']
    num_boxes = frames[frame_num]['num_boxes']

    for i in range(num_boxes):
        box = boxes[i]
        score = scores[i]
        class_object = classes[i]
        # Filter out low-scoring objects
        if score < 0.7:
            continue

        

        # Try to match the detected object with a previously tracked object based on IoU
        best_match_id = None
        best_match_iou = 0
        for object_id, object_box in object_tracker.items():
            if prev_frame.all() != None:
                #optical_flow = compute_optical_flow_pyflow(prev_frame, frame, object_box)
                optical_flow = compute_optical_flow_lucas_kanade(prev_frame, frame, object_box) #too slow
                print(optical_flow)
                object_box[0] = object_box[0] + optical_flow[0] if object_box[0] + optical_flow[0] >= 0 else object_box[0] # xtl + optical_flow x
                object_box[1] = object_box[1] + optical_flow[1] if object_box[1] + optical_flow[1] >= 0 else object_box[1]# ytl + optical_flow y
                object_box[2] = object_box[2] + optical_flow[0] if object_box[2] + optical_flow[0] >= 0 else object_box[2]# xbr + optical_flow x
                object_box[3] = object_box[3] + optical_flow[1] if object_box[3] + optical_flow[1] >= 0 else object_box[3]# ybr + optical_flow y
                prev_object_tracker[object_id] = object_box
            iou = compute_iou(box, object_box)
            if iou > best_match_iou:
                best_match_iou = iou
                best_match_id = object_id
        # If the best match has IoU > 0.4, assign the same ID to the detected object
        print(best_match_iou)
        if best_match_id is not None and best_match_iou > 0.3:
            new_object_tracker[best_match_id] = box
            del object_tracker[best_match_id]
        else:
            # Assign a new ID to each new detected object
            current_id += 1
            new_object_tracker[current_id] = box
      
    # Update the object tracker for the current frame
    object_tracker = new_object_tracker
    
    result = visualize_tracked_objects_in_frame(frame, object_tracker, prev_object_tracker)
    # Display the current frame with the tracked objects
    cv2.imwrite('frames/frame_{}.png'.format(frame_num), result)
    cv2.imshow("Object tracking", result)
    
    if cv2.waitKey(50) & 0xFF == ord('q'):
        break

    write_tracker_output_MOT16(object_tracker, output_fp)
    prev_frame = frame
    frame_num += 1

# Release the video and the output file and close all windows
video.release()
cv2.destroyAllWindows()
output_fp.close()

### Motmetrics


In [11]:
def motMetricsEnhancedCalculator(gtSource, tSource):
  # import required packages
  import motmetrics as mm
  import numpy as np
  
  # load ground truth
  gt = np.loadtxt(gtSource, delimiter=',')

  # load tracking output
  t = np.loadtxt(tSource, delimiter=',')

  # Create an accumulator that will be updated during each frame
  acc = mm.MOTAccumulator(auto_id=True)

  # Max frame number maybe different for gt and t files
  for frame in range(int(gt[:,0].max())):
    frame += 1 # detection and frame numbers begin at 1

    # select id, x, y, width, height for current frame
    # required format for distance calculation is X, Y, Width, Height \
    # We already have this format
    gt_dets = gt[gt[:,0]==frame,1:6] # select all detections in gt
    t_dets = t[t[:,0]==frame,1:6] # select all detections in t

    C = mm.distances.iou_matrix(gt_dets[:,1:], t_dets[:,1:], \
                                max_iou=0.5) # format: gt, t

    # Call update once for per frame.
    # format: gt object ids, t object ids, distance
    acc.update(gt_dets[:,0].astype('int').tolist(), \
              t_dets[:,0].astype('int').tolist(), C)

  mh = mm.metrics.create()

  summary = mh.compute(acc, metrics=['num_frames', 'idf1', 'idp', 'idr', \
                                     'recall', 'precision', 'num_objects', \
                                     'mostly_tracked', 'partially_tracked', \
                                     'mostly_lost', 'num_false_positives', \
                                     'num_misses', 'num_switches', \
                                     'num_fragmentations', 'mota', 'motp' \
                                    ], \
                      name='acc')

  strsummary = mm.io.render_summary(
      summary,
      #formatters={'mota' : '{:.2%}'.format},
      namemap={'idf1': 'IDF1', 'idp': 'IDP', 'idr': 'IDR', 'recall': 'Rcll', \
               'precision': 'Prcn', 'num_objects': 'GT', \
               'mostly_tracked' : 'MT', 'partially_tracked': 'PT', \
               'mostly_lost' : 'ML', 'num_false_positives': 'FP', \
               'num_misses': 'FN', 'num_switches' : 'IDsw', \
               'num_fragmentations' : 'FM', 'mota': 'MOTA', 'motp' : 'MOTP',  \
              }
  )
  print(strsummary)

In [12]:
motMetricsEnhancedCalculator('gt.txt', \
                             'pyflow.txt')
motMetricsEnhancedCalculator('gt.txt', \
                             'Faster-RCNN-finetuned-03.txt')

     num_frames      IDF1       IDP       IDR      Rcll      Prcn     GT  MT  PT  ML   FP   FN  IDsw  FM      MOTA      MOTP
acc        2141  0.774495  0.781302  0.767806  0.970686  0.987748  21594  53   3   0  260  633    71  52  0.955358  0.052928
     num_frames      IDF1       IDP       IDR      Rcll      Prcn     GT  MT  PT  ML   FP   FN  IDsw  FM      MOTA      MOTP
acc        2141  0.769377  0.771563  0.767204  0.973326  0.978856  21594  53   3   0  454  576    96  51  0.947856  0.053546


## Create video

In [16]:
directory_path = './frames'
num_digits = 4

# Get a list of files in the directory
file_list = os.listdir(directory_path)

# Loop through each file in the list
for filename in file_list:
    # Check if the file starts with "frame_" and ends with ".png"
    if filename.startswith('frame_') and filename.endswith('.png'):
        # Extract the frame number from the filename
        frame_num = int(filename.split('_')[1].split('.')[0])
        # Create the new filename with leading zeros
        new_filename = f"frame_{frame_num:0{num_digits}}.png"
        # Rename the file
        os.rename(os.path.join(directory_path, filename), os.path.join(directory_path, new_filename))

In [None]:
import os
import cv2

# Set up parameters
frame_rate = 15.0  # frames per second
output_file = 'output.mp4'


height, width, _ = cv2.imread('./frames/frame_0001.png').shape
size = (width, height)
# Get a list of image files
image_dir = './frames'
image_files = os.listdir(image_dir)
#
# Sort the image files
image_files = sorted(image_files)
# Create a cv2.VideoWriter object
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
video_writer = cv2.VideoWriter(output_file, fourcc, frame_rate, size)

# Iterate over the image files
for image_file in image_files:
    print(image_file)
    image_path = os.path.join(image_dir, image_file)
    image = cv2.imread(image_path)


    cv2.imshow("Track", image)
    if cv2.waitKey(50) & 0xFF == ord('q'):
        break
    video_writer.write(image)

# Release the cv2.VideoWriter object
video_writer.release()

## TrackEval

In [None]:
%run TrackEval/scripts/run_mot_challenge.py --BENCHMARK lucas-kanade --DO_PREPROC False #median
%run TrackEval/scripts/run_mot_challenge.py --BENCHMARK lucas-kanade-mean --DO_PREPROC False #mean