In [87]:
import os
import numpy as np
import pandas as pd

def pad_and_flatten_features(features, max_len=None):
    """
    features: list of np.array, each shape like (T_i, 17, 3)
    max_len: int or None, padding後的最大時間軸長度，None代表用目前最大長度
    """

    # 找最大長度 (若沒指定)
    if max_len is None:
        max_len = max(f.shape[0] for f in features)

    padded_features = []
    for f in features:
        T, J, C = f.shape
        if T < max_len:
            # pad zeros 到 max_len
            pad_width = ((0, max_len - T), (0, 0), (0, 0))
            f_padded = np.pad(f, pad_width=pad_width, mode='constant', constant_values=0)
        else:
            f_padded = f[:max_len]  # 如果長度超過 max_len，截斷

        # flatten 成 1維向量
        f_flat = f_padded.flatten()
        padded_features.append(f_flat)

    return np.array(padded_features)

def build_feature_label_dataframe(npy_folder, csv_path):
    # 讀取描述的 CSV
    df_csv = pd.read_csv(csv_path)

    data = []

    # 遍歷每個 .npy 檔案
    for file in os.listdir(npy_folder):
        if file.endswith('.npy'):
            base_name = os.path.splitext(file)[0]  # 去掉 .npy
            video_filename = base_name + '.mp4'    # 在 CSV 中查找

            # 找出對應的 label
            match = df_csv[df_csv['Filename'] == video_filename]

            if not match.empty:
                label = match.iloc[0]['description']
                feature_path = os.path.join(npy_folder, file)
                feature = np.load(feature_path)

                data.append({
                    'filename': file,
                    'feature': feature,
                    'label': label
                })
            else:
                print(f"⚠️ 無法在 CSV 中找到 {video_filename}")

    # 組成 DataFrame
    df = pd.DataFrame(data)
    return df

def convert_feature_to_dict(feature_array):
    n_frames = feature_array.shape[0]
    pose_sequence = []
    for frame_idx in range(len(feature_array)):
        pose_sequence.append({
            "frame":frame_idx,
            "keypoints":feature_array[frame_idx]
        })
    return pose_sequence

df = build_feature_label_dataframe('npy','Gerrit_Cole_SL.csv')
df['label'] = df['label'].str.contains('strike', case=False).astype(int)
df['feature'] = df['feature'].apply(convert_feature_to_dict)
df

Unnamed: 0,filename,feature,label
0,pitch_0001.npy,"[{'frame': 0, 'keypoints': [[691.13201904 235....",0
1,pitch_0002.npy,"[{'frame': 0, 'keypoints': [[813.06262207 451....",1
2,pitch_0003.npy,"[{'frame': 0, 'keypoints': [[598.640625 222....",1
3,pitch_0004.npy,"[{'frame': 0, 'keypoints': [[587.71087646 172....",0
4,pitch_0005.npy,"[{'frame': 0, 'keypoints': [[414.98080444 319....",1
...,...,...,...
195,pitch_0196.npy,"[{'frame': 0, 'keypoints': [[789.10772705 342....",0
196,pitch_0197.npy,"[{'frame': 0, 'keypoints': [[483.5071106 309....",1
197,pitch_0198.npy,"[{'frame': 0, 'keypoints': [[744.29425049 264....",0
198,pitch_0199.npy,"[{'frame': 0, 'keypoints': [[486.62567139 323....",0


In [88]:
import numpy as np


def calculate_pixel_angle_from_points(a, b, c):
    """
    使用三個 numpy pixel 座標點 a, b, c 計算夾角（degree）。
    - a, b, c 為 shape = (2,) 的 numpy 陣列
    - b 為頂點
    """
    ba = a - b
    bc = c - b
    cosine = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc) + 1e-6)
    angle = np.degrees(np.arccos(np.clip(cosine, -1.0, 1.0)))
    return angle

