# 定義模型

In [191]:
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import DataLoader, TensorDataset, random_split
from sklearn.metrics import accuracy_score
from tqdm import tqdm
'''
# 提高以下模型的"擬合能力" 改了那些程式碼旁邊都要標註 為何這樣改
# ===== 模型定義（簡化 ST-GCN） =====
class STGCNBlock(nn.Module):
    def __init__(self, in_channels, out_channels, A, kernel_size=9):
        super().__init__()
        self.A = torch.tensor(A, dtype=torch.float32, requires_grad=False)
        self.gcn = nn.Conv2d(in_channels, out_channels, kernel_size=1)
        self.tcn = nn.Conv2d(out_channels, out_channels, kernel_size=(kernel_size, 1), padding=(kernel_size // 2, 0))
        self.relu = nn.ReLU()

    def forward(self, x):
        x = torch.einsum('nctv,vw->nctw', x, self.A.to(x.device))
        x = self.gcn(x)
        x = self.tcn(x)
        return self.relu(x)

class STGCNClassifier(nn.Module):
    def __init__(self, num_class, A):
        super().__init__()
        self.block1 = STGCNBlock(3, 64, A)
        self.block2 = STGCNBlock(64, 128, A)
        self.pool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(128, num_class)

    def forward(self, x):
        x = self.block1(x)
        x = self.block2(x)
        x = self.pool(x)
        return self.fc(x.view(x.size(0), -1))
'''

# ===== 模型定義（簡化 ST-GCN，改進版） =====
class STGCNBlock(nn.Module):
    def __init__(self, in_channels, out_channels, A, kernel_size=9, dropout=0.3):
        super().__init__()
        self.A = torch.tensor(A, dtype=torch.float32, requires_grad=False)
        self.gcn = nn.Conv2d(in_channels, out_channels, kernel_size=1)
        self.tcn = nn.Conv2d(out_channels, out_channels, kernel_size=(kernel_size, 1), padding=(kernel_size // 2, 0))
        self.bn = nn.BatchNorm2d(out_channels)  # 加入BatchNorm穩定訓練，加速收斂，提高擬合能力
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)     # 加入dropout減少過擬合，提高泛化能力

    def forward(self, x):
        x = torch.einsum('nctv,vw->nctw', x, self.A.to(x.device))
        x = self.gcn(x)
        x = self.tcn(x)
        x = self.bn(x)          # BatchNorm放在激活函數前面或後面皆可，這裡放前面
        x = self.relu(x)
        x = self.dropout(x)     # 隨機丟棄部分神經元，減少過擬合
        return x

class STGCNClassifier(nn.Module):
    def __init__(self, num_class, A):
        super().__init__()
        self.block1 = STGCNBlock(3, 64, A)
        self.block2 = STGCNBlock(64, 128, A)
        self.block3 = STGCNBlock(128, 256, A)  # 加入第三層，提高模型容量，能學習更複雜特徵
        self.pool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(256, num_class)    # 對應block3輸出通道數

    def forward(self, x):
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)    # 新增層
        x = self.pool(x)
        return self.fc(x.view(x.size(0), -1))

# ===== 鄰接矩陣 A（簡化骨架結構） =====
def build_adjacency_matrix(num_joints=33):
    A = np.eye(num_joints)
    for i, j in [
        (11,13),(13,15),(12,14),(14,16),
        (23,25),(25,27),(24,26),(26,28),
        (11,12),(23,24),(11,23),(12,24)
    ]:
        A[i, j] = A[j, i] = 1
    return A

# ===== 資料準備（隨機模擬） =====
def generate_mock_data(N=10000, C=3, T=30, V=33, num_classes=2):
    """
    產生只有 ST-GCN 類型模型能準確分類的資料（隨機森林會失效）
    """
    import numpy as np
    X = np.zeros((N, C, T, V), dtype=np.float32)
    y = np.random.randint(0, num_classes, size=(N,))
    
    for i in range(N):
        # 基礎雜訊
        X[i] = np.random.randn(C, T, V) * 0.05
        
        # 模擬「動作變化」：只在特定時間段 + 關節群中有明顯時空圖形差異
        if y[i] == 0:
            # 類別 0：joint 11, 12, 13（左臀部到膝蓋）呈「向前擺腿」的趨勢
            for t in range(10, 20):
                X[i, 1, t, 11] += 0.1 * (t - 10)   # y越來越小 → 腿抬高
                X[i, 1, t, 12] += 0.05 * (t - 10)
                X[i, 1, t, 13] += 0.02 * (t - 10)
        else:
            # 類別 1：相同關節走反方向「蹲下」動作
            for t in range(10, 20):
                X[i, 1, t, 11] -= 0.1 * (t - 10)   # y越來越大 → 下蹲
                X[i, 1, t, 12] -= 0.05 * (t - 10)
                X[i, 1, t, 13] -= 0.02 * (t - 10)
    
    # One-hot encoding
    y_onehot = np.eye(num_classes)[y].astype(np.float32)
    return X, y_onehot

