In [3]:
import os
import sys
import tempfile
import warnings
import cv2
import pickle
import json
from argparse import ArgumentParser

from mmpose.apis import (inference_top_down_pose_model, init_pose_model,
                         process_mmdet_results, vis_pose_result)
from mmpose.datasets import DatasetInfo

try:
    from mmdet.apis.inference import inference_detector, init_detector
    has_mmdet = True
except (ImportError, ModuleNotFoundError):
    has_mmdet = False
    print('Import error')

from tqdm.notebook import tqdm


  check_for_updates()


## Model config

In [2]:
det_config = "ViTPose/demo/mmdetection_cfg/faster_rcnn_r50_fpn_coco.py" 
det_checkpoint = "ViTPose/checkpoints/faster_rcnn_r50_fpn_1x_coco_20200130-047c8118.pth"

# ViTPose Large
#pose_config = "ViTPose/configs/body/2d_kpt_sview_rgb_img/topdown_heatmap/coco/ViTPose_large_coco_256x192.py"  
#pose_checkpoint = "ViTPose/checkpoints/vitpose-l.pth"

# ViTPose++ Base
pose_config = "ViTPose/configs/body/2d_kpt_sview_rgb_img/topdown_heatmap/coco/vitPose+_base_coco+aic+mpii+ap10k+apt36k+wholebody_256x192_udp.py"  
pose_checkpoint = "ViTPose/checkpoints/vitpose++_base.pth"

device = "cuda:0"

det_cat_id = 1 # Category id for bounding box detection model
bbox_thr = 0.5 # Bounding box score threshold
kpt_thr = 0.5 # Keypoint score threshold
radius = 4 # Keypoint radius for visualization
thickness = 1 # Link thickness for visualization

""" det_model = init_detector(det_config, det_checkpoint, device=device.lower())

# build the pose model from a config file and a checkpoint file
pose_model = init_pose_model(pose_config, pose_checkpoint, device=device.lower())

dataset = pose_model.cfg.data['test']['type']
dataset_info = pose_model.cfg.data['test'].get('dataset_info', None)
if dataset_info is None:
    warnings.warn(
        'Please set `dataset_info` in the config.'
        'Check https://github.com/open-mmlab/mmpose/pull/663 for details.',
        DeprecationWarning)
else:
    dataset_info = DatasetInfo(dataset_info)
 """

# Init detector
det_model = init_detector(det_config, det_checkpoint, device=device.lower())

# Init pose model
pose_model = init_pose_model(pose_config, pose_checkpoint, device=device.lower())

# === Dataset meta (MMPose >= 1.0) ===
if hasattr(pose_model, "dataset_meta") and pose_model.dataset_meta is not None:
    dataset_info = DatasetInfo(pose_model.dataset_meta)
else:
    dataset_info = None


load checkpoint from local path: ViTPose/checkpoints/faster_rcnn_r50_fpn_1x_coco_20200130-047c8118.pth


KeyError: 'TopDownMoE is not in the models registry'

## Run model on frames

In [None]:
def save_to_openpose(json_file_path, keypoints, scores):
    '''
    Save the keypoints and scores to a JSON file in the OpenPose format

    INPUTS:
    - json_file_path: Path to save the JSON file
    - keypoints: Detected keypoints
    - scores: Confidence scores for each keypoint

    OUTPUTS:
    - JSON file with the detected keypoints and confidence scores in the OpenPose format
    '''

    # Prepare keypoints with confidence scores for JSON output
    nb_detections = len(keypoints)
    # print('results: ', keypoints, scores)
    detections = []
    for i in range(nb_detections): # nb of detected people
        keypoints_with_confidence_i = []
        for kp, score in zip(keypoints[i], scores[i]):
            keypoints_with_confidence_i.extend([kp[0].item(), kp[1].item(), score.item()])
        detections.append({
                    "person_id": [-1],
                    "pose_keypoints_2d": keypoints_with_confidence_i,
                    "face_keypoints_2d": [],
                    "hand_left_keypoints_2d": [],
                    "hand_right_keypoints_2d": [],
                    "pose_keypoints_3d": [],
                    "face_keypoints_3d": [],
                    "hand_left_keypoints_3d": [],
                    "hand_right_keypoints_3d": []
                })
            
    # Create JSON output structure
    json_output = {"version": 1.3, "people": detections}
    
    # Save JSON output for each frame
    json_output_dir = os.path.abspath(os.path.join(json_file_path, '..'))
    if not os.path.isdir(json_output_dir): os.makedirs(json_output_dir)
    with open(json_file_path, 'w') as json_file:
        json.dump(json_output, json_file)