def detect_release(pose_sequence):
    """
    根據關鍵點條件與角度計算出手幀。
    - pose_sequence: list，每幀包含 'frame' 和 'keypoints'
    回傳最佳出手 frame 編號
    """

    candidate_frames = []

    for item in pose_sequence:
        frame_idx = item["frame"]
        keypoints = item["keypoints"]  # (17, 3)
        if keypoints.shape[0] < 17:
            continue

        # 關鍵點信心值過濾
        if np.min(keypoints[[11, 12, 14, 16], 2]) < 0.3:
            continue

        right_shoulder = keypoints[12]
        left_shoulder = keypoints[11]
        right_elbow = keypoints[14]
        right_wrist = keypoints[16]

        # 基本量
        shoulder_dist = abs(right_shoulder[0] - left_shoulder[0])

        elbow_angle = calculate_pixel_angle_from_points(
            right_wrist[:2], right_elbow[:2], right_shoulder[:2]
        )

        shoulder_vec = np.array(
            [right_shoulder[0] - left_shoulder[0], right_shoulder[1] - left_shoulder[1]]
        )
        horizontal_vec = np.array([1, 0])
        cos_theta = np.dot(shoulder_vec, horizontal_vec) / np.linalg.norm(shoulder_vec)
        shoulder_angle = np.degrees(np.arccos(np.clip(cos_theta, -1.0, 1.0)))

        # 出手條件
        if shoulder_dist > 25 and right_wrist[1] > right_shoulder[1]:
            candidate_frames.append(
                {
                    "frame": frame_idx,
                    "elbow_angle": elbow_angle,
                    "shoulder_angle": shoulder_angle,
                    "shoulder_dist": shoulder_dist,
                }
            )

    if not candidate_frames:
        return None

    # 選出 shoulder_angle 最小的前三幀
    top3 = sorted(candidate_frames, key=lambda f: f["shoulder_angle"])[:3]

    def compute_score(f):
        return -1.5 * f["shoulder_angle"] + 0.8 * f["elbow_angle"]

    best_frame = max(top3, key=compute_score)
    frame_id = best_frame["frame"]
    return frame_id

def detect_landing(
    pose_sequence, release_frame, back_offset=9
):
    """
    從出手點 frame 向前推 back_offset 幀作為落地幀，並輸出落地骨架與圖片
    - pose_sequence: 由 load_pose_sequence() 輸出的骨架序列
    - release_frame: 出手點的 frame 編號
    - output_json: 儲存 landing_frame.json 的路徑
    - back_offset: 向前推估的幀數，預設 9
    回傳：landing_frame（int）
    """
    release_index = next(
        (i for i, item in enumerate(pose_sequence) if item["frame"] == release_frame),
        None,
    )
    if release_index is None:
        pass
        #raise ValueError(f"❌ 找不到 release_frame = {release_frame} 的對應資料")

    target_index = release_index - back_offset
    if target_index < 0:
        pass
        #raise ValueError(f"❌ 推估 index = {target_index} 超出序列長度")

    landing_item = pose_sequence[target_index]
    landing_frame = landing_item["frame"]
    return landing_frame

def detect_shoulder(
    pose_sequence,
    release_frame,
    image_dir="output_detectron2_first_person_tracked",
):
    """
    偵測肩膀最打開的幀：
    - 起始：右手腕高於右肩（代表投球已啟動）
    - 條件：右手腕仍在右肩左側且未落下
    - 評分：先取肩膀水平距離最大的前3幀，再從中挑肩角最大的幀

    回傳：
    - shoulder_frame（int）
    """
    LEFT_SHOULDER = 5
    RIGHT_SHOULDER = 6
    LEFT_HIP = 11
    RIGHT_WRIST = 10
    candidate_list = []
    start_found = False

    for item in pose_sequence:
        frame_idx = item["frame"]
        if frame_idx > release_frame:
            break

        keypoints = item["keypoints"]
        if (
            np.min(keypoints[[LEFT_SHOULDER, RIGHT_SHOULDER, LEFT_HIP, RIGHT_WRIST], 2])
            < 0.3
        ):
            continue

        l_sh = keypoints[LEFT_SHOULDER][:2]
        r_sh = keypoints[RIGHT_SHOULDER][:2]
        l_hip = keypoints[LEFT_HIP][:2]
        r_wr = keypoints[RIGHT_WRIST][:2]

        # 起始條件：右手腕高於右肩
        if not start_found:
            if r_wr[1] < r_sh[1]:
                start_found = True
            else:
                continue

        # 排除：右手腕落下 或 手腕已超過肩膀
        if r_wr[0] > r_sh[0] or r_wr[1] >= r_sh[1]:
            continue

        # 肩膀開啟角度（l_sh - r_sh - l_hip）
        angle = calculate_pixel_angle_from_points(l_sh, r_sh, l_hip)
        shoulder_distance = abs(r_sh[0] - l_sh[0])
        candidate_list.append((angle, shoulder_distance, frame_idx))

    if not candidate_list:
        return 0 # None

    # 取肩膀 X 軸距離最大的前三名 → 選角度最大者
    top3 = sorted(candidate_list, key=lambda x: -x[1])[:3]
    best = max(top3, key=lambda x: x[0])
    shoulder_frame = best[2]
    return shoulder_frame

def get_pose_window(pose_sequence, center_frame, window_size):
    """
    取得前後 window_size 幀內的骨架資料。
    回傳 list，每筆為 {frame, keypoints}
    """
    center_idx = None
    for i, item in enumerate(pose_sequence):
        if item["frame"] == center_frame:
            center_idx = i
            break
    if center_idx is None:
        return []

    window = []
    for offset in range(-window_size, window_size + 1):
        idx = center_idx + offset
        if 0 <= idx < len(pose_sequence):
            window.append(pose_sequence[idx])
    return window

