In [1]:
import cupy as cp
from scipy.optimize import linear_sum_assignment
from datetime import time
from numpy.dual import inv

  from numpy.dual import inv


# 1. 中心点の計算

In [2]:
def center_pt_batched(bboxes):
    # Ensure bboxes is a cupy array
    bboxes = cp.array(bboxes) if not isinstance(bboxes, cp.ndarray) else bboxes
    
    # Return an empty array for no bboxes
    if bboxes.size == 0:
        return cp.array([])
    
    # Ensure all points are non-negative
    bboxes = cp.clip(bboxes, 0, None)

    # If the bbox is a single box, convert it to a 2D array
    if len(bboxes.shape) == 1:
        bboxes = cp.expand_dims(bboxes, axis=0)
    
    # Calculate center points
    centers_x = (bboxes[:, 0] + bboxes[:, 2]) / 2
    centers_y = (bboxes[:, 1] + bboxes[:, 3]) / 2
    
    # Combine x and y centers into a single array
    centers = cp.stack((centers_x, centers_y), axis=-1)
    
    return centers

Unit test

In [3]:
# Corrected test cases for center_pt_batched function
assert cp.array_equal(center_pt_batched(cp.array([[0, 0, 10, 10]])), cp.array([[5, 5]])), "Test 1 failed"
assert cp.array_equal(center_pt_batched(cp.array([[-5, -5, 5, 5]])), cp.array([[2.5, 2.5]])), "Test 2 failed"
assert cp.array_equal(center_pt_batched(cp.array([[0, 0, 0, 0]])), cp.array([[0, 0]])), "Test 3 failed"
assert cp.array_equal(center_pt_batched(cp.array([[10, 10, 20, 20]])), cp.array([[15, 15]])), "Test 4 failed"

assert cp.array_equal(center_pt_batched(cp.array([[0, 0, 10, 10], [-5, -5, 5, 5]])), cp.array([[5, 5], [2.5, 2.5]])), "Test 5 failed"
assert cp.array_equal(center_pt_batched([0, 0, 10, 10]), cp.array([[5, 5]])), "Test 6 failed"

print("All tests passed")

All tests passed


# 2.二点間の距離の測定

In [4]:
def distance_pts_batched_matrix(pts1, pts2):
    # Ensure pts1 and pts2 are cupy arrays
    pts1 = cp.array(pts1) if not isinstance(pts1, cp.ndarray) else pts1
    pts2 = cp.array(pts2) if not isinstance(pts2, cp.ndarray) else pts2
    
    # Return an empty array for no points
    if pts1.size == 0 or pts2.size == 0:
        return cp.array([])
    
    # Ensure all points are non-negative
    pts1 = cp.clip(pts1, 0, None)
    pts2 = cp.clip(pts2, 0, None)
    
    # Ensure pts1 and pts2 are 2D arrays
    if len(pts1.shape) == 1:
        pts1 = cp.expand_dims(pts1, axis=0)
    if len(pts2.shape) == 1:
        pts2 = cp.expand_dims(pts2, axis=0)

    # If the size of pts1 and pts2 are not the same, pad the smaller array with zeros
    if pts1.shape[0] < pts2.shape[0]:
        pts1 = cp.vstack((pts1, cp.zeros((pts2.shape[0] - pts1.shape[0], pts1.shape[1]))))
    elif pts1.shape[0] > pts2.shape[0]:
        pts2 = cp.vstack((pts2, cp.zeros((pts1.shape[0] - pts2.shape[0], pts2.shape[1]))))
    
    # Calculate distance matrix
    diff_x = pts1[:, cp.newaxis, 0] - pts2[cp.newaxis, :, 0]
    diff_y = pts1[:, cp.newaxis, 1] - pts2[cp.newaxis, :, 1]
    dist_matrix = diff_x**2 + diff_y**2
    
    return dist_matrix

Unit test

In [5]:
pts1 = cp.array([[-1, -1], [1, 1], [2, 2]])       # horizontal axis
pts2 = cp.array([[0, 0], [1, 1], [2, 2], [3, 3]]) # vertical axis

# Corrected test cases for distance_pts_batched_matrix function
matrix = distance_pts_batched_matrix(pts1, pts2)

# Expected output
#         (0, 0) (1, 1) (2, 2) (3, 3) <- pts2
# (-1, -1) 2     1      8      18  <-- horizontal array, elements are clipped to zero because of negative values
# (1, 1)   2     0      2      8
# (2, 2)   8     2      0      2
# (0, 0)   0     2      8      18  <-- horizontal array, elements are padded with zeros because of size mismatch
# pts1

print(matrix)

[[ 0.  2.  8. 18.]
 [ 2.  0.  2.  8.]
 [ 8.  2.  0.  2.]
 [ 0.  2.  8. 18.]]


# 2. 追跡と検出

