In [14]:
from ultralytics import YOLO
import cv2
import numpy as np
import pandas as pd
from pathlib import Path
from tqdm import tqdm

In [15]:
DATA_ROOT = Path("../data/")
EDA_ROOT = Path("../EDA/")

VIDEO_ROOT = DATA_ROOT / "train" / "video"
POSE_ROOT = DATA_ROOT / "pose"
POSE_ROOT.mkdir(parents=True, exist_ok=True)

In [16]:
model = YOLO("yolov8n-pose.pt")

In [17]:
def extract_pose_from_video(video_path: Path) -> np.ndarray:
    cap = cv2.VideoCapture(str(video_path))
    poses = []

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        result = model(frame, verbose=False)[0]

        if (
            result.keypoints is None or
            result.keypoints.xy is None or
            len(result.keypoints.xy) == 0
        ):
            kpts = np.zeros((17, 3), dtype=np.float32)
        else:
            xy = result.keypoints.xy[0].cpu().numpy()      # (17,2)
            conf = result.keypoints.conf[0].cpu().numpy()  # (17,)
            kpts = np.concatenate([xy, conf[:, None]], axis=1)

        poses.append(kpts)

    cap.release()
    return np.asarray(poses, dtype=np.float32)

In [20]:
def fix_frame_length(pose, target_len=100):
    F = pose.shape[0]

    if F > target_len:
        idx = np.linspace(0, F - 1, target_len).astype(int)
        pose = pose[idx]
    elif F < target_len:
        pad = np.zeros((target_len - F, 17, 3), dtype=np.float32)
        pose = np.concatenate([pose, pad], axis=0)

    return pose

In [21]:
df_all = pd.read_csv(EDA_ROOT / "train_clean.csv")

df_fall = df_all[df_all["class"] == "Y/SY"].sample(
    n=100, random_state=42
)

df_norm = df_all[df_all["class"] == "N/N"].sample(
    n=100, random_state=42
)

df_sample = pd.concat([df_fall, df_norm]).reset_index(drop=True)

print(df_sample["class"].value_counts())

class
Y/SY    100
N/N     100
Name: count, dtype: int64


In [24]:
for _, row in tqdm(df_sample.iterrows(), total=len(df_sample)):
    scene_id = row["scene_id"]
    video_dir = Path(row["video_dir"])

    out_path = POSE_ROOT / f"{scene_id}.npy"
    if out_path.exists():
        continue

    video_path = video_dir / f"{scene_id}.mp4"
    if not video_path.exists():
        print(f"[SKIP] video not found: {video_path}")
        continue

    pose = extract_pose_from_video(video_path)

    pose = fix_frame_length(pose, target_len=100)

    np.save(out_path, pose)

100%|██████████| 200/200 [08:44<00:00,  2.62s/it]


In [25]:
len(list(Path("../data/pose").glob("*.npy")))

200