# core

> A standalone Python implementation of the [ByteTrack](https://arxiv.org/abs/2110.06864) multi-object tracker based on the [official implementation](https://github.com/ifzhang/ByteTrack/tree/main/yolox/tracker).

In [None]:
#| default_exp core

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
import numpy as np
from cjm_byte_track.kalman_filter import KalmanFilter
import cjm_byte_track.matching
from cjm_byte_track.basetrack import BaseTrack, TrackState

In [None]:
#| export
class STrack(BaseTrack):
    """A Simple Track object"""
    # Shared Kalman Filter for all instances of STrack
    shared_kalman = KalmanFilter()

    def __init__(self, 
                 tlwh, # List or array-like containing four values (top-left x, top-left y, width, height).
                 score # The confidence score of the detection.
                ):
        """
        Initialize an STrack (Simple Track) object.
        """
        # Convert tlwh to numpy array of type float
        self._tlwh = np.asarray(tlwh, dtype=float)
        
        # Kalman filter instance for this track
        self.kalman_filter = None
        
        # Mean and covariance of the state
        self.mean, self.covariance = None, None
        
        # Flag to check if the track is activated
        self.is_activated = False
        
        # Score of the detection
        self.score = score
        
        # Length of the tracklet
        self.tracklet_len = 0

    def predict(self):
        """
        Predict the next state using Kalman filter.
        """
        mean_state = self.mean.copy()
        
        # If the track is not in Tracked state, set velocity to 0
        if self.state != TrackState.Tracked:
            mean_state[7] = 0
        
        # Predict the next state using the kalman filter
        self.mean, self.covariance = self.kalman_filter.predict(mean_state, self.covariance)

    @staticmethod
    def multi_predict(stracks # List of STrack objects.
                     ):
        """
        Predict the state for multiple tracks simultaneously using the shared Kalman filter.
        """
        if not stracks:
            return

        # Extract mean and covariance for each track
        multi_states = [(st.mean.copy(), st.covariance) for st in stracks]
        
        # For each track, set velocity to 0 if it's not in Tracked state
        for i, st in enumerate(stracks):
            if st.state != TrackState.Tracked:
                multi_states[i][0][7] = 0
        
        # Use shared kalman filter for prediction
        multi_means, multi_covariances = STrack.shared_kalman.multi_predict(np.asarray(multi_means), np.asarray(multi_covariances))

        # Update each track with the predicted mean and covariance
        for i, st in enumerate(stracks):
            st.mean, st.covariance = multi_means[i], multi_covariances[i]

    def activate(self, 
                 kalman_filter, # KalmanFilter instance.
                 frame_id # ID of the current frame.
                ):
        """
        Activate the track. 
        """
        self.kalman_filter = kalman_filter
        
        # Initialize track ID
        self.track_id = self.next_id()
        
        # Initiate mean and covariance using the kalman filter
        self.mean, self.covariance = self.kalman_filter.initiate(self.tlwh_to_xyah(self._tlwh))
        
        self.tracklet_len = 0
        self.state = TrackState.Tracked
        
        # Check if track is activated in the first frame
        self.is_activated = frame_id == 1
        self.frame_id = frame_id
        self.start_frame = frame_id

    def re_activate(self, 
                    new_track, # The new STrack object with updated details.
                    frame_id, # ID of the current frame.
                    new_id=False # Flag to determine if a new ID should be assigned.
                   ):
        """
        Reactivate a track with new details.
        """
        self._update_track(new_track, frame_id, new_id)

    def update(self, new_track, frame_id):
        """
        Update the track with new details.

        :param new_track: The new STrack object with updated details.
        :param frame_id: ID of the current frame.
        """
        self._update_track(new_track, frame_id)

    def _update_track(self, 
                      new_track, # The new STrack object with updated details.
                      frame_id, # ID of the current frame.
                      new_id=False # Flag to determine if a new ID should be assigned.
                     ):
        """
        Internal method to update track details.
        """
        self.frame_id = frame_id
        self.tracklet_len += 1
        
        # Update mean and covariance using the kalman filter
        self.mean, self.covariance = self.kalman_filter.update(self.mean, self.covariance, self.tlwh_to_xyah(new_track.tlwh))
        self.state = TrackState.Tracked
        self.is_activated = True
        
        # Assign new track ID if required
        if new_id:
            self.track_id = self.next_id()
        
        # Update score
        self.score = new_track.score

    @staticmethod
    def tlwh_to_xyah(tlwh # Bounding box in tlwh format.
                    ): # Bounding box in xyah format.
        """
        Convert bounding box from (top-left x, top-left y, width, height) to (center x, center y, aspect ratio, height). 
        """
        ret = np.asarray(tlwh).copy()
        ret[:2] += ret[2:] / 2
        ret[2] /= ret[3]
        return ret

    @staticmethod
    def tlwh_to_tlbr(tlwh # Bounding box in tlwh format.
                    ): # Bounding box in tlbr format.
        """
        Convert bounding box from (top-left x, top-left y, width, height) to (top-left x, top-left y, bottom-right x, bottom-right y).
        """
        ret = np.asarray(tlwh).copy()
        ret[2:] += ret[:2]
        return ret

    @staticmethod
    def tlbr_to_tlwh(tlbr # Bounding box in tlbr format.
                    ): # Bounding box in tlwh format.
        """
        Convert bounding box from (top-left x, top-left y, bottom-right x, bottom-right y) to (top-left x, top-left y, width, height).
        """
        ret = np.asarray(tlbr).copy()
        ret[2:] -= ret[:2]
        return ret

    @property
    def tlwh(self
            ): # Bounding box in tlwh format.
        """
        Get bounding box in (top-left x, top-left y, width, height) format.
        """
        if self.mean is None:
            return self._tlwh.copy()
        ret = self.mean[:4].copy()
        ret[2] *= ret[3]
        ret[:2] -= ret[2:] / 2
        return ret

    @property
    def tlbr(self
            ): # Bounding box in tlbr format.
        """
        Get bounding box in (top-left x, top-left y, bottom-right x, bottom-right y) format.
        """
        return self.tlwh_to_tlbr(self.tlwh)

    def __repr__(self
                ): # String representation of the track.
        """
        String representation of the STrack object.
        """
        return 'OT_{}_({}-{})'.format(self.track_id, self.start_frame, self.end_frame)

In [None]:
show_doc(STrack.__init__)

---

[source](https://github.com/cj-mills/cjm-byte-track/blob/main/cjm_byte_track/core.py#L17){target="_blank" style="float:right; font-size:smaller"}

### STrack.__init__

>      STrack.__init__ (tlwh, score)

Initialize an STrack (Simple Track) object.

|    | **Details** |
| -- | ----------- |
| tlwh | List or array-like containing four values (top-left x, top-left y, width, height). |
| score | The confidence score of the detection. |

In [None]:
show_doc(STrack.predict)

---

[source](https://github.com/cj-mills/cjm-byte-track/blob/main/cjm_byte_track/core.py#L42){target="_blank" style="float:right; font-size:smaller"}

### STrack.predict

>      STrack.predict ()

Predict the next state using Kalman filter.

In [None]:
show_doc(STrack.multi_predict)

---

[source](https://github.com/cj-mills/cjm-byte-track/blob/main/cjm_byte_track/core.py#L56){target="_blank" style="float:right; font-size:smaller"}

### STrack.multi_predict

>      STrack.multi_predict (stracks)

Predict the state for multiple tracks simultaneously using the shared Kalman filter.

|    | **Details** |
| -- | ----------- |
| stracks | List of STrack objects. |

In [None]:
show_doc(STrack.activate)

---

[source](https://github.com/cj-mills/cjm-byte-track/blob/main/cjm_byte_track/core.py#L79){target="_blank" style="float:right; font-size:smaller"}

### STrack.activate

>      STrack.activate (kalman_filter, frame_id)

Activate the track.

|    | **Details** |
| -- | ----------- |
| kalman_filter | KalmanFilter instance. |
| frame_id | ID of the current frame. |

In [None]:
show_doc(STrack.re_activate)

---

[source](https://github.com/cj-mills/cjm-byte-track/blob/main/cjm_byte_track/core.py#L102){target="_blank" style="float:right; font-size:smaller"}

### STrack.re_activate

>      STrack.re_activate (new_track, frame_id, new_id=False)

Reactivate a track with new details.

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| new_track |  |  | The new STrack object with updated details. |
| frame_id |  |  | ID of the current frame. |
| new_id | bool | False | Flag to determine if a new ID should be assigned. |

In [None]:
show_doc(STrack.update)

---

[source](https://github.com/cj-mills/cjm-byte-track/blob/main/cjm_byte_track/core.py#L112){target="_blank" style="float:right; font-size:smaller"}

### STrack.update

>      STrack.update (new_track, frame_id)

Update the track with new details.

:param new_track: The new STrack object with updated details.
:param frame_id: ID of the current frame.

In [None]:
show_doc(STrack._update_track)

---

[source](https://github.com/cj-mills/cjm-byte-track/blob/main/cjm_byte_track/core.py#L121){target="_blank" style="float:right; font-size:smaller"}

### STrack._update_track

>      STrack._update_track (new_track, frame_id, new_id=False)

Internal method to update track details.

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| new_track |  |  | The new STrack object with updated details. |
| frame_id |  |  | ID of the current frame. |
| new_id | bool | False | Flag to determine if a new ID should be assigned. |

In [None]:
show_doc(STrack.tlwh_to_xyah)

---

[source](https://github.com/cj-mills/cjm-byte-track/blob/main/cjm_byte_track/core.py#L145){target="_blank" style="float:right; font-size:smaller"}

### STrack.tlwh_to_xyah

>      STrack.tlwh_to_xyah (tlwh)

Convert bounding box from (top-left x, top-left y, width, height) to (center x, center y, aspect ratio, height).

|    | **Details** |
| -- | ----------- |
| tlwh | Bounding box in tlwh format. |

In [None]:
show_doc(STrack.tlwh_to_tlbr)

---

[source](https://github.com/cj-mills/cjm-byte-track/blob/main/cjm_byte_track/core.py#L156){target="_blank" style="float:right; font-size:smaller"}

### STrack.tlwh_to_tlbr

>      STrack.tlwh_to_tlbr (tlwh)

Convert bounding box from (top-left x, top-left y, width, height) to (top-left x, top-left y, bottom-right x, bottom-right y).

|    | **Details** |
| -- | ----------- |
| tlwh | Bounding box in tlwh format. |

In [None]:
show_doc(STrack.tlwh)

---

[source](https://github.com/cj-mills/cjm-byte-track/blob/main/cjm_byte_track/core.py#L176){target="_blank" style="float:right; font-size:smaller"}

### STrack.tlwh

>      STrack.tlwh ()

Get bounding box in (top-left x, top-left y, width, height) format.

In [None]:
show_doc(STrack.tlbr)

---

[source](https://github.com/cj-mills/cjm-byte-track/blob/main/cjm_byte_track/core.py#L189){target="_blank" style="float:right; font-size:smaller"}

### STrack.tlbr

>      STrack.tlbr ()

Get bounding box in (top-left x, top-left y, bottom-right x, bottom-right y) format.

In [None]:
#| export
class BYTETracker:
    def __init__(self, 
                 track_thresh:float=0.25, # Threshold value for tracking.
                 track_buffer:int=30, # Size of buffer for tracking.
                 match_thresh:float=0.8, # Threshold value for matching tracks to detections.
                 frame_rate:int=30 # Frame rate of the input video stream.
                ):
        """
        Initializes the BYTETracker.
        """

        # Thresholds for tracking and matching
        self.track_thresh = track_thresh
        self.match_thresh = match_thresh
        
        # Frame count
        self.frame_id = 0
        
        # Detection threshold, calculated based on the given track_thresh
        self.det_thresh = track_thresh + 0.1
        
        # Calculate buffer size based on given frame rate and track buffer
        self.buffer_size = int(frame_rate / 30.0 * track_buffer)
        
        # Maximum time a track is considered lost
        self.max_time_lost = self.buffer_size
        
        # Initialize Kalman filter
        self.kalman_filter = KalmanFilter()
        
        # Lists to store different kinds of tracks
        self.tracked_stracks = []
        self.lost_stracks = []
        self.removed_stracks = []
        
        BaseTrack._count = 0

    def _process_output(self, 
                        output_results # Detection results.
                       ) -> tuple: # scores and bounding boxes.
        """
        Process the output results to separate scores and bounding boxes.
        """

        # Check if output has 5 columns (indicating it contains scores only)
        if output_results.shape[1] == 5:
            scores = output_results[:, 4]
        else:
            output_results = output_results.cpu().numpy()
            # Calculate scores if output has more columns
            scores = output_results[:, 4] * output_results[:, 5]
        
        # Extract bounding boxes
        bboxes = output_results[:, :4]
        return scores, bboxes

    def _scale_bboxes(self, 
                      img_info:tuple, # Original height and width of the image.
                      img_size:tuple, # Target size.
                      bboxes:list # List of bounding boxes.
                     ) -> list: # Scaled bounding boxes.
        """
        Scale bounding boxes based on image size.
        """

        img_h, img_w = img_info
        scale = min(img_size[0] / float(img_h), img_size[1] / float(img_w))
        return bboxes / scale

    def _get_detections(self, 
                        dets:list, # List of detections.
                        scores_keep:list # Scores for the detections.
                       ) -> list: # List of STrack objects.
        """
        Convert bounding boxes to STrack objects.
        """

        return [STrack(STrack.tlbr_to_tlwh(tlbr), s) for tlbr, s in zip(dets, scores_keep)] if dets.size > 0 else []

    def _update_tracked_stracks(self
                               ) -> tuple: # List of unconfirmed and tracked tracks.
        """
        Update the list of tracked and unconfirmed tracks.
        """

        unconfirmed = [track for track in self.tracked_stracks if not track.is_activated]
        tracked_stracks = [track for track in self.tracked_stracks if track.is_activated]
        return unconfirmed, tracked_stracks

    def _match_tracks_to_detections(self, 
                                    stracks:list, # List of tracks.
                                    detections:list, # List of detections.
                                    thresh:float # IOU threshold for matching.
                                   ) -> tuple: # Matches and unmatched tracks and detections.
        """
        Match tracks to detections using IOU.
        """

        dists = matching.iou_distance(stracks, detections)
        return matching.linear_assignment(dists, thresh=thresh)

    def _update_tracks(self, 
                       stracks:list, # List of tracks.
                       detections:list, # List of detections.
                       matches:list, # Matched track-detection pairs.
                       refind_stracks:list, # List to add refind tracks.
                       activated_stracks:list # List to add activated tracks.
                      ):
        """
        Update tracks based on matches with detections.
        """

        for itracked, idet in matches:
            track = stracks[itracked]
            det = detections[idet]
            if track.state == TrackState.Tracked:
                track.update(det, self.frame_id)
                activated_stracks.append(track)
            else:
                track.re_activate(det, self.frame_id, new_id=False)
                refind_stracks.append(track)

    def update(self, 
               output_results, # Detection results.
               img_info:tuple, # Original height and width of the image.
               img_size:tuple # Target size.
              ) -> list: # List of activated tracks.
        """
        Update the tracker based on new detections.
        """

        self.frame_id += 1
        refind_stracks, activated_stracks, lost_stracks, removed_stracks = [], [], [], []

        scores, bboxes = self._process_output(output_results)
        bboxes = self._scale_bboxes(img_info, img_size, bboxes)
        detections = self._get_detections(bboxes[scores > self.track_thresh], scores[scores > self.track_thresh])
        detections_second = self._get_detections(bboxes[np.logical_and(scores > 0.1, scores < self.track_thresh)], 
                                                 scores[np.logical_and(scores > 0.1, scores < self.track_thresh)])

        # Update tracked stracks
        unconfirmed, tracked_stracks = self._update_tracked_stracks()
        strack_pool = joint_stracks(tracked_stracks, self.lost_stracks)
        STrack.multi_predict(strack_pool)

        # Match and update tracks
        matches, u_track, u_detection = self._match_tracks_to_detections(strack_pool, detections, self.match_thresh)
        self._update_tracks(strack_pool, detections, matches, refind_stracks, activated_stracks)

        # Additional matching and track updates
        r_tracked_stracks = [strack_pool[i] for i in u_track if strack_pool[i].state == TrackState.Tracked]
        matches, u_track, _ = self._match_tracks_to_detections(r_tracked_stracks, detections_second, thresh=0.5)
        self._update_tracks(r_tracked_stracks, detections_second, matches, refind_stracks, activated_stracks)
        for it in u_track:
            track = r_tracked_stracks[it]
            if not track.state == TrackState.Lost:
                track.mark_lost()
                lost_stracks.append(track)

        # Update unconfirmed tracks
        detections = [detections[i] for i in u_detection]
        matches, u_unconfirmed, u_detection = self._match_tracks_to_detections(unconfirmed, detections, thresh=0.7)
        for itracked, idet in matches:
            unconfirmed[itracked].update(detections[idet], self.frame_id)
            activated_stracks.append(unconfirmed[itracked])
        for it in u_unconfirmed:
            track = unconfirmed[it]
            track.mark_removed()
            removed_stracks.append(track)

        # Handle new tracks
        for inew in u_detection:
            track = detections[inew]
            if track.score >= self.det_thresh:
                track.activate(self.kalman_filter, self.frame_id)
                activated_stracks.append(track)

        # Handle lost and removed tracks
        for track in self.lost_stracks:
            if self.frame_id - track.end_frame > self.max_time_lost:
                track.mark_removed()
                removed_stracks.append(track)

        removed_stracks.extend([track for track in self.lost_stracks if self.frame_id - track.end_frame > self.max_time_lost])
        self.tracked_stracks = [t for t in self.tracked_stracks if t.state == TrackState.Tracked]
        self.tracked_stracks = joint_stracks(self.tracked_stracks, activated_stracks)
        self.tracked_stracks = joint_stracks(self.tracked_stracks, refind_stracks)
        self.lost_stracks = sub_stracks(self.lost_stracks, self.tracked_stracks)
        self.lost_stracks.extend(lost_stracks)
        self.lost_stracks = sub_stracks(self.lost_stracks, self.removed_stracks)
        self.removed_stracks.extend(removed_stracks)
        self.tracked_stracks, self.lost_stracks = remove_duplicate_stracks(self.tracked_stracks, self.lost_stracks)
        return [track for track in self.tracked_stracks if track.is_activated]

In [None]:
show_doc(BYTETracker.__init__)

---

[source](https://github.com/cj-mills/cjm-byte-track/blob/main/cjm_byte_track/core.py#L205){target="_blank" style="float:right; font-size:smaller"}

### BYTETracker.__init__

>      BYTETracker.__init__ (track_thresh:float=0.25, track_buffer:int=30,
>                            match_thresh:float=0.8, frame_rate:int=30)

Initializes the BYTETracker.

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| track_thresh | float | 0.25 | Threshold value for tracking. |
| track_buffer | int | 30 | Size of buffer for tracking. |
| match_thresh | float | 0.8 | Threshold value for matching tracks to detections. |
| frame_rate | int | 30 | Frame rate of the input video stream. |

In [None]:
show_doc(BYTETracker._process_output)

---

[source](https://github.com/cj-mills/cjm-byte-track/blob/main/cjm_byte_track/core.py#L241){target="_blank" style="float:right; font-size:smaller"}

### BYTETracker._process_output

>      BYTETracker._process_output (output_results)

Process the output results to separate scores and bounding boxes.

|    | **Type** | **Details** |
| -- | -------- | ----------- |
| output_results |  | Detection results. |
| **Returns** | **tuple** | **scores and bounding boxes.** |

In [None]:
show_doc(BYTETracker._scale_bboxes)

---

[source](https://github.com/cj-mills/cjm-byte-track/blob/main/cjm_byte_track/core.py#L260){target="_blank" style="float:right; font-size:smaller"}

### BYTETracker._scale_bboxes

>      BYTETracker._scale_bboxes (img_info:tuple, img_size:tuple, bboxes:list)

Scale bounding boxes based on image size.

|    | **Type** | **Details** |
| -- | -------- | ----------- |
| img_info | tuple | Original height and width of the image. |
| img_size | tuple | Target size. |
| bboxes | list | List of bounding boxes. |
| **Returns** | **list** | **Scaled bounding boxes.** |

In [None]:
show_doc(BYTETracker._get_detections)

---

[source](https://github.com/cj-mills/cjm-byte-track/blob/main/cjm_byte_track/core.py#L273){target="_blank" style="float:right; font-size:smaller"}

### BYTETracker._get_detections

>      BYTETracker._get_detections (dets:list, scores_keep:list)

Convert bounding boxes to STrack objects.

|    | **Type** | **Details** |
| -- | -------- | ----------- |
| dets | list | List of detections. |
| scores_keep | list | Scores for the detections. |
| **Returns** | **list** | **List of STrack objects.** |

In [None]:
show_doc(BYTETracker._update_tracked_stracks)

---

[source](https://github.com/cj-mills/cjm-byte-track/blob/main/cjm_byte_track/core.py#L283){target="_blank" style="float:right; font-size:smaller"}

### BYTETracker._update_tracked_stracks

>      BYTETracker._update_tracked_stracks ()

Update the list of tracked and unconfirmed tracks.

In [None]:
show_doc(BYTETracker._match_tracks_to_detections)

---

[source](https://github.com/cj-mills/cjm-byte-track/blob/main/cjm_byte_track/core.py#L293){target="_blank" style="float:right; font-size:smaller"}

### BYTETracker._match_tracks_to_detections

>      BYTETracker._match_tracks_to_detections (stracks:list, detections:list,
>                                               thresh:float)

Match tracks to detections using IOU.

|    | **Type** | **Details** |
| -- | -------- | ----------- |
| stracks | list | List of tracks. |
| detections | list | List of detections. |
| thresh | float | IOU threshold for matching. |
| **Returns** | **tuple** | **Matches and unmatched tracks and detections.** |

In [None]:
show_doc(BYTETracker._update_tracks)

---

[source](https://github.com/cj-mills/cjm-byte-track/blob/main/cjm_byte_track/core.py#L305){target="_blank" style="float:right; font-size:smaller"}

### BYTETracker._update_tracks

>      BYTETracker._update_tracks (stracks:list, detections:list, matches:list,
>                                  refind_stracks:list, activated_stracks:list)

Update tracks based on matches with detections.

|    | **Type** | **Details** |
| -- | -------- | ----------- |
| stracks | list | List of tracks. |
| detections | list | List of detections. |
| matches | list | Matched track-detection pairs. |
| refind_stracks | list | List to add refind tracks. |
| activated_stracks | list | List to add activated tracks. |

In [None]:
show_doc(BYTETracker.update)

---

[source](https://github.com/cj-mills/cjm-byte-track/blob/main/cjm_byte_track/core.py#L326){target="_blank" style="float:right; font-size:smaller"}

### BYTETracker.update

>      BYTETracker.update (output_results, img_info:tuple, img_size:tuple)

Update the tracker based on new detections.

|    | **Type** | **Details** |
| -- | -------- | ----------- |
| output_results |  | Detection results. |
| img_info | tuple | Original height and width of the image. |
| img_size | tuple | Target size. |
| **Returns** | **list** | **List of activated tracks.** |

In [None]:
#| export
def joint_stracks(track_list_a:list, # The first list of tracks.
                  track_list_b:list # The second list of tracks.
                 ) -> list: # A combined list of unique tracks.
    """
    Combines two lists of tracks ensuring each track is unique based on its track_id.
    """
    # Using a dictionary comprehension to ensure unique tracks based on track_id
    unique_tracks = {track.track_id: track for track in track_list_a + track_list_b}
    
    return list(unique_tracks.values())

In [None]:
#| export
def sub_stracks(track_list_a:list, # The list of tracks to subtract from.
                track_list_b:list # The list of tracks to subtract.
               ) -> list: # A list containing tracks from track_list_a that are not in track_list_b.
    """
    Subtracts the tracks in track_list_b from track_list_a based on track_id.
    """
    # Creating a set of track_ids from track_list_b for efficient look-up
    track_ids_b = {track.track_id for track in track_list_b}
    
    # Return tracks from track_list_a that are not in track_list_b based on track_id
    return [track for track in track_list_a if track.track_id not in track_ids_b]

In [None]:
#| export
def remove_duplicate_stracks(s_tracks_a:list, # The first list of tracks.
                             s_tracks_b:list # The second list of tracks.
                            ) -> tuple: # Two lists of tracks with duplicates removed.
    """
    Removes duplicate tracks from two lists based on a defined distance metric and time criteria. 
    """
    # Calculate pairwise distance between tracks in the two lists
    pairwise_distance = matching.iou_distance(s_tracks_a, s_tracks_b)
    
    # Identify pairs of tracks with distance less than 0.15 (indicating potential duplicates)
    pairs = np.where(pairwise_distance < 0.15)

    # Sets to store indexes of duplicate tracks in each list
    duplicates_a, duplicates_b = set(), set()
    
    for track_a_index, track_b_index in zip(*pairs):
        # Calculate how long each track has been in the list
        time_a = s_tracks_a[track_a_index].frame_id - s_tracks_a[track_a_index].start_frame
        time_b = s_tracks_b[track_b_index].frame_id - s_tracks_b[track_b_index].start_frame
        
        # Compare times and add the newer track to the duplicate set
        if time_a > time_b:
            duplicates_b.add(track_b_index)
        else:
            duplicates_a.add(track_a_index)

    # Filter out duplicates from the original lists
    result_a = [track for i, track in enumerate(s_tracks_a) if i not in duplicates_a]
    result_b = [track for i, track in enumerate(s_tracks_b) if i not in duplicates_b]
    
    return result_a, result_b


In [None]:
#| hide
import nbdev; nbdev.nbdev_export()