def run_rotate(det_model, img_name, frame_names, out_img_root, keypoint_pkl_path, pose_dir):
    drawn_frames = []
    all_keypoints = {}  # frame_name → list of person dicts
    frame_paths = [os.path.join(img_name, frame_name) for frame_name in frame_names]

    for frame_idx, image_file in tqdm(enumerate(frame_paths)):
        out_file = os.path.join(out_img_root, f'vis_{frame_names[frame_idx]}')

        # rotate to portrait
        """ img = cv2.imread(image_file)
        rotated_img = cv2.rotate(img, cv2.ROTATE_90_COUNTERCLOCKWISE)

        tmp_fd, tmp_img_path = tempfile.mkstemp(suffix=".png")
        os.close(tmp_fd)
        cv2.imwrite(tmp_img_path, rotated_img) """

        mmdet_results = inference_detector(det_model, image_file)
        person_results = process_mmdet_results(mmdet_results, det_cat_id)

        for person in person_results:
            person['bbox'][2] = max(person['bbox'][2], 1)  # éviter w=0
            person['bbox'][3] = max(person['bbox'][3], 1)  # éviter h=0
            person['dataset_idx'] = 0  # <-- hack pour ViTPose++

        pose_results, returned_outputs = inference_top_down_pose_model(
            pose_model,
            image_file,
            person_results,
            bbox_thr=bbox_thr,
            format='xyxy',
            dataset=dataset,
            dataset_info=dataset_info,
            return_heatmap=False,
            outputs=None)

        keypoints = [r['keypoints'][:, :2] for r in pose_results]
        scores = [r['keypoints'][:, 2] for r in pose_results]

        json_output_dir = os.path.join(pose_dir, f'{os.path.basename(img_name)}_json')
        json_file_path = os.path.join(json_output_dir, f"{os.path.splitext(os.path.basename(image_file))[0]}_{frame_idx:06d}.json")
        
        save_to_openpose(json_file_path, keypoints, scores)

        # Collect keypoints with person ID
        keypoints_per_frame = []
        for pid, result in enumerate(pose_results):
            keypoints_per_frame.append({
                "id": pid,
                "bbox": result['bbox'],  # [x1, y1, x2, y2]
                "keypoints": result['keypoints'].tolist()
            })

        all_keypoints[frame_names[frame_idx]] = keypoints_per_frame

        vis_pose_result(
            pose_model,
            image_file,
            pose_results,
            dataset=dataset,
            dataset_info=dataset_info,
            kpt_score_thr=kpt_thr,
            radius=radius,
            thickness=thickness,
            show=False,
            out_file=out_file)

        drawn_frame = cv2.imread(out_file)
        drawn_frames.append(drawn_frame)

        #os.remove(tmp_img_path)

    # Save all keypoints as .pkl
    with open(keypoint_pkl_path, 'wb') as f:
        pickle.dump(all_keypoints, f)

    return drawn_frames


## Convert results to video

In [22]:
def convert_to_video(drawn_frames, out_img_root, img_name):
    output_video_path = os.path.join(out_img_root, img_name.split('/')[-1].split('.')[0] + '_vit.mp4')
    height, width, _ = drawn_frames[0].shape

    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    video_writer = cv2.VideoWriter(output_video_path, fourcc, fps=120.0, frameSize=(width, height))

    for frame in drawn_frames:
        video_writer.write(frame)

    video_writer.release()