In [17]:
def associate_detections_to_trackers_gpu(detected_boxes, tracking_boxes, dist_variance=500):

    # Ensure detected_bboxes and tracking_pts are cupy arrays
    detected_boxes = cp.array(detected_boxes) if not isinstance(detected_boxes, cp.ndarray) else detected_boxes
    tracking_boxes = cp.array(tracking_boxes) if not isinstance(tracking_boxes, cp.ndarray) else tracking_boxes

    if detected_boxes.size == 0:
        return [], [], list(range(len(tracking_boxes)))
    
    if tracking_boxes.size == 0:
        return [], list(range(len(detected_boxes))), []

    # Convert detected_bboxes to center points
    # Input shape: (num_detections, 4)
    # Output shape: (num_detections, 2)
    detected_centers = center_pt_batched(detected_boxes)

    # Convert tracking_boxes to center points
    # Input shape: (num_trackers, 4)
    # Output shape: (num_trackers, 2)
    tracking_centers = center_pt_batched(tracking_boxes)

    # Generate distance variance matrix
    # Input shape of detected_centers: (num_detections, 2)
    # Input shape of tracking_pts: (num_trackers, 2)
    # Output shape: (num_detections, num_trackers)
    dist_matrix = distance_pts_batched_matrix(detected_centers, tracking_centers)

    # Convert the CuPy array to a NumPy array before passing it to linear_sum_assignment
    row_ind, col_ind = linear_sum_assignment(dist_matrix.get())
    # row_ind is the index of the detected_centers
    # col_ind is the index of the tracking_pts

    # Debugging
    # print(row_ind, col_ind)

    matched_pairs = []
    untracked_detections = []
    untracked_trackers = []

    # Iterate through the row and column indices to find the matched pairs
    for i, j in zip(row_ind, col_ind):
        # If i or j is padded with zeros, skip the pair
        if i >= detected_centers.shape[0] or j >= tracking_centers.shape[0]:
            continue

        # Check if the distance between the detected center and the tracking point is less than the distance variance
        if dist_matrix[i, j] <= dist_variance:
            matched_pairs.append((i, j))
        else:
            untracked_detections.append(i)
            untracked_trackers.append(j)

    # Find the unmatched detections by checking if the index is in matched_pairs
    untracked_detections += [i for i in range(detected_centers.shape[0]) if i not in [pair[0] for pair in matched_pairs]]

    # Find the unmatched trackers by checking if the index is in matched_pairs
    untracked_trackers += [j for j in range(tracking_centers.shape[0]) if j not in [pair[1] for pair in matched_pairs]]
    
    return matched_pairs, untracked_detections, untracked_trackers


Unit test

In [18]:
# horizontal axis > vertical axis
detected_boxes = [[0, 0, 10, 10], [-5, -5, 5, 5], [10, 10, 20, 20], [0, 0, 0, 0], [1, 1, 15, 15]] # horizontal axis
tracking_boxes = [[1, 1, 1, 1], [2, 2, 2, 2], [3, 3, 3, 3], [4, 4, 4, 4]] # vertical axis
print(associate_detections_to_trackers_gpu(detected_boxes, tracking_boxes))

# horizontal axis < vertical axis
detected_boxes = [[0, 0, 10, 10], [1, 1, 15, 15]] # horizontal axis
tracking_boxes = [[1, 1, 1, 1], [2, 2, 2, 2], [3, 3, 3, 3], [4, 4, 4, 4], [10, 10, 20, 20]] # vertical axis
print(associate_detections_to_trackers_gpu(detected_boxes, tracking_boxes))

([(0, 1), (1, 0), (2, 3), (4, 2)], [3], [])
([(0, 3), (1, 4)], [], [0, 1, 2])


# 3. Kalman Filter

In [12]:
class KalmanFilter:
    def __init__(self, A, B, H, Q, R, x0, P0):
        self.A = cp.asarray(A)
        self.B = cp.asarray(B)
        self.H = cp.asarray(H)
        self.Q = cp.asarray(Q)
        self.R = cp.asarray(R)
        self.x = cp.asarray(x0)
        self.P = cp.asarray(P0)

    def predict(self, u=None):
        if u is None:
            u = cp.zeros((self.B.shape[1], 1))
        self.x = self.A @ self.x + self.B @ u
        self.P = self.A @ self.P @ self.A.T + self.Q

    def update(self, z):
        z = cp.asarray(z)
        y = z - self.H @ self.x
        S = self.H @ self.P @ self.H.T + self.R
        K = self.P @ self.H.T @ cp.asarray(inv(cp.asnumpy(S)))
        self.x = self.x + K @ y
        self.P = self.P - K @ self.H @ self.P

    def get_state(self):
        return cp.asnumpy(self.x)

# 4. 追跡クラス

