In [None]:
# Imports 
import os, glob, warnings
import numpy as np
import pandas as pd

from sklearn.model_selection import StratifiedKFold, StratifiedShuffleSplit
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (
    accuracy_score, precision_recall_fscore_support,
    confusion_matrix, classification_report
)

import tensorflow as tf
from tensorflow.keras import layers, models, callbacks, regularizers

from scipy.signal import resample
import matplotlib.pyplot as plt

warnings.filterwarnings("ignore")
print("TensorFlow:", tf.__version__)

In [None]:
# Configuration and Parameters

# REQUIRED: set this to the root directory of dataset.
DATASET_ROOT = "PATH/TO/DATASET_ROOT"

TEST_FOLDERS = ["test1", "test2", "test3"]
FILE_GLOBS   = ["*.csv", "*.xlsx", "*.xls"]

# Sequence handling
TARGET_LEN   = 1024   # resampled/padded length per sample
MIN_LEN_OK   = 64     # skip samples with fewer timesteps than this
USE_RESAMPLE = True   # True: resample time axis to TARGET_LEN; False: pad/truncate
FILL_NA      = True
FILL_VALUE   = 0.0
SCALE_PER_FEATURE = True  # standardize features (fit on TRAIN folds only)

# Training hyperparameters
BATCH_SIZE   = 32
EPOCHS       = 50      # Increase or decrease based on your results analysis
VAL_FRAC     = 0.2
PATIENCE     = 12
SEED         = 42

# Labels
LABEL_MAP   = {"test1":0, "test2":1, "test3":2}
LABEL_NAMES = ["Open", "Closed", "Dynamic"]
N_CLASSES   = len(LABEL_MAP)

# Output directory
SAVE_DIR = "./cnn_balance_results"
os.makedirs(SAVE_DIR, exist_ok=True)

# Reproducibility
np.random.seed(SEED)
tf.random.set_seed(SEED)

In [None]:
# Discover Files in Folders
def list_files(dataset_root, test_folders, file_globs):
    if not os.path.isdir(dataset_root):
        raise FileNotFoundError(
            f"DATASET_ROOT does not exist: {dataset_root}\n"
            "Please set DATASET_ROOT to the path containing test1/test2/test3."
        )
    samples = []
    for folder in test_folders:
        dir_path = os.path.join(dataset_root, folder)
        if not os.path.isdir(dir_path):
            raise FileNotFoundError(f"Missing folder: {dir_path}")
        files = []
        for pat in file_globs:
            files += glob.glob(os.path.join(dir_path, pat))
        for fp in sorted(files):
            samples.append((fp, LABEL_MAP[folder]))
    print(f"Found {len(samples)} files total.")
    return samples

all_files = list_files(DATASET_ROOT, TEST_FOLDERS, FILE_GLOBS)
assert len(all_files) > 0, "No files found. Check DATASET_ROOT and folder structure."

In [None]:
# Read one Excel/CSV to 2D array [time, F]
def read_excel_like(path, fill_na=True, fill_value=0.0):
    if path.lower().endswith(".csv"):
        df = pd.read_csv(path)
    else:
        df = pd.read_excel(path)

    # Drop fully empty columns
    df = df.dropna(axis=1, how="all")

    # Remove 'time' column if present
    lower_cols = [c.lower() for c in df.columns]
    if "time" in lower_cols:
        tcol = df.columns[lower_cols.index("time")]
        df = df.drop(columns=[tcol])

    # Convert to numeric
    for c in df.columns:
        df[c] = pd.to_numeric(df[c], errors="coerce")

    if fill_na:
        df = df.fillna(fill_value)

    arr = df.values.astype(np.float32)
    if arr.ndim == 1:
        arr = arr[:, None]  # [T] -> [T,1]
    return arr  # [T, F]

In [None]:
# Fix sample length (resample/pad/trunc)
def fix_length(x, target_len=1024, min_len_ok=64, use_resample=True):
    T, F = x.shape
    if T < min_len_ok:
        return None
    if use_resample:
        x_fixed = resample(x, target_len, axis=0)
    else:
        if T >= target_len:
            x_fixed = x[:target_len]
        else:
            pad = np.zeros((target_len - T, F), dtype=x.dtype)
            x_fixed = np.vstack([x, pad])
    return x_fixed.astype(np.float32)

In [None]:
# Build dataset arrays X (N,T,F), y (N,)
X_list, y_list, name_list = [], [], []

