In [None]:
import os
import cv2
import json
import matplotlib.pyplot as plt
import tensorflow as tf
from PIL import Image
import numpy as np
from sklearn.model_selection import train_test_split
from datasets import load_dataset
from pycocotools.coco import COCO
from scipy.optimize import linear_sum_assignment

#ds = load_dataset("chengyenhsieh/TAO-Amodal")

In [None]:
#Extract video frames
def extract_videoframes(videoPath):
    capture = cv2.VideoCapture(videoPath)
    frames = []
    while capture.isOpened():
        ret, frame = capture.read()
        if not ret:
            print("Error: cannot receive frame")
            break
        frames.append(frame)
    capture.release()
    return frames
    

In [None]:
coco_images_dir = "/Users/ivanng/dataset/coco-2017/train/data"
coco_ann_file_caption = "/Users/ivanng/dataset/coco-2017/raw/captions_train2017.json"
coco_ann_file_detection = "/Users/ivanng/dataset/coco-2017/raw/instances_train2017.json"

#Preprocess images
def preprocessImage(image, targetSize=(128,128), paddingType='zero'):
    """
    Resize and pad an image to target size
    Args:
        image: input image ready for preprocessing
        targetSize: resize the image to the ideal size
        paddingType: type of padding would implement
    Returns:
        imagePadded: padded image
        scale: scale factor for image width and height
        x_offset, y_offset: padding offsets
    """
    height, width = image.shape[:2]
    scale = min(targetSize[0]/height, targetSize[1]/width)
    scaledHeight, scaledWidth = int(height*scale), int(width*scale)
    x_offset = (targetSize[1]-scaledWidth)//2
    y_offset = (targetSize[0]-scaledHeight)//2
    #Resize image
    resizedImage = cv2.resize(image, (scaledWidth, scaledHeight))
    #Create padding for resized image
    if paddingType=='zero':
        imagePadded = np.zeros((targetSize[0], targetSize[1], 3), dtype=np.uint8)
    elif paddingType=='mirror':
        imagePadded = cv2.copyMakeBorder(resizedImage, y_offset, targetSize[0]-scaledHeight-y_offset, x_offset, targetSize[1]-scaledWidth-x_offset, borderType=cv2.BORDER_REFLECT)
        return imagePadded/255, scale, x_offset, y_offset
    elif paddingType=="replicate":
        imagePadded = cv2.copyMakeBorder(resizedImage, y_offset, targetSize[0]-scaledHeight-y_offset, x_offset, targetSize[1]-scaledWidth-x_offset, borderType=cv2.BORDER_REPLICATE)
        return imagePadded/255, scale, x_offset, y_offset
    imagePadded[y_offset:y_offset+scaledHeight, x_offset:x_offset+scaledWidth] = resizedImage
    #Normalise to [0, 1]
    imagePadded = imagePadded/255
    return imagePadded, scale, x_offset, y_offset

#Preprocess bounding boxes
def preprocessBboxes(bboxes, scale, x_offset, y_offset):
    """
    Adjust bboxes after resized image
    Args:
        bboxes: bounding boxes for preprocessing
        scale: scale factor
        x_offset, y_offset: padding offsets
    Returns:
        newBboxes: a list of bounding boxes
    """
    newBboxes = []
    for bbox in bboxes:
        x, y, width, height = bbox
        xStart = x*scale+x_offset
        yStart = y*scale+y_offset
        xEnd = xStart+width*scale
        yEnd = yStart+height*scale
        newBboxes.append([xStart, yStart, xEnd, yEnd])
    return newBboxes

