# Initialize


## get_reader_writer

In [None]:
import os


In [None]:
"/workspace/frames/val/scene_042/camera09/NIA_MTMDC_s42_c09_am_sunny_fall_0000.jpg"


In [None]:
def get_reader_writer(source):
    pass


In [None]:
sources = {
        # Val
        'scene_042': sorted([os.path.join('/workspace/frames/val/scene_042', p) for p in os.listdir('/workspace/frames/val/scene_042')]),}
sources['scene_042']


In [None]:
# src_handlers = [get_reader_writer(s) for s in sources['scene_042']]


In [None]:
source = '/workspace/frames/val/scene_042/camera09'
src_paths = sorted(os.listdir(source),  key=lambda x: int(x.split("_")[-1].split(".")[0]))
src_paths = [os.path.join(source, s) for s in src_paths]
src_paths[0]


## VideoWriter

In [None]:
import cv2


In [None]:
img = cv2.imread(src_paths[0])
img.shape


## Detection model initialize

In [None]:
from ultralytics import YOLO


In [None]:
detection = YOLO('yolov8x.pt')


## Pose estimation initialize

In [None]:
from mmpose.apis import init_model


In [None]:
# pose estimation initialize
config_file = '/mmpose/configs/body_2d_keypoint/rtmpose/crowdpose/rtmpose-m_8xb64-210e_crowdpose-256x192.py'
checkpoint_file = 'https://download.openmmlab.com/mmpose/v1/projects/rtmposev1/rtmpose-m_simcc-crowdpose_pt-aic-coco_210e-256x192-e6192cac_20230224.pth'
pose = init_model(config_file, checkpoint_file, device='cuda:0')


## Arguments

In [None]:
import os


In [None]:
args = {
        'max_batch_size' : 32,  # maximum input batch size of reid model
        'track_buffer' : 150,  # the frames for keep lost tracks
        'with_reid' : True,  # whether to use reid model's out feature map at first association
        'sct_appearance_thresh' : 0.4,  # threshold of appearance feature cosine distance when do single-cam tracking
        'sct_euclidean_thresh' : 0.1,  # threshold of euclidean distance when do single-cam tracking

        'clt_appearance_thresh' : 0.35,  # threshold of appearance feature cosine distance when do multi-cam clustering
        'clt_euclidean_thresh' : 0.3,  # threshold of euclidean distance when do multi-cam clustering

        'mct_appearance_thresh' : 0.4,  # threshold of appearance feature cosine distance when do cluster tracking (not important)

        'frame_rate' : 30,  # your video(camera)'s fps
        'write_vid' : False,  # write result to video
        }

conf_thres=0.1
iou_thres=0.45

scenes = ['scene_042']
scene = scenes[0]
perspective=scene


In [None]:
sources = {
        'scene_042': sorted([os.path.join('/workspace/frames/val/scene_042', p) for p in os.listdir('/workspace/frames/val/scene_042')]),
}

sources = sources[scene]
sources


## trackers initialize


In [None]:
from trackers.botsort.bot_sort import BoTSORT


In [None]:
trackers = []
for i in range(len(sources)):
    trackers.append(BoTSORT(track_buffer=args['track_buffer'], max_batch_size=args['max_batch_size'], 
                        appearance_thresh=args['sct_appearance_thresh'], euc_thresh=args['sct_euclidean_thresh'],))


## perspective transform initialize


In [None]:
from pathlib import Path
from perspective_transform.model import PerspectiveTransform


In [None]:
calibration_position = {
    # Val
    "scene_042": sorted([str(p) for p in Path("/workspace/videos/val/scene_042").glob("**/calibration.json")])
}

calibration_position


In [None]:
calibrations = calibration_position[perspective]
perspective_transforms = [PerspectiveTransform(c) for c in calibrations]
perspective_transforms


## id_distributor and multi-camera tracker initialize



In [None]:
from trackers.multicam_tracker.cluster_track import MCTracker
from trackers.multicam_tracker.clustering import Clustering, ID_Distributor


In [None]:
clustering = Clustering(appearance_thresh=args['clt_appearance_thresh'], euc_thresh=args['clt_euclidean_thresh'],
                        match_thresh=0.8)
mc_tracker = MCTracker(appearance_thresh=args['mct_appearance_thresh'], match_thresh=0.8, scene=scene)
id_distributor = ID_Distributor()


## get source imgs, video writers


In [None]:
import cv2


