In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### 1D GRU 처리

In [None]:
import os
import glob
import json
import math
import random
from typing import List, Dict, Any, Tuple

import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

import re

# =========================
# 0. 기본 설정
# =========================
BASE_DIR = "/content/drive/MyDrive/2025CVproject/preprocessing/Keypoints_json"
TARGET_LEN = 16         # T (시퀀스 길이)
BATCH_SIZE = 32
EPOCHS = 40
LEARNING_RATE = 3e-4
WEIGHT_DECAY = 1e-2
LABEL_SMOOTHING = 0.1
RANDOM_SEED = 42

TRAIN_RATIO = 0.7
VAL_RATIO = 0.15
TEST_RATIO = 0.15

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {DEVICE}")

random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)
if DEVICE == "cuda":
    torch.cuda.manual_seed_all(RANDOM_SEED)


# =========================
# 1. 유틸 함수들
# =========================

EXPECTED_POSE = 33
EXPECTED_HAND = 21  # left/right 둘 다


def load_json(path: str) -> Dict[str, Any]:
    with open(path, "r", encoding="utf-8") as f:
        return json.load(f)


def get_landmark_array(lst: List[Dict[str, float]], expected_len: int) -> np.ndarray:
    """
    lst: [{"x":..., "y":..., "z":..., "v":...}, ...]
    expected_len 길이로 (expected_len, 4) 배열 생성 (부족하면 0 패딩)
    + NaN/Inf -> 0으로 치환
    """
    arr = np.zeros((expected_len, 4), dtype=np.float32)
    for i, lm in enumerate(lst[:expected_len]):
        arr[i, 0] = lm.get("x", 0.0)
        arr[i, 1] = lm.get("y", 0.0)
        arr[i, 2] = lm.get("z", 0.0)
        arr[i, 3] = lm.get("v", 0.0)

    arr = np.nan_to_num(arr, nan=0.0, posinf=0.0, neginf=0.0)
    return arr


def normalize_skeleton(frame_kpts: np.ndarray) -> np.ndarray:
    """
    frame_kpts: (75, 4) = pose(33) + left_hand(21) + right_hand(21)
    - 어깨 중심 기준 translation
    - 어깨폭으로 scale
    - 어깨 라인이 수평이 되도록 (x, y) 회전
    """
    frame_kpts = np.nan_to_num(frame_kpts, nan=0.0, posinf=0.0, neginf=0.0)

    pose = frame_kpts[:EXPECTED_POSE]  # (33, 4)

    # Mediapipe 기준: 11 = left_shoulder, 12 = right_shoulder
    ls_idx, rs_idx = 11, 12
    ls = pose[ls_idx]  # (x,y,z,v)
    rs = pose[rs_idx]

    use_shoulders = (ls[3] > 0.3 and rs[3] > 0.3)

    if use_shoulders:
        origin = (ls[:3] + rs[:3]) / 2.0  # (x,y,z)
        shoulder_vec_xy = rs[:2] - ls[:2]
        shoulder_dist = np.linalg.norm(shoulder_vec_xy)
        if not np.isfinite(shoulder_dist) or shoulder_dist < 1e-4:
            shoulder_dist = 1.0
        scale = shoulder_dist
        theta = math.atan2(float(shoulder_vec_xy[1]), float(shoulder_vec_xy[0]))
    else:
        origin = frame_kpts[:, :3].mean(axis=0)
        origin = np.nan_to_num(origin, nan=0.0, posinf=0.0, neginf=0.0)
        scale = 1.0
        theta = 0.0

    kpts = frame_kpts.copy()
    kpts[:, :3] = (kpts[:, :3] - origin[None, :]) / (scale + 1e-6)

    if abs(theta) > 1e-3:
        cos_t = math.cos(theta)
        sin_t = math.sin(theta)
        x = kpts[:, 0].copy()
        y = kpts[:, 1].copy()
        kpts[:, 0] = x * cos_t + y * sin_t
        kpts[:, 1] = -x * sin_t + y * cos_t

    kpts = np.nan_to_num(kpts, nan=0.0, posinf=0.0, neginf=0.0)
    return kpts


def resample_sequence(seq: np.ndarray, target_len: int) -> np.ndarray:
    """
    seq: (L, D)
    target_len: T
    """
    seq = np.nan_to_num(seq, nan=0.0, posinf=0.0, neginf=0.0)

    L = seq.shape[0]
    if L == target_len:
        return seq
    if L <= 1:
        return np.repeat(seq, target_len, axis=0)

    idxs = np.linspace(0, L - 1, target_len)
    idxs = np.round(idxs).astype(np.int32)
    idxs = np.clip(idxs, 0, L - 1)
    out = seq[idxs]
    out = np.nan_to_num(out, nan=0.0, posinf=0.0, neginf=0.0)
    return out


def add_velocity_feature(seq: np.ndarray) -> np.ndarray:
    """
    seq: (T, D)
    -> [position, velocity] concat (T, 2D)
    """
    seq = np.nan_to_num(seq, nan=0.0, posinf=0.0, neginf=0.0)
    vel = np.diff(seq, axis=0, prepend=seq[0:1])  # (T, D)
    vel = np.nan_to_num(vel, nan=0.0, posinf=0.0, neginf=0.0)
    out = np.concatenate([seq, vel], axis=-1)     # (T, 2D)
    out = np.nan_to_num(out, nan=0.0, posinf=0.0, neginf=0.0)
    return out


