In [1]:

# Notebook: Subject-level Stratified Train/Test + ML baseline pipeline

import os
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import (accuracy_score, balanced_accuracy_score,
                             classification_report, confusion_matrix)
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, HistGradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
import joblib
import warnings
warnings.filterwarnings('ignore')
sns.set_theme(style='whitegrid')



In [2]:
# Pain scores mapping
PAIN_SCORE = {
    0: 7, 1: 4, 2: 3, 3: 8, 4: 5, 5: 2, 6: 7, 7: 3, 8: 4, 9: 9,
    10: 3, 11: 6, 13: 3, 14: 3, 15: 8, 16: 5, 18: 5, 19: 8, 20: 7,
    21: 6, 22: 7, 23: 6, 24: 9, 25: 8, 26: 0, 27: 1, 30: 3, 31: 9,
    33: 6, 35: 1, 37: 0, 38: 7, 39: 8, 40: 4, 41: 7, 43: 6
}

def pain_label(score):
    if score in (0, 1, 2):
        return "low"
    elif score in (3, 4, 5, 6):
        return "mid"
    else:
        return "high"

In [3]:
#  CONFIG
DATA_DIR = Path("FP1_FeatureData_Simple_Overlapping")   # <-- change to your folder with per-subject CSVs
OUTPUT_DIR = Path("ml_results_subject_split")
OUTPUT_DIR.mkdir(exist_ok=True)

TEST_SIZE = 0.2   # fraction of subjects for test
RANDOM_STATE = 42

# Provide your grouping mapping (from your message)
GROUPS = {
    "low":    [5, 26, 27, 35, 37],
    "mid":    [1,2,4,7,8,10,11,13,14,16,18,21,23,30,33,40,43],
    "high":   [0,3,6,9,15,19,20,22,24,25,31,38,39,41]
}

# Flatten mapping into id->label for safety
PAIN_LABEL_MAP = {}
for lab, ids in GROUPS.items():
    for i in ids:
        PAIN_LABEL_MAP[i] = lab

print("CONFIG:")
print(" DATA_DIR:", DATA_DIR)
print(" OUTPUT_DIR:", OUTPUT_DIR)
print(" TEST_SIZE:", TEST_SIZE)
print(" RANDOM_STATE:", RANDOM_STATE)

CONFIG:
 DATA_DIR: FP1_FeatureData_Simple_Overlapping
 OUTPUT_DIR: ml_results_subject_split
 TEST_SIZE: 0.2
 RANDOM_STATE: 42


In [4]:

# READ CSVs & CHECK MAPPING
# Gather all ID*_feature.csv files
csv_files = sorted(DATA_DIR.glob("ID*_feature.csv"))
if not csv_files:
    raise FileNotFoundError(f"No CSVs found in {DATA_DIR}. Check path and filenames.")

# Build per-subject dataframe mapping
subjects = []
for p in csv_files:
    # parse ID number from filename (assumes 'ID<number>_feature.csv')
    stem = p.stem
    # extract first integer in the stem
    nums = ''.join([c if c.isdigit() else ' ' for c in stem]).split()
    if not nums:
        print("Cannot parse ID from", p.name)
        continue
    sid = int(nums[0])
    subjects.append({'id': sid, 'path': p})
subj_df = pd.DataFrame(subjects).sort_values('id').reset_index(drop=True)
print(f"Found {len(subj_df)} subject CSVs.")



# Verify grouping consistency vs present IDs
present_ids = set(subj_df['id'].tolist())
mapped_ids = set(PAIN_LABEL_MAP.keys())
missing_in_map = present_ids - mapped_ids
missing_in_folder = mapped_ids - present_ids
print("IDs present in folder:", len(present_ids))
if missing_in_map:
    print("Warning: these subject IDs have no label mapping (PAIN_LABEL_MAP):", sorted(missing_in_map))
if missing_in_folder:
    print("Note: mapping contains IDs not present as CSV:", sorted(missing_in_folder))



Found 36 subject CSVs.
IDs present in folder: 36


In [5]:


# Build subject -> label (from CSV label column if exists else from PAIN_LABEL_MAP)
subj_labels = {}
for idx, row in subj_df.iterrows():
    sid = int(row['id'])
    # try to read the CSV's label column (first row) to confirm
    df_tmp = pd.read_csv(row['path'], nrows=2)  # small peek
    if 'label' in df_tmp.columns:
        label_val = df_tmp['label'].iloc[0]
        subj_labels[sid] = str(label_val)
    else:
        # fallback to provided mapping
        if sid in PAIN_LABEL_MAP:
            subj_labels[sid] = PAIN_LABEL_MAP[sid]
        else:
            raise ValueError(f"Cannot determine label for subject ID {sid}. Add mapping or ensure 'label' column in CSV.")
        
        

