In [None]:
import numpy as np
from scipy.optimize import linear_sum_assignment
from ._base_metric import _BaseMetric
from .. import _timing
from .. import utils


class Identity(_BaseMetric):
    """Class which implements the ID metrics"""

    @staticmethod
    def get_default_config():
        """Default class config values"""
        default_config = {
            'THRESHOLD': 0.5,  # Similarity score threshold required for a IDTP match. Default 0.5.
            'PRINT_CONFIG': True,  # Whether to print the config information on init. Default: False.
        }
        return default_config

    def __init__(self, config=None):
        super().__init__()
        self.integer_fields = ['IDTP', 'IDFN', 'IDFP'] # 정수 Field - TP, FN, FP
        self.float_fields = ['IDF1', 'IDR', 'IDP'] # 실수 Field - IDF1, IDR, IDP
        self.fields = self.float_fields + self.integer_fields
        self.summary_fields = self.fields

        # Configuration options:
        self.config = utils.init_config(config, self.get_default_config(), self.get_name())
        self.threshold = float(self.config['THRESHOLD'])
        
    # 단일 시퀀스에 대한 Identity 지표 계산
    @_timing.time
    def eval_sequence(self, data):
        """Calculates ID metrics for one sequence"""
        # 초기화
        res = {}
        for field in self.fields:
            res[field] = 0
            
        # tracker 비었으면 result 즉시 return
        if data['num_tracker_dets'] == 0:
            res['IDFN'] = data['num_gt_dets']
            return res
        
        # ground truth 시퀀스가 비었으면 result 즉시 return
        if data['num_gt_dets'] == 0:
            res['IDFP'] = data['num_tracker_dets']
            return res

        # 카운팅을 위한 글로벌 변수
        
        # potential_matches_count -> 적된 객체와 실제 객체 간의 잠재적 매칭 횟수를 추적
        # data['num_gt_ids'] -> 실제 객체의 개수
        # data['num_tracker_ids'] -> 추적된 객체의 개수
        potential_matches_count = np.zeros((data['num_gt_ids'], data['num_tracker_ids']))
        
        # gt_id_count -> 실제 객체의 detection 수를 추적(실제 객체의 탐지 수)
        gt_id_count = np.zeros(data['num_gt_ids'])
        
        # tracker_id_count -> 추적된 객체의 탐지 수를 추적
        tracker_id_count = np.zeros(data['num_tracker_ids'])

        # First loop through each timestep and accumulate global track information.
        # 각 타임스텝을 처음부터 반복하며 전역 추적 정보를 누적
        for t, (gt_ids_t, tracker_ids_t) in enumerate(zip(data['gt_ids'], data['tracker_ids'])):
            # Count the potential matches between ids in each timestep
            # 타임스탬프 별로 매칭 카운팅
            # np.greater_equal -> threshold보다 큰 경우 True 작은 경우 False
            matches_mask = np.greater_equal(data['similarity_scores'][t], self.threshold) # threshold 이상인 매칭 찾기용 마스크
            # threshold 이상인 실제 객체의 인덱스, 추적된 객체의 인덱스
            match_idx_gt, match_idx_tracker = np.nonzero(matches_mask)
            
            # 실제 객체와 탐지 대상 객체가 일치하면 + 1
            potential_matches_count[gt_ids_t[match_idx_gt], tracker_ids_t[match_idx_tracker]] += 1

            # Calculate the total number of dets for each gt_id and tracker_id.
            gt_id_count[gt_ids_t] += 1 # 실제 객체의 탐지 수 + 1
            tracker_id_count[tracker_ids_t] += 1 # 추적된 객체의 탐지 수 + 1

        # Calculate optimal assignment cost matrix for ID metrics
        # ID 지표를 위한 최적 할당 비용 행렬을 계산
        
        # 유사도가 높으면 매칭 비용이 낮아짐
        # 유사도가 낮으면 매칭 비용이 높아짐
        num_gt_ids = data['num_gt_ids']
        num_tracker_ids = data['num_tracker_ids']
        # False Positive
        fp_mat = np.zeros((num_gt_ids + num_tracker_ids, num_gt_ids + num_tracker_ids))
        # False Negative
        fn_mat = np.zeros((num_gt_ids + num_tracker_ids, num_gt_ids + num_tracker_ids))
        fp_mat[num_gt_ids:, :num_tracker_ids] = 1e10 # 행렬의 좌측 하단 부분에 매우 큰 값 할당(이 영역의 매칭이 불가능하게)
        fn_mat[:num_gt_ids, num_tracker_ids:] = 1e10 # 행렬의 우측 상단 부분에 매우 큰 값 할당(이 영역의 매칭이 불가능하게)
        # 매우 큰 값을 할당하는 대신 gt_id_count 값을 할당(실제 객체의 누락에 대한 할당 비용이 증가, 실제 객체의 매칭을 정리하고 누락을 패널티로 간주)
        for gt_id in range(num_gt_ids):
            fn_mat[gt_id, :num_tracker_ids] = gt_id_count[gt_id]
            fn_mat[gt_id, num_tracker_ids + gt_id] = gt_id_count[gt_id]
            
        # 매우 큰 값을 할당하는 대신 gt_id_count 값을 할당
        for tracker_id in range(num_tracker_ids):
            fp_mat[:num_gt_ids, tracker_id] = tracker_id_count[tracker_id]
            fp_mat[tracker_id + num_gt_ids, tracker_id] = tracker_id_count[tracker_id]
        fn_mat[:num_gt_ids, :num_tracker_ids] -= potential_matches_count
        fp_mat[:num_gt_ids, :num_tracker_ids] -= potential_matches_count

        # Hungarian algorithm
        # 헝가리안 알고리즘
        match_rows, match_cols = linear_sum_assignment(fn_mat + fp_mat)

        # Accumulate basic statistics
        # IDFN, IDFP, IDTP 계산
        res['IDFN'] = fn_mat[match_rows, match_cols].sum().astype(np.int)
        res['IDFP'] = fp_mat[match_rows, match_cols].sum().astype(np.int)
        res['IDTP'] = (gt_id_count.sum() - res['IDFN']).astype(np.int)

        # Calculate final ID scores
        # 최종적인 ID 지표 계산
        res = self._compute_final_fields(res)
        return res

    def combine_classes_class_averaged(self, all_res, ignore_empty_classes=False):
        """Combines metrics across all classes by averaging over the class values.
        If 'ignore_empty_classes' is True, then it only sums over classes with at least one gt or predicted detection.
        """
        res = {}
        for field in self.integer_fields:
            if ignore_empty_classes:
                res[field] = self._combine_sum({k: v for k, v in all_res.items()
                                                if v['IDTP'] + v['IDFN'] + v['IDFP'] > 0 + np.finfo('float').eps},
                                               field)
            else:
                res[field] = self._combine_sum({k: v for k, v in all_res.items()}, field)
        for field in self.float_fields:
            if ignore_empty_classes:
                res[field] = np.mean([v[field] for v in all_res.values()
                                      if v['IDTP'] + v['IDFN'] + v['IDFP'] > 0 + np.finfo('float').eps], axis=0)
            else:
                res[field] = np.mean([v[field] for v in all_res.values()], axis=0)
        return res

    def combine_classes_det_averaged(self, all_res):
        """Combines metrics across all classes by averaging over the detection values"""
        res = {}
        for field in self.integer_fields:
            res[field] = self._combine_sum(all_res, field)
        res = self._compute_final_fields(res)
        return res

    def combine_sequences(self, all_res):
        """Combines metrics across all sequences"""
        res = {}
        for field in self.integer_fields:
            res[field] = self._combine_sum(all_res, field)
        res = self._compute_final_fields(res)
        return res

    @staticmethod
    def _compute_final_fields(res):
        """Calculate sub-metric ('field') values which only depend on other sub-metric values.
        This function is used both for both per-sequence calculation, and in combining values across sequences.
        """
        res['IDR'] = res['IDTP'] / np.maximum(1.0, res['IDTP'] + res['IDFN'])
        res['IDP'] = res['IDTP'] / np.maximum(1.0, res['IDTP'] + res['IDFP'])
        res['IDF1'] = res['IDTP'] / np.maximum(1.0, res['IDTP'] + 0.5 * res['IDFP'] + 0.5 * res['IDFN'])
        return res