<a href="https://colab.research.google.com/github/kty3452/ML-Class/blob/main/HumanActivityResNet_BiLSTM_Attention.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [9]:
import os, random, numpy as np, pandas as pd
from typing import Tuple, Dict, Literal
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from sklearn.model_selection import StratifiedGroupKFold

# -------------------------------
# Reproducibility
# -------------------------------
SEED = 42
random.seed(SEED); np.random.seed(SEED); tf.random.set_seed(SEED)

# -------------------------------
# Data path
# -------------------------------
DATA_ROOT = "/content/drive/MyDrive/ML_Class"  # ← 필요 시 수정

# 로드 순서 (9채널)
SIGNAL_ORDER = [
    "total_acc_x","total_acc_y","total_acc_z",
    "body_acc_x","body_acc_y","body_acc_z",
    "body_gyro_x","body_gyro_y","body_gyro_z",
]

# -------------------------------
# Loading
# -------------------------------
def _load_group_xy(root: str, group: str):
    base = os.path.join(root, group, "Inertial Signals")
    X = np.dstack([
        pd.read_csv(os.path.join(base, f"{n}_{group}.txt"), sep=r"\s+", header=None).values
        for n in SIGNAL_ORDER
    ]).astype("float32")                                  # (N, 128, 9)
    y = pd.read_csv(os.path.join(root, group, f"y_{group}.txt"), sep=r"\s+", header=None)\
          .values.squeeze().astype("int32") - 1           # 0..5
    subs = pd.read_csv(os.path.join(root, group, f"subject_{group}.txt"), sep=r"\s+", header=None)\
             .values.squeeze().astype("int32")            # subject id
    return X, y, subs

def load_uci_har(root: str):
    Xtr, ytr, sub_tr = _load_group_xy(root, "train")
    Xte, yte, sub_te = _load_group_xy(root, "test")
    return Xtr, ytr, sub_tr, Xte, yte, sub_te


In [10]:

# -------------------------------
# Utilities
# -------------------------------
def split_to_branches(X: np.ndarray) -> Dict[str, np.ndarray]:
    # X: (N, 128, 9) → 3분기
    total = X[:, :, 0:3]
    body  = X[:, :, 3:6]
    gyro  = X[:, :, 6:9]
    return {"in_body": body, "in_gyro": gyro, "in_total": total}

def instance_norm(X: np.ndarray, eps: float = 1e-6) -> np.ndarray:
    # per-sample per-feature (time-dim 기준)
    mu = X.mean(axis=1, keepdims=True)
    sd = X.std(axis=1, keepdims=True) + eps
    return (X - mu) / sd

def subject_norm(X: np.ndarray, subs: np.ndarray, eps: float = 1e-6) -> np.ndarray:
    """subject-wise μ/σ 정규화 (선택 사항)"""
    Xn = np.empty_like(X)
    subs = np.asarray(subs).reshape(-1)
    for sid in np.unique(subs):
        m = (subs == sid)
        mu = X[m].mean(axis=(0,1), keepdims=True)
        sd = X[m].std(axis=(0,1),  keepdims=True) + eps
        Xn[m] = (X[m] - mu) / sd
    return Xn

def subject_wise_masks(subs: np.ndarray, val_ratio: float = 0.1, seed: int = 42):
    """subject-wise split을 위한 boolean mask 반환 (train_mask, val_mask)"""
    subs = np.asarray(subs).reshape(-1)
    uniq = np.unique(subs)
    rng = np.random.default_rng(seed)
    n_val = max(1, int(len(uniq) * val_ratio))
    val_subs = rng.choice(uniq, size=n_val, replace=False)
    train_mask = ~np.isin(subs, val_subs)
    val_mask   =  np.isin(subs, val_subs)
    return train_mask, val_mask, np.sort(val_subs)

In [8]:


# -------------------------------
# Model parts
# -------------------------------
def res_bilstm_block(x, units: int, dropout: float = 0.3, name: str | None = None):
    proj = layers.Conv1D(2*units, 1, padding="same", name=f"{name}_proj")(x)
    proj = layers.BatchNormalization(name=f"{name}_proj_bn")(proj)

    y = layers.Bidirectional(layers.LSTM(units, return_sequences=True), name=f"{name}_bilstm")(x)
    y = layers.BatchNormalization(name=f"{name}_bn")(y)
    y = layers.Dropout(dropout, name=f"{name}_drop")(y)

    out = layers.Add(name=f"{name}_add")([proj, y])
    return layers.Activation("relu", name=f"{name}_relu")(out)

class TemporalAdditiveAttention(layers.Layer):
    """x: (B, T, C) → (context(B, C), attn(B, T, 1))"""
    def __init__(self, units=128, **kw):
        super().__init__(**kw)
        self.d1 = layers.Dense(units, activation="tanh")
        self.d2 = layers.Dense(1)
        self.sm = layers.Softmax(axis=1)
    def call(self, x):
        e = self.d2(self.d1(x))  # (B,T,1)
        a = self.sm(e)           # (B,T,1)
        ctx = tf.reduce_sum(a * x, axis=1)  # (B,C)
        return ctx, a

