In [1]:
import numpy as np
import pandas as pd
import cv2
import os
import math
import glob
import random

from scipy.optimize import linear_sum_assignment

## Kalman Filter Class

In [2]:
class KF(object):
    def __init__(self):
        self.state = np.array([[0.],[0.]])  # previous state vector  #x[t-1]
        self.dt = 0.005  # delta time
        self.F = np.array([[1.0, self.dt], [0.0, 1.0]])

        self.P = np.array([[3., 0.],[0., 3.]])  # covar matrix
        self.H = np.array([[1, 0], [0, 1]])  # observation matrix
        self.obs = np.array([[0], [255]])  # vector of observations #centroid
        
        self.Q = np.eye(self.state.shape[0])
        self.R = np.eye(self.obs.shape[0])
        self.prev_state = np.array([[0], [0]])

    def predict(self):
        self.state = np.dot(self.F, self.state)
        self.P = np.dot(self.F, np.dot(self.P, self.F.T)) + self.Q
        
        self.prev_state = self.state
        return self.state

    def correct(self, detection, flag):
        if not flag:  # correct using prediction
            self.obs = self.prev_state
        else:  # correct using detection
            self.obs = detection
            
        K = np.dot(self.P, np.dot(self.H.T, np.linalg.inv(np.dot(self.H, np.dot(self.P, self.H.T)) + self.R)))
        z = self.obs + np.random.rand(2,1)
        
        self.state = self.state + np.dot(K, (z - np.dot(self.H, self.state)))
        self.P = self.P - np.dot(K, np.dot(self.H, self.P))
        
        self.prev_state = self.state
        return self.state

## Tracker and track classes

In [3]:

class Track(object):

    def __init__(self, pred, track_id):
        self.KF = KF()
        self.pred = np.asarray(pred)
        self.track_id = track_id

        self.skipped_frames = 0
        self.trace = [] # trace path
        
class Tracker(object):

    def __init__(self, dist_thresh, max_skip_frames, max_trace_len, track_id):

        self.dist_thresh = dist_thresh
        self.max_skip_frames = max_skip_frames
        self.max_trace_len = max_trace_len
        self.track_id = track_id
        self.tracks = []

    # Calculate cost matrix using sum of square distance between predicted vs detected centroids
    def getCentroidLoss(self, centroids):
        N = len(self.tracks)
        M = len(centroids)
        cost = np.zeros(shape=(N, M)) # cost matrix
        for i in range(len(self.tracks)):
            for j in range(len(centroids)):
                try:
                    diff = self.tracks[i].pred - centroids[j]
                    distance = np.sqrt(diff[0][0]*diff[0][0] + diff[1][0]*diff[1][0])
                    cost[i][j] = distance
                except:
                    pass

        return cost/2
    
    # Hungarian Algorithm for solving measurement assignments
    def assignMeasurements(self, cost):
        assignments = [-1 for i in range(len(self.tracks))]
        row_ind, col_ind = linear_sum_assignment(cost) # "find a complete assignment of workers to jobs of minimal cost"
        # Returns row index/tracks and col_index/detections
        
        for i in range(len(row_ind)):
            assignments[row_ind[i]] = col_ind[i] #assign given tracks to detections
        
        return assignments

    # Identify unassigned tracks
    def getUnassignedTracks(self, cost, assignments):
        unassigned_tracks = []
        for i in range(len(assignments)):
            if (assignments[i] != -1): # appears empty
                if (cost[i][assignments[i]] > self.dist_thresh):
                    assignments[i] = -1
                    unassigned_tracks.append(i)
                pass
            else:
                self.tracks[i].skipped_frames += 1

        return unassigned_tracks
    
    # Remove undetected tracks
    def pruneUndetectedTracks(self, assignments):
        to_del = []
        for i in range(len(self.tracks)):
            if (self.tracks[i].skipped_frames > self.max_skip_frames):
                to_del.append(i)
                
        if len(to_del) > 0: # there are undetected tracks
            for id in to_del:
                if id < len(self.tracks):
                    del self.tracks[id]
                    del assignments[id]

        return to_del, assignments
    
    # Detects if there are new tracks and inits them
    def initNewTracks(self, centroids, assignments):
        # If # centroids are > # tracks, there are unidentified tracks
        unassigned_objects = []
        for i in range(len(centroids)):
                if i not in assignments:
                    unassigned_objects.append(i)

        # Initialization
        if(len(unassigned_objects) != 0):
            for i in range(len(unassigned_objects)):
                track = Track(centroids[unassigned_objects[i]],self.track_id)
                self.track_id += 1
                self.tracks.append(track)
    
        return unassigned_objects
    
    # Main Tracker UPDATE func
    def update(self, centroids):
        # no tracks yet
        if (len(self.tracks) == 0):
            for i in range(len(centroids)):
                track = Track(centroids[i], self.track_id)
                self.tracks.append(track)
                self.track_id += 1

        # begin update 
        cost = self.getCentroidLoss(centroids)
        assignments = self.assignMeasurements(cost)
        unassigned_tracks = self.getUnassignedTracks(cost, assignments)
        del_tracks, assignments = self.pruneUndetectedTracks(assignments)
        unassigned_objects = self.initNewTracks(centroids, assignments)

        # update kf state, prev_state and all traces
        for i in range(len(assignments)):
            self.tracks[i].KF.predict()

            if(assignments[i] != -1):
                self.tracks[i].skipped_frames = 0
                self.tracks[i].pred = self.tracks[i].KF.correct(centroids[assignments[i]], 1)
            else:
                self.tracks[i].pred = self.tracks[i].KF.correct(np.array([[0], [0]]), 0)

            if(len(self.tracks[i].trace) > self.max_trace_len):
                for j in range(len(self.tracks[i].trace) - self.max_trace_len):
                    del self.tracks[i].trace[j]

            self.tracks[i].trace.append(self.tracks[i].pred)
            self.tracks[i].KF.prev_state = self.tracks[i].pred