In [19]:
class Tracker:

    """
    カルマンフィルタを付けている追跡クラス

    Args:
        kalman (KalmanFilter): カルマンフィルタ
        pts_to_track (int, optional): 同時に追跡する点の数. Defaultsは1個
        max (int, optional): 追跡の歴史の最大数. Defaultsは100回
    """

    def __init__(self, kalman, pts_to_track=1, max=100) -> None:
        self.history = []
        self.max = max
        self.kalman = kalman
        self.pts_to_track = pts_to_track


    def to_cupy(self, alignment=False):
        if not alignment:
            return cp.array(self.history)
        else:
            ndata = cp.array(self.history)

            # 歴史の数が最大数よりも少ない場合、ゼロで埋める
            if ndata.shape[0] < self.max:
                ndata = cp.vstack([cp.zeros((self.max - ndata.shape[0], ndata.shape[1])), ndata])

            return ndata


    def predict(self, position):

        # 添加新的位置到历史记录中
        if len(self.history) >= self.max:
            self.history.pop(0)
            self.history.append(position)

        # 更新卡尔曼滤波器，预测下一个位置
        for pos in self.history:
            self.kalman.predict()
            self.kalman.update(pos)

        # 获得最近预测的位置
        return self.kalman.get_state()

    @property
    def last_position(self):
        return self.history[-1]

    def get(self,index):
        return self.history[index]

In [21]:
# テスト

# カルマンフィルタの初期化
A = cp.array([[1, 0], [0, 1]])
B = cp.array([[0, 0], [0, 0]])
H = cp.array([[1, 0], [0, 1]])
Q = cp.array([[1, 0], [0, 1]])
R = cp.array([[1, 0], [0, 1]])
x0 = cp.array([[0], [0]])
P0 = cp.array([[1, 0], [0, 1]])

kalman = KalmanFilter(A, B, H, Q, R, x0, P0)

# 追跡の初期化
tracker = Tracker(kalman, pts_to_track=2, max=5)

# 追跡のテスト
print(tracker.predict(cp.array([[1], [1]])))
print(tracker.predict(cp.array([[2], [2]])))
print(tracker.predict(cp.array([[3], [3]])))
print(tracker.predict(cp.array([[4], [4]])))
print(tracker.predict(cp.array([[5], [5]])))
print(tracker.predict(cp.array([[6], [6]])))

print(tracker.to_cupy())
      

[[0]
 [0]]
[[0]
 [0]]
[[0]
 [0]]
[[0]
 [0]]
[[0]
 [0]]
[[0]
 [0]]
[]


In [None]:

class Sort:

    """
    kalman_filter_class: Kalman filter class to use
    max_age: Maximum time to keep a track alive without an update
    variance_threshold: Maximum distance between a detection and a track to consider it as a match
    max_depth: the number of records to keep in the tracker, used for kalman filter
    kalman_filter_params: Parameters to pass to the Kalman filter
    """

    def __init__(self, kalman_filter_class, max_age=1, variance_threshold=500, max_depth=100, kalman_filter_params=None):
        self.kalman_filter_class = kalman_filter_class
        self.max_age = max_age
        self.variance_threshold = variance_threshold
        self.trackers = []
        self.max_depth = max_depth

        if kalman_filter_params is not None:
            self.kalman_filter_params = kalman_filter_params
        else:
            self.kalman_filter_params = {
                "A": cp.array([[1, 0, 1, 0], [0, 1, 0, 1], [0, 0, 1, 0], [0, 0, 0, 1]]),
                "B": cp.zeros((4, 1)),
                "H": cp.array([[1, 0, 0, 0], [0, 1, 0, 0]]),
                "Q": cp.eye(4) * 0.01,
                "R": cp.eye(2) * 1,
                "x0": cp.zeros((4, 1)),
                "P0": cp.eye(4)
            }

    @property
    def centered_trackers(self):
        """
        将跟踪的边界框全部转换为中心点
        """
        trackers = cp.array(self.trackers)
        return center_pt_batched(trackers)
    
    @property
    def last_updated_centered_trackers(self):
        """
        获取最后更新的跟踪器的中心点
        """
        trackers = cp.array([tracker[-1] for tracker in self.trackers])
        return center_pt_batched(trackers)
    

    def update(self, detected_bboxes):

        # Shape of tracker (batch, points, xy) 
        # batch is the number of trackers
        # points is the number of points in each tracker
        # xy is the x and y coordinates of the points
        # If the tracker is not empty, and the shape of the tracker is not empty
        # Use the track_objects_batch function to get the updated tracker
        if self.trackers is not None and len(self.trackers) > 0:
            predicted = track_objects_batch(self.kalman_filter_class,
                                            self.centered_trackers, 
                                            **self.kalman_filter_params)

            # The output of the predicted is (batch, points, xyaxay)
            # ax is the acceleration in the x direction
            # ay is the acceleration in the y direction

            # From the predicted output, get the last updated x and y coordinates
            predicted = predicted[:, -1, :4]
            # The output of the predicted is (batch, xy)

            # Use the associate_detections_to_trackers_gpu function to get the matched pairs
            matched_pairs, unmatched_detections, unmatched_trackers = associate_detections_to_trackers_gpu(detected_bboxes, predicted, self.variance_threshold)

            # Iterate through the matched pairs
            for det_idx, trk_idx in matched_pairs:
                # Convert the detected_bboxes
                self.trackers[trk_idx].append(detected_bboxes[det_idx])