# =========================
# 2. 데이터 로딩 & 전처리
# =========================

SEQ_GROUP_RE = re.compile(r"(.*)_s(\d+)_hands$")  # base_id, segment index 추출용


def collect_sequences(base_dir: str):
    """
    base_dir 아래:
        person(1~10) / WORD..._s00_hands.json
    """
    all_seq_arrays = []
    all_labels = []

    person_dirs = sorted(
        [d for d in glob.glob(os.path.join(base_dir, "*")) if os.path.isdir(d)]
    )
    print(f"Found person dirs: {person_dirs}")

    for p_dir in person_dirs:
        person_name = os.path.basename(p_dir)

        json_files = glob.glob(os.path.join(p_dir, "*_hands.json"))

        groups = {}  # base_id -> {seg_idx:int -> [json_path,...]}
        for jf in json_files:
            fname = os.path.basename(jf)
            stem, _ = os.path.splitext(fname)
            m = SEQ_GROUP_RE.match(stem)
            if not m:
                continue
            base_id, seg_str = m.groups()
            seg_idx = int(seg_str)
            if base_id not in groups:
                groups[base_id] = {}
            if seg_idx not in groups[base_id]:
                groups[base_id][seg_idx] = []
            groups[base_id][seg_idx].append(jf)

        print(f"[Person {person_name}] #base sequences: {len(groups)}")

        for base_id, seg_dict in groups.items():
            seg_indices = sorted(seg_dict.keys())

            segment_features = []
            label_word = None

            for seg_idx in seg_indices:
                seg_jsons = sorted(seg_dict[seg_idx])
                if not seg_jsons:
                    continue

                frame_kpts_list = []
                for jf in seg_jsons:
                    data = load_json(jf)

                    if label_word is None:
                        label_word = data.get("word_folder", "UNKNOWN")

                    pose = get_landmark_array(data.get("pose", []), EXPECTED_POSE)
                    lh = get_landmark_array(data.get("left_hand", []), EXPECTED_HAND)
                    rh = get_landmark_array(data.get("right_hand", []), EXPECTED_HAND)
                    frame_kpts = np.concatenate([pose, lh, rh], axis=0)  # (75, 4)
                    frame_kpts = normalize_skeleton(frame_kpts)
                    frame_kpts_list.append(frame_kpts)

                if not frame_kpts_list:
                    continue

                seg_arr = np.stack(frame_kpts_list, axis=0).mean(axis=0)  # (75,4)
                seg_arr = np.nan_to_num(seg_arr, nan=0.0, posinf=0.0, neginf=0.0)
                segment_features.append(seg_arr)

            if not segment_features:
                continue

            seq_arr = np.stack(segment_features, axis=0)  # (L, 75, 4)
            seq_arr = np.nan_to_num(seq_arr, nan=0.0, posinf=0.0, neginf=0.0)
            all_seq_arrays.append(seq_arr)
            all_labels.append(label_word if label_word is not None else "UNKNOWN")

    print(f"Total sequences (base_id level): {len(all_seq_arrays)}")
    return all_seq_arrays, all_labels


def build_label_mapping(labels: List[str]) -> Dict[str, int]:
    uniq = sorted(list(set(labels)))
    label2idx = {lab: i for i, lab in enumerate(uniq)}
    print("Label mapping:")
    for k, v in label2idx.items():
        print(f"  {k} -> {v}")
    return label2idx


def prepare_dataset(
    base_dir: str, target_len: int
) -> Tuple[List[np.ndarray], List[int], int]:
    """
    (L,75,4) -> (T, 300) -> velocity concat (T, 600)
    """
    raw_seqs, raw_labels = collect_sequences(base_dir)
    label2idx = build_label_mapping(raw_labels)

    X_list = []
    y_list = []

    for seq_arr, lab in zip(raw_seqs, raw_labels):
        seq_arr = np.nan_to_num(seq_arr, nan=0.0, posinf=0.0, neginf=0.0)
        L, J, C = seq_arr.shape  # (L, 75, 4)
        seq_flat = seq_arr.reshape(L, J * C)  # (L, 300)
        seq_flat = np.nan_to_num(seq_flat, nan=0.0, posinf=0.0, neginf=0.0)

        seq_resampled = resample_sequence(seq_flat, target_len)  # (T, 300)
        seq_with_vel = add_velocity_feature(seq_resampled)       # (T, 600)
        seq_with_vel = np.nan_to_num(seq_with_vel, nan=0.0, posinf=0.0, neginf=0.0)

        X_list.append(seq_with_vel.astype(np.float32))
        y_list.append(label2idx[lab])

    X_arr = np.stack(X_list, axis=0)
    print("== After prepare_dataset (with nan_to_num) ==")
    print("Has NaN in X:", np.isnan(X_arr).any())
    print("Has Inf in X:", np.isinf(X_arr).any())
    print("Max abs value:", float(np.max(np.abs(X_arr))))

    input_dim = X_list[0].shape[1]
    num_classes = len(label2idx)
    print(f"Final input dim D' = {input_dim}")
    print(f"num_classes = {num_classes}")

    return X_list, y_list, num_classes