def convert_imfolder_to_video(images_path, video_path):
    frame0 = cv2.imread(os.path.join(images_path, os.listdir(images_path)[0]))
    height, width, _ = frame0.shape

    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    video_writer = cv2.VideoWriter(video_path, fourcc, fps=120.0, frameSize=(width, height))

    image_names = sorted(os.listdir(images_path), key=lambda x: int(str(x).split('.')[0][-5:]))

    for file in image_names:
        frame = cv2.imread(os.path.join(images_path, file))
        frame = cv2.resize(frame, (width, height))
        video_writer.write(frame)

    video_writer.release()

In [23]:
path_im = '/mnt/D494C4CF94C4B4F0/Trampoline_avril2025/Images_trampo_avril2025/20250429_vitL'
video_path_root = '/mnt/D494C4CF94C4B4F0/Trampoline_avril2025/Results_video/20250429_vitL'
if not os.path.isdir(video_path_root):
    os.makedirs(video_path_root)

for seq in tqdm(sorted(os.listdir(path_im))):
    video_path = os.path.join(video_path_root, seq)
    if not os.path.isfile(video_path+'.mp4'):
        convert_imfolder_to_video(os.path.join(path_im, seq), video_path+'.mp4')

  0%|          | 0/713 [00:00<?, ?it/s]

KeyboardInterrupt: 

## Data config

In [None]:
path_root = "/mnt/D494C4CF94C4B4F0/Trampoline_avril2025/Images_trampo_avril2025/20250429"
out_root = path_root + "_vit++B"
keypoint_path = path_root+ "_keypts"
pose_dir = path_root + "_pose"

os.makedirs(out_root, exist_ok=True)
#os.makedirs(keypoint_path, exist_ok=True)
os.makedirs(pose_dir, exist_ok=True)

files = os.listdir(path_root)
sorted_files = sorted(files)

for i in tqdm(sorted_files):
    print(i)
    if i + '_vis' not in os.listdir(out_root):
        img_name = os.path.join(path_root, i)
        out_img_root = os.path.join(out_root, i + '_vis')
        frame_names = os.listdir(img_name)
        frame_names.sort(key=lambda x: int(x.split('.')[0].split('_')[1]))
        
        drawn_frames = run_rotate(det_model, img_name, frame_names, out_img_root, keypoint_path+'/'+i+'.pkl', pose_dir)
        #convert_to_video(drawn_frames, out_img_root, img_name)

  0%|          | 0/712 [00:00<?, ?it/s]

1_partie_0429-Camera1_M11139
1_partie_0429-Camera2_M11140


0it [00:00, ?it/s]

KeyError: 'dataset_idx'

## ViTPose ++

### Implementation with Hugging face

In [None]:
import os
import glob
import pickle
from pathlib import Path
from typing import List, Dict

import numpy as np
import torch
import cv2
from PIL import Image
from tqdm import tqdm
import json

from transformers import AutoProcessor, RTDetrForObjectDetection, VitPoseForPoseEstimation

# Helper: save OpenPose-like JSON
def save_to_openpose(json_path: str, keypoints_list: List[np.ndarray], scores_list: List[np.ndarray]):
    """
    keypoints_list: list of (K,2)
    scores_list: list of (K,)
    Saves JSON in the simple OpenPose format:
    { "people": [ {"pose_keypoints_2d": [x1,y1,s1, x2,y2,s2, ...]}, ... ] }
    """
    people = []
    for kps, sc in zip(keypoints_list, scores_list):
        flat = []
        for (x, y), s in zip(kps, sc):
            flat.extend([float(x), float(y), float(s)])
        people.append({"pose_keypoints_2d": flat})
    out = {"people": people}
    with open(json_path, "w", encoding="utf-8") as f:
        json.dump(out, f, indent=2)