# Print counts per class (subject-level)
from collections import Counter
cnt = Counter(subj_labels.values())
print("Subject-level counts by label:", dict(cnt))


Subject-level counts by label: {'high': 14, 'mid': 17, 'low': 5}


In [6]:

# BUILD SUBJECT LIST & STRATIFIED SPLIT
# We'll split subjects (not windows) while stratifying on the subject-level label
subject_ids = np.array(sorted(list(subj_labels.keys())))
subject_labels = np.array([subj_labels[s] for s in subject_ids])

# train/test split of subjects
train_sids, test_sids, y_train_sub, y_test_sub = train_test_split(
    subject_ids, subject_labels,
    test_size=TEST_SIZE, random_state=RANDOM_STATE, stratify=subject_labels
)

print("Subjects -> Train:", len(train_sids), " Test:", len(test_sids))
print("Train label counts:", dict(Counter(y_train_sub)))
print("Test  label counts:", dict(Counter(y_test_sub)))

# Save lists
pd.DataFrame({'subject_id': train_sids}).to_csv(OUTPUT_DIR / 'train_subjects.csv', index=False)
pd.DataFrame({'subject_id': test_sids}).to_csv(OUTPUT_DIR / 'test_subjects.csv', index=False)


Subjects -> Train: 28  Test: 8
Train label counts: {np.str_('high'): 11, np.str_('mid'): 13, np.str_('low'): 4}
Test  label counts: {np.str_('mid'): 4, np.str_('high'): 3, np.str_('low'): 1}


In [7]:

# ASSEMBLE ROW-LEVEL DATA (no leakage)
# Load all rows for train subjects and test subjects. Use subj_id column to select.
def load_subject_csv(sid):
    p = DATA_DIR / f"ID{sid}_feature.csv"
    if not p.exists():
        raise FileNotFoundError(f"Expected file {p} not found.")
    df = pd.read_csv(p)
    # ensure subj_id column exists (if not, add)
    if 'subj_id' not in df.columns:
        df['subj_id'] = sid
    return df

train_rows = []
for sid in train_sids:
    df = load_subject_csv(sid)
    train_rows.append(df)
test_rows = []
for sid in test_sids:
    df = load_subject_csv(sid)
    test_rows.append(df)

train_df = pd.concat(train_rows, ignore_index=True)
test_df = pd.concat(test_rows, ignore_index=True)
print("Row-level: train rows =", len(train_df), ", test rows =", len(test_df))

# quick class balance check (row-level)
print("Row-level label distribution (train):\n", train_df['label'].value_counts())
print("Row-level label distribution (test):\n", test_df['label'].value_counts())


Row-level: train rows = 6688 , test rows = 1912
Row-level label distribution (train):
 label
mid     3107
high    2627
low      954
Name: count, dtype: int64
Row-level label distribution (test):
 label
mid     956
high    717
low     239
Name: count, dtype: int64


In [8]:

# PREPROCESS: feature selection, impute, scale
# Remove meta columns we don't want as features
META_COLS = ['subj_id', 'window_idx', 'label', 'pain_score']
# ensure these exist
for c in META_COLS:
    if c not in train_df.columns:
        print(f"Warning: expected meta column {c} not found in data.")


# Feature columns = all numeric columns except meta
feat_cols = [c for c in train_df.columns if c not in META_COLS]
print("Number of feature columns detected:", len(feat_cols))

X_train = train_df[feat_cols].values
y_train = train_df['label'].values
X_test = test_df[feat_cols].values
y_test = test_df['label'].values

# Encode labels to integers for classifiers
le = LabelEncoder().fit(y_train)  # fit on train labels
y_train_enc = le.transform(y_train)
y_test_enc = le.transform(y_test)

# Preprocessing pipeline (impute median + standard scale)
preproc = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])


X_train_p = preproc.fit_transform(X_train)
X_test_p = preproc.transform(X_test)

# Save preproc
joblib.dump(preproc, OUTPUT_DIR / 'preprocessing_pipeline.joblib')


Number of feature columns detected: 52


['ml_results_subject_split\\preprocessing_pipeline.joblib']

In [9]:
print(X_train.shape, X_test.shape)
print(y_train.shape, y_test.shape)

# after preprocessing
print(X_train_p.shape, X_test_p.shape)
print(y_train_enc.shape, y_test_enc.shape)


