In [7]:
# -*- coding: utf-8 -*-
import json
import cv2 as cv
import numpy as np

In [8]:
# part 1:

def load_obj_each_frame(data_file):
    with open(data_file, 'r') as file:
        frame_dict = json.load(file)
    return frame_dict

def draw_target_object_center(video_file, obj_centers):
    A, H, Q, R, P, x = kalman_init(obj_centers[1]) 
    cap = cv.VideoCapture(video_file)
    vidwrite = cv.VideoWriter("part_1_demo.mp4", cv.VideoWriter_fourcc(*'MP4V'), 30, (700, 500))

    observed_track = []
    predicted_track = []
    final_obj_centers = []
    
    flip = 0 
    
    for center in obj_centers:
        ok, image = cap.read()
        if not ok:
            break

        image = cv.resize(image, (700, 500))
        if center[0] != -1:  # Valid measurement
            observed_track.append(center)
            z = np.array(center)
            x_pred, P_pred = predict(A, Q, P, x)
            x, P, K, IM, IS = update(H, R, P_pred, x_pred, z)
            predicted_track.append(x[:2].tolist())
            final_obj_centers.append(center)
        else: 
            x, P = predict(A, Q, P, x)
            predicted_track.append(x[:2].tolist())
            final_obj_centers.append(x[:2].tolist())

        if flip > 0:
            for i in range(1, len(predicted_track)):
                cv.line(image, tuple(map(int, predicted_track[i-1])), tuple(map(int, predicted_track[i])), (255, 0, 0), 2, cv.LINE_AA)
            for i in range(1, len(observed_track)):
                cv.circle(image, observed_track[i], 0, (0,0,255), 2)
        else:
            for i in range(1, len(observed_track)):
                cv.circle(image, observed_track[i], 0, (0,0,255), 2)
            for i in range(1, len(predicted_track)):
                cv.line(image, tuple(map(int, predicted_track[i-1])), tuple(map(int, predicted_track[i])), (255, 0, 0), 2, cv.LINE_AA)
        
        if flip == 2:
            flip = -1
        else:
            flip = flip + 1
            
        vidwrite.write(image)
        
        

    vidwrite.release()
    with open("part_1_object_tracking.json", "w") as outfile:
        json.dump({"obj": final_obj_centers}, outfile)
def kalman_init(initial_x):
    dt = 1
    #state transition matrix
    A = np.array([[1, 0, dt, 0],
                  [0, 1, 0, dt],
                  [0, 0, 1, 0],
                  [0, 0, 0, 1]])
    # Measurement Matrix (H)
    H = np.array([[1, 0, 0, 0],
                  [0, 1, 0, 0]])

    # Process Noise Covariance (Q)
    q = 1e-6
    #precomputed Q for white noise in 4 dimensions with dt = 1 and variance = 2.35
    Q = np.array([[0.06527778, 0.19583333, 0.39166667, 0.39166667], 
                  [0.19583333, 0.5875    , 1.175     , 1.175     ], 
                  [0.39166667, 1.175     , 2.35      , 2.35      ], 
                  [0.39166667, 1.175     , 2.35      , 2.35     ]])
    Q = (Q + np.array([[dt**4/4, 0, dt**3/2, 0],
                      [0, dt**4/4, 0, dt**3/2],
                      [dt**3/2, 0, dt**2, 0],
                      [0, dt**3/2, 0, dt**2]])) * 0.5 * q

    # Measurement Noise Covariance (R)
    r = 1e-3
    R = r * np.eye(2)

    # Error Covariance (P)
    P = np.eye(4) * 0.001 #TEST

    # Initial State (x)
    x = np.array(np.append([initial_x[0],initial_x[1]], [0, 0]))  # Initial guess
    
    return A, H, Q, R, P, x

def predict(A, Q, P, x):
    x_pred = A.dot(x)
    P_pred = A.dot(P).dot(A.T) + Q
    return x_pred, P_pred

def update(H, R, P_pred, x_pred, z):
    IM = H.dot(x_pred) 
    IS = H.dot(P_pred.dot(H.T)) + R
    K = P_pred.dot((H.T).dot(np.linalg.inv(IS)))
    x_upd = x_pred + K.dot(z - IM)
    P_upd = P_pred - K.dot(IS.dot(K.T)) #new
    #P_upd = (np.eye(4) - K.dot(H)).dot(P_pred) #old
    #LH? https://arxiv.org/ftp/arxiv/papers/1204/1204.0375.pdf
    return x_upd, P_upd, K, IM, IS #TEST


frame_dict = load_obj_each_frame("object_to_track.json")
video_file = "commonwealth.mp4"
draw_target_object_center(video_file,frame_dict['obj'])

