# TCN 간단 확인

In [None]:
import os
import re
import numpy as np
import pandas as pd
import random
from collections import defaultdict
import torch
from torch.utils.data import Dataset
from torchvision import transforms
from PIL import Image
import torch.nn as nn
import torch.nn.functional as F

## train, val, test 구성
-   데이터 분포
    -   normal: 10세트 X 10개 = 100개
    -   missing1
        -   A누락: 2세트 X 10개
        -   B누락: 2세트 X 10개
        -   C누락: 2세트 X 10개 -> 총 6세트, 60개
    -   missing2
        -   A만: 2세트 X 10개
        -   B만: 2세트 X 10개
        -   C만: 2세트 X 10개 -> 총 6세트, 60개
    -   idle: 6세트 X 10개 = 60개

-   세트 기준으로 스플릿, 하지만 missing이 유형별로 너무 적기 때문에 k-fold cross-validation진행
-   test는 따로 수집
-   test 유형
    -   3행동이 전부 있는 유형
        -   빠르개 하는 버전
        -   느리게 하는 버전
        -   중간중간 멈칫/머뭇거림 있는 버전
    -   2행동
        -   뚜껑이 열려있는 상태로 진행
        -   약이 들어있는 상태로 진행
        -   뚜껑을 닫은 마무리가 없는 상태로 종료
    -   1행동
        -   뚜껑을 열기만 함
        -   약을 넣기만 함
        -   뚜껑을 닫기만 함
    -   idle
        -   손만 소소하게 음직이는데 작업은 안하는 경우

-   test는 학습과 별계로 학습용 데이터에 참여하지 않은 사람, 약간 다른 카메라 세팅, 프레임 단위 라벨링을 진행하여 수집함

In [3]:
def extract_type_and_number(fname: str):
    """
    video_normal_023_lange.csv
    video_missing1_A_007_lange.csv
    → (type_str, number_int)
    """
    # video_(타입)_(번호)_lange.csv
    m = re.match(r"video_(.+)_(\d+)_lange\.csv", fname)
    if not m:
        return None, None
    type_str = m.group(1)          # normal, missing1_A, idle ...
    number   = int(m.group(2))     # 1, 2, ...
    return type_str, number


def load_all_data_with_sets(data_root: str):
    """
    data_root: 'data'
    구조:
      data/out_csv/{normal,missing1,missing2,idle}
      data/out_npz/{normal,missing1,missing2,idle}

    return:
      data_dict: sample_name -> {"landmarks", "labels"}
      meta_dict: sample_name -> {"type", "number", "set_idx", "set_id"}
    """
    csv_root = os.path.join(data_root, "out_csv")
    npz_root = os.path.join(data_root, "out_npz")

    subfolders = [
        d for d in os.listdir(csv_root)
        if os.path.isdir(os.path.join(csv_root, d))
    ]

    data_dict = {}
    meta_dict = {}

    for sub in sorted(subfolders):
        csv_dir = os.path.join(csv_root, sub)
        npz_dir = os.path.join(npz_root, sub)

        csv_files = sorted([f for f in os.listdir(csv_dir) if f.endswith(".csv")])

        for csv_file in csv_files:
            type_str, number = extract_type_and_number(csv_file)
            if type_str is None:
                print("[WARN] 이름 패턴 안맞음:", csv_file)
                continue

            csv_base = os.path.splitext(csv_file)[0]          # video_xxx_007_lange
            core_name = csv_base.replace("_lange", "")        # video_xxx_007
            npz_base  = "hands_" + core_name                  # hands_video_xxx_007

            csv_path = os.path.join(csv_dir, csv_file)
            npz_path = os.path.join(npz_dir, npz_base + ".npz")

            if not os.path.exists(npz_path):
                print("[WARN] npz 없음:", npz_path)
                continue

            # CSV 로드
            df = pd.read_csv(csv_path)
            if "Unnamed: 0" in df.columns:
                df = df.drop(columns=["Unnamed: 0"])
            labels = df.to_numpy(dtype=np.float32)

            # NPZ 로드
            npz = np.load(npz_path)
            if "hand_kps" not in npz.files:
                print(f"[WARN] 'hand_kps' 키 없음: {npz_path}, keys={npz.files}")
                continue
            landmarks = npz["hand_kps"].astype(np.float32)

            if len(landmarks) != len(labels):
                print("[WARN] 길이 불일치:", csv_file)
                continue

            # 세트 정보 계산
            set_idx = (number - 1) // 10 + 1          # 1~10 → set1, 11~20 → set2 ...
            set_id  = f"{type_str}_set{set_idx}"      # normal_set1, missing1_A_set2 ...

            sample_name = f"{sub}/{core_name}"        # 예: normal/video_normal_023

            data_dict[sample_name] = {
                "landmarks": landmarks,
                "labels": labels,
            }
            meta_dict[sample_name] = {
                "type": type_str,
                "number": number,
                "set_idx": set_idx,
                "set_id": set_id,
            }

    print(f"[INFO] 총 샘플 수: {len(data_dict)}")
    print(f"[INFO] 세트 개수: {len(set(m['set_id'] for m in meta_dict.values()))}")
    return data_dict, meta_dict