(6688, 52) (1912, 52)
(6688,) (1912,)
(6688, 52) (1912, 52)
(6688,) (1912,)


In [10]:

# MODEL LIST
models = {
    "LogisticRegression": LogisticRegression(max_iter=2000, random_state=RANDOM_STATE),
    "RandomForest": RandomForestClassifier(n_estimators=200, random_state=RANDOM_STATE, n_jobs=-1),
    "GradientBoosting": GradientBoostingClassifier(n_estimators=200, random_state=RANDOM_STATE),
    "HistGradientBoosting": HistGradientBoostingClassifier(random_state=RANDOM_STATE),
    "SVC": SVC(probability=True, random_state=RANDOM_STATE),
    "KNN": KNeighborsClassifier(n_neighbors=5)
}
# Try to add XGBoost if available
try:
    from xgboost import XGBClassifier
    models["XGBoost"] = XGBClassifier(use_label_encoder=False, eval_metric='mlogloss', random_state=RANDOM_STATE)
    print("XGBoost available and added.")
except Exception:
    print("XGBoost not available (skipping).")

# %% TRAIN, EVALUATE, PLOT
results = []
for name, clf in models.items():
    print("\n" + "="*50)
    print("Training:", name)
    clf.fit(X_train_p, y_train_enc)
    # Save model
    joblib.dump(clf, OUTPUT_DIR / f"{name}.joblib")
    y_pred = clf.predict(X_test_p)
    y_prob = None
    if hasattr(clf, "predict_proba"):
        y_prob = clf.predict_proba(X_test_p)

    acc = accuracy_score(y_test_enc, y_pred)
    bacc = balanced_accuracy_score(y_test_enc, y_pred)
    report = classification_report(y_test_enc, y_pred, target_names=le.classes_, output_dict=True)
    report_text = classification_report(y_test_enc, y_pred, target_names=le.classes_)
    print(report_text)
    print("Accuracy:", acc, " Balanced Accuracy:", bacc)

    # Confusion matrix
    cm = confusion_matrix(y_test_enc, y_pred)
    fig, ax = plt.subplots(figsize=(5,4))
    sns.heatmap(cm, annot=True, fmt='d', xticklabels=le.classes_, yticklabels=le.classes_, cmap='Blues', ax=ax)
    ax.set_xlabel('Predicted')
    ax.set_ylabel('True')
    ax.set_title(f"{name} — Confusion Matrix")
    plt.tight_layout()
    fig_path = OUTPUT_DIR / f"{name}_confusion_matrix.png"
    fig.savefig(fig_path)
    plt.close(fig)

    # Collect numeric metrics summary
    results.append({
        'model': name,
        'accuracy': acc,
        'balanced_accuracy': bacc,
        'precision_macro': report['macro avg']['precision'],
        'recall_macro': report['macro avg']['recall'],
        'f1_macro': report['macro avg']['f1-score'],
        'n_train_subjects': len(train_sids),
        'n_test_subjects': len(test_sids),
        'n_train_rows': len(train_df),
        'n_test_rows': len(test_df)
    })

    # Save detailed report
    with open(OUTPUT_DIR / f"{name}_classification_report.txt", 'w') as fh:
        fh.write(report_text)


XGBoost available and added.

Training: LogisticRegression
              precision    recall  f1-score   support

        high       0.20      0.11      0.14       717
         low       0.01      0.01      0.01       239
         mid       0.44      0.62      0.51       956

    accuracy                           0.35      1912
   macro avg       0.22      0.25      0.22      1912
weighted avg       0.30      0.35      0.31      1912

Accuracy: 0.3530334728033473  Balanced Accuracy: 0.24628079962807994

Training: RandomForest
              precision    recall  f1-score   support

        high       0.17      0.26      0.20       717
         low       0.05      0.03      0.03       239
         mid       0.28      0.20      0.24       956

    accuracy                           0.20      1912
   macro avg       0.17      0.16      0.16      1912
weighted avg       0.21      0.20      0.20      1912

Accuracy: 0.200836820083682  Balanced Accuracy: 0.16166899116689912

Training: Gradien

In [None]:

# Save results summary
results_df = pd.DataFrame(results).sort_values('f1_macro', ascending=False)
results_df.to_csv(OUTPUT_DIR / "model_results_summary.csv", index=False)
print("\nSummary:")
print(results_df)

# %% EXTRA: show per-class support in test set and mapping
print("\nTest set per-class row support:", test_df['label'].value_counts().to_dict())
print("Label encoding mapping (int -> label):", dict(enumerate(le.classes_)))