def compute_global_stats(
    X_list: List[np.ndarray], indices: List[int]
) -> Tuple[np.ndarray, np.ndarray]:
    """
    train split에서만 mean/std 계산
    """
    if len(indices) == 0:
        raise ValueError("No indices provided for computing global stats.")
    all_train = np.concatenate([X_list[i] for i in indices], axis=0)
    all_train = np.nan_to_num(all_train, nan=0.0, posinf=0.0, neginf=0.0)
    mean = all_train.mean(axis=0)
    std = all_train.std(axis=0) + 1e-6
    mean = np.nan_to_num(mean, nan=0.0, posinf=0.0, neginf=0.0)
    std = np.nan_to_num(std, nan=1.0, posinf=1.0, neginf=1.0)
    return mean.astype(np.float32), std.astype(np.float32)


# =========================
# 3. Dataset & Augmentation
# =========================

def augment_seq_tensor(seq: torch.Tensor) -> torch.Tensor:
    """
    seq: (T, D)
    """
    T, D = seq.shape

    # global scale
    if random.random() < 0.5:
        scale = random.uniform(0.95, 1.05)
        seq = seq * scale

    # time jitter
    if random.random() < 0.5 and T > 8:
        cut = random.randint(0, 2)
        if cut > 0:
            seq_short = seq[cut:]
            pad = seq_short[-1:].repeat(cut, 1)
            seq = torch.cat([seq_short, pad], dim=0)

    return seq


class KeypointSequenceDataset(Dataset):
    def __init__(
        self,
        X_list: List[np.ndarray],
        y_list: List[int],
        indices: List[int],
        mean: np.ndarray = None,
        std: np.ndarray = None,
        augment: bool = False,
    ):
        self.X_list = X_list
        self.y_list = y_list
        self.indices = indices
        self.augment = augment

        if mean is not None and std is not None:
            self.mean = torch.from_numpy(mean).float()
            self.std = torch.from_numpy(std).float()
        else:
            self.mean = None
            self.std = None

    def __len__(self):
        return len(self.indices)

    def __getitem__(self, idx):
        real_idx = self.indices[idx]
        seq = self.X_list[real_idx]  # (T, D)
        label = self.y_list[real_idx]

        seq = torch.from_numpy(seq).float()  # (T, D)

        if self.mean is not None and self.std is not None:
            seq = (seq - self.mean) / self.std

        if self.augment:
            if random.random() < 0.7:
                noise_std = 0.01
                seq = seq + torch.randn_like(seq) * noise_std

            if random.random() < 0.5:
                max_shift = 3
                shift = random.randint(-max_shift, max_shift)
                if shift != 0:
                    seq = torch.roll(seq, shifts=shift, dims=0)

            seq = augment_seq_tensor(seq)

        return seq, label


# =========================
# 4. GRU + Attention Pooling
# =========================

class GRUClassifier(nn.Module):
    def __init__(
        self,
        input_dim: int,
        num_classes: int,
        hidden_dim: int = 256,
        num_layers: int = 2,
        bidirectional: bool = True,
        dropout: float = 0.2,
    ):
        super().__init__()
        self.gru = nn.GRU(
            input_size=input_dim,   # 600
            hidden_size=hidden_dim, # 256
            num_layers=num_layers,
            batch_first=True,
            bidirectional=bidirectional,
            dropout=dropout if num_layers > 1 else 0.0,
        )
        out_dim = hidden_dim * (2 if bidirectional else 1)

        self.attn_fc = nn.Linear(out_dim, 1)
        self.head = nn.Sequential(
            nn.LayerNorm(out_dim),
            nn.Dropout(0.2),
            nn.Linear(out_dim, num_classes),
        )

    def forward(self, x):
        # x: (B, T, D)
        out, _ = self.gru(x)            # (B, T, H*)
        out = torch.nan_to_num(out, nan=0.0, posinf=0.0, neginf=0.0)

        w = self.attn_fc(out)           # (B, T, 1)
        w = torch.softmax(w, dim=1)
        w = torch.nan_to_num(w, nan=0.0, posinf=0.0, neginf=0.0)

        feat = (w * out).sum(dim=1)     # (B, H*)
        logits = self.head(feat)
        return logits


# =========================
# 5. 학습/평가 루프
# =========================

def run_epoch(model, loader, optimizer=None, criterion=None):
    if optimizer is None:
        model.eval()
    else:
        model.train()

    total_loss = 0.0
    total_correct = 0
    total_count = 0

    for seq, label in loader:
        seq = seq.to(DEVICE)
        label = label.to(DEVICE)

        if optimizer is not None:
            optimizer.zero_grad()

        logits = model(seq)
        loss = criterion(logits, label)

        if optimizer is not None:
            loss.backward()
            optimizer.step()

        total_loss += loss.item() * seq.size(0)
        _, pred = torch.max(logits, dim=1)
        total_correct += (pred == label).sum().item()
        total_count += seq.size(0)

    avg_loss = total_loss / total_count
    avg_acc = total_correct / total_count
    return avg_loss, avg_acc


# =========================
# 6. 랜덤 train/val/test split
# =========================