## Misc helper funcs

In [4]:
def getColor():
    color = (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))
    return color

In [5]:
def threshold(x, img):
    # Convert into binary image using thresholding
    # Documentation for threshold: http://docs.opencv.org/modules/imgproc/doc/miscellaneous_transformations.html?highlight=threshold#threshold
    # Example of thresholding: http://docs.opencv.org/doc/tutorials/imgproc/threshold/threshold.html
    thres_output = cv2.adaptiveThreshold(img,255,cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY,17,-5)

    # Find contours
    # Documentation for finding contours: http://docs.opencv.org/modules/imgproc/doc/structural_analysis_and_shape_descriptors.html?highlight=findcontours#findcontours
    contours, hierarchy = cv2.findContours(thres_output, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
    
    return contours, thres_output

In [6]:
def getBatLocs(hw_dir):
    loc_dir = os.path.join(hw_dir, "Localization")
    bat_locs = []

    for fname in sorted(os.listdir(loc_dir)):
        loc = pd.read_csv(os.path.join(loc_dir, fname), header=None)
        xlist = loc[0].to_numpy()
        ylist = loc[1].to_numpy()
        locs = np.array([np.array([[xlist[i]],[ylist[i]]]) for i in range(len(xlist))])
        bat_locs.append(locs)
        
    return bat_locs

In [7]:
## Gets centers/centroids from a given frame
def getCenters(frame):
    rad_thresh = 14
    centers = []

    contours, hierarchy = cv2.findContours(frame, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
#     print(len(contours))

    for cnt in contours:
        if cv2.contourArea(cnt) < 14 and cv2.contourArea(cnt) > 1600:
            continue

        (x, y), radius = cv2.minEnclosingCircle(cnt)
        centroid = (int(x), int(y))
        radius = int(radius)
        if (radius > rad_thresh):
#             cv2.circle(org, centroid, radius, (0, 255, 0), 2)
            c = np.array([[x],[y]])
            centers.append(c)

    return centers

In [8]:
# img_idx = 0
# thresh = 128
# max_thresh = 255

# if ftype == "gray": frames = gray_frames.copy()
# elif ftype == "false": frames = false_frames.copy()
# else: frames = cell_frames.copy()

# w_frame,h_frame,c_frame = frames[0].shape

## Putting it all together

In [9]:
hw_dir = "/Users/jasonli/Desktop/BU/Junior/Spring2021/CS585/homeworks/hw5/"
bat_gray_path = os.path.join(hw_dir, "CS585-BatImages/Gray/*.ppm")
bat_false_path = os.path.join(hw_dir, "CS585-BatImages/FalseColor/*.ppm")
cell_path = os.path.join(hw_dir, "CS585-Cells/*.tif")
bat_centers = getBatLocs(hw_dir)

filenames = glob.glob(cell_path)
filenames.sort()
images = [cv2.imread(img) for img in filenames]

In [10]:
backSub = cv2.createBackgroundSubtractorMOG2()
ekernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE,(5,5))
dkernel = cv2.getStructuringElement(cv2.MORPH_CROSS,(5,5)) 

tracker = Tracker(45,6,16,1) #  dist_thresh, max_skip_frames, max_trace_len, track_id
track_colors = [getColor() for i in range(14)]

count = 0
video = []
# cell_vid = []

for items in range(len(images)):
#     print(count)
    frame = images[count]
    frame = cv2.resize(frame,(500,500))
#     cell_vid.append(cv2.resize(images[count],(500,500)))
    
    ## Process/augment frame
    gray = cv2.cvtColor(frame,cv2.COLOR_BGR2GRAY)
    sub = backSub.apply(gray)
    blur = cv2.blur(sub, (3, 3))
    erode = cv2.erode(blur,ekernel,iterations = 1)
    dilate = cv2.dilate(erode,dkernel,iterations = 6)
    
    ## Draw contours
    centers = getCenters(dilate)
#     centers = bat_centers.copy()
    tracker.update(centers)
    for i in range(len(tracker.tracks)):
            if (len(tracker.tracks[i].trace) > 1):
                for j in range(len(tracker.tracks[i].trace) - 1):# draw each trace
                    x1 = tracker.tracks[i].trace[j][0][0]
                    y1 = tracker.tracks[i].trace[j][1][0]
                    x2 = tracker.tracks[i].trace[j+1][0][0]
                    y2 = tracker.tracks[i].trace[j+1][1][0]
                    color = tracker.tracks[i].track_id % 14
                    
                    cv2.line(frame, (int(x1), int(y1)), (int(x2), int(y2)), track_colors[color], 1)
                    cv2.putText(frame, str(len(centers)), (20, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
                    #cv2.circle(frame,(int(x1), int(y1)),2,track_colors[color],4)

    video.append(np.uint8(frame))
#     cv2.namedWindow("img", cv2.WINDOW_AUTOSIZE)
#     cv2.imshow("img", img2)
    count = count + 1
    

# print(len(video))
video_write = cv2.VideoWriter(os.path.join(hw_dir, "cells_tracked.avi"), cv2.VideoWriter_fourcc(*'DIVX'), 7, (500, 500))

# Write video
for i in range(len(video)):
    video_write.write(video[i])
video_write.release()


In [248]:
centers[0].shape

(59, 2, 1)

In [249]:
bat_centers[0].shape

(59, 2, 1)