OpenCV: FFMPEG: tag 0x5634504d/'MP4V' is not supported with codec id 12 and format 'mp4 / MP4 (MPEG-4 Part 14)'
OpenCV: FFMPEG: fallback to use tag 0x7634706d/'mp4v'


In [9]:
class KalmanPather:
    def __init__(self, initial_x):
        self.pvel = 0
        self.vel = 0
        self.accel = 0
        
        self.dt = 1
        #state transition matrix
        self.A = np.array([
                        [1, 0, self.dt, 0],
                        [0, 1, 0, self.dt],
                        [0, 0, 1, 0],
                        [0, 0, 0, 1]])
        # Measurement Matrix (H)
        self.H = np.array([[1, 0, 0, 0],
                      [0, 1, 0, 0]])
    
        # Process Noise Covariance (Q)
        self.q = 1e-6
        #precomputed Q for white noise in 4 dimensions with dt = 1 and variance = 2.35
        self.Q = np.array([[0.06527778, 0.19583333, 0.39166667, 0.39166667], 
              [0.19583333, 0.5875    , 1.175     , 1.175     ], 
              [0.39166667, 1.175     , 2.35      , 2.35      ], 
              [0.39166667, 1.175     , 2.35      , 2.35     ]])
        #average this with analytically derived Q
        self.Q = (self.Q + np.array([[self.dt**4/4, 0, self.dt**3/2, 0],
                          [0, self.dt**4/4, 0, self.dt**3/2],
                          [self.dt**3/2, 0, self.dt**2, 0],
                          [0, self.dt**3/2, 0, self.dt**2]])) * 0.5 * self.q
    
        # Measurement Noise Covariance (R)
        self.r = 1e-3
        self.R = self.r * np.eye(2)
    
        # Error Covariance (P)
        self.P = np.eye(4) * 0.001 #TEST

        # Initial State (x)
        #self.x = np.array(np.append([initial_x[0],initial_x[1]], [0, 0]))  # Initial guess
        self.x = np.array([0,0,0,0])
        
    def predict(self):
        self.x_pred = (self.A).dot(self.x)
        self.P_pred = (self.A).dot((self.P).dot((self.A).T)) + self.Q
        self.x = np.copy(self.x_pred) #TEST
        self.P = np.copy(self.P_pred) #TEST
        self.pvel = self.vel
        self.vel = (self.x_pred - self.x)[:2]
        self.accel = self.vel - self.pvel

    def update(self, z):
        self.IM = (self.H).dot(self.x_pred) 
        self.IS = (self.H).dot((self.P_pred).dot((self.H).T)) + self.R
        self.K = (self.P_pred).dot(((self.H).T).dot(np.linalg.inv(self.IS)))
        self.x_upd = self.x_pred + (self.K).dot(z - self.IM)
        self.P_upd = self.P_pred - (self.K).dot((self.IS).dot((self.K).T)) #new
        self.x = self.x_upd
        self.P = self.P_upd
        return self.x_upd, self.P_upd, self.K, self.IM, self.IS #TEST

In [10]:
def min_zero_row(zeroMatrix, markAsZero):
    smallestRow = [99999, -1] # finds the row needed

    for x in range(zeroMatrix.shape[0]): 
        if np.sum(zeroMatrix[x] == True) > 0 and smallestRow[0] > np.sum(zeroMatrix[x] == True):
            smallestRow = [np.sum(zeroMatrix[x] == True), x]

    # finds a specific row & column and marks it as False
    indexZero = np.where(zeroMatrix[smallestRow[1]] == True)[0][0]
    markAsZero.append((smallestRow[1], indexZero))
    zeroMatrix[smallestRow[1], :] = False
    zeroMatrix[:, indexZero] = False

def mark_matrix(mat):
    # creates a boolean matrix and iteratively identify zeros
    # Seperate rows & colums into different lists and then take the leftover unmarked
    # rows and colums and sort them as well 
    
    # Fucntion will take the matrix and change it into a boolean matrix
    # 0 = True, False o.w.
    current = mat
    boolZero = (current == 0)
    boolZeroMatrix = boolZero.copy() # makes a copy of boolZero

    markedZero = []
    while (True in boolZeroMatrix): #Recording possible answer positions by markedZero
        min_zero_row(boolZeroMatrix, markedZero)
        
    #Recording the row and column positions seperately.
    rowMarkedZero = []
    colMarkedZero = []
    
    for i in range(len(markedZero)):
        rowMarkedZero.append(markedZero[i][0])
        colMarkedZero.append(markedZero[i][1])
        
    unmarkedRows = list(set(range(current.shape[0])) - set(rowMarkedZero))
    markedColums = []
    
    checking = True
    while checking:
        checking = False
        for i in range(len(unmarkedRows)):
            rowArr = boolZero[unmarkedRows[i], :]
            for j in range(rowArr.shape[0]):
                if rowArr[j] == True and j not in markedColums:
                    markedColums.append(j)
                    checking = True

        for z, columnNum in markedZero:
            if z not in unmarkedRows and columnNum in markedColums:
                unmarkedRows.append(z)
                checking = True
    markedRows = list(set(range(mat.shape[0])) - set(unmarkedRows))
    return(markedZero, markedRows, markedColums)

