# Pose estimation

This is a slight modification of the earlier pose estimation code to run on ShanghaiTech - it saves the outputs at intermediates incase of kernel crashes on the larger dataset.

For the written work this was run in Google Colab, and advised to repeat there if you would like to verify the pose estimation.

The pose estimation can be skipped by using the pre-extracted poses as outlined in the README.

In [None]:
from glob import glob

# incase of kernel crashes when running the pose estimator
done = glob('data/shanghaitech/tracked_poses_l_shanghai_*.csv')
done = []

In [None]:
done = [[d[-17:-4] for d in done]]

In [None]:
import os
import cv2
import numpy as np
import pandas as pd
from tqdm import tqdm

from ultralytics import YOLO
from src.sortalg import Sort

model = YOLO(model = YOLO("yolo11l-pose.pt"))

results = []

video_dir = "../shanghaitech/training/videos"

for video_name in tqdm(os.listdir(video_dir)):
    results = []

    tracker = Sort(max_age=10)

    if '_frames':
        print(video_name)

        frame_dir = os.path.join(video_dir, video_name)
        if not os.path.isdir(frame_dir):
            continue

        frame_files = sorted(os.listdir(frame_dir))

        for frame_idx, frame_file in enumerate(frame_files):
            frame_path = os.path.join(frame_dir, frame_file)
            frame = cv2.imread(frame_path)

            pred = model.predict(source=frame, conf=0.1, save=False, verbose=False)
            keypoints = pred[0].keypoints  
            boxes = pred[0].boxes.xyxy.cpu().numpy()  
            scores = pred[0].boxes.conf.cpu().numpy()  

            if keypoints is None or len(keypoints) == 0:
                continue

            detections = []
            for i in range(len(boxes)):
                x1, y1, x2, y2 = boxes[i]
                detections.append([x1, y1, x2, y2, scores[i]])
            detections = np.array(detections)

            tracked = tracker.update(detections)

            for i, (x1, y1, x2, y2, track_id) in enumerate(tracked):
                cx = (x1 + x2) / 2
                cy = (y1 + y2) / 2
                matched_idx = np.argmin([
                    np.linalg.norm([(b[0] + b[2]) / 2 - cx, (b[1] + b[3]) / 2 - cy])
                    for b in boxes
                ])

                kp = keypoints.data[matched_idx].cpu().numpy()
                joints_flat = kp[:, :2].flatten()

                row = {
                    "video": video_name,
                    "frameID": frame_idx,
                    "personID": int(track_id),
                    "bbox_x1": x1,
                    "bbox_y1": y1,
                    "bbox_x2": x2,
                    "bbox_y2": y2
                }
                for j in range(17):
                    row[f"joint{j+1}x"] = joints_flat[2 * j]
                    row[f"joint{j+1}y"] = joints_flat[2 * j + 1]
                results.append(row)


        # Final save of all results
        df = pd.DataFrame(results)
        print(df)
        df.to_csv(f"data/shanghaitech/tracked_poses_l_shanghai_{video_name}.csv", index=False)

# Test

In [None]:
from glob import glob

test_frames_dir = "data/shanghaitech/testing/frames/"

done = glob('data/shanghaitech/test_poses_shanghai_l_*.csv')
done = [d[-11:-4] for d in done]
done

In [None]:
import os
import cv2
import numpy as np
import pandas as pd
from tqdm import tqdm

from ultralytics import YOLO
from src.sortalg import Sort

model = YOLO(model = YOLO("yolo11l-pose.pt"))

results = []

video_dir = test_frames_dir
for video_name in tqdm(os.listdir(video_dir)):
    results = []

    tracker = Sort(max_age=10)

    if video_name not in done:
        print(video_name)
        frame_dir = os.path.join(video_dir, video_name)
        if not os.path.isdir(frame_dir):
            continue

        frame_files = sorted(os.listdir(frame_dir))

        for frame_idx, frame_file in enumerate(frame_files):
            frame_path = os.path.join(frame_dir, frame_file)
            frame = cv2.imread(frame_path)

            pred = model.predict(source=frame, conf=0.1, save=False, verbose=False)
            keypoints = pred[0].keypoints
            boxes = pred[0].boxes.xyxy.cpu().numpy() 
            scores = pred[0].boxes.conf.cpu().numpy()

            if keypoints is None or len(keypoints) == 0:
                continue

            detections = []
            for i in range(len(boxes)):
                x1, y1, x2, y2 = boxes[i]
                detections.append([x1, y1, x2, y2, scores[i]])
            detections = np.array(detections)

            tracked = tracker.update(detections)

            for i, (x1, y1, x2, y2, track_id) in enumerate(tracked):
                cx = (x1 + x2) / 2
                cy = (y1 + y2) / 2
                matched_idx = np.argmin([
                    np.linalg.norm([(b[0] + b[2]) / 2 - cx, (b[1] + b[3]) / 2 - cy])
                    for b in boxes
                ])

                kp = keypoints.data[matched_idx].cpu().numpy()
                joints_flat = kp[:, :2].flatten()

                row = {
                    "video": video_name,
                    "frameID": frame_idx,
                    "personID": int(track_id),
                    "bbox_x1": x1,
                    "bbox_y1": y1,
                    "bbox_x2": x2,
                    "bbox_y2": y2
                }
                for j in range(17):
                    row[f"joint{j+1}x"] = joints_flat[2 * j]
                    row[f"joint{j+1}y"] = joints_flat[2 * j + 1]
                results.append(row)

        # Final save of all results
        df = pd.DataFrame(results)
        df.to_csv(f"/content/drive/My Drive/shanghaitech/test_poses_shanghai_l_{video_name}.csv", index=False)

# Combine extracted poses

The code below combines the extracted poses into single CSVs like the ones provided by default in the repo.

In [None]:
dfs = []
for f in glob('data/shanghaitech/tracked_poses_l*_frames.csv'):
  try:
    dfs.append(pd.read_csv(f))
  except:
    print(f)

df = pd.concat(dfs)
del dfs

df.to_csv('./data/stc-train_tracked_poses_l.csv')

In [None]:
dfs = []
for f in glob('data/shanghaitech/test_poses_shanghai_l*.csv'):
  try:
    dfs.append(pd.read_csv(f))
  except:
    print(f)

df = pd.concat(dfs)
del dfs

df.to_csv('./data/stc-test_tracked_poses_l.csv')