In [4]:
def build_group_kfold_splits(meta_dict, n_folds=4, seed=42):
    """
    meta_dict: sample_name -> { 'set_id': ..., ... }
    n_folds: K-fold 개수
    return:
      folds: list of dict
        [
          {
            "train_keys": [... sample_name ...],
            "val_keys":   [... sample_name ...],
          },
          ...
        ]
    """
    # 1) 모든 세트 ID 수집
    set_ids = sorted(set(m["set_id"] for m in meta_dict.values()))
    print("[INFO] unique set_ids:", len(set_ids))

    # 2) 셔플
    rnd = random.Random(seed)
    rnd.shuffle(set_ids)

    # 3) 세트 단위로 folds 분할
    folds_set_ids = [[] for _ in range(n_folds)]
    for i, sid in enumerate(set_ids):
        folds_set_ids[i % n_folds].append(sid)

    # 4) 각 fold마다 train/val 샘플 리스트 생성
    folds = []
    for fold_idx in range(n_folds):
        val_set_ids   = set(folds_set_ids[fold_idx])
        train_set_ids = set(sid for sid in set_ids if sid not in val_set_ids)

        train_keys = []
        val_keys   = []
        for sample_name, meta in meta_dict.items():
            if meta["set_id"] in train_set_ids:
                train_keys.append(sample_name)
            elif meta["set_id"] in val_set_ids:
                val_keys.append(sample_name)

        folds.append({
            "train_keys": train_keys,
            "val_keys":   val_keys,
            "train_set_ids": train_set_ids,
            "val_set_ids":   val_set_ids,
        })

        print(f"[FOLD {fold_idx}] train sets: {len(train_set_ids)}, "
              f"val sets: {len(val_set_ids)}, "
              f"train samples: {len(train_keys)}, "
              f"val samples: {len(val_keys)}")

    return folds


In [6]:
# 1) 전체 데이터 로드
data_root = "data"
all_data_dict, meta_dict = load_all_data_with_sets(data_root)

# 2) K-fold 세트 스플릿 생성
folds = build_group_kfold_splits(meta_dict, n_folds=4, seed=42)


[INFO] 총 샘플 수: 280
[INFO] 세트 개수: 28
[INFO] unique set_ids: 28
[FOLD 0] train sets: 21, val sets: 7, train samples: 210, val samples: 70
[FOLD 1] train sets: 21, val sets: 7, train samples: 210, val samples: 70
[FOLD 2] train sets: 21, val sets: 7, train samples: 210, val samples: 70
[FOLD 3] train sets: 21, val sets: 7, train samples: 210, val samples: 70


## TNC
-   colap 으로 진행

In [8]:
def build_window_indices(n_frames: int, window: int = 15, step: int = 5):
    """
    프레임 개수 n_frames에서 (start, end) 윈도우 인덱스 리스트 생성.
    """
    indices = []
    for start in range(0, n_frames - window + 1, step):
        end = start + window
        indices.append((start, end))
    return indices