In [None]:
def get_reader_writer(source):
    src_paths = sorted(os.listdir(source),  key=lambda x: int(x.split("_")[-1].split(".")[0]))
    src_paths = [os.path.join(source, s) for s in src_paths]

    fps = 30
    wi, he = 1920, 1080
    os.makedirs('output_videos/' + source.split('/')[-2], exist_ok=True)
    # dst = 'output_videos/' + source.replace('/','').replace('.','') + '.mp4'
    dst = f"output_videos/{source.split('/')[-2]}/" + source.split('/')[-3] + '_' + source.split('/')[-2] + source.split('/')[-1] + '.mp4'
    video_writer = cv2.VideoWriter(dst, cv2.VideoWriter_fourcc(*'mp4v'), fps, (wi, he))
    print(f"{source}'s total frames: {len(src_paths)}")

    return [src_paths, video_writer]


In [None]:
src_handlers = [get_reader_writer(s) for s in sources]
results_lists = [[] for i in range(len(sources))]  # make empty lists to store tracker outputs in MOT Format


In [None]:
total_frames = max([len(s[0]) for s in src_handlers])  # 321
cur_frame = 1
stop = False


# While loop

In [None]:
import time


In [None]:
start = time.time()


## first, run trackers each frame independently


In [None]:
import matplotlib.pyplot as plt


In [None]:
components = zip(src_handlers, trackers, perspective_transforms, results_lists)


In [None]:
imgs = []


---

In [None]:
(img_paths, writer), tracker, perspective_transform, result_list = next(components)
print(f"len img_paths: {len(img_paths)}")


In [None]:
img_path = img_paths.pop(0)
img = cv2.imread(img_path)
plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
plt.axis("off")
plt.show()


## run detection model

In [None]:
results = detection(img, conf=conf_thres, iou=iou_thres, classes=0)
dets = results[0].boxes.data.cpu().numpy()  # [(x1, y1, x2, y2, conf, cls), ...]


In [None]:
plt.imshow(cv2.cvtColor(results[0].plot(), cv2.COLOR_BGR2RGB))
plt.axis("off")
plt.show()


## run tracker

In [None]:
online_targets, new_ratio = tracker.update(dets, img, img_path, pose)  # [bot_sort.Strack]


In [None]:
print(len(online_targets))
type(online_targets[0])


## run perspective transform


In [None]:
perspective_transform.run(tracker, new_ratio)


## assign temporal global_id to each track for multi-camera tracking

In [None]:
for t in tracker.tracked_stracks:
    t.t_global_id = id_distributor.assign_id()


In [None]:
imgs.append(img)


In [None]:
len(imgs)


### STrack status

In [None]:
for t in tracker.tracked_stracks:
    print(t.is_activated)


In [None]:
tracker.lost_stracks


In [None]:
t.track_id


## pose

In [None]:
from mmpose.apis import inference_topdown


In [None]:
import seaborn
import numpy as np
colors = seaborn.color_palette(n_colors= 80)
colors = np.array(colors)


In [None]:
pose_input = dets[:, :4]
pose_results = inference_topdown(pose, img, pose_input, bbox_format="xyxy")


In [None]:
import matplotlib.pyplot as plt
import cv2
import numpy as np

def visualize_pose_results(results, img):
    img = img.copy()
    for i, result in enumerate(results):
        # Extract predicted keypoints and bboxes
        pred_instances = result.pred_instances
        keypoints = pred_instances.keypoints[0] if pred_instances.keypoints is not None else []
        bboxes = pred_instances.bboxes[0] if pred_instances.bboxes is not None else []

        # Draw the bounding box
        if len(bboxes) == 4:
            x1, y1, x2, y2 = bboxes.astype(int)
            cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2)

        # Draw keypoints
        for kp_index, kp in enumerate(keypoints):
            x, y = kp.astype(int)
            cv2.circle(img, (x, y), 3, (255, 0, 0), -1)
            cv2.putText(img, f"{kp_index}", (x, y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 255), 1)


    # Display the image
    plt.figure(figsize=(12, 8))
    plt.imshow(img)
    plt.title(f"Pose Visualization")
    plt.axis("off")
    plt.show()


In [None]:
import matplotlib.pyplot as plt
import cv2
import numpy as np

def visualize_pose_with_connections(results, img, draw_connect=False):
    # Define the keypoint connections for CrowdPose
    connections = [
        (13, 6), (6, 8), (8, 10),  # Left leg
        (13, 7), (7, 9), (9, 11),  # Right leg
        (13, 1), (1, 3), (3, 5), # Right arm
        (13, 0), (0, 2), (2, 4), # Left arm
        (12, 13),# Neck to head
    ]
    img = img.copy()
    for i, result in enumerate(results):
        # Extract predicted keypoints
        pred_instances = result.pred_instances
        keypoints = pred_instances.keypoints[0] if pred_instances.keypoints is not None else []

        # Draw keypoints
        for kp in keypoints:
            x, y = kp[:2].astype(int)
            cv2.circle(img, (x, y), 5, (255, 0, 0), -1)

        if draw_connect:
            # Draw connections
            for start, end in connections:
                if len(keypoints) > max(start, end):  # Ensure indices are valid
                    x1, y1 = keypoints[start, :2].astype(int)
                    x2, y2 = keypoints[end, :2].astype(int)
                    cv2.line(img, (x1, y1), (x2, y2), (0, 255, 0), 2)

    # Display the image
    plt.figure(figsize=(8, 6))
    plt.imshow(img)
    plt.title(f"Pose Visualization {i+1}")
    plt.axis("off")
    plt.show()