def draw_pose_on_image(img_bgr: np.ndarray, pose_results: List[Dict]):
    """
    Draw keypoints on BGR image.
    pose_results: list of dicts with 'keypoints' (Kx3)
    Returns image with skeletons drawn.
    """
    img = img_bgr.copy()
    for person in pose_results:
        kpts = np.array(person["keypoints"], dtype=float)  # (K,3)
        # Draw keypoints
        for x, y, s in kpts:
            cv2.circle(img, (int(x), int(y)), 3, (0, 0, 255), -1)
        # Optional: draw bbox around keypoints
        x_min, y_min = kpts[:,0].min(), kpts[:,1].min()
        x_max, y_max = kpts[:,0].max(), kpts[:,1].max()
        cv2.rectangle(img, (int(x_min), int(y_min)), (int(x_max), int(y_max)), (0, 255, 0), 2)
    return img

# ----------------- CONFIG -----------------
DETECTOR_MODEL = "PekingU/rtdetr_r50vd_coco_o365"
POSE_MODEL = "usyd-community/vitpose-plus-base"
DET_CONF_THR = 0.5
POSE_KPT_THR = 0.5
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
# ------------------------------------------

# ------------------ MODEL ------------------
print("Loading detector...")
person_image_processor = AutoProcessor.from_pretrained(DETECTOR_MODEL)
person_model = RTDetrForObjectDetection.from_pretrained(DETECTOR_MODEL).to(DEVICE)

print("Loading pose model...")
image_processor = AutoProcessor.from_pretrained(POSE_MODEL)
pose_model = VitPoseForPoseEstimation.from_pretrained(POSE_MODEL).to(DEVICE)
# ------------------------------------------

path = "/mnt/D494C4CF94C4B4F0/Trampoline_avril2025/Images_trampo_avril2025/20250429/"
out_path = "/mnt/D494C4CF94C4B4F0/Trampoline_avril2025/Images_trampo_avril2025/out_vitpose++_base"
os.makedirs(out_path, exist_ok=True)

