In [1]:
import time
import cv2
import pickle as pkl
from matplotlib import pyplot as plt
import numpy as np
import pandas as pd
from tqdm import tqdm
from IPython import display as dp
from collections import deque
import matplotlib.animation as animation

In [2]:
def drawTrackingOnImage(img, bbox, track=0, line=[], colour=(0, 255, 0)):
    b, g, r = colour
    img = cv2.rectangle(img, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (int(b), int(g), int(r)), 3)
    img = cv2.putText(img, str(track), (bbox[0], bbox[1] - 10),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.8, (int(b), int(g), int(r)), 3)
    for i in range(1, len(line)):
        img = cv2.line(img, line[i - 1], line[i], (int(b), int(g), int(r)), 3)
    return img

In [3]:
def get_box_center(bbox):
    center = (int(bbox[0]) + ((int(bbox[2]) - int(bbox[0])) // 2)), (int(bbox[1]) + ((int(bbox[3]) - int(bbox[1])) // 2)) # X + Width / 2, Y + Height / 2
    return center

In [4]:
# Detection Pre-Processing 

# Remove overlaps in the same frame
def remove_overlaps(detections_pd, tolerance=0.9):
    row_to_remove = []
    for detection in detections_pd.get('detection'):
        length, _ =detections_pd.shape
        for i in range(length):
            IoU = detection.IoU(detections_pd.iloc[i]['detection'])
            if IoU > tolerance and IoU < 1:
                row_to_remove.append(i)
                
    row_to_remove = np.unique(np.array(row_to_remove))
    detections_pd = detections_pd.drop(index=row_to_remove)
        
    return detections_pd

In [5]:
# Detection to DataFrame
def get_detection_dataframe(detections):
    bboxes = []
    bsizes = []
    lines = deque(maxlen=32)
    bdetections = []
    tracks = list(range(0, len(detections)))
    colours = []
    for i in range(len(detections)):
        colours.append(tuple(np.random.choice(range(256), size=3).astype('int')))
        
    updated = [False]*len(detections)
    
    for detection in detections:
        bbox = np.array(detection.getBBox()).astype('int')
        bboxes.append(bbox)
        
        centers = []
        centers.append(get_box_center(bbox))
        lines.append(centers)

        bsize = int(detection.areaOfRec())
        bsizes.append(bsize)

        bdetections.append(detection)

    detec = {
        'track': tracks,
        'detection': bdetections,
        'bbox': bboxes,
        'size': bsizes,
        'line': lines,
        'colour': colours,
        'updated': updated
    }
    detections_pd = pd.DataFrame(detec)
    detections_pd = detections_pd.sort_values(by=['size'], ascending=False)
    detections_pd = detections_pd.reset_index(drop=True)
    
    return detections_pd

In [6]:
def update_track(detections_pd, next_detections_pd, tolerance=0.5):
    detections_pd['updated'] = False
    detections_pd = detections_pd.reset_index(drop=True)
    
    # Loop each new detection
    for index, next_detection in next_detections_pd.iterrows():
        length, _ = detections_pd.shape
        
        # Find overlaps with max IoU and update if found
        IoUlist = []
        for i in range(length):
            IoU = next_detection['detection'].IoU(detections_pd.iloc[i]['detection'])
            IoUlist.append(IoU)
            
        indexMax = IoUlist.index(max(IoUlist))
            
        if max(IoUlist) > 0.5 and detections_pd.at[indexMax,'updated'] != True:
            detections_pd.at[indexMax,'detection'] = next_detection['detection']
            detections_pd.at[indexMax,'bbox'] = next_detection['bbox']
            detections_pd.at[indexMax,'size'] = next_detection['size']
            detections_pd.at[indexMax,'line'].append(next_detection['line'][0])
            detections_pd.at[indexMax,'updated'] = True
            next_detections_pd.at[index, 'updated'] = True
    
    # Drop detections no longer exist
    detections_pd = detections_pd[detections_pd['updated'] == True]
                
    # Start tracking new detections
    counter = 0
    if any(next_detections_pd['updated'] == False):
        new_pd = next_detections_pd[next_detections_pd['updated']==False]
        new_pd = new_pd.reset_index(drop=True)
        
        # Generate new track number      
        for i in range(len(new_pd)):
            while counter in detections_pd['track'].tolist():
                counter = counter + 1
            new_pd.at[i, 'track'] = counter
            counter = counter + 1

        # Add new tracks
        detections_pd = pd.concat([detections_pd, new_pd])
            
    detections_pd = detections_pd.reset_index(drop=True)
    return detections_pd

In [7]:
# Load paths
# detection_path = 'detection_pkls/retinanet_101_detections.pkl'
detection_path = 'detection_pkls/maskRCNN_101_detections.pkl'
data_path = '../datasets/AICity_data/train/S03/c010/'

# Load detections
with open(detection_path , 'rb') as f:
    all_detections = pkl.load(f)

In [8]:
display = True
gif = False

if gif:
    fig, ax = plt.subplots()
    plt.axis('off')

ims = []
detection_history = []

# Get the First Frame
detections_pd = get_detection_dataframe(all_detections['0'])

# Pre-process the first frame if needed
detections_pd = remove_overlaps(detections_pd, 0.4)
detections_pd
detection_history.append(detections_pd)

# Load video
vidcap = cv2.VideoCapture(data_path + 'vdo.avi')
vidcap.set(1,500)
_, image = vidcap.read()
num_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))

