In [None]:
# Do two steps: 
# 1: Export person bounding boxes with YOLO: 

from pathlib import Path
import numpy as np
import cv2
from tqdm.notebook import tqdm
import torch
from ultralytics import YOLO
import shutil

CONFIDENCE_THRESHOLD = 0.15

# === Paths ===
video_folder = Path("/Users/Christian/Downloads/Microadaptive Teaching Dritter Teil - LAs/Marlon/Erste Sitzung/YOLO")
output_base = Path("/Users/Christian/Downloads/Microadaptive Teaching Dritter Teil - LAs/Marlon/Erste Sitzung/YOLO/DLC")

if output_base.exists():
    shutil.rmtree(output_base)
output_base.mkdir(parents=True)

video_files = list(video_folder.glob("*.mp4")) + list(video_folder.glob("*.MP4")) + \
              list(video_folder.glob("*.Mp4")) + list(video_folder.glob("*.mP4"))

print(f"🎥 Found {len(video_files)} video(s) to process.")
if not video_files:
    raise FileNotFoundError("No video files found!")

# === Load YOLOv11 model
device = "mps" if torch.backends.mps.is_available() else "cpu"
print(f"⚡ Using device: {device}")
model = YOLO("yolo11l.pt")

# === Detection Loop ===
for i, video_path in enumerate(tqdm(video_files, desc="YOLO Detection", position=0)):
    print(f"\n📹 [{i+1}/{len(video_files)}] Processing: {video_path.name}")

    cap = cv2.VideoCapture(str(video_path))
    if not cap.isOpened():
        print(f"⚠️ Could not open video: {video_path}")
        continue

    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    video_name = video_path.stem
    frame_output_dir = output_base / video_name / "frames"
    bbox_output_dir = output_base / video_name / "bboxes"
    frame_output_dir.mkdir(parents=True, exist_ok=True)
    bbox_output_dir.mkdir(parents=True, exist_ok=True)

    frame_idx = 0

    with tqdm(total=total_frames, desc="Processing frames", leave=False, position=1) as pbar:
        while True:
            ret, frame = cap.read()
            if not ret or frame is None:
                break

            # Save frame
            frame_path = frame_output_dir / f"{frame_idx:05d}.jpg"
            cv2.imwrite(str(frame_path), frame)

            # Run YOLOv8 detection (no classes filter here!)
            results = model.predict(frame, verbose=False, device=device, imgsz=640)

            # Manually filter for persons (class 0)
            bboxes_xywh = []
            for result in results:
                boxes = result.boxes
                if boxes is not None and boxes.xyxy is not None:
                    xyxy = boxes.xyxy.cpu().numpy()
                    cls = boxes.cls.cpu().numpy()
                    conf = boxes.conf.cpu().numpy()  # ← Add this line to get the confidences
                    for (x1, y1, x2, y2), label, confidence in zip(xyxy, cls, conf):
                        if int(label) == 0 and confidence >= CONFIDENCE_THRESHOLD:  # class 0 = person + confidence check
                            x = float(x1)
                            y = float(y1)
                            w = float(x2 - x1)
                            h = float(y2 - y1)
                            bboxes_xywh.append([x, y, w, h])

            bboxes_xywh = np.array(bboxes_xywh[:20], dtype=np.float32)

            if bboxes_xywh.size == 0:
                bboxes_xywh = np.empty((0, 4), dtype=np.float32)

            bbox_path = bbox_output_dir / f"{frame_idx:05d}.npy"
            np.save(str(bbox_path), bboxes_xywh)

            frame_idx += 1
            pbar.update(1)

    cap.release()
    print(f"✅ Finished: {video_name} with {frame_idx} frames processed.")

In [None]:
# Step 2: RTM pose (body landmarks) with DLC:

import deeplabcut.pose_estimation_pytorch as dlc_torch
from deeplabcut.utils.video_processor import VideoProcessorCV
from deeplabcut.utils.make_labeled_video import CreateVideo
import numpy as np
import torch
import cv2
import gc
from tqdm.notebook import tqdm
from pathlib import Path
import sys
import os
from contextlib import contextmanager
import deeplabcut.utils
deeplabcut.utils.tqdm = tqdm
import shutil

@contextmanager
def suppress_stdout():
    original_stdout = sys.stdout
    sys.stdout = open(os.devnull, 'w')
    try:
        yield
    finally:
        sys.stdout.close()
        sys.stdout = original_stdout



# === Model Configuration Paths ===
path_model_config = Path("/Users/Christian/rtm_pose/rtmpose-x_simcc-body7_pytorch_config.yaml")
path_snapshot = Path("/Users/Christian/rtm_pose/rtmpose-x_simcc-body7.pt")
input_folder = Path("/Users/Christian/Downloads/Microadaptive Teaching Dritter Teil - LAs/Marlon/Erste Sitzung/YOLO/DLC") # Change the folder here!!!

