In [1]:
import cv2
import numpy as np
import os
import shutil
import torch

from ultralytics import YOLO
import supervision as sv

from mmpose.apis import init_pose_model
from mmpose.datasets import DatasetInfo

from video_inference_utils import infer_keypoint, apply_smoothnet_to_2d_seq, overlay_keypoints_on_video, output_keypoints_to_json



apex is not installed
apex is not installed
apex is not installed




In [None]:
# Initialize the models

# YOLO model for human detection
yolo_model = YOLO("weights/yolo11l.pt")

# Pose Estimation model for keypoints detection in human
cfg_file = "ViTPose/configs/body/2d_kpt_sview_rgb_img/topdown_heatmap/coco/ViTPose_large_coco_256x192.py"
ckpt_file = "weights/vitpose-l.pth"

device = "cuda" if torch.cuda.is_available() else "cpu"
pose_model = init_pose_model(cfg_file, ckpt_file, device=device)
dataset_name = pose_model.cfg.data.test.type
dataset_info = DatasetInfo(pose_model.cfg.data.test.dataset_info)

# SmoothNet model for smoothening the keypoints predictions between frames
smoothnet_config = 'SmoothNet/configs/h36m_fcn_3D.yaml'
smoothnet_checkpoint = 'weights/checkpoint_32.pth.tar'


Use load_from_local loader


### Video inference

In [None]:
video_path = 'video_clips/shoplifting1.MP4' # enter the video input path
output_dir = 'video_output'

os.makedirs(output_dir,exist_ok=True)
output_path = os.path.join(output_dir,video_path.split('/')[-1].split('.')[0]+'_track.mp4')

# get video information
cap = cv2.VideoCapture(video_path)
width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

# configuration of the tracker
tracker_yaml=r'bytetrack.yaml'

track_sessions = {} # {tracker_id: sessions}
# where sessions = {} with key: frame_ind_start, item: (frame_ind_end, smoothened_kpts)

track_curr_start = {} # {tracker_id: frame_ind_start}

track_kpts ={} # {tracker_id: kpts}
# where kpts = [] with shape = (T,J,2) 
#  T: # frames, J: #  keypoints
# '2': for for 'x' and 'y' in the dimension of [x,y] corrdinate

track_patience = {} # {tracker_id: patience}
patience0 = 4


# YOLO inference with tracking and Keypoints inference by ViTPose (Smoothened by SmoothNet)
frame_ind = 0 
for result in yolo_model.track(source=video_path, tracker=tracker_yaml, conf=0.5,iou=0.65,stream=True,device=device,verbose=False):
    
    frame = result.orig_img
    detections = sv.Detections.from_ultralytics(result)

    if result.boxes.id is not None:
        
        detections.tracker_id=result.boxes.id.cpu().numpy().astype(int)
        detections = detections[detections.class_id == 0] # get only the person class detections

        pose_results = infer_keypoint(frame, detections, pose_model=pose_model, dataset_name=dataset_name, dataset_info=dataset_info)

        for ind in range(len(detections)):
            tracker_id = detections.tracker_id[ind]

            if tracker_id not in track_sessions:
                track_sessions[tracker_id] = {}
                track_curr_start[tracker_id] = None
                track_kpts[tracker_id] = []
                track_patience[tracker_id] = patience0
            
            kpts_2d = pose_results[ind]["keypoints"][:, :2]  # (J, 2)
            track_kpts[tracker_id].append(kpts_2d)
            if track_curr_start[tracker_id] is None: track_curr_start[tracker_id] = frame_ind
            if track_patience[tracker_id] < patience0: track_patience[tracker_id] = patience0

        tracker_ids = set(detections.tracker_id)
    else:
        tracker_ids = set()
    
    for tracker_id in track_sessions:

        if tracker_id not in tracker_ids and track_kpts[tracker_id]:

            if track_patience[tracker_id]>0:

                track_patience[tracker_id] -= 1
                track_kpts[tracker_id].append(track_kpts[tracker_id][-1])

            elif track_patience[tracker_id]==0:

                all_kpts = np.stack(track_kpts[tracker_id], axis=0)  # (T, J, 2)

                smoothened_kpts = apply_smoothnet_to_2d_seq(
                    all_kpts,
                    smoothnet_config,
                    smoothnet_checkpoint,
                    image_shape=(height, width)
                )
                frame_ind_start = track_curr_start[tracker_id]
                track_sessions[tracker_id][frame_ind_start] = (frame_ind-1,smoothened_kpts)
                
                # empty track_kpts[tracker_id] 
                # and reset track_curr_start and patience for the new session
                track_kpts[tracker_id] = [] 
                track_curr_start[tracker_id] = None
                track_patience[tracker_id] = patience0
    
    frame_ind+=1