In [11]:
class LandmarkWindowDataset(Dataset):
    def __init__(self,
                 landmarks_dict: dict,
                 labels_dict: dict,
                 window: int = 15,
                 step: int = 5):
        """
        landmarks_dict: sample_name -> (N, D)
        labels_dict   : sample_name -> (N, K)
        """
        super().__init__()
        self.landmarks_dict = landmarks_dict
        self.labels_dict    = labels_dict
        self.window = window
        self.step   = step

        self.samples = sorted(landmarks_dict.keys())

        # (sample_name, start, end) 리스트로 전체 윈도우를 전개
        self.items = []
        for sample in self.samples:
            x = landmarks_dict[sample]
            n_frames = x.shape[0]
            win_idxs = build_window_indices(n_frames, window, step)
            for (s, e) in win_idxs:
                self.items.append((sample, s, e))

        print(f"[Dataset] samples: {self.samples}")
        print(f"[Dataset] total windows: {len(self.items)}")

        # meta
        any_sample = self.samples[0]
        self.hand_dim    = landmarks_dict[any_sample].shape[1]
        self.num_actions = labels_dict[any_sample].shape[1]

    def __len__(self):
        return len(self.items)

    def __getitem__(self, idx):
        sample_name, start, end = self.items[idx]
        x_all = self.landmarks_dict[sample_name]   # (N,D)
        y_all = self.labels_dict[sample_name]      # (N,K)

        x_win = x_all[start:end]                  # (T,D)
        y_win = y_all[start:end]                  # (T,K)

        x_t = torch.from_numpy(x_win).float()     # (T,D)
        y_seq = torch.from_numpy(y_win).float()   # (T,K)
        y_last = y_seq[-1]                        # (K,)

        return {
            "x": x_t,          # (T,D)
            "y_seq": y_seq,    # (T,K)  (원하면 쓸 수 있도록 같이 반환)
            "y_last": y_last,  # (K,)   (분류 타깃)
            "sample_name": sample_name,
            "start": start,
            "end": end,
        }


In [None]:
class Chomp1d(nn.Module):
    """Causal conv를 위해 padding 뒤쪽을 잘라내는 모듈."""
    def __init__(self, chomp_size):
        super().__init__()
        self.chomp_size = chomp_size

    def forward(self, x):
        # x: (B, C, T_pad)
        return x[:, :, :-self.chomp_size].contiguous()


class TemporalBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, dilation, dropout):
        super().__init__()
        padding = (kernel_size - 1) * dilation

        self.conv1 = nn.Conv1d(in_channels, out_channels,
                               kernel_size, padding=padding, dilation=dilation)
        self.chomp1 = Chomp1d(padding)
        self.bn1 = nn.BatchNorm1d(out_channels)

        self.conv2 = nn.Conv1d(out_channels, out_channels,
                               kernel_size, padding=padding, dilation=dilation)
        self.chomp2 = Chomp1d(padding)
        self.bn2 = nn.BatchNorm1d(out_channels)

        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)

        # residual connection (채널수가 바뀌면 1x1 conv로 맞춰줌)
        self.downsample = (
            nn.Conv1d(in_channels, out_channels, kernel_size=1)
            if in_channels != out_channels else None
        )

    def forward(self, x):
        out = self.conv1(x)
        out = self.chomp1(out)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.dropout(out)

        out = self.conv2(out)
        out = self.chomp2(out)
        out = self.bn2(out)
        out = self.relu(out)
        out = self.dropout(out)

        res = x if self.downsample is None else self.downsample(x)
        return self.relu(out + res)