# ===== 訓練函式（含驗證與測試） =====
def train_stgcn(X, y, A, epochs=10, batch_size=16):
    device = 'cuda' if torch.cuda.is_available() else 'cpu'

    # 分割資料
    print(X.shape,y.shape)
    dataset = TensorDataset(torch.tensor(X), torch.tensor(y))
    N = len(dataset)
    train_len, val_len = int(0.8*N), int(0.1*N)
    test_len = N - train_len - val_len
    train_ds, val_ds, test_ds = random_split(dataset, [train_len, val_len, test_len])
    train_dl = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
    val_dl = DataLoader(val_ds, batch_size=batch_size)
    test_dl = DataLoader(test_ds, batch_size=batch_size)

    model = STGCNClassifier(num_class=y.shape[1], A=A).to(device)
    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

    for epoch in range(epochs):
        model.train()
        total_loss = 0
        for xb, yb in train_dl:
            xb, yb = xb.to(device), yb.to(device)
            optimizer.zero_grad()
            out = model(xb)
            loss = loss_fn(out, yb)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        
        # 訓練準確率
        model.eval()
        with torch.no_grad():
            train_preds, train_labels = [], []
            for xb, yb in train_dl:
                xb = xb.to(device)
                out = model(xb)
                preds = out.cpu().numpy()
                train_preds.extend(preds)
                train_labels.extend(yb.numpy())
            acc = accuracy_score(np.argmax(train_labels,axis=1), np.argmax(train_preds,axis=1))
        print(f"Epoch {epoch+1}/{epochs} | train Loss: {total_loss:.4f} | train Acc: {acc:.4f}")

        # 驗證準確率
        model.eval()
        with torch.no_grad():
            val_preds, val_labels = [], []
            for xb, yb in val_dl:
                xb = xb.to(device)
                out = model(xb)
                preds = out.cpu().numpy()
                val_preds.extend(preds)
                val_labels.extend(yb.numpy())
            acc = accuracy_score(np.argmax(val_labels,axis=1), np.argmax(val_preds,axis=1))

        print(f"Epoch {epoch+1}/{epochs} | Val Loss: {total_loss:.4f} | Val Acc: {acc:.4f}")

    # 訓練集準確率
    model.eval()
    with torch.no_grad():
        train_preds, train_labels = [], []
        for xb, yb in train_dl:
            xb = xb.to(device)
            preds = model(xb).cpu().numpy()
            train_preds.extend(preds)
            train_labels.extend(yb.numpy())
        train_acc = accuracy_score(np.argmax(train_labels,axis=1), np.argmax(train_preds,axis=1))
        print(f"✅ Train Accuracy: {train_acc:.4f}")

    # 測試集準確率
    model.eval()
    with torch.no_grad():
        test_preds, test_labels = [], []
        for xb, yb in test_dl:
            xb = xb.to(device)
            preds = model(xb).cpu().numpy()
            test_preds.extend(preds)
            test_labels.extend(yb.numpy())
        test_acc = accuracy_score(np.argmax(test_labels,axis=1), np.argmax(test_preds,axis=1))
        print(f"✅ Test Accuracy: {test_acc:.4f}")

    return model


# 載入資料

In [192]:
import os
import numpy as np
import pandas as pd