# === Pose Model Settings ===
device = "mps"  # Use Apple Silicon MPS
pose_cfg = dlc_torch.config.read_config_as_dict(path_model_config)
runner = dlc_torch.get_pose_inference_runner(
    pose_cfg,
    snapshot_path=path_snapshot,
    batch_size=4,
    max_individuals=20,
    device=device
)

# === Load video directories ===
video_dirs = [d for d in input_folder.iterdir() if d.is_dir()]
print(f"📂 Found {len(video_dirs)} videos to process.")

# === Pose Estimation Loop ===
for video_dir in tqdm(video_dirs, desc="Pose Estimation", position=0):
    print(f"\n🧍‍♂️ Processing: {video_dir.name}")
    frame_dir = video_dir / "frames"
    bbox_dir = video_dir / "bboxes"

    frame_files = sorted(frame_dir.glob("*.jpg"))
    bbox_files = sorted(bbox_dir.glob("*.npy"))

    assert len(frame_files) == len(bbox_files), "Mismatch between frames and bbox files."

    output_csv_path = input_folder / f"{video_dir.name}_predictions.csv"
    partial_predictions = {}

    with tqdm(total=len(frame_files), desc="Pose estimation frames", leave=False, position=1) as pbar:
        for idx, (frame_file, bbox_file) in enumerate(zip(frame_files, bbox_files)):
            frame = cv2.imread(str(frame_file))
            if frame is None:
                print(f"⚠️ Failed to load frame: {frame_file}")
                continue

            bboxes = np.load(str(bbox_file), allow_pickle=True)
            frame_context = {"bboxes": bboxes}

            # Run inference on single frame
            pred = runner.inference([(frame, frame_context)])[0]
            partial_predictions[idx] = pred

            # Save every 100 frames
            if (idx + 1) % 100 == 0 or (idx + 1) == len(frame_files):
                df_partial = dlc_torch.build_predictions_dataframe(
                    scorer="rtmpose-body7",
                    predictions=partial_predictions,
                    parameters=dlc_torch.PoseDatasetParameters(
                        bodyparts=pose_cfg["metadata"]["bodyparts"],
                        unique_bpts=pose_cfg["metadata"]["unique_bodyparts"],
                        individuals=[f"idv_{i}" for i in range(20)]
                    )
                )
                df_partial.to_csv(output_csv_path)
                print(f"💾 Saved intermediate predictions at frame {idx+1}")
        
            pbar.update(1)

    print(f"✅ Finished pose estimation: {video_dir.name}")

    create_labeled_video = False  # Set this to False if you DON'T want labeled videos!!!

    # === Optional: Create labeled video IF NEEDED
    if create_labeled_video:
        original_video_path = Path("XXX") / f"{video_dir.name}.mp4"
        output_video_path = input_folder / f"{video_dir.name}_labeled.mp4"
    
        if original_video_path.exists():
            clip = VideoProcessorCV(str(original_video_path), sname=str(output_video_path), codec="mp4v")
            df_final = dlc_torch.build_predictions_dataframe(
                scorer="rtmpose-body7",
                predictions=partial_predictions,
                parameters=dlc_torch.PoseDatasetParameters(
                    bodyparts=pose_cfg["metadata"]["bodyparts"],
                    unique_bpts=pose_cfg["metadata"]["unique_bodyparts"],
                    individuals=[f"idv_{i}" for i in range(20)]
                )
            )
        
            print(f"🎬 Creating labeled video: {output_video_path.name}", end="", flush=True)
            
            with suppress_stdout():
                CreateVideo(
                    clip,
                    df_final,
                    pcutoff=0.4,
                    dotsize=5,
                    colormap="rainbow",
                    bodyparts2plot=pose_cfg["metadata"]["bodyparts"],
                    trailpoints=0,
                    cropping=False,
                    x1=0,
                    x2=clip.w,
                    y1=0,
                    y2=clip.h,
                    bodyparts2connect=[
                        [15, 13], [13, 11], [16, 14], [14, 12], [11, 12],
                        [5, 11], [6, 12], [5, 6], [5, 7], [6, 8],
                        [7, 9], [8, 10], [1, 2], [0, 1], [0, 2],
                        [1, 3], [2, 4], [3, 5], [4, 6]
                    ],
                    skeleton_color="k",
                    draw_skeleton=True,
                    displaycropped=False,
                    color_by="bodypart",
                )
            print(f"🎬 Labeled video saved: {output_video_path.name}")
        else:
            print(f"⚠️ Original video {original_video_path.name} not found, skipping labeled video.")
    
    # Continue cleanup regardless of the flag
    del partial_predictions
    torch.mps.empty_cache()
    gc.collect()

print("\n🎉 All pose estimations complete!")

# === REMOVE ALL DLC SUBFOLDERS (after all pose estimations are complete) ===
for subfolder in input_folder.iterdir():
    if subfolder.is_dir():
        try:
            shutil.rmtree(subfolder)
            print(f"🗑️ Deleted DLC subfolder: {subfolder}")
        except Exception as e:
            print(f"⚠️ Error deleting {subfolder}: {e}")