class TCNClassifier(nn.Module):
    def __init__(self, input_dim, num_classes,
                 channels=(32, 32), kernel_size=3, dropout=0.5):
        super().__init__()
        layers = []
        in_ch = input_dim
        for i, out_ch in enumerate(channels):
            dilation = 2 ** i
            layers.append(
                TemporalBlock(in_ch, out_ch,
                              kernel_size=kernel_size,
                              dilation=dilation,
                              dropout=dropout)
            )
            in_ch = out_ch

        self.tcn = nn.Sequential(*layers)
        self.fc = nn.Linear(in_ch, num_classes)

    def forward(self, x):
        """
        x: (B, T, D)  # LandmarkWindowDataset에서 나오는 형태
        return: (B, num_classes)
        """
        # Conv1d: (B, C, T) 이므로 D ↔ C
        x = x.transpose(1, 2)  # (B, D, T)
        y = self.tcn(x)        # (B, C_out, T)
        y_last = y[:, :, -1]   # 마지막 타임스텝만 사용 (B, C_out)
        logits = self.fc(y_last)  # (B, num_classes)
        return logits


In [14]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def train_one_epoch(model, loader, optimizer, criterion):
    model.train()
    total_loss = 0.0
    total_correct = 0
    total_examples = 0

    for batch in loader:
        x = batch["x"].to(device)          # (B, T, D)
        y = batch["y_last"].to(device)     # (B, K) 0/1

        optimizer.zero_grad()
        logits = model(x)                  # (B, K)
        loss = criterion(logits, y)

        loss.backward()
        optimizer.step()

        total_loss += loss.item() * x.size(0)
        total_examples += x.size(0)

        # 간단한 multi-label accuracy (threshold=0.5, 완전일치 비율)
        preds = (torch.sigmoid(logits) > 0.5).float()
        correct = (preds == y).all(dim=1).sum().item()
        total_correct += correct

    avg_loss = total_loss / total_examples
    acc = total_correct / total_examples
    return avg_loss, acc


def eval_one_epoch(model, loader, criterion):
    model.eval()
    total_loss = 0.0
    total_correct = 0
    total_examples = 0

    with torch.no_grad():
        for batch in loader:
            x = batch["x"].to(device)
            y = batch["y_last"].to(device)

            logits = model(x)
            loss = criterion(logits, y)

            total_loss += loss.item() * x.size(0)
            total_examples += x.size(0)

            preds = (torch.sigmoid(logits) > 0.5).float()
            correct = (preds == y).all(dim=1).sum().item()
            total_correct += correct

    avg_loss = total_loss / total_examples
    acc = total_correct / total_examples
    return avg_loss, acc


In [12]:
WINDOW = 15
STEP   = 5

from torch.utils.data import DataLoader

def build_fold_dataloaders(fold_info, batch_size=64):
    # landmarks_dict / labels_dict 구성
    train_landmarks = {k: all_data_dict[k]["landmarks"] for k in fold_info["train_keys"]}
    train_labels    = {k: all_data_dict[k]["labels"]    for k in fold_info["train_keys"]}
    val_landmarks   = {k: all_data_dict[k]["landmarks"] for k in fold_info["val_keys"]}
    val_labels      = {k: all_data_dict[k]["labels"]    for k in fold_info["val_keys"]}

    train_dataset = LandmarkWindowDataset(
        landmarks_dict=train_landmarks,
        labels_dict=train_labels,
        window=WINDOW,
        step=STEP,
    )
    val_dataset = LandmarkWindowDataset(
        landmarks_dict=val_landmarks,
        labels_dict=val_labels,
        window=WINDOW,
        step=STEP,
    )

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader   = DataLoader(val_dataset,   batch_size=batch_size, shuffle=False)

    return train_dataset, val_dataset, train_loader, val_loader

# 예: FOLD 0 학습
fold_idx = 0
fold_info = folds[fold_idx]
train_dataset, val_dataset, train_loader, val_loader = build_fold_dataloaders(fold_info)

print("FOLD", fold_idx)
print("train windows:", len(train_dataset))
print("val windows  :", len(val_dataset))

batch = next(iter(train_loader))
print("x:", batch["x"].shape)
print("y_last:", batch["y_last"].shape)