for seq in tqdm(os.listdir(path)):
    if '0429' in str(seq) and str(seq) not in os.listdir(out_path):
        IMAGES_DIR = path + str(seq)
        OUT_DIR = out_path + f"/{str(seq)}"

        os.makedirs(OUT_DIR, exist_ok=True)
        vis_dir = os.path.join(OUT_DIR, "vis")
        json_dir = os.path.join(OUT_DIR, "json")
        os.makedirs(vis_dir, exist_ok=True)
        os.makedirs(json_dir, exist_ok=True)

        # Gather images
        img_paths = sorted(glob.glob(os.path.join(IMAGES_DIR, "*.*")))
        img_paths = [p for p in img_paths if p.lower().endswith((".jpg", ".jpeg", ".png", ".bmp"))]
        if len(img_paths) == 0:
            raise FileNotFoundError("Aucune image trouvée dans IMAGES_DIR")

        all_keypoints = {}  # mapping frame_name -> list of persons dicts
        frame_names = [Path(p).stem for p in img_paths]

        for frame_idx, img_path in enumerate(tqdm(img_paths, desc="Frames")):
            image = Image.open(img_path).convert("RGB")
            image_size = (image.height, image.width)

            # --- Stage 1: person detection (RTDetr) ---
            inputs = person_image_processor(images=image, return_tensors="pt").to(DEVICE)
            with torch.no_grad():
                det_outputs = person_model(**inputs)

            # Post-process detection results (HF processor provides helper)
            results = person_image_processor.post_process_object_detection(
                det_outputs, target_sizes=torch.tensor([(image.height, image.width)], dtype=torch.long), threshold=DET_CONF_THR
            )
            det_result = results[0]
            person_boxes = det_result["boxes"][det_result["labels"] == 0]  # select label 0 = person
            person_boxes = person_boxes.cpu().numpy()

            # Convert from (x1,y1,x2,y2) -> (x1,y1,w,h) for the vitpose processor
            if person_boxes.size > 0:
                person_boxes_xywh = person_boxes.copy()
                person_boxes_xywh[:, 2] = person_boxes[:, 2] - person_boxes[:, 0]
                person_boxes_xywh[:, 3] = person_boxes[:, 3] - person_boxes[:, 1]
                # ensure w,h >= 1
                person_boxes_xywh[:, 2] = np.maximum(person_boxes_xywh[:, 2], 1.0)
                person_boxes_xywh[:, 3] = np.maximum(person_boxes_xywh[:, 3], 1.0)
            else:
                person_boxes_xywh = np.zeros((0, 4), dtype=np.float32)

            # If no person detected, still save empty json and continue
            if person_boxes_xywh.shape[0] == 0:
                json_file = os.path.join(json_dir, f"{Path(img_path).stem}.json")
                save_to_openpose(json_file, [], [])
                all_keypoints[frame_names[frame_idx]] = []
                # copy original image to vis folder (no poses)
                out_vis = os.path.join(vis_dir, f"{Path(img_path).stem}.png")
                img_cv = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
                cv2.imwrite(out_vis, img_cv)
                continue

            # --- Stage 2: ViTPose inference per image (top-down) ---
            # The HF vitpose processor expects boxes as list-of-arrays per image
            inputs_pose = image_processor(image, boxes=[person_boxes_xywh], return_tensors="pt").to(DEVICE)

            # MOE dataset index hack (like you did): model expects dataset_index (here set to 0)
            # inputs_pose can be augmented in-place
            inputs_pose["dataset_index"] = torch.tensor([0], device=DEVICE)

            with torch.no_grad():
                outputs = pose_model(**inputs_pose)

            # Post-process (HF helper) to get keypoints per person
            # threshold filters out low-confidence kps in the post-processing stage
            pose_results_list = image_processor.post_process_pose_estimation(outputs, boxes=[person_boxes_xywh], threshold=POSE_KPT_THR)

            # pose_results_list is a list per image; take first (only one image)
            pose_results = pose_results_list[0]  # list of dicts, one per person

            # The HF postprocess returns keypoints in the coordinate system of original image.
            # Ensure bbox field exists and is xyxy so downstream code matches your format
            """ processed_pose_results = []
            keypoints_for_openpose = []
            scores_for_openpose = []

            for pid, pr in enumerate(pose_results):
                kpts = pr["keypoints"]  # typically tensor/np array shape (K,3)
                # The HF result may include 'bbox' in xywh or xyxy depending on version - check
                bbox = pr.get("bbox", None)
                if bbox is None:
                    # fallback: reconstruct bbox from the passed person_boxes_xywh
                    if pid < len(person_boxes):
                        x, y, w, h = person_boxes_xywh[pid]
                        bbox_xyxy = [float(x), float(y), float(x + w), float(y + h)]
                    else:
                        bbox_xyxy = [0, 0, 0, 0]
                else:
                    # If bbox is xywh -> convert to xyxy
                    bbox = np.array(bbox, dtype=float)
                    if len(bbox) == 4:
                        # decide if it's xywh or xyxy by heuristic: if x2 > width or y2 > height then it's probably xywh
                        # We'll assume HF returns xywh (as we passed), so convert:
                        x, y, w, h = bbox
                        bbox_xyxy = [float(x), float(y), float(x + w), float(y + h)]
                    else:
                        bbox_xyxy = bbox.tolist()

                # normalize types
                kpts_arr = np.array(kpts, dtype=float)
                if kpts_arr.ndim == 2 and kpts_arr.shape[1] == 2:
                    # If scores missing, append zeros
                    kpts_arr = np.concatenate([kpts_arr, np.zeros((kpts_arr.shape[0], 1), dtype=float)], axis=1)

                processed_pose_results.append({
                    "id": int(pid),
                    "bbox": bbox_xyxy,
                    "keypoints": kpts_arr.tolist(),  # list of [x,y,s]
                })

                keypoints_for_openpose.append(kpts_arr[:, :2])  # (K,2)
                scores_for_openpose.append(kpts_arr[:, 2])      # (K,) """
            processed_pose_results = []
            keypoints_for_openpose = []
            scores_for_openpose = []

            for pid, pr in enumerate(pose_results):
                kpts_arr = np.array(pr["keypoints"], dtype=float)  # (K,3) normalement

                # Vérification et fallback si jamais il manque la 3ᵉ colonne
                if kpts_arr.ndim == 2 and kpts_arr.shape[1] == 2:
                    print("⚠️ Aucun score renvoyé par HF, ajout de -1 comme placeholder")
                    kpts_arr = np.concatenate(
                        [kpts_arr, np.ones((kpts_arr.shape[0], 1), dtype=float) * -1],
                        axis=1
                    )

                # Logging debug : afficher 3 premiers keypoints avec score
                if pid == 0:
                    print(f"[DEBUG] Frame {frame_names[frame_idx]} - Person {pid} - sample kpts: {kpts_arr[:3]}")

                # bbox
                bbox = pr.get("bbox", None)
                if bbox is None:
                    if pid < len(person_boxes):
                        x, y, w, h = person_boxes_xywh[pid]
                        bbox_xyxy = [float(x), float(y), float(x + w), float(y + h)]
                    else:
                        bbox_xyxy = [0, 0, 0, 0]
                else:
                    bbox = np.array(bbox, dtype=float)
                    if len(bbox) == 4:
                        x, y, w, h = bbox
                        bbox_xyxy = [float(x), float(y), float(x + w), float(y + h)]
                    else:
                        bbox_xyxy = bbox.tolist()

                processed_pose_results.append({
                    "id": int(pid),
                    "bbox": bbox_xyxy,
                    "keypoints": kpts_arr.tolist(),  # [x,y,score]
                })

                keypoints_for_openpose.append(kpts_arr[:, :2])  # (K,2)
                scores_for_openpose.append(kpts_arr[:, 2])      # (K,)

            # Save per-image JSON (OpenPose-like)
            json_file = os.path.join(json_dir, f"{Path(img_path).stem}.json")
            save_to_openpose(json_file, keypoints_for_openpose, scores_for_openpose)

            # Collect for global pkl
            all_keypoints[frame_names[frame_idx]] = processed_pose_results

            # Visualization: draw and save
            img_cv = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
            drawn = draw_pose_on_image(img_cv, processed_pose_results)
            out_vis = os.path.join(vis_dir, f"{Path(img_path).stem}.png")
            cv2.imwrite(out_vis, drawn)

        # Save global pickle of all keypoints
        pkl_path = os.path.join(OUT_DIR, "all_keypoints.pkl")
        with open(pkl_path, "wb") as f:
            pickle.dump(all_keypoints, f)