# 指定資料夾路徑
folder_path1 = './data/Gerrit_Cole_CH_videos_4S-20250603T012318Z-1-001/features_all_xyz'
folder_path2 = './data/Gerrit_Cole_FF_videos_4S-20250603T012329Z-1-001/features_all_xyz'
folder_path3 = './data/Gerrit_Cole_SL_videos_4S-20250603T012332Z-1-001/features_all_xyz'
csv1_path = './data/data_csv-20250603T012350Z-1-001/data_csv/Gerrit_Cole_CH.csv'
csv2_path = './data/data_csv-20250603T012350Z-1-001/data_csv/Gerrit_Cole_FF.csv'
csv3_path = './data/data_csv-20250603T012350Z-1-001/data_csv/Gerrit_Cole_SL.csv'
file_number = 200  # 假設每個資料夾有200個檔案

all_features = []
idx = 0
error_idx = []
for folder_path in [folder_path1, folder_path2, folder_path3]:
    for i in range(1, file_number+1):
        file_name = f'pitch_{i:04d}_all_xyz.npy'  # 正確的四位數格式
        try:
            all_features.append(np.load(os.path.join(folder_path, file_name)))
        except Exception as e:
            print(f"檔案 {file_name} 在資料夾 {folder_path} 中不存在或無法讀取。錯誤：{e}")
            error_idx.append(idx)
        idx += 1

# 可選：顯示第一筆資料的形狀來確認內容
print(f"共有 {len(all_features)} 筆資料")
if all_features:
    for i in range(10):
        print(f"第{i}筆資料形狀：", all_features[i].shape)
print(f"錯誤索引：{error_idx}")
csv_feat = pd.DataFrame()
for csv_path in [csv1_path, csv2_path, csv3_path]:
    csv_feat = pd.concat([csv_feat, pd.read_csv(csv_path).head(200)],ignore_index=True)
csv_feat = csv_feat.drop(index=error_idx)
print(f"csv_feat shape: {csv_feat.shape}")
display(csv_feat.head())

# 處理y
from sklearn.preprocessing import OneHotEncoder
def simplify_pitch_type(pitch_type: str) -> str:
    """
    將詳細的投球結果類別簡化為 'ball' 或 'strike'。

    參數:
        pitch_type (str): 原始投球結果類別

    回傳:
        str: 簡化後的類別（'ball' 或 'strike'）
    """
    ball_types = {'ball', 'foul', 'blocked_ball'}
    strike_types = {'hit_into_play', 'called_strike', 'swinging_strike', 'foul_tip'}

    if pitch_type in ball_types:
        return 'ball'
    elif pitch_type in strike_types:
        return 'strike'
    else:
        return 'unknown'  # 避免非預期輸入

csv_feat['description'] = csv_feat['description'].apply(simplify_pitch_type)
encoder = OneHotEncoder()
y = encoder.fit_transform(csv_feat[['description']]).toarray()
print(f"y shape: {y.shape}")
print(f"Class names: {encoder.categories_[0]}")

import numpy as np

# 處理x
def pad_features(features, max_len=None):
    """
    Pad a list of feature arrays to the same frame length.
    
    Args:
        features (list of np.ndarray): Each element is of shape (frames, joints, coords)
        max_len (int, optional): Max frame length to pad to. If None, use the longest one.

    Returns:
        np.ndarray: Padded array of shape (batch, max_len, joints, coords)
    """
    batch_size = len(features)
    max_len = max_len or max(f.shape[0] for f in features)
    joints = features[0].shape[1]
    coords = features[0].shape[2]

    padded = np.zeros((batch_size, max_len, joints, coords), dtype=np.float32)
    
    for i, f in enumerate(features):
        length = f.shape[0]
        padded[i, :length, :, :] = f

    return padded

# X,Y SHAPE
X = np.array(pad_features(all_features)) 
print(f"X shape: {X.shape}, y shape: {y.shape}")