def adjustMatrix(mat, rowsCovered, columsCovered):
    # Creates a copy of the input matrix (mat) => find the minimum value among the 
    # undiscovered/uncovered elements
    # subtracts the minimum value from uncovered elements and then add
    # the minimum value to covered elements
    # after all those task are complete it will returns a modified matrix
    
    current = mat
    element = [] #contains a non zero element

    for row in range(len(current)):
        if row not in rowsCovered:
            for i in range(len(current[row])):
                if i not in columsCovered:
                    element.append(current[row][i])
    miniNum = min(element)
    
    for row in range(len(current)):
        if row not in rowsCovered:
            for i in range(len(current[row])):
                if i not in columsCovered:
                    current[row, i] = current[row, i] - miniNum

    for row in range(len(rowsCovered)):  
        for col in range(len(columsCovered)):
            current[rowsCovered[row], columsCovered[col]] = current[rowsCovered[row], columsCovered[col]] + miniNum
    return current

def HungarianAlg(mat): 
    # Hungarian Algorithm; 
    # Takes a square matrix (mat) representing costs as input and creates a copy (current) 
    # to iterate through. Subtracts the minimum value in each row and column from respective 
    # elements to create zeros without affecting optimal output
    # Loops until a complete assignment is found & then returns a possible answer
    
    dim = mat.shape[0]
    current = mat

    for y in range(mat.shape[0]): 
        current[y] = current[y] - np.min(current[y]) 
        # will take every row & column and subtract it from internal minium
        
    for columnNum in range(mat.shape[1]): 
        current[:,columnNum] = current[:,columnNum] - np.min(current[:,columnNum])
    count = 0
    while count < dim:

        possibleAnswer, markedRows, markedColums = mark_matrix(current)
        count = len(markedRows) + len(markedColums)

        if count < dim:
            current = adjustMatrix(current, markedRows, markedColums)
    return possibleAnswer

def answerCalc(mat, pos):
    # calculates a total value based on specific positions in the a given matrix and 
    # creates an answer matrix highlighting those positions
    total = 0
    answerMatrix = np.zeros((mat.shape[0], mat.shape[1]))
    for i in range(len(pos)):
        total += mat[pos[i][0], pos[i][1]]
        answerMatrix[pos[i][0], pos[i][1]] = mat[pos[i][0], pos[i][1]]
    return total, answerMatrix

In [None]:
# part 2:


def load_obj_each_frame(file_name):
    with open(file_name, 'r') as f:
        return json.load(f)

def calculate_cost_matrix(last_frame_objs, current_frame_objs, predicts):
    cost_matrix = np.zeros((len(last_frame_objs), len(current_frame_objs)))
    for i, last_obj in enumerate(last_frame_objs):
        for j, current_obj in enumerate(current_frame_objs):
            # Calculate Euclidean distance between the centers of the bounding boxes
            last_center = (last_obj['x_min'] + last_obj['width'] / 2, last_obj['y_min'] + last_obj['height'] / 2)
            current_center = (current_obj['x_min'] + current_obj['width'] / 2, current_obj['y_min'] + current_obj['height'] / 2)
            dist = np.sqrt((last_center[0] - current_center[0]) ** 2 + (last_center[1] - current_center[1]) ** 2)
            p = predicts[last_obj['id']]
            KalmanDist = np.sqrt((p.x[:2][0] - current_center[0]) ** 2 + (p.x[:2][1] - current_center[1]) ** 2)
            cost_matrix[i, j] = KalmanDist
            
            if KalmanDist > 25:
                cost_matrix[i, j] = 9999999 # If bounding box count in consecutive frames is the same but cars different.
            v = tuple(map(lambda i, j: i - j, last_center, current_center))# vector to car
            if p.vel.any() and v != (0,0) and np.arccos(np.clip(np.dot(p.vel/np.linalg.norm(p.vel), v/np.linalg.norm(v)), -1.0, 1.0)) > 3.14159265/2:
                cost_matrix[i, j] = 9999999
                
            
    dim = max(cost_matrix.shape)
    for r in range(dim-cost_matrix.shape[0]):
        cost_matrix = np.r_[cost_matrix, [np.full(cost_matrix.shape[1], 9999999)]]
    for c in range(dim-cost_matrix.shape[1]):
        cost_matrix = np.c_[cost_matrix, np.full(cost_matrix.shape[0], 9999999)]
    return cost_matrix