### Implementation with MMPose (VitPose++)

In [1]:
from mmpose.apis import inference_topdown, init_model
from mmpose.utils import register_all_modules

register_all_modules()

config_file = 'td-hm_hrnet-w48_8xb32-210e_coco-256x192.py'
checkpoint_file = 'td-hm_hrnet-w48_8xb32-210e_coco-256x192-0e67c616_20220913.pth'
model = init_model(config_file, checkpoint_file, device='cpu')  # or device='cuda:0'

# please prepare an image with person
results = inference_topdown(model, 'demo.jpg')

ModuleNotFoundError: No module named 'mmpose.apis'

In [14]:
from mmpose.apis import MMPoseInferencer

pose_config = "/home/lea/trampo/vitpose/ViTPose/configs/body/2d_kpt_sview_rgb_img/topdown_heatmap/coco/vitPose+_small_coco+aic+mpii+ap10k+apt36k+wholebody_256x192_udp.py"
pose_checkpoint = "ViTPose/checkpoints/vitpose++_base.pth"

inferencer = MMPoseInferencer(
    pose_config,
    pose_checkpoint,
    device='cuda:0'  # ou 'cpu'
)

# Exemple d’inférence sur une image
results = inferencer('demo/demo.jpg', show=True, draw_heatmap=True)