#Get entire preprocess data
def getImageAnnotations(coco_ann, coco_images_dir, targetSize=(128, 128), paddingType='zero'):
    """
    Preprocess MS COCO dataset
    Args:
        coco_ann: json annotation file directory
        coco_images_dir: image directory
        targetSize: size of standardised image
        paddingType: type of padding used
    Returns:
        data: a list of preprocessed images and annotations
    """
    coco = COCO(coco_ann)
    data = []
    for imageID in coco.getImgIds():
        imageInfo = coco.loadImgs(imageID)[0]
        #Load image
        imagePath = os.path.join(coco_images_dir, imageInfo['file_name'])
        image = cv2.imread(imagePath)
        if image is None: continue
        #Invoke image preprocess
        preprocessedImage, scale, x_offset, y_offset = preprocessImage(image, targetSize, paddingType)
        #Extract annotations for corresponding image
        annotations = coco.loadAnns(coco.getAnnIds(imgIds=imageID))
        #Obtain labels, bboxes
        bboxes = []
        labels = []
        for ann in annotations:
            #Invoke bboxes preprocess
            preprocessedBboxes = preprocessBboxes([ann['bbox']], scale, x_offset, y_offset)[0]
            bboxes.append(preprocessedBboxes)
            labels.append(ann['category_id'])
        data.append((preprocessedImage, np.array(bboxes), np.array(labels)))
    return data

#TEMPORARY: visualisation of bboxes and labels on images
def visualise(image, bboxes, labels):
    image = (image*255).astype(np.uint8)
    for i, box in enumerate(bboxes):
        x1, y1, x2, y2 = map(int, box)
        label = labels[i]
        cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 1)
        cv2.putText(image, labels[i], (x1, y1-2), cv2.FONT_HERSHEY_SIMPLEX, 0.2, (0, 255, 0), 1)
    plt.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
    plt.axis("off")
    plt.show()

In [None]:
#Real-time detection transformer framework
def RT_DETR(numClasses, inputShape=(128,128,3)):
    """
    RT-DETR framework
    Args:
        numClasses: number of object classes to classify
        inputShape: input image dimensions
    Returns:
        model: real-time detection transformer model
    """
    inputs = tf.keras.Input(shape=inputShape)
    #CNN backbone
    x = tf.keras.layers.Conv2D(32, (3, 3), activation='relu', padding='same')(inputs)
    x = tf.keras.layers.MaxPooling2D((2,2))(x)
    x = tf.keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    x = tf.keras.layers.MaxPooling2D((2,2))(x)
    #Flatten feature map for encoder-decorder
    featureMap = tf.keras.layers.Reshape((-1, x.shape[-1]))(x)
    #Encoder
    encoder = tf.keras.layers.MultiHeadAttention(num_heads=4, key_dim=64)(featureMap, featureMap)
    encoder = tf.keras.layers.LayerNormalisation()(encoder)
    #Query selection
    query = tf.keras.layers.Dense(64)(tf.keras.layers.Flatten()(featureMap))
    #Decoder
    decoder = tf.keras.layers.MultiHeadAttention(num_heads=4, key_dim=64)(query, encoder)
    decoder = tf.keras.layers.LayerNormalisation()(decoder) 
    #Detection heads
    outputBbox = tf.keras.layers.Dense(4, activation='sigmoid')(decoder)
    outputClass = tf.keras.layers.Dense(numClasses, activation='softmax')(decoder)
    
    model = tf.keras.Model(inputs=inputs, outputs=[outputBbox, outputClass])
    return model
    
#Re-ID network
def Re_ID(inputShape=(64,64,3)):
    """
    Re-Identification network
    Args:
        inputShape: dimensions of cropped input images
    Returns:
        model: Re-Identification network
    """
    inputs = tf.keras.layers.Input(shape=inputShape)
    x = tf.keras.layers.Conv2D(32, (3, 3), activation='relu', padding='same')(inputs)
    x = tf.keras.layers.MaxPooling2D((2,2))(x)
    x = tf.keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    x = tf.keras.layers.Flatten()(x)
    embeddings = tf.keras.layers.Dense(128, activation='relu')(x)
    model = tf.keras.Model(inputs=inputs, outputs=embeddings)
    return model