for fp, y in all_files:
    arr = read_excel_like(fp, fill_na=FILL_NA, fill_value=FILL_VALUE)  # [T, F]
    fixed = fix_length(arr, TARGET_LEN, MIN_LEN_OK, USE_RESAMPLE)
    if fixed is None:
        print("Skipping short file:", os.path.basename(fp))
        continue
    X_list.append(fixed)
    y_list.append(y)
    name_list.append(fp)

X = np.array(X_list)                  # [N, T, F]
y = np.array(y_list)
print("X:", X.shape, "y:", y.shape, "samples:", len(name_list))

In [None]:
# Model (compact ResNet-1D + regulariz.)
def res_block(x, filters, kernel=7, wd=1e-4):
    shortcut = x
    x = layers.Conv1D(filters, kernel, padding="same",
                      kernel_regularizer=regularizers.l2(wd))(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation("relu")(x)
    x = layers.Conv1D(filters, 3, padding="same",
                      kernel_regularizer=regularizers.l2(wd))(x)
    x = layers.BatchNormalization()(x)
    if shortcut.shape[-1] != filters:
        shortcut = layers.Conv1D(filters, 1, padding="same",
                                 kernel_regularizer=regularizers.l2(wd))(shortcut)
        shortcut = layers.BatchNormalization()(shortcut)
    x = layers.Add()([x, shortcut])
    x = layers.Activation("relu")(x)
    x = layers.MaxPool1D(2)(x)
    return x

def build_resnet1d(input_shape, n_classes, wd=1e-4,
                   label_smooth=0.05, noise_std=0.01, dropout=0.35):
    inp = layers.Input(shape=input_shape)
    x = layers.GaussianNoise(noise_std)(inp)

    x = layers.Conv1D(64, 7, padding="same",
                      kernel_regularizer=regularizers.l2(wd))(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation("relu")(x)
    x = layers.MaxPool1D(2)(x)

    x = res_block(x, 128, kernel=5, wd=wd)
    x = res_block(x, 256, kernel=3, wd=wd)

    x = layers.GlobalAveragePooling1D()(x)
    x = layers.Dropout(dropout)(x)

    out = layers.Dense(n_classes, activation="softmax",
                       kernel_regularizer=regularizers.l2(wd))(x)

    model = models.Model(inp, out)
    opt  = tf.keras.optimizers.Adam(3e-4)
    loss = tf.keras.losses.CategoricalCrossentropy(label_smoothing=label_smooth)
    model.compile(optimizer=opt, loss=loss, metrics=["accuracy"])
    return model

input_shape = (X.shape[1], X.shape[2])
print("Model input shape:", input_shape, "classes:", N_CLASSES)

In [None]:
# Utilities: splits, scaling, metrics, CM
def stratified_val_split(X_tr, y_tr, val_frac=0.2, seed=42):
    sss = StratifiedShuffleSplit(n_splits=1, test_size=val_frac, random_state=seed)
    idx_tr2, idx_val = next(sss.split(X_tr, y_tr))
    return X_tr[idx_tr2], y_tr[idx_tr2], X_tr[idx_val], y_tr[idx_val]

def fit_scaler_on_train(X_tr):
    sc = StandardScaler()
    sc.fit(X_tr.reshape(-1, X_tr.shape[-1]))
    return sc

def apply_scaler(sc, Z):
    z = Z.reshape(-1, Z.shape[-1])
    z = sc.transform(z)
    return z.reshape(Z.shape)

def to_onehot(y, n_classes):
    oh = np.zeros((len(y), n_classes), dtype=np.float32)
    oh[np.arange(len(y)), y] = 1.0
    return oh

def metrics_tuple(y_true, y_pred):
    acc = accuracy_score(y_true, y_pred)
    prec, rec, f1, _ = precision_recall_fscore_support(
        y_true, y_pred, average="weighted", zero_division=0
    )
    return acc, prec, rec, f1

def save_normalized_cm(y_true, y_pred, labels, title, out_prefix, show_colorbar=False):
    # Uses matplotlib only and avoids specifying colors/styles explicitly.
    cm = confusion_matrix(y_true, y_pred, labels=labels).astype(np.float32)
    cm_norm = cm / cm.sum(axis=1, keepdims=True)
    cm_norm = np.nan_to_num(cm_norm)

    fig, ax = plt.subplots(figsize=(5.0, 4.5))
    im = ax.imshow(cm_norm, interpolation='nearest')  # no explicit colormap

    if show_colorbar:
        ax.figure.colorbar(im, ax=ax, fraction=0.046, pad=0.04)

    ax.set(
        xticks=np.arange(len(labels)), yticks=np.arange(len(labels)),
        xticklabels=LABEL_NAMES[:len(labels)], yticklabels=LABEL_NAMES[:len(labels)],
        ylabel='True Label', xlabel='Predicted Label', title=title
    )
    plt.setp(ax.get_xticklabels(), rotation=0, ha="center")

    fmt = '.2f'
    thresh = cm_norm.max() / 2.
    for i in range(len(labels)):
        for j in range(len(labels)):
            ax.text(j, i, format(cm_norm[i, j], fmt),
                    ha="center", va="center",
                    color="white" if cm_norm[i, j] > thresh else "black")

    fig.tight_layout()
    pdf_path = os.path.join(SAVE_DIR, f"{out_prefix}.pdf")
    png_path = os.path.join(SAVE_DIR, f"{out_prefix}.png")
    plt.savefig(pdf_path, bbox_inches='tight')
    plt.savefig(png_path, dpi=300, bbox_inches='tight')
    plt.show()
    print("Saved CM:", pdf_path, "and", png_path)

In [None]:
# 3-class 5-fold CV + Confusion Matrix
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=SEED)

fold_stats = []
all_true, all_pred = [], []

for fold, (tr_idx, te_idx) in enumerate(skf.split(X, y), start=1):
    X_tr, X_te = X[tr_idx], X[te_idx]
    y_tr, y_te = y[tr_idx], y[te_idx]

    # Scale per-feature on training data
    scaler = fit_scaler_on_train(X_tr)
    X_tr = apply_scaler(scaler, X_tr)
    X_te = apply_scaler(scaler, X_te)

    # Stratified validation split
    X_tr2, y_tr2, X_val, y_val = stratified_val_split(X_tr, y_tr, val_frac=VAL_FRAC, seed=fold)

    # One-hot labels for label-smoothing
    y_tr2_oh = to_onehot(y_tr2, N_CLASSES)
    y_val_oh = to_onehot(y_val, N_CLASSES)

    model = build_resnet1d(input_shape, N_CLASSES, wd=1e-4, label_smooth=0.05,
                           noise_std=0.01, dropout=0.35)

    ckpt_path = os.path.join(SAVE_DIR, f"best_fold{fold}.keras")
    cbs = [
        callbacks.ModelCheckpoint(ckpt_path, monitor="val_accuracy",
                                  save_best_only=True, verbose=0),
        callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5,
                                    patience=5, verbose=0),
        callbacks.EarlyStopping(monitor="val_accuracy", patience=PATIENCE,
                                restore_best_weights=True, verbose=0)
    ]

    model.fit(
        X_tr2, y_tr2_oh,
        validation_data=(X_val, y_val_oh),
        epochs=EPOCHS, batch_size=BATCH_SIZE,
        verbose=0, callbacks=cbs, shuffle=True
    )

    # Evaluate on test split
    y_prob = model.predict(X_te, verbose=0)
    y_hat  = np.argmax(y_prob, axis=1)

    acc, prec, rec, f1 = metrics_tuple(y_te, y_hat)
    print(f"FOLD {fold}: Acc={acc:.3f} Prec={prec:.3f} Rec={rec:.3f} F1={f1:.3f}")
    print(confusion_matrix(y_te, y_hat))
    print(classification_report(y_te, y_hat, target_names=LABEL_NAMES, zero_division=0))

    fold_stats.append([acc, prec, rec, f1])
    all_true.append(y_te)
    all_pred.append(y_hat)

