## 2D PIPELINE TEST

In [1]:
import os
os.chdir('/mnt/nas_siddharth/code_final/')

import cv2
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm

from src.pipeline.step1_proc2d import init_all_models
from mmcv.transforms import Compose

DEVICE = 'cuda:0'

detector, tracker, pose_model, test_pipeline, id_model_all = init_all_models(DEVICE)

06/09 02:04:21 - mmengine - [4m[97mINFO[0m - [INFO] Initializing models on device: cuda:0
Loads checkpoint by local backend from path: ./model/detection/detection.pth
Loads checkpoint by local backend from path: ./model/pose/pose.pth
Loads checkpoint by local backend from path: ./model/id/id_finetuned.pth
06/09 02:04:39 - mmengine - [4m[97mINFO[0m - [INFO] All models initialized.


In [2]:
test_pipeline = Compose([
    {"type": "mmdet.LoadImageFromNDArray"},
    {"type": "Resize", "scale": (800, 800), "keep_ratio": True},
    {"type": "mmdet.LoadAnnotations", "with_bbox": True},
    {"type": "mmdet.PackDetInputs"},
])

In [3]:
video_path = '/mnt/nas_siddharth/code_final/notebooks/video/test_video.mp4'
output_path = '/mnt/nas_siddharth/code_final/notebooks/video/test_video_inference.mp4'

In [5]:
skeleton_pairs = [
    (0, 1),   # nose - left eye
    (0, 2),   # nose - right eye
    (1, 3),   # left eye - left ear
    (2, 4),   # right eye - right ear
    (3, 4),   # left ear - right ear
    #(0, 17),  # nose - neck
    (3, 17),  # left ear - neck
    (4, 17),  # right ear - neck
    (17, 5),  # neck - left shoulder
    (17, 6),  # neck - right shoulder
    (5, 6),   # left shoulder - right shoulder
    (17, 11), # neck - left hip
    (17, 12), # neck - right hip
    (11, 12), # left hip - right hip
    (5, 7),   # left shoulder - left elbow
    (7, 9),   # left elbow - left wrist
    (6, 8),   # right shoulder - right elbow
    (8, 10),  # right elbow - right wrist
    (11, 13), # left hip - left knee
    (13, 15), # left knee - left ankle
    (12, 14), # right hip - right knee
    (14, 16), # right knee - right ankle
    # Diagonals for torso
    #(5, 11),  # left shoulder - left hip
    #(6, 12),  # right shoulder - right hip
    #(5, 12),  # left shoulder - right hip
    #(6, 11),  # right shoulder - left hip
    ]

def draw_kps_cv2(img, kpts, skeleton_pairs, clr=(0,255,0)):
    for i, (x, y, score) in enumerate(kpts):
        if score > 0.2:
            cv2.circle(img, (int(x), int(y)), 2, clr, -1)
    for i1, i2 in skeleton_pairs:
        if kpts[i1][2]>0.2 and kpts[i2][2]>0.2:
            pt1 = (int(kpts[i1][0]), int(kpts[i1][1]))
            pt2 = (int(kpts[i2][0]), int(kpts[i2][1]))
            cv2.line(img, pt1, pt2, clr, 1)
    return img

def detect_and_pose_on_image(img, detector, pose_model, test_pipeline):
    from mmdet.apis import inference_detector
    det_result = inference_detector(detector, [img], test_pipeline=test_pipeline)[0]
    bboxes = det_result.pred_instances.bboxes.cpu().numpy()
    scores = det_result.pred_instances.scores.cpu().numpy()
    score_thr = 0.3
    keep = scores > score_thr
    bboxes = bboxes[keep]
    person_results = [{"bbox": bbox} for bbox in bboxes]
    from mmpose.apis import inference_topdown
    if len(person_results) > 0:
        pose_results = inference_topdown(
            pose_model, img,
            bboxes=np.array([r["bbox"] for r in person_results], dtype=np.float32),
            bbox_format="xyxy",
        )
    else:
        pose_results = []
    return pose_results

cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
w, h = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
nframes = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

save_out = True
if save_out:
    out_writer = cv2.VideoWriter(
        output_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h)
    )


In [6]:
start_sec = 44
end_sec   = 54

fps = 24
start_frame = int(start_sec * fps)
end_frame = int(end_sec * fps)
nframes = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
end_frame = min(end_frame, nframes)

In [8]:
fps = cap.get(cv2.CAP_PROP_FPS)
nframes = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"FPS: {fps}, total frames: {nframes}")
print(f"{start_sec}s to {end_sec}s (frames {start_frame} to {end_frame})")

FPS: 25.0, total frames: 10000
44s to 54s (frames 1056 to 1296)


In [None]:
sample_frames = []
frame_idxs_to_show = [0, nframes//2, nframes-1]  # first, middle, last

cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
for i in tqdm(range(start_frame, end_frame), desc="Processing selected frames"):
    ret, frame = cap.read()
    if not ret:
        break

    img_vis = frame.copy()
    pose_results = detect_and_pose_on_image(frame, detector, pose_model, test_pipeline)
    for res in pose_results:
        kpts = res.pred_instances.keypoints[0]
        scores = res.pred_instances.keypoint_scores[0]
        if not (np.isnan(kpts[5]).any() or np.isnan(kpts[6]).any()):
            neck_xy = (kpts[5] + kpts[6]) / 2.0
            neck_score = (scores[5] + scores[6]) / 2.0
        else:
            neck_xy = np.array([np.nan, np.nan])
            neck_score = 0.0
        kpts18 = np.vstack([kpts, neck_xy])
        scores18 = np.concatenate([scores, [neck_score]])
        kpts_vis = np.concatenate([kpts18, scores18[:, None]], axis=1)
        img_vis = draw_kps_cv2(img_vis, kpts_vis, skeleton_pairs)

    if save_out:
        out_writer.write(img_vis)
    if i in frame_idxs_to_show:
        sample_frames.append(cv2.cvtColor(img_vis, cv2.COLOR_BGR2RGB))

cap.release()
if save_out:
    out_writer.release()

for idx, img_rgb in zip(frame_idxs_to_show, sample_frames):
    plt.figure(figsize=(8,8))
    plt.title(f"Frame {idx}")
    plt.imshow(img_rgb)
    plt.axis('off')
    plt.show()

Processing selected frames:   0%|          | 0/240 [00:00<?, ?it/s]



Processing selected frames: 100%|██████████| 240/240 [01:08<00:00,  3.52it/s]