Loads checkpoint by local backend from path: ViTPose/checkpoints/vitpose++_base.pth


KeyError: 'TopDownMoE is not in the mmpose::model registry. Please check whether the value of `TopDownMoE` is correct or it was registered as expected. More details can be found at https://mmengine.readthedocs.io/en/latest/advanced_tutorials/config.html#import-the-custom-module'

In [8]:
import mmpose
print(mmpose.__version__)

1.3.2


In [7]:
import torch
from mmpose.apis import MMPoseInferencer

DET_CONFIG = "ViTPose/demo/mmdetection_cfg/faster_rcnn_r50_fpn_coco.py"
DET_CHECKPOINT = "ViTPose/checkpoints/faster_rcnn_r50_fpn_1x_coco_20200130-047c8118.pth"

POSE_CONFIG = "ViTPose/configs/body/2d_kpt_sview_rgb_img/topdown_heatmap/coco/vitPose+_base_coco+aic+mpii+ap10k+apt36k+wholebody_256x192_udp.py"
POSE_CHECKPOINT = "ViTPose/checkpoints/vitpose++_base.pth"

# Initialize inferencer for top-down pose estimation
inferencer = MMPoseInferencer(
    pose2d=POSE_CONFIG, # Specify the ViTPose++ model
    pose2d_weights = POSE_CHECKPOINT,
    det_model=DET_CONFIG, # Specify the object detector
    det_weights=DET_CHECKPOINT,
    device='cuda:0' if torch.cuda.is_available() else 'cpu'
)

Loads checkpoint by local backend from path: ViTPose/checkpoints/vitpose++_base.pth


KeyError: 'TopDownMoE is not in the mmpose::model registry. Please check whether the value of `TopDownMoE` is correct or it was registered as expected. More details can be found at https://mmengine.readthedocs.io/en/latest/advanced_tutorials/config.html#import-the-custom-module'

In [None]:
import os
import glob
import pickle
from pathlib import Path

import cv2
from tqdm import tqdm

from mmpose.apis import (
    init_pose_model,
    inference_top_down_pose_model,
    vis_pose_result,
)
from mmdet.apis import init_detector, inference_detector, process_mmdet_results
import json

# ---------------- CONFIG ----------------
DET_CONFIG = "demo/mmdetection_cfg/faster_rcnn_r50_fpn_coco.py"
DET_CHECKPOINT = "checkpoints/faster_rcnn_r50_fpn_1x_coco_20200130-047c8118.pth"

POSE_CONFIG = "configs/body/2d_kpt_sview_rgb_img/topdown_heatmap/coco/vitPose+_base_coco+aic+mpii+ap10k+apt36k+wholebody_256x192_udp.py"
POSE_CHECKPOINT = "checkpoints/vitpose++_base.pth"

DEVICE = "cuda:0"
DET_CONF_THR = 0.5
POSE_KPT_THR = 0.3
# ---------------------------------------

# Initialize models
print("Loading detector...")
det_model = init_detector(DET_CONFIG, DET_CHECKPOINT, device=DEVICE)

print("Loading pose model...")
pose_model = init_pose_model(POSE_CONFIG, POSE_CHECKPOINT, device=DEVICE)

# Paths
root_path = "/mnt/D494C4CF94C4B4F0/Trampoline_avril2025/Images_trampo_avril2025/"
out_root = os.path.join(root_path, "out_vitpose++_mmpose")
os.makedirs(out_root, exist_ok=True)

# Helper: save OpenPose-like JSON
def save_to_openpose(json_path, keypoints_list):
    people = []
    for kps in keypoints_list:
        flat = []
        for x, y, s in kps['keypoints']:
            flat.extend([float(x), float(y), float(s)])
        people.append({"pose_keypoints_2d": flat})
    with open(json_path, "w") as f:
        json.dump({"people": people}, f, indent=2)