fold_stats = np.array(fold_stats)
print("\n=== 3-Class CNN (5-fold) — Mean ± SD ===")
print(f"Acc  : {fold_stats[:,0].mean():.3f} ± {fold_stats[:,0].std():.3f}")
print(f"Prec : {fold_stats[:,1].mean():.3f} ± {fold_stats[:,1].std():.3f}")
print(f"Rec  : {fold_stats[:,2].mean():.3f} ± {fold_stats[:,2].std():.3f}")
print(f"F1   : {fold_stats[:,3].mean():.3f} ± {fold_stats[:,3].std():.3f}")

all_true = np.concatenate(all_true)
all_pred = np.concatenate(all_pred)

# Save normalized CM (3-class) as PDF + PNG
save_normalized_cm(
    all_true, all_pred, labels=[0,1,2],
    title="CNN - Balance Assessment (3 Classes)",
    out_prefix="cnn_balance_cm_3class"
)

In [None]:
# Pairwise helpers and 5-fold CV
def subset_pair(X, y, a, b):
    mask = (y == a) | (y == b)
    Xp, yp = X[mask], y[mask]
    # map labels to 0/1 (a->0, b->1)
    yp = (yp == b).astype(np.int32)
    return Xp, yp

def run_pairwise_cv(X, y, class_a, class_b, title, fname_prefix):
    print(f"\n--- Pairwise: {title} ---")
    Xp, yp = subset_pair(X, y, class_a, class_b)
    if len(np.unique(yp)) < 2:
        print("Not enough classes in subset.")
        return

    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=SEED)
    fold_stats = []
    all_true, all_pred = [], []

    for fold, (tr, te) in enumerate(skf.split(Xp, yp), start=1):
        X_tr, X_te = Xp[tr], Xp[te]
        y_tr, y_te = yp[tr], yp[te]

        scaler = fit_scaler_on_train(X_tr)
        X_tr = apply_scaler(scaler, X_tr)
        X_te = apply_scaler(scaler, X_te)

        X_tr2, y_tr2, X_val, y_val = stratified_val_split(X_tr, y_tr, val_frac=VAL_FRAC, seed=fold)
        y_tr2_oh = to_onehot(y_tr2, 2)
        y_val_oh = to_onehot(y_val, 2)

        model = build_resnet1d((X_tr.shape[1], X_tr.shape[2]), 2,
                               wd=1e-4, label_smooth=0.05, noise_std=0.01, dropout=0.35)

        ckpt_path = os.path.join(SAVE_DIR, f"{fname_prefix}_best_fold{fold}.keras")
        cbs = [
            callbacks.ModelCheckpoint(ckpt_path, monitor="val_accuracy",
                                      save_best_only=True, verbose=0),
            callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5,
                                        patience=5, verbose=0),
            callbacks.EarlyStopping(monitor="val_accuracy", patience=PATIENCE,
                                    restore_best_weights=True, verbose=0)
        ]

        model.fit(X_tr2, y_tr2_oh,
                  validation_data=(X_val, y_val_oh),
                  epochs=EPOCHS, batch_size=BATCH_SIZE,
                  verbose=0, callbacks=cbs, shuffle=True)

        y_hat = np.argmax(model.predict(X_te, verbose=0), axis=1)
        acc, prec, rec, f1 = metrics_tuple(y_te, y_hat)
        print(f"Fold {fold}: Acc={acc:.3f} Prec={prec:.3f} Rec={rec:.3f} F1={f1:.3f}")
        print(confusion_matrix(y_te, y_hat))

        fold_stats.append([acc, prec, rec, f1])
        all_true.append(y_te); all_pred.append(y_hat)

    fold_stats = np.array(fold_stats)
    print(f"\n{title} — Mean ± SD")
    print(f"Acc  : {fold_stats[:,0].mean():.3f} ± {fold_stats[:,0].std():.3f}")
    print(f"Prec : {fold_stats[:,1].mean():.3f} ± {fold_stats[:,1].std():.3f}")
    print(f"Rec  : {fold_stats[:,2].mean():.3f} ± {fold_stats[:,2].std():.3f}")
    print(f"F1   : {fold_stats[:,3].mean():.3f} ± {fold_stats[:,3].std():.3f}")

    all_true = np.concatenate(all_true)
    all_pred = np.concatenate(all_pred)

    # Save normalized CM for the pair (mapped to 0/1)
    save_normalized_cm(
        all_true, all_pred, labels=[0,1],
        title=f"CNN - {title}",
        out_prefix=f"{fname_prefix}_cm"
    )

# Run pairwise analyses
run_pairwise_cv(X, y, LABEL_MAP["test1"], LABEL_MAP["test2"],
                title="Test1 (Open) vs Test2 (Closed)",
                fname_prefix="pair_1v2")

run_pairwise_cv(X, y, LABEL_MAP["test1"], LABEL_MAP["test3"],
                title="Test1 (Open) vs Test3 (Dynamic)",
                fname_prefix="pair_1v3")

run_pairwise_cv(X, y, LABEL_MAP["test2"], LABEL_MAP["test3"],
                title="Test2 (Closed) vs Test3 (Dynamic)",
                fname_prefix="pair_2v3")