檔案 pitch_0003_all_xyz.npy 在資料夾 ./data/Gerrit_Cole_CH_videos_4S-20250603T012318Z-1-001/features_all_xyz 中不存在或無法讀取。錯誤：[Errno 2] No such file or directory: './data/Gerrit_Cole_CH_videos_4S-20250603T012318Z-1-001/features_all_xyz\\pitch_0003_all_xyz.npy'
檔案 pitch_0004_all_xyz.npy 在資料夾 ./data/Gerrit_Cole_CH_videos_4S-20250603T012318Z-1-001/features_all_xyz 中不存在或無法讀取。錯誤：[Errno 2] No such file or directory: './data/Gerrit_Cole_CH_videos_4S-20250603T012318Z-1-001/features_all_xyz\\pitch_0004_all_xyz.npy'
檔案 pitch_0005_all_xyz.npy 在資料夾 ./data/Gerrit_Cole_CH_videos_4S-20250603T012318Z-1-001/features_all_xyz 中不存在或無法讀取。錯誤：[Errno 2] No such file or directory: './data/Gerrit_Cole_CH_videos_4S-20250603T012318Z-1-001/features_all_xyz\\pitch_0005_all_xyz.npy'
檔案 pitch_0006_all_xyz.npy 在資料夾 ./data/Gerrit_Cole_CH_videos_4S-20250603T012318Z-1-001/features_all_xyz 中不存在或無法讀取。錯誤：[Errno 2] No such file or directory: './data/Gerrit_Cole_CH_videos_4S-20250603T012318Z-1-001/features_all_xyz\\pitch_0006_all_xyz.n

Unnamed: 0,Filename,pitch_type,game_date,release_speed,release_pos_x,release_pos_z,player_name,batter,pitcher,events,...,batter_days_until_next_game,api_break_z_with_gravity,api_break_x_arm,api_break_x_batter_in,arm_angle,attack_angle,attack_direction,swing_path_tilt,intercept_ball_minus_batter_pos_x_inches,intercept_ball_minus_batter_pos_y_inches
0,pitch_0001.mp4,CH,2023-09-27,89.7,-2.18,5.62,"Cole, Gerrit",595281,543037,,...,1.0,2.08,1.29,-1.29,38.9,,,,,
1,pitch_0002.mp4,CH,2023-09-27,87.7,-2.08,5.71,"Cole, Gerrit",656305,543037,,...,1.0,2.34,1.34,1.34,41.5,,,,,
28,pitch_0029.mp4,CH,2023-09-05,89.7,-2.15,5.89,"Cole, Gerrit",678009,543037,,...,1.0,1.52,0.91,-0.91,46.9,,,,,
33,pitch_0034.mp4,CH,2023-09-05,87.6,-2.08,6.04,"Cole, Gerrit",678009,543037,,...,1.0,2.31,1.09,-1.09,45.0,,,,,
37,pitch_0038.mp4,CH,2023-08-30,89.0,-2.3,5.59,"Cole, Gerrit",656716,543037,grounded_into_double_play,...,1.0,1.75,1.28,-1.28,42.5,10.786684,3.197531,30.752486,45.919343,25.068283


y shape: (442, 3)
Class names: ['ball' 'strike' 'unknown']
X shape: (442, 239, 33, 3), y shape: (442, 3)


In [193]:
csv_feat['description'].unique()

array(['ball', 'strike', 'unknown'], dtype=object)

# 訓練stgcn

In [194]:
from sklearn.model_selection import train_test_split

A = build_adjacency_matrix()  # shape: (V, V)
X = X.transpose(0, 3, 1, 2)   # 轉成 (N, C, T, V)

# 標準化
X_norm = np.empty_like(X)
for c in range(X.shape[1]):  # 對每個 channel (x/y/z)
    mean = X[:, c, :, :].mean()
    std = X[:, c, :, :].std()
    X_norm[:, c, :, :] = (X[:, c, :, :] - mean) / (std + 1e-8)

# 資料切分
X_train, X_test, y_train, y_test = train_test_split(X_norm, y, test_size=0.2, random_state=42)