def random_split_indices(
    num_samples: int,
    train_ratio: float,
    val_ratio: float,
    test_ratio: float,
    seed: int = 42,
) -> Tuple[List[int], List[int], List[int]]:
    assert abs(train_ratio + val_ratio + test_ratio - 1.0) < 1e-6, "비율 합이 1이 되게 설정해줘."

    rng = random.Random(seed)
    indices = list(range(num_samples))
    rng.shuffle(indices)

    n_train = int(num_samples * train_ratio)
    n_val = int(num_samples * val_ratio)
    # 나머지는 test
    n_test = num_samples - n_train - n_val

    train_idx = indices[:n_train]
    val_idx = indices[n_train:n_train + n_val]
    test_idx = indices[n_train + n_val:]

    print(f"[Random Split] N={num_samples}, #train={len(train_idx)}, #val={len(val_idx)}, #test={len(test_idx)}")
    return train_idx, val_idx, test_idx


# =========================
# 7. 메인
# =========================

def main():
    X_list, y_list, num_classes = prepare_dataset(BASE_DIR, TARGET_LEN)
    input_dim = X_list[0].shape[1]
    print(f"Input dim for model: {input_dim}")

    num_samples = len(X_list)
    train_idx, val_idx, test_idx = random_split_indices(
        num_samples,
        TRAIN_RATIO,
        VAL_RATIO,
        TEST_RATIO,
        seed=RANDOM_SEED,
    )

    # train 기준 mean/std
    train_mean, train_std = compute_global_stats(X_list, train_idx)
    print("[Stats] mean/std computed from train set.")

    train_dataset = KeypointSequenceDataset(
        X_list, y_list, train_idx,
        mean=train_mean, std=train_std,
        augment=True
    )
    val_dataset = KeypointSequenceDataset(
        X_list, y_list, val_idx,
        mean=train_mean, std=train_std,
        augment=False
    )
    test_dataset = KeypointSequenceDataset(
        X_list, y_list, test_idx,
        mean=train_mean, std=train_std,
        augment=False
    )

    train_loader = DataLoader(
        train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0
    )
    val_loader = DataLoader(
        val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0
    )
    test_loader = DataLoader(
        test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0
    )

    model = GRUClassifier( input_dim=input_dim, num_classes=num_classes, hidden_dim=256,
                            num_layers=2, bidirectional=True, dropout=0.2,).to(DEVICE)


    optimizer = torch.optim.AdamW(
        model.parameters(),
        lr=LEARNING_RATE,
        weight_decay=WEIGHT_DECAY
    )
    criterion = nn.CrossEntropyLoss(label_smoothing=LABEL_SMOOTHING)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode='max', factor=0.5, patience=3
    )

    best_val_acc = 0.0
    best_state = None

    for epoch in range(1, EPOCHS + 1):
        train_loss, train_acc = run_epoch(model, train_loader, optimizer, criterion)
        val_loss, val_acc = run_epoch(model, val_loader, optimizer=None, criterion=criterion)

        print(
            f"[Epoch {epoch:02d}] "
            f"train_loss={train_loss:.4f}, train_acc={train_acc:.3f}, "
            f"val_loss={val_loss:.4f}, val_acc={val_acc:.3f}"
        )

        scheduler.step(val_acc)

        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_state = model.state_dict()

    if best_state is not None:
        model.load_state_dict(best_state)
    test_loss, test_acc = run_epoch(model, test_loader, optimizer=None, criterion=criterion)
    print(
        f"[Final] Best val_acc={best_val_acc:.3f}, "
        f"Test loss={test_loss:.4f}, Test acc={test_acc:.3f}"
    )


if __name__ == "__main__":
    main()


Using device: cpu
Found person dirs: ['/content/drive/MyDrive/2025CVproject/preprocessing/Keypoints_json/1', '/content/drive/MyDrive/2025CVproject/preprocessing/Keypoints_json/10', '/content/drive/MyDrive/2025CVproject/preprocessing/Keypoints_json/2', '/content/drive/MyDrive/2025CVproject/preprocessing/Keypoints_json/3', '/content/drive/MyDrive/2025CVproject/preprocessing/Keypoints_json/4', '/content/drive/MyDrive/2025CVproject/preprocessing/Keypoints_json/5', '/content/drive/MyDrive/2025CVproject/preprocessing/Keypoints_json/6', '/content/drive/MyDrive/2025CVproject/preprocessing/Keypoints_json/7', '/content/drive/MyDrive/2025CVproject/preprocessing/Keypoints_json/8', '/content/drive/MyDrive/2025CVproject/preprocessing/Keypoints_json/9']
[Person 1] #base sequences: 107
[Person 10] #base sequences: 110
[Person 2] #base sequences: 110
[Person 3] #base sequences: 110
[Person 4] #base sequences: 110
[Person 5] #base sequences: 110
[Person 6] #base sequences: 110
[Person 7] #base sequences

### 1D TCN 처리

In [None]:
import os
import glob
import json
import math
import random
from typing import List, Dict, Any, Tuple

import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

import re

# =========================
# 0. 기본 설정
# =========================
BASE_DIR = "/content/drive/MyDrive/2025CVproject/preprocessing/Keypoints_json"
TARGET_LEN = 16         # T (시퀀스 길이)
BATCH_SIZE = 32
EPOCHS = 40
LEARNING_RATE = 3e-4
WEIGHT_DECAY = 1e-2
LABEL_SMOOTHING = 0.1
RANDOM_SEED = 42

TRAIN_RATIO = 0.7
VAL_RATIO = 0.15
TEST_RATIO = 0.15

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {DEVICE}")