# Set plot
if display:
    plt.ion() 
    plt.figure(figsize=(20, 12))
    plt.axis('off') 

# Iterate Frames
# skip = 1
# for frame in tqdm(range(1,num_frames//skip)):
#     for i in range(skip):
#         _, image = vidcap.read()
#     frame = frame*skip

for frame in tqdm(range(500,1200)):
    _, image = vidcap.read()
    
    next_detections_pd = get_detection_dataframe(all_detections[str(frame)])
    next_detections_pd = remove_overlaps(next_detections_pd, 0.4)
    detections_pd = update_track(detections_pd, next_detections_pd, tolerance=0.5)
    detection_history.append(detections_pd)
    
    for index, row in detections_pd.iterrows():
        image = drawTrackingOnImage(image, row['bbox'], track=row['track'], line=row['line'],colour=row['colour'])
        
    if display:
        plt.title('Frame:'+str(frame))
        plt.imshow(image)  
    
    if gif:
        im = ax.imshow(image, animated=True)
        ims.append([im])
        
    if display:
        dp.clear_output(wait=True)
        dp.display(plt.gcf())
        time.sleep(0.000001)
        plt.cla()

if gif:
    ani = animation.ArtistAnimation(fig, ims, interval=10, blit=True, repeat_delay=10000)
    ani.save('Tracking_with_IoU_overlap_700frames' + ".gif", writer=animation.PillowWriter(fps=24))
    
# Open a file and use dump()
# with open('tracking_history/tracking_history.pkl', 'wb') as file:
#     # A new file will be created
#     pkl.dump(detection_history, file)

SyntaxError: invalid syntax (2134255944.py, line 42)

# TEST

In [None]:
# with open('tracking_history/tracking_history.pkl', 'rb') as file:
#     # Call load method to deserialze
#     tracking_hist = pkl.load(file)

In [None]:
# tracking_hist[100]

In [None]:
# for index, row in detections_pd.iterrows():
#     print(row['colour'])
#     image = drawRectangleOnImage(image, row['bbox'], track=row['track'],colour=row['colour'])

In [None]:
# plt.figure(figsize=(20, 12))
# plt.imshow(image)

In [None]:
#Test......
# test_num = 396
# detections_pd = get_detection_dataframe(all_detections[str(test_num)])
# next_detections_pd = get_detection_dataframe(all_detections[str(test_num+1)])

In [None]:
# detections_pd

In [None]:
# next_detections_pd

In [None]:
# detections_pd = update_track(detections_pd, next_detections_pd, tolerance=0.5)
# detections_pd

In [None]:
# next_detections_pd