def se_block(x, ratio=8, name=None):
    c = x.shape[-1]
    s = layers.GlobalAveragePooling1D(name=f"{name}_gap")(x)
    s = layers.Dense(max(c // ratio, 8), activation="relu")(s)
    s = layers.Dense(c, activation="sigmoid")(s)
    s = layers.Reshape((1, c))(s)
    return layers.Multiply()([x, s])

In [5]:


# -------------------------------
# Builders
# -------------------------------
def build_resbilstm_baseline(shape=(128,3), num_classes=6):
    def branch(inp, n):
        z = res_bilstm_block(inp, 128, 0.3, f"{n}_b1")
        z = res_bilstm_block(z, 128, 0.3, f"{n}_b2")
        return layers.GlobalAveragePooling1D(name=f"{n}_gap")(z)
    in_body  = layers.Input(shape=shape, name="in_body")
    in_gyro  = layers.Input(shape=shape, name="in_gyro")
    in_total = layers.Input(shape=shape, name="in_total")
    merged = layers.Concatenate(name="concat_vec")([branch(in_body,"body"),
                                                    branch(in_gyro,"gyro"),
                                                    branch(in_total,"total")])
    x = layers.Dense(256, activation="relu")(merged)
    x = layers.Dropout(0.5)(x)
    out = layers.Dense(num_classes, activation="softmax", name="pred")(x)
    return keras.Model([in_body, in_gyro, in_total], out, name="ResBLSTM_baseline")

def build_resbilstm_with_attention(shape=(128,3), num_classes=6):
    def branch_seq(inp, n):
        z = res_bilstm_block(inp, 128, 0.3, f"{n}_b1")
        z = res_bilstm_block(z, 128, 0.3, f"{n}_b2")
        return z
    in_body  = layers.Input(shape=shape, name="in_body")
    in_gyro  = layers.Input(shape=shape, name="in_gyro")
    in_total = layers.Input(shape=shape, name="in_total")
    merged_seq = layers.Concatenate(axis=-1, name="concat_seq")([
        branch_seq(in_body, "body"),
        branch_seq(in_gyro, "gyro"),
        branch_seq(in_total, "total")
    ])
    merged_seq = layers.LayerNormalization(name="ln_seq")(merged_seq)
    ctx, attn = TemporalAdditiveAttention(128, name="attn")(merged_seq)
    x = layers.Dropout(0.5)(ctx)
    x = layers.Dense(256, activation="relu")(x)
    x = layers.Dropout(0.5)(x)
    out = layers.Dense(num_classes, activation="softmax", name="pred")(x)
    return keras.Model([in_body, in_gyro, in_total], out, name="ResBLSTM_attention")

def build_resbilstm_with_attn_and_se(shape=(128,3), num_classes=6):
    def branch_seq(inp, n):
        z = res_bilstm_block(inp, 128, 0.35, f"{n}_b1")
        z = res_bilstm_block(z, 128, 0.35, f"{n}_b2")
        return z
    in_body  = layers.Input(shape=shape, name="in_body")
    in_gyro  = layers.Input(shape=shape, name="in_gyro")
    in_total = layers.Input(shape=shape, name="in_total")
    merged_seq = layers.Concatenate(axis=-1, name="concat_seq")([
        branch_seq(in_body, "body"),
        branch_seq(in_gyro, "gyro"),
        branch_seq(in_total, "total")
    ])
    merged_seq = layers.LayerNormalization(name="ln_seq")(merged_seq)
    merged_seq = se_block(merged_seq, ratio=8, name="se")
    ctx, attn = TemporalAdditiveAttention(128, name="attn")(merged_seq)
    x = layers.Dropout(0.55)(ctx)
    x = layers.Dense(256, activation="relu")(x)
    x = layers.LayerNormalization(name="ln_head")(x)
    x = layers.Dropout(0.55)(x)
    out = layers.Dense(num_classes, activation="softmax", name="pred")(x)
    return keras.Model([in_body, in_gyro, in_total], out, name="ResBLSTM_attn_se")




In [11]:

from sklearn.model_selection import KFold
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import numpy as np

def cross_subject_5fold_eval(
    epochs: int = 60,
    batch: int = 128,
    lr: float = 1e-3,
    data_root: str = DATA_ROOT,
    do_instance_norm: bool = False,
    use_subject_norm: bool = False,
    folds: int = 5,
    seed: int = 42
):
    # ---------- Load all ----------
    Xtr, ytr, sub_tr, Xte, yte, sub_te = load_uci_har(data_root)
    # 전체 합치기 (train/test 구분 무시)
    X_all  = np.concatenate([Xtr, Xte], axis=0)
    y_all  = np.concatenate([ytr, yte], axis=0)
    subs_all = np.concatenate([sub_tr, sub_te], axis=0)

    uniq_subs = np.unique(subs_all)
    print(f"[INFO] Total subjects: {len(uniq_subs)}")

    all_accs, fold_reports = [], []

    # ----- (Subject-Group + Label-Stratified) 5Fold -----
    sgkf_outer = StratifiedGroupKFold(n_splits=folds, shuffle=True, random_state=seed)

    for fold_id, (trainval_idx, test_idx) in enumerate(
            sgkf_outer.split(X_all, y_all, groups=subs_all), start=1):

        # ----- 내부 분할: train/val도 그룹/라벨 균형 -----
        sub_tv = subs_all[trainval_idx]
        y_tv   = y_all[trainval_idx]
        uniq_sub_tv = np.unique(sub_tv)

        # 대략 val subject ~4명 수준(원하면 숫자만 바꿔도 됨)
        n_val_groups = 4
        n_splits_inner = max(2, len(uniq_sub_tv) // n_val_groups)

        sgkf_inner = StratifiedGroupKFold(
            n_splits=n_splits_inner, shuffle=True, random_state=seed + fold_id
        )
        inner_train_rel, inner_val_rel = next(
            sgkf_inner.split(X_all[trainval_idx], y_tv, groups=sub_tv)
        )

        train_idx = trainval_idx[inner_train_rel]
        val_idx   = trainval_idx[inner_val_rel]

        # ---- subject 목록 로그 (형식은 기존과 동일)
        print(f"\n========== Fold {fold_id}/{folds} ==========")
        print("Train subs:", np.sort(np.unique(subs_all[train_idx])).tolist())
        print("Val subs:  ", np.sort(np.unique(subs_all[val_idx])).tolist())
        print("Test subs: ", np.sort(np.unique(subs_all[test_idx])).tolist())

        # 정규화
        if use_subject_norm:
            Xn = subject_norm(X_all, subs_all)
        else:
            Xn = X_all.copy()
            if do_instance_norm:
                Xn = instance_norm(Xn)

        # ---------- 브랜치 분할 & 인덱싱 ----------
        br_all   = split_to_branches(Xn)
        br_train = {k: v[train_idx] for k, v in br_all.items()}
        br_val   = {k: v[val_idx]   for k, v in br_all.items()}
        br_test  = {k: v[test_idx]  for k, v in br_all.items()}
        y_train, y_val, y_test = y_all[train_idx], y_all[val_idx], y_all[test_idx]

        input_shape = br_train["in_body"].shape[1:]
        num_classes = int(y_all.max() + 1)

        # ---------- Model ----------
        model = build_resbilstm_with_attention(input_shape, num_classes)
        model.compile(
            optimizer=keras.optimizers.Adam(learning_rate=lr),
            loss=keras.losses.SparseCategoricalCrossentropy(),
            metrics=["accuracy"]
        )
        cb = [
            keras.callbacks.ReduceLROnPlateau(
                monitor="val_loss", factor=0.5, patience=6, min_lr=1e-6, verbose=0
            ),
            keras.callbacks.EarlyStopping(
                monitor="val_loss", patience=20, restore_best_weights=True, verbose=0
            ),
        ]

        # ---------- Train ----------
        hist = model.fit(
            br_train, y_train,
            validation_data=(br_val, y_val),
            epochs=epochs, batch_size=batch, verbose=1, callbacks=cb
        )

        # ---------- Test ----------
        y_pred = np.argmax(model.predict(br_test, batch_size=batch, verbose=0), axis=1)
        acc = accuracy_score(y_test, y_pred)
        all_accs.append(acc)
        rep = classification_report(y_test, y_pred, digits=4, output_dict=True)
        fold_reports.append(rep)

        print(f"[Fold {fold_id}] Test Accuracy: {acc*100:.2f}%")

    # ---------- Summary ----------
    print("\n========== 5-Fold Summary ==========")
    print(f"Mean Accuracy: {np.mean(all_accs)*100:.2f}% ± {np.std(all_accs)*100:.2f}%")

    return all_accs, fold_reports


In [12]:
accs, reports = cross_subject_5fold_eval(
    epochs=60, batch=128, lr=1e-3,
    data_root=DATA_ROOT,
    do_instance_norm=False,
    use_subject_norm=True
)

[INFO] Total subjects: 30

Train subs: [2, 3, 4, 5, 8, 10, 11, 12, 13, 16, 18, 19, 20, 21, 22, 23, 24, 27, 28, 30]
Val subs:   [1, 7, 9, 15]
Test subs:  [6, 14, 17, 25, 26, 29]
Epoch 1/60
[1m54/54[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m29s[0m 234ms/step - accuracy: 0.8125 - loss: 0.5362 - val_accuracy: 0.4666 - val_loss: 3.3573 - learning_rate: 0.0010
Epoch 2/60
[1m54/54[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 204ms/step - accuracy: 0.9560 - loss: 0.1233 - val_accuracy: 0.4658 - val_loss: 4.1720 - learning_rate: 0.0010
Epoch 3/60
[1m54/54[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 205ms/step - accuracy: 0.9688 - loss: 0.0848 - val_accuracy: 0.5075 - val_loss: 3.0085 - learning_rate: 0.0010
Epoch 4/60
[1m54/54[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 206ms/step - accuracy: 0.9700 - loss: 0.0783 - val_accuracy: 0.5578 - val_loss: 2.2233 - learning_rate: 0.0010
Epoch 5/60
[1m54/54[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m