random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)
if DEVICE == "cuda":
    torch.cuda.manual_seed_all(RANDOM_SEED)


# =========================
# 1. 유틸 함수들
# =========================

EXPECTED_POSE = 33
EXPECTED_HAND = 21  # left/right 둘 다


def load_json(path: str) -> Dict[str, Any]:
    with open(path, "r", encoding="utf-8") as f:
        return json.load(f)


def get_landmark_array(lst: List[Dict[str, float]], expected_len: int) -> np.ndarray:
    """
    lst: [{"x":..., "y":..., "z":..., "v":...}, ...]
    expected_len 길이로 (expected_len, 4) 배열 생성 (부족하면 0 패딩)
    + NaN/Inf -> 0으로 치환
    """
    arr = np.zeros((expected_len, 4), dtype=np.float32)
    for i, lm in enumerate(lst[:expected_len]):
        arr[i, 0] = lm.get("x", 0.0)
        arr[i, 1] = lm.get("y", 0.0)
        arr[i, 2] = lm.get("z", 0.0)
        arr[i, 3] = lm.get("v", 0.0)

    arr = np.nan_to_num(arr, nan=0.0, posinf=0.0, neginf=0.0)
    return arr


def normalize_skeleton(frame_kpts: np.ndarray) -> np.ndarray:
    """
    frame_kpts: (75, 4) = pose(33) + left_hand(21) + right_hand(21)
    - 어깨 중심 기준 translation
    - 어깨폭으로 scale
    - 어깨 라인이 수평이 되도록 (x, y) 회전
    """
    frame_kpts = np.nan_to_num(frame_kpts, nan=0.0, posinf=0.0, neginf=0.0)

    pose = frame_kpts[:EXPECTED_POSE]  # (33, 4)

    # Mediapipe 기준: 11 = left_shoulder, 12 = right_shoulder
    ls_idx, rs_idx = 11, 12
    ls = pose[ls_idx]  # (x,y,z,v)
    rs = pose[rs_idx]

    use_shoulders = (ls[3] > 0.3 and rs[3] > 0.3)

    if use_shoulders:
        origin = (ls[:3] + rs[:3]) / 2.0  # (x,y,z)
        shoulder_vec_xy = rs[:2] - ls[:2]
        shoulder_dist = np.linalg.norm(shoulder_vec_xy)
        if not np.isfinite(shoulder_dist) or shoulder_dist < 1e-4:
            shoulder_dist = 1.0
        scale = shoulder_dist
        theta = math.atan2(float(shoulder_vec_xy[1]), float(shoulder_vec_xy[0]))
    else:
        origin = frame_kpts[:, :3].mean(axis=0)
        origin = np.nan_to_num(origin, nan=0.0, posinf=0.0, neginf=0.0)
        scale = 1.0
        theta = 0.0

    kpts = frame_kpts.copy()
    kpts[:, :3] = (kpts[:, :3] - origin[None, :]) / (scale + 1e-6)

    if abs(theta) > 1e-3:
        cos_t = math.cos(theta)
        sin_t = math.sin(theta)
        x = kpts[:, 0].copy()
        y = kpts[:, 1].copy()
        kpts[:, 0] = x * cos_t + y * sin_t
        kpts[:, 1] = -x * sin_t + y * cos_t

    kpts = np.nan_to_num(kpts, nan=0.0, posinf=0.0, neginf=0.0)
    return kpts


def resample_sequence(seq: np.ndarray, target_len: int) -> np.ndarray:
    """
    seq: (L, D)
    target_len: T
    """
    seq = np.nan_to_num(seq, nan=0.0, posinf=0.0, neginf=0.0)

    L = seq.shape[0]
    if L == target_len:
        return seq
    if L <= 1:
        return np.repeat(seq, target_len, axis=0)

    idxs = np.linspace(0, L - 1, target_len)
    idxs = np.round(idxs).astype(np.int32)
    idxs = np.clip(idxs, 0, L - 1)
    out = seq[idxs]
    out = np.nan_to_num(out, nan=0.0, posinf=0.0, neginf=0.0)
    return out


def add_velocity_feature(seq: np.ndarray) -> np.ndarray:
    """
    seq: (T, D)
    -> [position, velocity] concat (T, 2D)
    """
    seq = np.nan_to_num(seq, nan=0.0, posinf=0.0, neginf=0.0)
    vel = np.diff(seq, axis=0, prepend=seq[0:1])  # (T, D)
    vel = np.nan_to_num(vel, nan=0.0, posinf=0.0, neginf=0.0)
    out = np.concatenate([seq, vel], axis=-1)     # (T, 2D)
    out = np.nan_to_num(out, nan=0.0, posinf=0.0, neginf=0.0)
    return out


# =========================
# 2. 데이터 로딩 & 전처리
# =========================

SEQ_GROUP_RE = re.compile(r"(.*)_s(\d+)_hands$")  # base_id, segment index 추출용


