In [None]:
import numpy as np
import pandas as pd
import sys, os
from collections import defaultdict
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

import warnings
warnings.filterwarnings('ignore')

sys.path.append('../')
import Processing, Model


def call_data(base_path, sub_lst, sub_idx):
    data = pd.read_csv(base_path+sub_lst[sub_idx])
    return data

# Normal 용도

In [None]:
def data_setup(lst_normal, path_normal, window_size, step_size):
    subjects_data = defaultdict(list)
    subjects_label = defaultdict(list)

    for s_idx, filename in enumerate(lst_normal):
        name = filename.lower()

        subject_id = name.split("n")[0]
        data = call_data(path_normal, lst_normal, s_idx).to_numpy()  # (num_samples, num_channels)

        windows = Processing.sliding_window(data, window_size, step_size)  # (num_windows, win_len, ch)

        if "standing" in name: label = 0
        elif "gait" in name: label = 1
        elif "sitting" in name: label = 2
        else: label = -1
        #print(name, label)
        labels = np.full(len(windows), label)

        subjects_data[subject_id].append(windows)
        subjects_label[subject_id].append(labels)

    return subjects_data, subjects_label


def get_X_y(X, y):
    all_X, all_y = [], []

    #for subject_id in subjects_data.keys():
    data_list = X
    label_list = y

    for data, labels in zip(data_list, label_list):
        for w, label in zip(data, labels):  # w: (win_len, ch)
            feat = Processing.extract_features_WL(w)  # (num_channels*5,)
            all_X.append(feat)
            all_y.append(label)

    all_X, all_y = np.array(all_X), np.array(all_y)   # (N, num_channels*5)

    return all_X, all_y

# Abnormal 용도

In [None]:
def data_setup(lst_normal, path_normal, window_size, step_size):
    subjects_data = defaultdict(list)
    subjects_label = defaultdict(list)

    for s_idx, filename in enumerate(lst_normal):
        name = filename.lower()

        subject_id = name.split("a")[0]
        data = call_data(path_normal, lst_normal, s_idx).to_numpy()  # (num_samples, num_channels)

        windows = Processing.sliding_window(data, window_size, step_size)  # (num_windows, win_len, ch)

        if "standing" in name: label = 0
        elif "gait" in name: label = 1
        elif "sitting" in name: label = 2
        else: label = -1
        #print(name, label)
        labels = np.full(len(windows), label)

        subjects_data[subject_id].append(windows)
        subjects_label[subject_id].append(labels)

    return subjects_data, subjects_label


def get_X_y(X, y):
    all_X, all_y = [], []

    #for subject_id in subjects_data.keys():
    data_list = X
    label_list = y

    for data, labels in zip(data_list, label_list):
        for w, label in zip(data, labels):  # w: (win_len, ch)
            #feat = processing.extract_features(w)  # (num_channels*5,)
            feat = Processing.extract_features_WL(w)
            if len(feat) > 5: ##########################################MAKE SURE to attach this when running ABnormal
                feat = feat[:5]
            all_X.append(feat)
            all_y.append(label)

    all_X, all_y = np.array(all_X), np.array(all_y)   # (N, num_channels*5)
    #print(pd.Series(all_y).value_counts())
    return all_X, all_y

# Inter-subject

In [None]:
import tensorflow as tf
from typing import Dict, Tuple, List

SEED = 42
np.random.seed(SEED)
tf.random.set_seed(SEED)

PATH_ABNORMAL = r"C:/Users/hml76/PycharmProjects/Cross-motor-decoding/Data/Gait1-UCI/Abnormal/"
PATH_NORMAL   = r"C:/Users/hml76/PycharmProjects/Cross-motor-decoding/Data/Gait1-UCI/normal/"

WINDOW_SIZE = 200
STEP_SIZE   = 10
NUM_CHANNELS = 5
NUM_FEATURES = 30
NUM_CLASSES = 3

SUBJECT_LIST = [str(i) for i in range(1, 12)]   # '1' ~ '11'

normal_files = os.listdir(PATH_NORMAL)
subjects_data, subjects_label = data_setup(
    normal_files, PATH_NORMAL, WINDOW_SIZE, STEP_SIZE
)

# -----------------------
# LOSO Training Loop - 아까말한 leave one subject out
# -----------------------
accuracy_list = []

for test_id in SUBJECT_LIST:
    print(f"\n===== Test Subject: {test_id} =====")

    # Test set
    X_test, y_test = get_X_y(subjects_data[test_id], subjects_label[test_id])
    X_test = tf.expand_dims(X_test, axis=-1)

    # Train set: concatenate all other subjects
    train_X, train_y = [], []
    for sub_id in SUBJECT_LIST:
        if sub_id == test_id:
            continue
        X_tmp, y_tmp = get_X_y(subjects_data[sub_id], subjects_label[sub_id])
        train_X.append(X_tmp)
        train_y.append(y_tmp)

    X_train = np.concatenate(train_X, axis=0)
    y_train = np.concatenate(train_y, axis=0)
    X_train = tf.expand_dims(X_train, axis=-1)

    print("Train:", X_train.shape, y_train.shape)
    print("Test: ", X_test.shape, y_test.shape)

    model = Model.build_model_1D(NUM_CLASSES, X_train.shape[1:])

    history = model.fit(
        X_train, y_train,
        validation_data=(X_test, y_test),
        batch_size=256,
        epochs=100,
        verbose=0
    )

    plt.figure()
    plt.plot(history.history['accuracy'], label="Train Acc")
    plt.plot(history.history['val_accuracy'], label="Val Acc")
    plt.title(f"Subject {test_id} Learning Curve")
    plt.legend()
    plt.show()

    test_loss, test_acc = model.evaluate(X_test, y_test, verbose=0)
    print(f"Test Accuracy = {test_acc * 100:.2f}%")
    accuracy_list.append(test_acc)

    del model
    tf.keras.backend.clear_session()


# Final Results
print("\n========== Summary ==========")
for sid, acc in zip(SUBJECT_LIST, accuracy_list):
    print(f"Subject {sid} → {acc*100:.2f}%")

print(f"Average ACC: {np.mean(accuracy_list)*100:.2f}%")