# EOF


In [None]:
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import cv2
from tensorflow import keras

# ==========================
# Grad-CAM heatmap function
# ==========================
def make_gradcam_heatmap(img_array, model, last_conv_layer_name, pred_index=None):
    """
    Generate a Grad-CAM heatmap for a given image and model.
    
    Args:
        img_array (np.ndarray): Input image array of shape (1, H, W, 3)
            with pixel values in [0, 255].
        model (tf.keras.Model): The trained model containing a Rescaling layer.
        last_conv_layer_name (str): Name of the last convolutional layer.
        pred_index (int, optional): Class index to visualize. Defaults to predicted class.
    
    Returns:
        np.ndarray: Normalized heatmap (H, W) values in [0, 1].
    """
    # Build a model mapping input to activations + output
    grad_model = tf.keras.models.Model(
        [model.inputs],
        [model.get_layer(last_conv_layer_name).output, model.output]
    )

    # Record gradients
    with tf.GradientTape() as tape:
        conv_outputs, predictions = grad_model(img_array)
        if pred_index is None:
            pred_index = tf.argmax(predictions[0])
        class_channel = predictions[:, pred_index]

    # Compute gradient of class output wrt conv layer
    grads = tape.gradient(class_channel, conv_outputs)

    # Mean intensity of gradients for each feature map
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))

    # Weight each feature map by its importance
    conv_outputs = conv_outputs[0]
    heatmap = conv_outputs @ pooled_grads[..., tf.newaxis]
    heatmap = tf.squeeze(heatmap)

    # Normalize to [0, 1]
    heatmap = tf.maximum(heatmap, 0)
    max_val = tf.reduce_max(heatmap)
    if max_val > 0:
        heatmap /= max_val

    return heatmap.numpy()

# ==========================================
# Function to overlay Grad-CAM on the image
# ==========================================
def overlay_gradcam(img, heatmap, alpha=0.4, colormap=cv2.COLORMAP_JET):
    """
    Overlay Grad-CAM heatmap on the original image.
    
    Args:
        img (np.ndarray): Original image array (H, W, 3), uint8 [0–255].
        heatmap (np.ndarray): Grad-CAM heatmap (H, W), float [0–1].
        alpha (float): Transparency factor for blending.
        colormap (int): OpenCV colormap to apply.
    
    Returns:
        np.ndarray: RGB image with heatmap overlay.
    """
    # Ensure uint8 image
    if img.dtype != np.uint8:
        img = np.uint8(255 * img / np.max(img))

    # Resize heatmap to image size
    heatmap = cv2.resize(heatmap, (img.shape[1], img.shape[0]))

    # Convert heatmap to RGB using colormap
    heatmap = np.uint8(255 * heatmap)
    heatmap = cv2.applyColorMap(heatmap, colormap)
    heatmap = cv2.cvtColor(heatmap, cv2.COLOR_BGR2RGB)

    # Superimpose heatmap on original image
    superimposed_img = cv2.addWeighted(heatmap, alpha, img, 1 - alpha, 0)
    return superimposed_img

# ==================================================
# Display Grad-CAM results for a few sample images
# ==================================================
def show_gradcam_samples(dataset, class_names, model, last_conv_layer_name, num_images=3):
    """
    Display Grad-CAM visualizations for a few images from a dataset.
    
    Args:
        dataset (tf.data.Dataset): Dataset containing (image, label) pairs.
        class_names (list): List of class names in order.
        model (tf.keras.Model): Trained model.
        last_conv_layer_name (str): Name of the last convolutional layer.
        num_images (int): Number of images to visualize.
    """
    plt.figure(figsize=(12, 6))
    for images, labels in dataset.take(1):
        for i in range(min(num_images, images.shape[0])):
            img = images[i].numpy().astype("uint8")
            label = int(labels[i].numpy())

            # Expand dims (model includes its own normalization layer)
            img_array = np.expand_dims(img, axis=0)

            # Generate Grad-CAM
            heatmap = make_gradcam_heatmap(img_array, model, last_conv_layer_name)
            superimposed_img = overlay_gradcam(img, heatmap)

            # Plot original
            plt.subplot(2, num_images, i + 1)
            plt.imshow(img)
            plt.title(f"True: {class_names[label]}")
            plt.axis("off")

            # Plot Grad-CAM overlay
            plt.subplot(2, num_images, i + 1 + num_images)
            plt.imshow(superimposed_img)
            plt.title("Grad-CAM")
            plt.axis("off")

        break
    plt.tight_layout()
    plt.show()