# 資料擴增
def augment_data(X, y, augment_ratio=0.5):
    """
    對 X (N, C, T, V) 做簡單資料擴增，擴增比例為 augment_ratio，
    輸出為擴增後的 X_aug, y_aug，會疊加於原始資料後面。
    """
    N, C, T, V = X.shape
    num_aug = int(N * augment_ratio)
    
    # 隨機挑選要擴增的樣本index
    idx = np.random.choice(N, num_aug, replace=False)
    
    X_aug = X[idx].copy()
    y_aug = y[idx].copy()
    
    # 時間隨機裁剪 (從T的中間段截取一段長度80%~100%的序列，再用插值拉回T長度)
    def temporal_crop_resize(sample):
        t_len = sample.shape[1]
        crop_ratio = np.random.uniform(0.8, 1.0)
        crop_len = int(t_len * crop_ratio)
        start = np.random.randint(0, t_len - crop_len + 1)
        cropped = sample[:, start:start+crop_len, :]
        # 線性插值resize回t_len
        x_old = np.linspace(0, 1, crop_len)
        x_new = np.linspace(0, 1, t_len)
        cropped_resized = np.empty((sample.shape[0], t_len, sample.shape[2]))
        for c in range(sample.shape[0]):
            for v in range(sample.shape[2]):
                cropped_resized[c, :, v] = np.interp(x_new, x_old, cropped[c, :, v])
        return cropped_resized
    
    # 空間維度加小噪聲
    def add_noise(sample, noise_level=0.05):
        noise = np.random.normal(scale=noise_level, size=sample.shape)
        return sample + noise
    
    for i in range(num_aug):
        sample = X_aug[i]
        
        # temporal crop + resize
        sample = temporal_crop_resize(sample)
        
        # 加噪聲
        sample = add_noise(sample)
        
        X_aug[i] = sample
    
    # 將擴增資料接回原始資料後
    X_all = np.concatenate([X, X_aug], axis=0)
    y_all = np.concatenate([y, y_aug], axis=0)
    
    return X_all, y_all

# 訓練
X_train_aug, y_train_aug = augment_data(X_train, y_train, augment_ratio=0.5)
model = train_stgcn(X_train_aug, y_train_aug, A, epochs=100)


(529, 3, 239, 33) (529, 3)
Epoch 1/100 | train Loss: 22.6057 | train Acc: 0.5792
Epoch 1/100 | Val Loss: 22.6057 | Val Acc: 0.4615
Epoch 2/100 | train Loss: 19.3976 | train Acc: 0.5792
Epoch 2/100 | Val Loss: 19.3976 | Val Acc: 0.4615
Epoch 3/100 | train Loss: 19.0310 | train Acc: 0.5792
Epoch 3/100 | Val Loss: 19.0310 | Val Acc: 0.4615
Epoch 4/100 | train Loss: 18.8210 | train Acc: 0.5934
Epoch 4/100 | Val Loss: 18.8210 | Val Acc: 0.4615
Epoch 5/100 | train Loss: 18.6201 | train Acc: 0.6170
Epoch 5/100 | Val Loss: 18.6201 | Val Acc: 0.3846
Epoch 6/100 | train Loss: 18.4524 | train Acc: 0.5839
Epoch 6/100 | Val Loss: 18.4524 | Val Acc: 0.4423
Epoch 7/100 | train Loss: 18.9445 | train Acc: 0.6099
Epoch 7/100 | Val Loss: 18.9445 | Val Acc: 0.4231
Epoch 8/100 | train Loss: 18.8612 | train Acc: 0.5508
Epoch 8/100 | Val Loss: 18.8612 | Val Acc: 0.5577
Epoch 9/100 | train Loss: 19.1180 | train Acc: 0.5792
Epoch 9/100 | Val Loss: 19.1180 | Val Acc: 0.4615
Epoch 10/100 | train Loss: 18.8697 | 

In [195]:
# 測試
import gc
import torch
from torch.utils.data import DataLoader, TensorDataset

gc.collect()
torch.cuda.empty_cache()

# 準備 test 資料
X_test_tensor = torch.Tensor(X_test)
y_test_tensor = torch.Tensor(y_test)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# 模型預測
model.eval()
y_preds = []
with torch.no_grad():
    for x_batch, _ in test_loader:
        x_batch = x_batch.cuda()
        y_batch_pred = model(x_batch)
        y_preds.append(y_batch_pred.cpu())
        
# 合併所有預測
y_pred_all = torch.cat(y_preds).numpy()
acc = accuracy_score(np.argmax(y_test, axis=1), np.argmax(y_pred_all, axis=1))
print(f"Accuracy: {acc:.4f}")

Accuracy: 0.4944


# 與隨機森林比較

In [196]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

X_flat = X.reshape(X.shape[0], -1)  # (203, 3*239*33) = (203, 23661)
# 資料切分
X_train, X_test, y_train, y_test = train_test_split(X_flat, y, test_size=0.2, random_state=42)

# 建立模型並訓練
clf = RandomForestClassifier(n_estimators=100, random_state=42)
clf.fit(X_train, y_train)

# 預測與評估
y_pred = clf.predict(X_test)
acc = accuracy_score(y_test, y_pred)
print(f"Accuracy: {acc:.4f}")


Accuracy: 0.5506