def collect_sequences(base_dir: str):
    """
    base_dir 아래:
        person(1~10) / WORD..._s00_hands.json
    """
    all_seq_arrays = []
    all_labels = []

    person_dirs = sorted(
        [d for d in glob.glob(os.path.join(base_dir, "*")) if os.path.isdir(d)]
    )
    print(f"Found person dirs: {person_dirs}")

    for p_dir in person_dirs:
        person_name = os.path.basename(p_dir)

        json_files = glob.glob(os.path.join(p_dir, "*_hands.json"))

        groups = {}  # base_id -> {seg_idx:int -> [json_path,...]}
        for jf in json_files:
            fname = os.path.basename(jf)
            stem, _ = os.path.splitext(fname)
            m = SEQ_GROUP_RE.match(stem)
            if not m:
                continue
            base_id, seg_str = m.groups()
            seg_idx = int(seg_str)
            if base_id not in groups:
                groups[base_id] = {}
            if seg_idx not in groups[base_id]:
                groups[base_id][seg_idx] = []
            groups[base_id][seg_idx].append(jf)

        print(f"[Person {person_name}] #base sequences: {len(groups)}")

        for base_id, seg_dict in groups.items():
            seg_indices = sorted(seg_dict.keys())

            segment_features = []
            label_word = None

            for seg_idx in seg_indices:
                seg_jsons = sorted(seg_dict[seg_idx])
                if not seg_jsons:
                    continue

                frame_kpts_list = []
                for jf in seg_jsons:
                    data = load_json(jf)

                    if label_word is None:
                        label_word = data.get("word_folder", "UNKNOWN")

                    pose = get_landmark_array(data.get("pose", []), EXPECTED_POSE)
                    lh = get_landmark_array(data.get("left_hand", []), EXPECTED_HAND)
                    rh = get_landmark_array(data.get("right_hand", []), EXPECTED_HAND)
                    frame_kpts = np.concatenate([pose, lh, rh], axis=0)  # (75, 4)
                    frame_kpts = normalize_skeleton(frame_kpts)
                    frame_kpts_list.append(frame_kpts)

                if not frame_kpts_list:
                    continue

                seg_arr = np.stack(frame_kpts_list, axis=0).mean(axis=0)  # (75,4)
                seg_arr = np.nan_to_num(seg_arr, nan=0.0, posinf=0.0, neginf=0.0)
                segment_features.append(seg_arr)

            if not segment_features:
                continue

            seq_arr = np.stack(segment_features, axis=0)  # (L, 75, 4)
            seq_arr = np.nan_to_num(seq_arr, nan=0.0, posinf=0.0, neginf=0.0)
            all_seq_arrays.append(seq_arr)
            all_labels.append(label_word if label_word is not None else "UNKNOWN")

    print(f"Total sequences (base_id level): {len(all_seq_arrays)}")
    return all_seq_arrays, all_labels


def build_label_mapping(labels: List[str]) -> Dict[str, int]:
    uniq = sorted(list(set(labels)))
    label2idx = {lab: i for i, lab in enumerate(uniq)}
    print("Label mapping:")
    for k, v in label2idx.items():
        print(f"  {k} -> {v}")
    return label2idx


def prepare_dataset(
    base_dir: str, target_len: int
) -> Tuple[List[np.ndarray], List[int], int]:
    """
    (L,75,4) -> (T, 300) -> velocity concat (T, 600)
    """
    raw_seqs, raw_labels = collect_sequences(base_dir)
    label2idx = build_label_mapping(raw_labels)

    X_list = []
    y_list = []

    for seq_arr, lab in zip(raw_seqs, raw_labels):
        seq_arr = np.nan_to_num(seq_arr, nan=0.0, posinf=0.0, neginf=0.0)
        L, J, C = seq_arr.shape  # (L, 75, 4)
        seq_flat = seq_arr.reshape(L, J * C)  # (L, 300)
        seq_flat = np.nan_to_num(seq_flat, nan=0.0, posinf=0.0, neginf=0.0)

        seq_resampled = resample_sequence(seq_flat, target_len)  # (T, 300)
        seq_with_vel = add_velocity_feature(seq_resampled)       # (T, 600)
        seq_with_vel = np.nan_to_num(seq_with_vel, nan=0.0, posinf=0.0, neginf=0.0)

        X_list.append(seq_with_vel.astype(np.float32))
        y_list.append(label2idx[lab])

    X_arr = np.stack(X_list, axis=0)
    print("== After prepare_dataset (with nan_to_num) ==")
    print("Has NaN in X:", np.isnan(X_arr).any())
    print("Has Inf in X:", np.isinf(X_arr).any())
    print("Max abs value:", float(np.max(np.abs(X_arr))))

    input_dim = X_list[0].shape[1]
    num_classes = len(label2idx)
    print(f"Final input dim D' = {input_dim}")
    print(f"num_classes = {num_classes}")

    return X_list, y_list, num_classes


def compute_global_stats(
    X_list: List[np.ndarray], indices: List[int]
) -> Tuple[np.ndarray, np.ndarray]:
    """
    train split에서만 mean/std 계산
    """
    if len(indices) == 0:
        raise ValueError("No indices provided for computing global stats.")
    all_train = np.concatenate([X_list[i] for i in indices], axis=0)
    all_train = np.nan_to_num(all_train, nan=0.0, posinf=0.0, neginf=0.0)
    mean = all_train.mean(axis=0)
    std = all_train.std(axis=0) + 1e-6
    mean = np.nan_to_num(mean, nan=0.0, posinf=0.0, neginf=0.0)
    std = np.nan_to_num(std, nan=1.0, posinf=1.0, neginf=1.0)
    return mean.astype(np.float32), std.astype(np.float32)