# ----------------- LOOP OVER SEQUENCES -----------------
sequences = sorted(set([d for d in os.listdir(root_path) if os.path.isdir(os.path.join(root_path, d))]))
for seq in sequences:
    seq_path = os.path.join(root_path, seq)
    out_seq_dir = os.path.join(out_root, seq)
    vis_dir = os.path.join(out_seq_dir, "vis")
    json_dir = os.path.join(out_seq_dir, "json")
    os.makedirs(vis_dir, exist_ok=True)
    os.makedirs(json_dir, exist_ok=True)

    # Gather images
    img_paths = sorted(glob.glob(os.path.join(seq_path, "*.*")))
    img_paths = [p for p in img_paths if p.lower().endswith((".jpg", ".jpeg", ".png", ".bmp"))]
    if len(img_paths) == 0:
        print(f"No images found in {seq_path}, skipping...")
        continue

    all_keypoints = {}  # frame_name -> list of persons dict
    frame_names = [Path(p).stem for p in img_paths]

    # Loop over frames
    for frame_idx, img_path in enumerate(tqdm(img_paths, desc=f"Processing {seq}")):
        frame_name = frame_names[frame_idx]

        # --- Stage 1: detect persons ---
        mmdet_results = inference_detector(det_model, img_path)
        person_results = process_mmdet_results(mmdet_results, cat_id=1)  # cat_id=1 -> person

        for person in person_results:
            person['bbox'][2] = max(person['bbox'][2], 1)
            person['bbox'][3] = max(person['bbox'][3], 1)
            person['dataset_idx'] = 0  # required for ViTPose++

        # --- Stage 2: pose estimation ---
        if len(person_results) > 0:
            pose_results, _ = inference_top_down_pose_model(
                pose_model,
                img_path,
                person_results,
                bbox_thr=DET_CONF_THR,
                format='xyxy',
                dataset='TopDownCocoDataset',
                return_heatmap=False
            )
        else:
            pose_results = []

        # Collect keypoints + scores
        keypoints_per_frame = []
        for pid, r in enumerate(pose_results):
            keypoints_per_frame.append({
                "id": pid,
                "bbox": r['bbox'],            # [x1,y1,x2,y2]
                "keypoints": r['keypoints'].tolist()  # (K,3) [x,y,score]
            })
        all_keypoints[frame_name] = keypoints_per_frame

        # --- Save JSON ---
        json_file = os.path.join(json_dir, f"{frame_name}.json")
        save_to_openpose(json_file, keypoints_per_frame)

        # --- Visualization ---
        img_vis = vis_pose_result(
            pose_model,
            img_path,
            pose_results,
            dataset='TopDownCocoDataset',
            kpt_score_thr=POSE_KPT_THR,
            show=False
        )
        out_vis_file = os.path.join(vis_dir, f"{frame_name}.png")
        cv2.imwrite(out_vis_file, img_vis)

    # Save global pickle for the sequence
    pkl_path = os.path.join(out_seq_dir, "all_keypoints.pkl")
    with open(pkl_path, "wb") as f:
        pickle.dump(all_keypoints, f)

    print(f"Finished sequence {seq}, saved {len(img_paths)} frames.")


ModuleNotFoundError: No module named 'mmpose'

In [None]:
import shutil

dist_path = '/home/lea/trampo/Pose2Sim/pose_all_vit++'
src_path = '/mnt/D494C4CF94C4B4F0/Trampoline_avril2025/Images_trampo_avril2025/20250429_vit++B'

os.makedirs(dist_path, exist_ok=True)

for seq in os.listdir(src_path):
    if os.path.isdir(os.path.join(src_path, seq)):
        os.mkdir(os.path.join(dist_path, seq))
        for file in os.listdir(os.path.join(src_path, seq, 'json')):
            shutil.copy2(os.path.join(src_path, seq, 'json', file), os.path.join(dist_path, seq, file))