In [None]:
class DeepSORT:
    def __init__(self, ReID, iou_threshold, max_age):
        """
        Initialise Deep SORT tracker, combines IoU matching and Re-Identification embeddings to track objs across frames
        Args:
            ReID: trained Re-Identification network
            iou_threshold: IoU threshold for matching detections
            max_age: maximum frames for retaining unmatched tracks
        """
        self.model = ReID
        self.iouThreshold = iou_threshold
        self.maxAge = max_age
        self.tracks = []
        self.nextTrackID = 1
    def iou(self, box1, box2):
        """
        Compute intersection over union between two bboxes
        Args:
            box1, box2: two input bboxes
        Returns:
            iou: IoU value
        """
        #Compute intersection coordinates
        x1 = max(box1[0], box2[0])
        y1 = max(box1[1], box2[1])
        x2 = min(box1[2], box2[2])
        y2 = min(box1[3], box2[3])
        #Area of intersection
        intersects = max(0, x2-x1)*max(0, y2-y1)
        #Area of both two bboxes
        box1Area = (box1[2]-box1[0])*(box1[3]-box1[1])
        box2Area = (box2[2]-box2[0])*(box2[3]-box2[1])
        #Compute IoU value
        iou = intersects/float(box1Area+box2Area-intersects)
        return iou
    def trackMatch(self, detections):
        """
        Args:
            detections: a list of detection bboxes
        Returns:
            matchedTracks: a list of matched tracks
            unmatchedTracks: a list of unmatched tracks
            unmatchedDetections: a list of unmatched bboxes detections
        """
        if len(self.tracks)==0: return [], list(range(len(detections))), []
        #Cost matrix
        iouMat = np.zeros((len(self.tracks), len(detections)))
        for i, track in enumerate(self.tracks):
            for j, detect in enumerate(detections):
                iouMat[i, j] = self.iou(track['bbox'], detect)
        #Hungarian maximum matching algorithm
        rowIndices, columnIndices = linear_sum_assignment(-iouMat)
        
        matchedTracks, unmatchedTracks, unmatchedDetections = [], [], list(range(len(detections)))
        #Matching detection
        for rIdx, cIdx in zip(rowIndices, columnIndices):
            if iouMat[rIdx,cIdx]<self.iouThreshold:
                unmatchedTracks.append(rIdx)
                unmatchedDetections.append(cIdx)
            else:
                matchedTracks.append((rIdx, cIdx))
                unmatchedDetections.remove(cIdx)
        #Add Unmatched tracks
        unmatchedTracks += list(set(range(len(self.tracks)))-set(rowIndices))
        return matchedTracks, unmatchedTracks, unmatchedDetections
    def update(self, detections, embeddings):
        """
        Update tracking with detections and embeddings
        Args:
            detections: a list of detection bboxes
            embeddings: a list of Re-Identification embeddings for each detection
        Returns:
            a list of updated trackings
        """
        #Match detections to existing trackings
        matchedTracks, unmatchedTracks, unmatchedDetections = self.trackMatch(detections)
        #Update matched trackings
        for trackIdx, detectionIdx in matchedTracks:
            self.tracks[trackIdx]['bbox'] = detections[detectionIdx]
            self.tracks[trackIdx]['embedding'] = embeddings[detectionIdx] 
            self.tracks[trackIdx]['age'] = 0
        #Age unseen trackings
        for trackIdx in unmatchedTracks:
            self.tracks[trackIdx]['age'] += 1
        #Remove any unnecessary trackings 
        self.tracks = [track for track in self.tracks if track['age']<=self.maxAge]
        #Add new trackings
        for detectionIdx in unmatchedDetections:
            self.tracks.append({'id':self.nextTrackID, 'bbox':detections[detectionIdx], 'embedding':embeddings[detectionIdx], 'age':0})
            self.nextTrackID+=1
        return self.tracks

In [None]:
#Temporary execution
coco, categoryMap = loadCocoAnnotations(coco_ann_file_detection)
data = getImageAnnotations(coco, coco_images_dir, categoryMap)

In [None]:
data