# COCO keypoints 順序
LEFT_HIP = 11
RIGHT_HIP = 12
LEFT_ANKLE = 15
RIGHT_ANKLE = 16
LEFT_SHOULDER = 5
RIGHT_SHOULDER = 6
LEFT_SHOULDER = 5
RIGHT_SHOULDER = 6
LEFT_HIP = 11
LEFT_WRIST = 9
RIGHT_WRIST = 10

def detect_landing_features(pose_sequence, landing_frame):
    """
    輸出 landing 點附近的特徵（跨步角、腳穩定度、展髖角）。
    - pose_sequence: list，每幀含 'frame' 與 'keypoints'
    - landing_frame: int，落地關鍵幀編號
    回傳 dict 特徵值
    """

    window = get_pose_window(pose_sequence, landing_frame, window_size=3)
    features = []

    for item in window:
        kpts = item["keypoints"]

        if (
            np.min(
                kpts[
                    [
                        LEFT_HIP,
                        RIGHT_HIP,
                        LEFT_ANKLE,
                        RIGHT_ANKLE,
                        LEFT_SHOULDER,
                        RIGHT_SHOULDER,
                    ],
                    2,
                ]
            )
            < 0.3
        ):
            continue

        # 特徵一：跨步角（左髖 - 左踝 - 右髖）
        hip_angle = calculate_pixel_angle_from_points(
            kpts[LEFT_HIP][:2], kpts[LEFT_ANKLE][:2], kpts[RIGHT_HIP][:2]
        )

        # 特徵二：腳穩定性（左右腳 heel Y 差）
        foot_y_diff = abs(kpts[LEFT_ANKLE][1] - kpts[RIGHT_ANKLE][1])

        # 特徵三：展髖角（左肩 - 左髖 - 右髖）
        hip_opening = calculate_pixel_angle_from_points(
            kpts[LEFT_SHOULDER][:2], kpts[LEFT_HIP][:2], kpts[RIGHT_HIP][:2]
        )

        features.append(
            {
                "frame": item["frame"],
                "hip_angle": hip_angle,
                "foot_y_diff": foot_y_diff,
                "hip_opening": hip_opening,
            }
        )

    if not features:
        #print("⚠️ 無法計算踏地特徵，資料不足")
        return {}

    # 找最穩定的幀（腳 Y 差最小）
    best = min(features, key=lambda f: f["foot_y_diff"])
    return best

def extract_shoulder_features(pose_sequence, shoulder_frame, landing_frame):
    """
    提取肩膀展開幀附近的特徵：
    - shoulder_angle：肩膀開展角度（左肩-右肩-左髖）
    - hand_symmetry_y：雙手在 y 軸的對稱程度
    - shoulder_open_speed：肩膀開展角度變化速度
    """
    window = get_pose_window(pose_sequence, shoulder_frame, window_size=3)
    if not window:
        raise ValueError(f"❌ 找不到 shoulder_frame = {shoulder_frame} 的骨架資料")

    angles = []
    symmetries = []

    for item in window:
        keypoints = item["keypoints"]

        def valid(i):
            return keypoints[i, 2] > 0.3

        if not all(
            valid(i)
            for i in [LEFT_SHOULDER, RIGHT_SHOULDER, LEFT_HIP, LEFT_WRIST, RIGHT_WRIST]
        ):
            continue

        angle = calculate_pixel_angle_from_points(
            keypoints[LEFT_SHOULDER, :2],
            keypoints[RIGHT_SHOULDER, :2],
            keypoints[LEFT_HIP, :2],
        )
        symmetry_y = abs(keypoints[LEFT_WRIST, 1] - keypoints[RIGHT_WRIST, 1])

        angles.append(angle)
        symmetries.append(symmetry_y)

    if not angles:
        pass
        #raise ValueError("❌ 所有幀的肩膀關鍵點皆無效")

    shoulder_angle = np.mean(angles)
    hand_symmetry_y = np.mean(symmetries)
    shoulder_open_speed = np.std(angles)

    return {
        "shoulder_angle": round(shoulder_angle, 1),
        "hand_symmetry_y": round(hand_symmetry_y, 1),
        "shoulder_open_speed": round(shoulder_open_speed, 2),
    }

RIGHT_SHOULDER = 6
RIGHT_ELBOW = 8
RIGHT_WRIST = 10
LEFT_SHOULDER = 5