In [None]:
def visualize(dets, img, colors, pose, pose_result, cur_frame):
    m = 2
    if len(dets) == 0:
        return img

    # keypoints = [p['keypoints'][:,:2] for p in pose_result]
    # scores = [p['keypoints'][:,2] for p in pose_result]
    keypoints = pose_result[0].pred_instances.keypoints
    scores = pose_results[0].pred_instances.keypoint_scores
    img = visualize_kpt(img, keypoints, scores, thr=0.3)
            
    for obj in dets:
        score = obj[4]
        track_id = int(obj[5])
        # len_feats = ' ' if obj[6] == 50 else obj[6]
        len_feats = ' '
        x0, y0, x1, y1 = int(obj[0]), int(obj[1]), int(obj[2]), int(obj[3])

        color = (colors[track_id%80] * 255).astype(np.uint8).tolist()
        text = '{} : {:.1f}% | {}'.format(track_id, score * 100, len_feats)
        txt_color = (0, 0, 0) if np.mean(colors[track_id%80]) > 0.5 else (255, 255, 255)
        font = cv2.FONT_HERSHEY_SIMPLEX

        txt_size = cv2.getTextSize(text, font, 0.4*m, 1*m)[0]
        cv2.rectangle(img, (x0, y0), (x1, y1), color, 2)

        txt_bk_color = (colors[track_id%80] * 255 * 0.7).astype(np.uint8).tolist()
        cv2.rectangle(
            img,
            (x0, y0 - 1),
            (x0 + txt_size[0] + 1, y0 - int(1.5*txt_size[1])),
            txt_bk_color,
            -1
        )
        cv2.putText(img, text, (x0, y0 - txt_size[1]), font, 0.4*m, txt_color, thickness=1*m)
    
    return img

def visualize_kpt(img,
              keypoints,
              scores,
              thr=0.3) -> np.ndarray:

    skeleton = [
        [12, 13], [13, 0], [13, 1], [0, 1], [6, 7], [0, 2], [2, 4], 
        [1, 3], [3, 5], [0, 6], [1, 7], [6, 8], [8, 10], [7, 9], [9, 11]
    ]
    palette = [[51, 153, 255], [0, 255, 0], [255, 128, 0], [255, 255, 255],
               [255, 153, 255], [102, 178, 255], [255, 51, 51]]
    link_color = [3, 3, 3, 0, 1, 0, 0, 0, 0, 0, 0, 2, 2, 2, 2]
    point_color = [0, 0, 0, 0, 0, 0, 1, 1, 2, 2, 2, 2, 3, 3]

    # draw keypoints and skeleton
    for kpts, score in zip(keypoints, scores):
        for kpt, color in zip(kpts, point_color):
            cv2.circle(img, tuple(kpt.astype(np.int32)), 2, palette[color], 2,
                       cv2.LINE_AA)
        for (u, v), color in zip(skeleton, link_color):
            if score[u] > thr and score[v] > thr:
                cv2.line(img, tuple(kpts[u].astype(np.int32)),
                         tuple(kpts[v].astype(np.int32)), palette[color], 1,
                         cv2.LINE_AA)

    return img


In [None]:
vis_img = visualize(dets, img, colors, pose, pose_results, cur_frame)


In [None]:
visualize_pose_with_connections(pose_results, cv2.cvtColor(vis_img, cv2.COLOR_BGR2RGB), draw_connect=True)


---


# second, run multi-camera tracker using above trackers results


In [None]:
groups = clustering.update(trackers, cur_frame, scene)


In [None]:
from itertools import combinations

list(combinations([1,2,3], 2))


In [None]:
mc_tracker.update(trackers, groups)


In [None]:
clustering.update_using_mctracker(trackers, mc_tracker)


# third, run cluster self-refinements


In [None]:
print(cur_frame)
if cur_frame % 5 == 0:
    mc_tracker.refinement_clusters()


# update result lists using updated trackers
        