for tracker_id in track_sessions:

    if track_kpts[tracker_id]:

        all_kpts = np.stack(track_kpts[tracker_id], axis=0)  # (T, J, 2)

        smoothened_kpts = apply_smoothnet_to_2d_seq(
            all_kpts,
            smoothnet_config,
            smoothnet_checkpoint,
            image_shape=(height, width)
        )
        frame_ind_start = track_curr_start[tracker_id]
        track_sessions[tracker_id][frame_ind_start] = (frame_ind-1,smoothened_kpts)

cap.release()

print('Finished inference on {}, \nplease go ahead to output the inferred keypoint results and video.'.format(
    video_path.split('/')[-1]))

Finish inferece on shoplifting1.MP4, 
please go ahead to output the inferred keypoint results and video.


### Draw the output video and save the inferred keypoints

In [None]:

# ===================== Drawing the keypoints for each tracked person in the video =====================

# # if want to load the save keypoints from previous sessions, uncomment the following few lines...
# npz_path = 'video_output\\Normal_Videos314_x264.npz'# change the npz path here for loading the file
# loaded = np.load(npz_path,allow_pickle=True)
# video_path = loaded['video_path'].flatten()[0]
# track_sessions = loaded['track_sessions'].flatten()[0]
# output_path = loaded['output_path'].flatten()[0]

colors = [
    (0, 255, 0),       # Green
    (255, 165, 0),     # Orange
    (0, 0, 255),       # Blue
    (0, 255, 255),     # Cyan
    (255, 255, 0),     # Yellow
    (255, 0, 255),     # Magenta
    (255, 0, 0),       # Red
    (0, 0, 0),         # Black
    (255, 255, 255),   # White
    (128, 128, 128),   # Gray
] # rgb
color_map = {cc: colors[color_i] for color_i, cc in enumerate(list(track_sessions.keys()))} # {tracker_id: color}
overlay_keypoints_on_video(
    video_path=video_path,
    track_sessions=track_sessions,
    output_path=output_path,
    color_map=color_map
)

# ===================== Save the smoothened keypoints for plotting in videos ===================== 

video_name = video_path.split('/')[-1].split('.')[0]
npz_path = os.path.join(output_dir,video_name+'.npz')
np.savez(npz_path,video_path=video_path,track_sessions=track_sessions,output_path=output_path)

# ===================== Create the json file for the inference by STG-NF model ===================== 

video_name_map = {
    'Normal_Videos313_x264':'07',
    'Normal_Videos314_x264':'08',
    'shoplifting1':'09',
    'shoplifting2':'10'
} # to match the STG-NF model format

json_dir = os.path.join(output_dir,video_name,video_name_map[video_name])
os.makedirs(os.path.join(output_dir,video_name),exist_ok=True)
output_keypoints_to_json(track_sessions, output_dir=json_dir)

# copy the json files to the STG-NF data directory
jsons = [os.path.join(output_dir,video_name,js) for js in os.listdir(os.path.join(output_dir,video_name)) if js.endswith('.json')]
for js in jsons:
    shutil.copy(js,os.path.join('STG_NF','data','PoseLift','pose','test'))


Saved video with smoothed keypoints to: video_output\shoplifting1_track.mp4