def extract_release_features(pose_sequence, release_frame):
    """
    擷取出手幀 ±3 幀的特徵：
    - elbow_angle：右手肘角度（腕-肘-肩）
    - shoulder_angle：肩膀水平角度（左肩-右肩與 x 軸夾角）
    - elbow_stability：肘角度標準差
    - shoulder_stability：肩膀角度標準差
    """
    window = get_pose_window(pose_sequence, release_frame, window_size=3)
    if not window:
        raise ValueError(f"❌ 找不到 release_frame = {release_frame} 的骨架資料")

    elbow_angles = []
    shoulder_angles = []

    for item in window:
        kpts = item["keypoints"]

        def valid(i):
            return kpts[i, 2] > 0.3

        if not all(
            valid(i) for i in [RIGHT_SHOULDER, RIGHT_ELBOW, RIGHT_WRIST, LEFT_SHOULDER]
        ):
            continue

        # 肘角度：右腕 - 右肘 - 右肩
        elbow = calculate_pixel_angle_from_points(
            kpts[RIGHT_WRIST, :2], kpts[RIGHT_ELBOW, :2], kpts[RIGHT_SHOULDER, :2]
        )
        elbow_angles.append(elbow)

        # 肩膀角度：右肩 - 左肩 與水平軸夾角
        shoulder_vec = kpts[RIGHT_SHOULDER, :2] - kpts[LEFT_SHOULDER, :2]
        horizontal = np.array([1, 0])
        cos_theta = np.dot(shoulder_vec, horizontal) / np.linalg.norm(shoulder_vec)
        angle = np.degrees(np.arccos(np.clip(cos_theta, -1.0, 1.0)))
        shoulder_angles.append(angle)

    if not elbow_angles:
        pass
        #raise ValueError("❌ 無法取得有效的肘角與肩角資料")

    return {
        "elbow_angle": round(np.mean(elbow_angles), 1),
        "shoulder_angle": round(np.mean(shoulder_angles), 1),
        "elbow_stability": round(np.std(elbow_angles), 2),
        "shoulder_stability": round(np.std(shoulder_angles), 2),
    }

In [89]:
from tqdm import tqdm

def extract_all_features(pose_sequence):
    try:
        release_frame = detect_release(pose_sequence)
        landing_frame = detect_landing(pose_sequence, release_frame)
        shoulder_frame = detect_shoulder(pose_sequence, release_frame)

        landing_features = detect_landing_features(pose_sequence, landing_frame)
        shoulder_features = extract_shoulder_features(pose_sequence, shoulder_frame, landing_frame)
        release_features = extract_release_features(pose_sequence, release_frame)

        all_features = {
            **landing_features,
            **shoulder_features,
            **release_features
        }

        return all_features
    except Exception as e:
        #print(f"Error: {e}")
        return None

# 提取所有特徵
all_feature_rows = []
valid_indices = []

for i in tqdm(range(len(df))):
    result = extract_all_features(df['feature'].iloc[i])
    if result is not None:
        all_feature_rows.append(result)
        valid_indices.append(i)

# 建立 DataFrame
features_df = pd.DataFrame(all_feature_rows)

# 可加回原本的 label / filename
features_df['label'] = df['label'].iloc[valid_indices].values
# 若有 filename 欄位可加上
# features_df['filename'] = df['filename'].iloc[valid_indices].values

features_df.head()


100%|██████████| 200/200 [00:01<00:00, 104.71it/s]


Unnamed: 0,frame,hip_angle,foot_y_diff,hip_opening,shoulder_angle,hand_symmetry_y,shoulder_open_speed,elbow_angle,elbow_stability,shoulder_stability,label
0,13,90.0,0.0,1.150366,178.7,0.0,2.53,97.6,34.55,0.61,0
1,1,90.0,0.0,90.0,176.0,0.0,6.62,91.0,43.0,0.51,1
2,30,21.614193,0.0,98.796972,174.2,9.8,0.4,159.3,5.31,1.28,0
3,9,14.318769,0.957275,102.382508,0.2,19.4,0.44,39.9,3.4,0.35,1
4,28,11.258183,1.918411,94.415805,22.4,62.2,1.62,132.0,2.2,0.69,1


In [90]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score

# 假設你已經有這個 DataFrame，叫做 df
# 如果從 CSV 讀入可以這樣寫：
# df = pd.read_csv('your_file.csv')

# 1. 準備資料：移除 'frame' 欄位
X = features_df.drop(columns=['label'])
y = features_df['label']

# 2. 資料切分
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# 3. 建立並訓練模型
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

# 4. 預測與評估
y_pred = model.predict(X_test)

print("✅ 混淆矩陣：\n", confusion_matrix(y_test, y_pred))
print("\n✅ 分類報告：\n", classification_report(y_test, y_pred))
print("\n✅ 準確率：", accuracy_score(y_test, y_pred))


✅ 混淆矩陣：
 [[27  2]
 [18  0]]

✅ 分類報告：
               precision    recall  f1-score   support

           0       0.60      0.93      0.73        29
           1       0.00      0.00      0.00        18

    accuracy                           0.57        47
   macro avg       0.30      0.47      0.36        47
weighted avg       0.37      0.57      0.45        47


✅ 準確率： 0.574468085106383