# =========================
# 3. Dataset & Augmentation
# =========================

def augment_seq_tensor(seq: torch.Tensor) -> torch.Tensor:
    """
    seq: (T, D)
    """
    T, D = seq.shape

    # global scale
    if random.random() < 0.5:
        scale = random.uniform(0.95, 1.05)
        seq = seq * scale

    # time jitter
    if random.random() < 0.5 and T > 8:
        cut = random.randint(0, 2)
        if cut > 0:
            seq_short = seq[cut:]
            pad = seq_short[-1:].repeat(cut, 1)
            seq = torch.cat([seq_short, pad], dim=0)

    return seq


class KeypointSequenceDataset(Dataset):
    def __init__(
        self,
        X_list: List[np.ndarray],
        y_list: List[int],
        indices: List[int],
        mean: np.ndarray = None,
        std: np.ndarray = None,
        augment: bool = False,
    ):
        self.X_list = X_list
        self.y_list = y_list
        self.indices = indices
        self.augment = augment

        if mean is not None and std is not None:
            self.mean = torch.from_numpy(mean).float()
            self.std = torch.from_numpy(std).float()
        else:
            self.mean = None
            self.std = None

    def __len__(self):
        return len(self.indices)

    def __getitem__(self, idx):
        real_idx = self.indices[idx]
        seq = self.X_list[real_idx]  # (T, D)
        label = self.y_list[real_idx]

        seq = torch.from_numpy(seq).float()  # (T, D)

        if self.mean is not None and self.std is not None:
            seq = (seq - self.mean) / self.std

        if self.augment:
            if random.random() < 0.7:
                noise_std = 0.01
                seq = seq + torch.randn_like(seq) * noise_std

            if random.random() < 0.5:
                max_shift = 3
                shift = random.randint(-max_shift, max_shift)
                if shift != 0:
                    seq = torch.roll(seq, shifts=shift, dims=0)

            seq = augment_seq_tensor(seq)

        return seq, label


# =========================
# 4. TCN + Attention Pooling
# =========================

class TemporalConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3, dilation=1, dropout=0.2):
        super().__init__()
        padding = ((kernel_size - 1) * dilation) // 2

        self.conv1 = nn.Conv1d(
            in_channels, out_channels, kernel_size,
            padding=padding, dilation=dilation
        )
        self.bn1 = nn.BatchNorm1d(out_channels)

        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)

        self.conv2 = nn.Conv1d(
            out_channels, out_channels, kernel_size,
            padding=padding, dilation=dilation
        )
        self.bn2 = nn.BatchNorm1d(out_channels)

        self.downsample = None
        if in_channels != out_channels:
            self.downsample = nn.Conv1d(in_channels, out_channels, kernel_size=1)
        self.init_weights()

    def init_weights(self):
        nn.init.kaiming_normal_(self.conv1.weight, nonlinearity="relu")
        nn.init.kaiming_normal_(self.conv2.weight, nonlinearity="relu")
        if self.downsample is not None:
            nn.init.kaiming_normal_(self.downsample.weight, nonlinearity="relu")

    def forward(self, x):
        # x: (B, C_in, T)
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.dropout(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            x = self.downsample(x)

        out = out + x
        out = self.relu(out)
        return out


class AttnPool1d(nn.Module):
    def __init__(self, in_channels):
        super().__init__()
        self.attn = nn.Linear(in_channels, 1)

    def forward(self, x):
        # x: (B, C, T)
        x_perm = x.transpose(1, 2)          # (B, T, C)
        scores = self.attn(x_perm).squeeze(-1)  # (B, T)
        weights = torch.softmax(scores, dim=-1)  # (B, T)
        pooled = torch.bmm(weights.unsqueeze(1), x_perm)  # (B, 1, C)
        return pooled.squeeze(1)  # (B, C)


class TCNClassifier(nn.Module):
    def __init__(self, input_dim: int, num_classes: int, hidden_channels: int = 256):
        super().__init__()
        self.input_proj = nn.Linear(input_dim, hidden_channels)

        self.tcn = nn.Sequential(
            TemporalConvBlock(hidden_channels, hidden_channels, kernel_size=3, dilation=1),
            TemporalConvBlock(hidden_channels, hidden_channels, kernel_size=3, dilation=2),
            TemporalConvBlock(hidden_channels, hidden_channels, kernel_size=3, dilation=4),
        )

        self.global_pool = AttnPool1d(hidden_channels)
        self.fc = nn.Linear(hidden_channels, num_classes)

    def forward(self, x):
        # x: (B, T, D)
        x = self.input_proj(x)   # (B, T, C)
        x = x.transpose(1, 2)    # (B, C, T)
        x = self.tcn(x)          # (B, C, T)
        x = self.global_pool(x)  # (B, C)
        logits = self.fc(x)      # (B, num_classes)
        return logits


# =========================
# 5. 학습/평가 루프
# =========================

def run_epoch(model, loader, optimizer=None, criterion=None):
    if optimizer is None:
        model.eval()
    else:
        model.train()

    total_loss = 0.0
    total_correct = 0
    total_count = 0

    for seq, label in loader:
        seq = seq.to(DEVICE)
        label = label.to(DEVICE)

        if optimizer is not None:
            optimizer.zero_grad()

        logits = model(seq)
        loss = criterion(logits, label)

        if optimizer is not None:
            loss.backward()
            optimizer.step()

        total_loss += loss.item() * seq.size(0)
        _, pred = torch.max(logits, dim=1)
        total_correct += (pred == label).sum().item()
        total_count += seq.size(0)

    avg_loss = total_loss / total_count
    avg_acc = total_correct / total_count
    return avg_loss, avg_acc


# =========================
# 6. 랜덤 train/val/test split
# =========================

def random_split_indices(
    num_samples: int,
    train_ratio: float,
    val_ratio: float,
    test_ratio: float,
    seed: int = 42,
) -> Tuple[List[int], List[int], List[int]]:
    assert abs(train_ratio + val_ratio + test_ratio - 1.0) < 1e-6, "비율 합이 1이 되게 설정해줘."

    rng = random.Random(seed)
    indices = list(range(num_samples))
    rng.shuffle(indices)

    n_train = int(num_samples * train_ratio)
    n_val = int(num_samples * val_ratio)
    # 나머지는 test
    n_test = num_samples - n_train - n_val

    train_idx = indices[:n_train]
    val_idx = indices[n_train:n_train + n_val]
    test_idx = indices[n_train + n_val:]

    print(f"[Random Split] N={num_samples}, #train={len(train_idx)}, #val={len(val_idx)}, #test={len(test_idx)}")
    return train_idx, val_idx, test_idx


# =========================
# 7. 메인
# =========================

def main():
    X_list, y_list, num_classes = prepare_dataset(BASE_DIR, TARGET_LEN)
    input_dim = X_list[0].shape[1]
    print(f"Input dim for model: {input_dim}")

    num_samples = len(X_list)
    train_idx, val_idx, test_idx = random_split_indices(
        num_samples,
        TRAIN_RATIO,
        VAL_RATIO,
        TEST_RATIO,
        seed=RANDOM_SEED,
    )

    # train 기준 mean/std
    train_mean, train_std = compute_global_stats(X_list, train_idx)
    print("[Stats] mean/std computed from train set.")

    train_dataset = KeypointSequenceDataset(
        X_list, y_list, train_idx,
        mean=train_mean, std=train_std,
        augment=True
    )
    val_dataset = KeypointSequenceDataset(
        X_list, y_list, val_idx,
        mean=train_mean, std=train_std,
        augment=False
    )
    test_dataset = KeypointSequenceDataset(
        X_list, y_list, test_idx,
        mean=train_mean, std=train_std,
        augment=False
    )

    train_loader = DataLoader(
        train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0
    )
    val_loader = DataLoader(
        val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0
    )
    test_loader = DataLoader(
        test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0
    )

    model = TCNClassifier(input_dim=input_dim, num_classes=num_classes).to(DEVICE)

    optimizer = torch.optim.AdamW(
        model.parameters(),
        lr=LEARNING_RATE,
        weight_decay=WEIGHT_DECAY
    )
    criterion = nn.CrossEntropyLoss(label_smoothing=LABEL_SMOOTHING)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode='max', factor=0.5, patience=3
    )

    best_val_acc = 0.0
    best_state = None

    for epoch in range(1, EPOCHS + 1):
        train_loss, train_acc = run_epoch(model, train_loader, optimizer, criterion)
        val_loss, val_acc = run_epoch(model, val_loader, optimizer=None, criterion=criterion)

        print(
            f"[Epoch {epoch:02d}] "
            f"train_loss={train_loss:.4f}, train_acc={train_acc:.3f}, "
            f"val_loss={val_loss:.4f}, val_acc={val_acc:.3f}"
        )

        scheduler.step(val_acc)

        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_state = model.state_dict()

    if best_state is not None:
        model.load_state_dict(best_state)
    test_loss, test_acc = run_epoch(model, test_loader, optimizer=None, criterion=criterion)
    print(
        f"[Final] Best val_acc={best_val_acc:.3f}, "
        f"Test loss={test_loss:.4f}, Test acc={test_acc:.3f}"
    )


if __name__ == "__main__":
    main()


Using device: cpu
Found person dirs: ['/content/drive/MyDrive/2025CVproject/preprocessing/Keypoints_json/1', '/content/drive/MyDrive/2025CVproject/preprocessing/Keypoints_json/10', '/content/drive/MyDrive/2025CVproject/preprocessing/Keypoints_json/2', '/content/drive/MyDrive/2025CVproject/preprocessing/Keypoints_json/3', '/content/drive/MyDrive/2025CVproject/preprocessing/Keypoints_json/4', '/content/drive/MyDrive/2025CVproject/preprocessing/Keypoints_json/5', '/content/drive/MyDrive/2025CVproject/preprocessing/Keypoints_json/6', '/content/drive/MyDrive/2025CVproject/preprocessing/Keypoints_json/7', '/content/drive/MyDrive/2025CVproject/preprocessing/Keypoints_json/8', '/content/drive/MyDrive/2025CVproject/preprocessing/Keypoints_json/9']
[Person 1] #base sequences: 107
[Person 10] #base sequences: 110
[Person 2] #base sequences: 110
[Person 3] #base sequences: 110
[Person 4] #base sequences: 110
[Person 5] #base sequences: 110
[Person 6] #base sequences: 110
[Person 7] #base sequences