In [None]:
def update_result_lists_testset(trackers, result_lists, frame_id, cam_ids, scene):
    results_frame = [[] for i in range(len(result_lists))]
    results_frame_feat = []
    for tracker, result_frame, result_list, cam_id in zip(trackers, results_frame, result_lists, cam_ids):
        for track in tracker.tracked_stracks:
            if track.global_id < 0: continue
            result = {
                'cam_id': int(cam_id),
                'frame_id': frame_id,
                'track_id': track.global_id,
                'sct_track_id': track.track_id,
                'tlwh': list(map(lambda x: int(x), track.tlwh.tolist())),
                '2d_coord': track.location[0].tolist()
            }
            result_ = list(result.values())
            result_list.append(result)


In [None]:
cam_ids = [9, 10]


In [None]:
update_result_lists_testset(trackers, results_lists, cur_frame, cam_ids, scene)
        

In [None]:
print(f"video frame ({cur_frame}/{total_frames})")
cur_frame += 1


In [None]:
results_lists


In [None]:
def visualize_kpt(img,
              keypoints,
              scores,
              thr=0.3) -> np.ndarray:

    skeleton = [
        [12, 13], [13, 0], [13, 1], [0, 1], [6, 7], [0, 2], [2, 4], 
        [1, 3], [3, 5], [0, 6], [1, 7], [6, 8], [8, 10], [7, 9], [9, 11]
    ]
    palette = [[51, 153, 255], [0, 255, 0], [255, 128, 0], [255, 255, 255],
               [255, 153, 255], [102, 178, 255], [255, 51, 51]]
    link_color = [3, 3, 3, 0, 1, 0, 0, 0, 0, 0, 0, 2, 2, 2, 2]
    point_color = [0, 0, 0, 0, 0, 0, 1, 1, 2, 2, 2, 2, 3, 3]

    # draw keypoints and skeleton
    for kpts, score in zip(keypoints, scores):
        for kpt, color in zip(kpts, point_color):
            cv2.circle(img, tuple(kpt.astype(np.int32)), 2, palette[color], 2,
                       cv2.LINE_AA)
        for (u, v), color in zip(skeleton, link_color):
            if score[u] > thr and score[v] > thr:
                cv2.line(img, tuple(kpts[u].astype(np.int32)),
                         tuple(kpts[v].astype(np.int32)), palette[color], 1,
                         cv2.LINE_AA)

    return img

def visualize(dets, img, colors, pose_result=None):
    m = 2
    if len(dets) == 0:
        return img
    if pose_result is not None:
        keypoints = [p['keypoints'][:,:2] for p in pose_result]
        scores = [p['keypoints'][:,2] for p in pose_result]
        img = visualize_kpt(img, keypoints, scores, thr=0.3)
            
    for obj in dets:
        score = obj[4]
        track_id = int(obj[5])
        len_feats = ' ' if obj[6] == 50 else obj[6]
        x0, y0, x1, y1 = int(obj[0]), int(obj[1]), int(obj[2]), int(obj[3])

        color = (colors[track_id%80] * 255).astype(np.uint8).tolist()
        text = '{} : {:.1f}% | {}'.format(track_id, score * 100, len_feats)
        txt_color = (0, 0, 0) if np.mean(colors[track_id%80]) > 0.5 else (255, 255, 255)
        font = cv2.FONT_HERSHEY_SIMPLEX

        txt_size = cv2.getTextSize(text, font, 0.4*m, 1*m)[0]
        cv2.rectangle(img, (x0, y0), (x1, y1), color, 2)

        txt_bk_color = (colors[track_id%80] * 255 * 0.7).astype(np.uint8).tolist()
        cv2.rectangle(
            img,
            (x0, y0 - 1),
            (x0 + txt_size[0] + 1, y0 - int(1.5*txt_size[1])),
            txt_bk_color,
            -1
        )
        cv2.putText(img, text, (x0, y0 - txt_size[1]), font, 0.4*m, txt_color, thickness=1*m)
    
    return img


In [None]:
gid_2_lenfeats = {}
for track in mc_tracker.tracked_mtracks + mc_tracker.lost_mtracks:
    if track.is_activated:
        gid_2_lenfeats[track.track_id] = len(track.features)
    else:
        gid_2_lenfeats[-2] = len(track.features)


In [None]:
results_imgs= []
for tracker, img in zip(trackers, imgs):
    outputs = [t.tlbr.tolist() + [t.score, t.global_id, gid_2_lenfeats.get(t.global_id, -1)] for t in tracker.tracked_stracks]
    img = visualize(outputs, img, colors)
    results_imgs.append(img)


In [None]:
res = np.vstack(results_imgs)
plt.figure(figsize=(6, 8))
plt.imshow(cv2.cvtColor(res, cv2.COLOR_BGR2RGB))
plt.axis("off")
plt.show()