[Dataset] samples: ['idle/video_idle_001', 'idle/video_idle_002', 'idle/video_idle_003', 'idle/video_idle_004', 'idle/video_idle_005', 'idle/video_idle_006', 'idle/video_idle_007', 'idle/video_idle_008', 'idle/video_idle_009', 'idle/video_idle_010', 'idle/video_idle_011', 'idle/video_idle_012', 'idle/video_idle_013', 'idle/video_idle_014', 'idle/video_idle_015', 'idle/video_idle_016', 'idle/video_idle_017', 'idle/video_idle_018', 'idle/video_idle_019', 'idle/video_idle_020', 'idle/video_idle_021', 'idle/video_idle_022', 'idle/video_idle_023', 'idle/video_idle_024', 'idle/video_idle_025', 'idle/video_idle_026', 'idle/video_idle_027', 'idle/video_idle_028', 'idle/video_idle_029', 'idle/video_idle_030', 'idle/video_idle_031', 'idle/video_idle_032', 'idle/video_idle_033', 'idle/video_idle_034', 'idle/video_idle_035', 'idle/video_idle_036', 'idle/video_idle_037', 'idle/video_idle_038', 'idle/video_idle_039', 'idle/video_idle_040', 'idle/video_idle_051', 'idle/video_idle_052', 'idle/video_id

In [15]:
all_fold_results = []

for fold_idx, fold_info in enumerate(folds):
    print(f"\n========== FOLD {fold_idx} ==========")
    train_dataset, val_dataset, train_loader, val_loader = build_fold_dataloaders(fold_info, batch_size=64)

    sample_batch = next(iter(train_loader))
    input_dim = sample_batch["x"].shape[-1]
    num_classes = sample_batch["y_last"].shape[-1]

    model = TCNClassifier(input_dim, num_classes,
                          channels=(32, 32),
                          kernel_size=3,
                          dropout=0.5).to(device)

    criterion = nn.BCEWithLogitsLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

    best_val_loss = float("inf")
    best_val_acc  = 0.0

    for epoch in range(1, 101):
        train_loss, train_acc = train_one_epoch(model, train_loader, optimizer, criterion)
        val_loss,   val_acc   = eval_one_epoch(model, val_loader, criterion)

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_val_acc  = val_acc

        if epoch % 10 == 0 or epoch == 1:
            print(f"[fold {fold_idx}] epoch {epoch:03d} | "
                  f"train_loss={train_loss:.4f}, val_loss={val_loss:.4f}, "
                  f"train_acc={train_acc:.3f}, val_acc={val_acc:.3f}")

    print(f"[fold {fold_idx}] BEST val_loss={best_val_loss:.4f}, val_acc={best_val_acc:.3f}")
    all_fold_results.append((best_val_loss, best_val_acc))

print("\n=== CV 결과 요약 ===")
for i, (vl, va) in enumerate(all_fold_results):
    print(f"fold {i}: val_loss={vl:.4f}, val_acc={va:.3f}")
print("평균 val_acc:", sum(va for _, va in all_fold_results) / len(all_fold_results))



[Dataset] samples: ['idle/video_idle_001', 'idle/video_idle_002', 'idle/video_idle_003', 'idle/video_idle_004', 'idle/video_idle_005', 'idle/video_idle_006', 'idle/video_idle_007', 'idle/video_idle_008', 'idle/video_idle_009', 'idle/video_idle_010', 'idle/video_idle_011', 'idle/video_idle_012', 'idle/video_idle_013', 'idle/video_idle_014', 'idle/video_idle_015', 'idle/video_idle_016', 'idle/video_idle_017', 'idle/video_idle_018', 'idle/video_idle_019', 'idle/video_idle_020', 'idle/video_idle_021', 'idle/video_idle_022', 'idle/video_idle_023', 'idle/video_idle_024', 'idle/video_idle_025', 'idle/video_idle_026', 'idle/video_idle_027', 'idle/video_idle_028', 'idle/video_idle_029', 'idle/video_idle_030', 'idle/video_idle_031', 'idle/video_idle_032', 'idle/video_idle_033', 'idle/video_idle_034', 'idle/video_idle_035', 'idle/video_idle_036', 'idle/video_idle_037', 'idle/video_idle_038', 'idle/video_idle_039', 'idle/video_idle_040', 'idle/video_idle_051', 'idle/video_idle_052', 'idle/video_i

KeyboardInterrupt: 