def draw_object_with_id(object_dict, image, color=(0, 255, 0), thickness=2, c_color=(255, 0, 0)):
    # Draw the bounding box
    x, y, width, height = object_dict['x_min'], object_dict['y_min'], object_dict['width'], object_dict['height']
    cv.rectangle(image, (x, y), (x + width, y + height), color, thickness)
    # Draw the ID
    object_id = object_dict.get('id', 'N/A')  # Use 'N/A' if ID is not assigned
    cv.putText(image, f'ID: {object_id}', (x, y - 10), cv.FONT_HERSHEY_SIMPLEX, 0.5, c_color, thickness)

def draw_objects_in_video(video_file, frame_dict):
    cap = cv.VideoCapture(video_file)
    ok, image = cap.read()
    vidwrite = cv.VideoWriter("part_2_demo.mp4", cv.VideoWriter_fourcc(*'MP4V'), 30, (700, 500))
    last_frame_objs = []
    known_frame_objs = []
    output_frame_dict = {}
    
    cnt = 0
    frame_num = 0  
    predict_paths = {} # TRY
    while ok:
        image = cv.resize(image, (700, 500))
        frame_num = str(int(cap.get(cv.CAP_PROP_POS_FRAMES)) - 1)
        current_frame_objs = frame_dict.get(frame_num, [])
        for i in range(len(current_frame_objs)): 
            if current_frame_objs[i].get('id', 'N/A') == 'N/A':
                current_frame_objs[i]['id'] = cnt
                cnt += 1
                    
        if known_frame_objs:
            cost_matrix = calculate_cost_matrix(known_frame_objs, current_frame_objs, predict_paths)
            matching = HungarianAlg(cost_matrix.copy())
            # Assign IDs based on the Hungarian algorithm's result
            for row, col in matching:
                if cost_matrix[row, col] < 999 and col < len(current_frame_objs) and row < len(known_frame_objs):  # Threshold to determine if assignment should be done
                    current_frame_objs[col]['id'] = known_frame_objs[row].get('id')
                    known_frame_objs[row] = current_frame_objs[col]
                    cnt += 1

        for obj in current_frame_objs:
            draw_object_with_id(obj, image)

        output_frame_dict[frame_num] = current_frame_objs
        
        last_frame_objs = current_frame_objs.copy() # OLD
        
        finds = [] #TRY
        for c in current_frame_objs:
            found = False
            for k in known_frame_objs:
                if c['id'] == k['id']:
                    found = True
                    finds.append(c)
            if not found: 
                known_frame_objs = known_frame_objs + [c] # NEW
                predict_paths[c['id']] = KalmanPather((c['x_min']+c['width']/2, c['y_min']+c['height']/2)) #TRY
                for t in range(50):
                    predict_paths[c['id']].predict() # Stabilize
                    predict_paths[c['id']].update((c['x_min']+c['width']/2, c['y_min']+c['height']/2)) # Stabilize
        
        for k in known_frame_objs:
            p = predict_paths[k['id']] #TRY
            p.predict() #TRY
            x = p.x
            for f in finds:
                if f['id'] == k['id']:
                    x, P, K, IM, IS = p.update((f['x_min']+f['width']/2, f['y_min']+f['height']/2)) #TRY
            #cv.circle(image, (int(x[:2][0].astype(int)), int(x[:2][1].astype(int))), 10, (0,0,255), -1) # Visualize Kalman
            #key = k['id']
            #cv.putText(image, f'ID: {str(key)}', (int(x[:2][0]), int(x[:2][1]) - 10), cv.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,0), 2)
        vidwrite.write(image)
        ok, image = cap.read()
        
        
    vidwrite.release()
    cap.release()

    with open('part_2_frame_dict.json', 'w') as f:
        json.dump(output_frame_dict, f)

frame_dict = load_obj_each_frame("frame_dict.json")
video_file = "commonwealth.mp4"
draw_objects_in_video(video_file, frame_dict)

OpenCV: FFMPEG: tag 0x5634504d/'MP4V' is not supported with codec id 12 and format 'mp4 / MP4 (MPEG-4 Part 14)'
OpenCV: FFMPEG: fallback to use tag 0x7634706d/'mp4v'
