-
Notifications
You must be signed in to change notification settings - Fork 240
/
tracker.py
339 lines (299 loc) · 14.1 KB
/
tracker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
import math
from typing import Callable, List, Optional, Sequence
import numpy as np
from rich import print
from .utils import validate_points
from .filter import FilterSetup
class Tracker:
def __init__(
self,
distance_function: Callable[["Detection", "TrackedObject"], float],
distance_threshold: float,
hit_inertia_min: int = 10,
hit_inertia_max: int = 25,
initialization_delay: Optional[int] = None,
detection_threshold: float = 0,
point_transience: int = 4,
filter_setup: "FilterSetup" = FilterSetup(),
):
self.tracked_objects: Sequence["TrackedObject"] = []
self.distance_function = distance_function
self.hit_inertia_min = hit_inertia_min
self.hit_inertia_max = hit_inertia_max
self.filter_setup = filter_setup
if initialization_delay is None:
self.initialization_delay = int(
(self.hit_inertia_max - self.hit_inertia_min) / 2
)
elif (
initialization_delay < 0
or initialization_delay > self.hit_inertia_max - self.hit_inertia_min
):
raise ValueError(
f"Argument 'initialization_delay' for 'Tracker' class should be an int between 0 and (hit_inertia_max - hit_inertia_min = {hit_inertia_max - hit_inertia_min}). The selected value is {initialization_delay}.\n"
)
else:
self.initialization_delay = initialization_delay
self.distance_threshold = distance_threshold
self.detection_threshold = detection_threshold
self.point_transience = point_transience
TrackedObject.count = 0
def update(self, detections: Optional[List["Detection"]] = None, period: int = 1):
self.period = period
# Remove stale trackers and make candidate object real if it has hit inertia
self.tracked_objects = [o for o in self.tracked_objects if o.has_inertia]
# Update tracker
for obj in self.tracked_objects:
obj.tracker_step()
# Update initialized tracked objects with detections
unmatched_detections = self.update_objects_in_place(
[o for o in self.tracked_objects if not o.is_initializing], detections
)
# Update not yet initialized tracked objects with yet unmatched detections
unmatched_detections = self.update_objects_in_place(
[o for o in self.tracked_objects if o.is_initializing], unmatched_detections
)
# Create new tracked objects from remaining unmatched detections
for detection in unmatched_detections:
self.tracked_objects.append(
TrackedObject(
detection,
self.hit_inertia_min,
self.hit_inertia_max,
self.initialization_delay,
self.detection_threshold,
self.period,
self.point_transience,
self.filter_setup,
)
)
return [p for p in self.tracked_objects if not p.is_initializing]
def update_objects_in_place(
self,
objects: Sequence["TrackedObject"],
detections: Optional[List["Detection"]],
):
if detections is not None and len(detections) > 0:
distance_matrix = np.ones((len(detections), len(objects)), dtype=np.float32)
distance_matrix *= self.distance_threshold + 1
for d, detection in enumerate(detections):
for o, obj in enumerate(objects):
distance = self.distance_function(detection, obj)
# Cap detections and objects with no chance of getting matched so we
# dont force the hungarian algorithm to minimize them and therefore
# introduce the possibility of sub optimal results.
# Note: This is probably not needed with the new distance minimizing algorithm
if distance > self.distance_threshold:
distance_matrix[d, o] = self.distance_threshold + 1
else:
distance_matrix[d, o] = distance
if np.isnan(distance_matrix).any():
print(
"\nReceived nan values from distance function, please check your distance function for errors!"
)
exit()
if np.isinf(distance_matrix).any():
print(
"\nReceived inf values from distance function, please check your distance function for errors!"
)
print(
"If you want to explicitly ignore a certain detection - tracked object pair, just"
)
print("return distance_threshold + 1 from your distance function.")
exit()
# Used just for debugging distance function
if distance_matrix.any():
for i, minimum in enumerate(distance_matrix.min(axis=0)):
objects[i].current_min_distance = (
minimum if minimum < self.distance_threshold else None
)
matched_det_indices, matched_obj_indices = self.match_dets_and_objs(
distance_matrix
)
if len(matched_det_indices) > 0:
unmatched_detections = [
d for i, d in enumerate(detections) if i not in matched_det_indices
]
# Handle matched people/detections
for (match_det_idx, match_obj_idx) in zip(
matched_det_indices, matched_obj_indices
):
match_distance = distance_matrix[match_det_idx, match_obj_idx]
matched_detection = detections[match_det_idx]
matched_object = objects[match_obj_idx]
if match_distance < self.distance_threshold:
matched_object.hit(matched_detection, period=self.period)
matched_object.last_distance = match_distance
else:
unmatched_detections.append(matched_detection)
else:
unmatched_detections = detections
else:
unmatched_detections = []
return unmatched_detections
def match_dets_and_objs(self, distance_matrix: np.array):
"""Matches detections with tracked_objects from a distance matrix
I used to match by minimizing the global distances, but found several
cases in which this was not optimal. So now I just match by starting
with the global minimum distance and matching the det-obj corresponding
to that distance, then taking the second minimum, and so on until we
reach the distance_threshold.
This avoids the the algorithm getting cute with us and matching things
that shouldn't be matching just for the sake of minimizing the global
distance, which is what used to happen
"""
# NOTE: This implementation is terribly inefficient, but it doesn't
# seem to affect the fps at all.
distance_matrix = distance_matrix.copy()
if distance_matrix.size > 0:
det_idxs = []
obj_idxs = []
current_min = distance_matrix.min()
while current_min < self.distance_threshold:
flattened_arg_min = distance_matrix.argmin()
det_idx = flattened_arg_min // distance_matrix.shape[1]
obj_idx = flattened_arg_min % distance_matrix.shape[1]
det_idxs.append(det_idx)
obj_idxs.append(obj_idx)
distance_matrix[det_idx, :] = self.distance_threshold + 1
distance_matrix[:, obj_idx] = self.distance_threshold + 1
current_min = distance_matrix.min()
return det_idxs, obj_idxs
else:
return [], []
class TrackedObject:
count = 0
initializing_count = 0
def __init__(
self,
initial_detection: "Detection",
hit_inertia_min: int,
hit_inertia_max: int,
initialization_delay: int,
detection_threshold: float,
period: int,
point_transience: int,
filter_setup: "FilterSetup",
):
try:
initial_detection_points = validate_points(initial_detection.points)
except AttributeError:
print(
f"\n[red]ERROR[/red]: The detection list fed into `tracker.update()` should be composed of {Detection} objects not {type(initial_detection)}.\n"
)
exit()
self.num_points = initial_detection_points.shape[0]
self.hit_inertia_min: int = hit_inertia_min
self.hit_inertia_max: int = hit_inertia_max
self.initialization_delay = initialization_delay
self.point_hit_inertia_min: int = math.floor(hit_inertia_min / point_transience)
self.point_hit_inertia_max: int = math.ceil(hit_inertia_max / point_transience)
if (self.point_hit_inertia_max - self.point_hit_inertia_min) < period:
self.point_hit_inertia_max = self.point_hit_inertia_min + period
self.detection_threshold: float = detection_threshold
self.initial_period: int = period
self.hit_counter: int = hit_inertia_min + period
self.point_hit_counter: np.ndarray = (
np.ones(self.num_points) * self.point_hit_inertia_min
)
self.last_distance: Optional[float] = None
self.current_min_distance: Optional[float] = None
self.last_detection: "Detection" = initial_detection
self.age: int = 0
self.is_initializing_flag: bool = True
self.id: Optional[int] = None
self.initializing_id: int = (
TrackedObject.initializing_count
) # Just for debugging
TrackedObject.initializing_count += 1
self.detected_at_least_once_points = np.array([False] * self.num_points)
# Create Kalman Filter
self.filter = filter_setup.create_filter(initial_detection_points)
self.dim_z = 2 * self.num_points
def tracker_step(self):
self.hit_counter -= 1
self.point_hit_counter -= 1
self.age += 1
# Advances the tracker's state
self.filter.predict()
@property
def is_initializing(self):
if (
self.is_initializing_flag
and self.hit_counter > self.hit_inertia_min + self.initialization_delay
):
self.is_initializing_flag = False
TrackedObject.count += 1
self.id = TrackedObject.count
return self.is_initializing_flag
@property
def has_inertia(self):
return self.hit_counter >= self.hit_inertia_min
@property
def estimate(self):
positions = self.filter.x.T.flatten()[: self.dim_z].reshape(-1, 2)
velocities = self.filter.x.T.flatten()[self.dim_z :].reshape(-1, 2)
return positions
@property
def live_points(self):
return self.point_hit_counter > self.point_hit_inertia_min
def hit(self, detection: "Detection", period: int = 1):
points = validate_points(detection.points)
self.last_detection = detection
if self.hit_counter < self.hit_inertia_max:
self.hit_counter += 2 * period
# We use a kalman filter in which we consider each coordinate on each point as a sensor.
# This is a hacky way to update only certain sensors (only x, y coordinates for
# points which were detected).
# TODO: Use keypoint confidence information to change R on each sensor instead?
if detection.scores is not None:
assert len(detection.scores.shape) == 1
points_over_threshold_mask = detection.scores > self.detection_threshold
matched_sensors_mask = np.array(
[[m, m] for m in points_over_threshold_mask]
).flatten()
H_pos = np.diag(matched_sensors_mask).astype(
float
) # We measure x, y positions
self.point_hit_counter[points_over_threshold_mask] += 2 * period
else:
points_over_threshold_mask = np.array([True] * self.num_points)
H_pos = np.identity(points.size)
self.point_hit_counter += 2 * period
self.point_hit_counter[
self.point_hit_counter >= self.point_hit_inertia_max
] = self.point_hit_inertia_max
self.point_hit_counter[self.point_hit_counter < 0] = 0
H_vel = np.zeros(H_pos.shape) # But we don't directly measure velocity
H = np.hstack([H_pos, H_vel])
self.filter.update(np.expand_dims(points.flatten(), 0).T, None, H)
# Force points being detected for the first time to have velocity = 0
# This is needed because some detectors (like OpenPose) set points with
# low confidence to coordinates (0, 0). And when they then get their first
# real detection this creates a huge velocity vector in our KalmanFilter
# and causes the tracker to start with wildly inaccurate estimations which
# eventually coverge to the real detections.
detected_at_least_once_mask = np.array(
[[m, m] for m in self.detected_at_least_once_points]
).flatten()
self.filter.x[self.dim_z :][np.logical_not(detected_at_least_once_mask)] = 0
self.detected_at_least_once_points = np.logical_or(
self.detected_at_least_once_points, points_over_threshold_mask
)
def __repr__(self):
if self.last_distance is None:
placeholder_text = "\033[1mObject_{}\033[0m(age: {}, hit_counter: {}, last_distance: {}, init_id: {})"
else:
placeholder_text = "\033[1mObject_{}\033[0m(age: {}, hit_counter: {}, last_distance: {:.2f}, init_id: {})"
return placeholder_text.format(
self.id,
self.age,
self.hit_counter,
self.last_distance,
self.initializing_id,
)
class Detection:
def __init__(self, points: np.array, scores=None, data=None):
self.points = points
self.scores = scores
self.data = data