In [1]:
from __future__ import annotations
import numpy as np
from numpy.lib.stride_tricks import sliding_window_view
from dataclasses import dataclass
from typing import List, Tuple, Optional, Dict, Any

from sklearn.linear_model import LogisticRegressionCV
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.inspection import permutation_importance

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, roc_auc_score, f1_score, confusion_matrix

data_dir = '../data/'

In [2]:
@dataclass(frozen=True)
class FeatureSpec:
    channel: int
    shapelet_idx: int
    shapelet_len: int


def _zscore(x: np.ndarray, axis=None, eps: float = 1e-8) -> np.ndarray:
    m = x.mean(axis=axis, keepdims=True)
    s = x.std(axis=axis, keepdims=True)
    return (x - m) / (s + eps)


def min_sliding_distance(
    x: np.ndarray,
    s: np.ndarray,
    per_window_z: bool = False,
    eps: float = 1e-8,
) -> float:
    """
    Minimal Euclidean distance between a 1D time series x (len T)
    and a shapelet s (len L), computed over all T-L+1 windows.

    If per_window_z=True, each window and the shapelet are z-normalized
    before computing distances (useful for amplitude/offset invariance).
    """
    T = x.shape[0]
    L = s.shape[0]
    if L > T:
        return np.inf  # cannot slide shapelet longer than the series

    # rolling windows view: shape (T-L+1, L)
    W = sliding_window_view(x, L)

    if per_window_z:
        # z-normalize each window (row-wise) and the shapelet once
        Wn = _zscore(W, axis=1, eps=eps)
        sn = _zscore(s, axis=0, eps=eps)
        # broadcast subtract → (T-L+1, L)
        d2 = np.square(Wn - sn).sum(axis=1)
    else:
        d2 = np.square(W - s).sum(axis=1)

    return float(np.sqrt(d2.min()))


def build_shapelet_features(
    X: np.ndarray,                        # (n_samples, n_channels, n_timepoints)
    shapelets: List[List[np.ndarray]],    # list over channels -> list of 1D arrays
    *,
    per_window_z: bool = True,
    global_channel_z: bool = False,
    verbose: bool = True,
) -> Tuple[np.ndarray, List[FeatureSpec]]:
    """
    Returns:
      F: (n_samples, n_features) shapelet-distance features
      specs: list mapping feature index -> (channel, shapelet_idx, shapelet_len)
    """
    n_samples, n_channels, n_time = X.shape
    assert len(shapelets) == n_channels, "Provide shapelets per channel."

    # Optional: z-normalize each channel globally per sample (not window-wise).
    # This de-biases scale differences before the per-window computation.
    Xn = X.copy()
    if global_channel_z:
        for i in range(n_samples):
            for c in range(n_channels):
                Xn[i, c] = _zscore(Xn[i, c])

    # Precompute total features and a spec map
    specs: List[FeatureSpec] = []
    for c in range(n_channels):
        for j, shp in enumerate(shapelets[c]):
            specs.append(FeatureSpec(channel=c, shapelet_idx=j, shapelet_len=len(shp)))
    n_features = len(specs)

    if verbose:
        total_shapelets = sum(len(lst) for lst in shapelets)
        print(f"[build] samples={n_samples}, channels={n_channels}, time={n_time}")
        print(f"[build] total shapelets={total_shapelets}, features={n_features}")
        if per_window_z:
            print("[build] distance = min Euclidean on z-scored windows")
        elif global_channel_z:
            print("[build] distance = min Euclidean (channels globally z-scored)")
        else:
            print("[build] distance = min Euclidean (raw)")

    F = np.empty((n_samples, n_features), dtype=np.float32)

    # Compute features
    # Feature index traverses channel-major then shapelet index
    f_idx = 0
    for c in range(n_channels):
        S_c = shapelets[c]
        if verbose:
            print(f"[build] channel {c}: {len(S_c)} shapelets")
        for j, shp in enumerate(S_c):
            # ensure 1D float array
            s = np.asarray(shp, dtype=float).ravel()
            for i in range(n_samples):
                x = np.asarray(Xn[i, c], dtype=float).ravel()
                F[i, f_idx] = min_sliding_distance(x, s, per_window_z=per_window_z)
            f_idx += 1

    return F, specs


def summarize_top_features(
    importances: np.ndarray,
    specs: List[FeatureSpec],
    k: int = 20,
    title: str = "Top features",
) -> List[Dict[str, Any]]:
    idx = np.argsort(importances)[::-1]  # descending
    out = []
    for r in idx[:k]:
        spec = specs[r]
        out.append({
            "rank": len(out) + 1,
            "feature_index": int(r),
            "channel": spec.channel,
            "shapelet_idx": spec.shapelet_idx,
            "shapelet_len": spec.shapelet_len,
            "importance": float(importances[r]),
        })
    print(f"\n{title} (top {min(k, len(importances))}):")
    for row in out:
        print(
            f"#{row['rank']:>2}  f={row['feature_index']:>4}  "
            f"[ch={row['channel']}, shp={row['shapelet_idx']}, L={row['shapelet_len']}]  "
            f"imp={row['importance']:.6f}"
        )
    return out

def holdout_eval(model, F, y, test_size=0.25, random_state=0):
    X_tr, X_te, y_tr, y_te = train_test_split(
        F, y, test_size=test_size, stratify=y, random_state=random_state
    )
    model.fit(X_tr, y_tr)
    y_pred = model.predict(X_te)
    acc = accuracy_score(y_te, y_pred)

    # Optional extras
    try:
        auc = roc_auc_score(y_te, model.predict_proba(X_te)[:, 1])
    except Exception:
        auc = None
    f1 = f1_score(y_te, y_pred)
    cm = confusion_matrix(y_te, y_pred)

    print(f"[holdout] accuracy={acc:.3f} | f1={f1:.3f} | auc={auc if auc is not None else 'n/a'}")
    print("[holdout] confusion matrix:\n", cm)
    return {"accuracy": acc, "f1": f1, "auc": auc, "cm": cm}


def demo_train_and_rank(
    X: np.ndarray,
    y: np.ndarray,
    shapelets: List[List[np.ndarray]],
    *,
    per_window_z: bool = True,
    global_channel_z: bool = False,
    random_state: int = 0,
    n_perm_repeats: int = 10,
    use_random_forest: bool = True,
    rf_kwargs: Optional[dict] = None,
) -> Dict[str, Any]:
    """
    Build features, fit models, and compute feature importance.
    Returns a dict with features, mapping, models, and importance arrays.
    """
    # 1) Build features
    F, specs = build_shapelet_features(
        X, shapelets,
        per_window_z=per_window_z,
        global_channel_z=global_channel_z,
        verbose=True,
    )

    results: Dict[str, Any] = {"F": F, "specs": specs}

    # 2) Model A: Logistic Regression with L1 (sparse, interpretable weights)
    logi = make_pipeline(
        StandardScaler(with_mean=True, with_std=True),
        LogisticRegressionCV(
            Cs=20,
            penalty="l1",
            solver="liblinear",
            scoring="roc_auc",
            cv=5,
            max_iter=2000,
            n_jobs=None,
            random_state=random_state,
            refit=True,
        )
    )
    logi.fit(F, y)
    results["logistic_pipeline"] = logi

    # Extract absolute coefficients as a crude importance
    lr = logi.named_steps["logisticregressioncv"]
    # coef_ shape (1, n_features) for binary → flatten
    coef_abs = np.abs(lr.coef_.ravel())
    results["coef_abs"] = coef_abs
    summarize_top_features(coef_abs, specs, k=20, title="L1-LogReg | |coef|")

    # 3) Model B (optional): Random Forest + impurity importance
    if use_random_forest:
        rf = RandomForestClassifier(
            n_estimators=400,
            max_depth=None,
            min_samples_split=2,
            min_samples_leaf=1,
            random_state=random_state,
            n_jobs=-1,
            **(rf_kwargs or {})
        )
        rf.fit(F, y)
        results["rf"] = rf
        rf_imp = rf.feature_importances_
        results["rf_importance"] = rf_imp
        summarize_top_features(rf_imp, specs, k=20, title="RandomForest | impurity importance")

    # 4) Permutation importance (model-agnostic). Use the better of the two models by AUC.
    # Compute quick train AUCs to decide which model to permute on (not perfect, but simple).
    from sklearn.metrics import roc_auc_score
    auc_logi = roc_auc_score(y, logi.predict_proba(F)[:, 1])
    model_for_perm = logi
    model_name = "LogReg"
    if use_random_forest:
        auc_rf = roc_auc_score(y, rf.predict_proba(F)[:, 1])
        if auc_rf > auc_logi:
            model_for_perm = rf
            model_name = "RF"

    print(f"\n[perm] Using {model_name} for permutation importance (train AUC baseline).")
    perm = permutation_importance(
        model_for_perm, F, y,
        scoring="roc_auc",
        n_repeats=n_perm_repeats,
        random_state=random_state,
        n_jobs=-1,
    )
    perm_mean = perm.importances_mean
    perm_std = perm.importances_std
    results["perm_mean"] = perm_mean
    results["perm_std"] = perm_std
    summarize_top_features(perm_mean, specs, k=20, title="Permutation importance | mean ΔAUC")

    eval_res = holdout_eval(logi, F, y, test_size=0.25, random_state=0)

    return results, eval_res

In [3]:
import pickle
import torch

from utils import Data_Loader

In [13]:
config = {
    'data_dir': data_dir + 'FingerMovements',
    'Norm': False
}
Data = Data_Loader(config)
X = Data['train_data']
y = Data['train_label']

with open('../store/FingerMovements_sd2.pkl', 'rb') as f:
    sd = pickle.load(f)
shapelets_info = sd.get_shapelet_info(number_of_shapelet=30)

sw = torch.tensor(shapelets_info[:,3])
sw = torch.softmax(sw*20, dim=0)*sw.shape[0]
shapelets_info[:,3] = sw.numpy()

shapelets = [[] for i in range(28)]
for si in shapelets_info:
    sc = X[int(si[0]), int(si[5]), int(si[1]):int(si[2])]
    for i in range(28):
        shapelets[i].append(sc)

_ = demo_train_and_rank(
    X, y, shapelets,
    per_window_z=False,         # z-normalize each window + shapelet (recommended)
    global_channel_z=False,    # or True, if you want an extra global channel z-norm
    n_perm_repeats=8,
    use_random_forest=False,
)

2025-10-22 22:55:54,119 | INFO : Loading preprocessed data ...
2025-10-22 22:55:54,122 | INFO : 158 samples will be used for training
2025-10-22 22:55:54,123 | INFO : 158 samples will be used for validation
2025-10-22 22:55:54,123 | INFO : 100 samples will be used for testing


[build] samples=158, channels=28, time=50
[build] total shapelets=1680, features=1680
[build] distance = min Euclidean (raw)
[build] channel 0: 60 shapelets
[build] channel 1: 60 shapelets
[build] channel 2: 60 shapelets
[build] channel 3: 60 shapelets
[build] channel 4: 60 shapelets
[build] channel 5: 60 shapelets
[build] channel 6: 60 shapelets
[build] channel 7: 60 shapelets
[build] channel 8: 60 shapelets
[build] channel 9: 60 shapelets
[build] channel 10: 60 shapelets
[build] channel 11: 60 shapelets
[build] channel 12: 60 shapelets
[build] channel 13: 60 shapelets
[build] channel 14: 60 shapelets
[build] channel 15: 60 shapelets
[build] channel 16: 60 shapelets
[build] channel 17: 60 shapelets
[build] channel 18: 60 shapelets
[build] channel 19: 60 shapelets
[build] channel 20: 60 shapelets
[build] channel 21: 60 shapelets
[build] channel 22: 60 shapelets
[build] channel 23: 60 shapelets
[build] channel 24: 60 shapelets
[build] channel 25: 60 shapelets
[build] channel 26: 60 shap

In [10]:
config = {
    'data_dir': data_dir + 'SelfRegulationSCP1',
    'Norm': False
}
Data = Data_Loader(config)
X = Data['train_data']
y = Data['train_label']

with open('../store/SelfRegulationSCP1_sd2.pkl', 'rb') as f:
    sd = pickle.load(f)
shapelets_info = sd.get_shapelet_info(number_of_shapelet=100)

sw = torch.tensor(shapelets_info[:,3])
sw = torch.softmax(sw*20, dim=0)*sw.shape[0]
shapelets_info[:,3] = sw.numpy()

shapelets = [[] for i in range(6)]
for si in shapelets_info:
    sc = X[int(si[0]), int(si[5]), int(si[1]):int(si[2])]
    shapelets[int(si[5])].append(sc)

_ = demo_train_and_rank(
    X, y, shapelets,
    per_window_z=False,         # z-normalize each window + shapelet (recommended)
    global_channel_z=False,    # or True, if you want an extra global channel z-norm
    n_perm_repeats=8,
    use_random_forest=False,
)

2025-10-22 22:54:36,311 | INFO : Loading preprocessed data ...
2025-10-22 22:54:36,328 | INFO : 134 samples will be used for training
2025-10-22 22:54:36,328 | INFO : 134 samples will be used for validation
2025-10-22 22:54:36,329 | INFO : 293 samples will be used for testing


[build] samples=134, channels=6, time=896
[build] total shapelets=200, features=200
[build] distance = min Euclidean (raw)
[build] channel 0: 44 shapelets
[build] channel 1: 34 shapelets
[build] channel 2: 35 shapelets
[build] channel 3: 25 shapelets
[build] channel 4: 36 shapelets
[build] channel 5: 26 shapelets

L1-LogReg | |coef| (top 20):
# 1  f= 137  [ch=3, shp=24, L=106]  imp=0.457578
# 2  f= 108  [ch=2, shp=30, L=40]  imp=0.433581
# 3  f= 197  [ch=5, shp=23, L=2]  imp=0.000000
# 4  f= 199  [ch=5, shp=25, L=2]  imp=0.000000
# 5  f= 195  [ch=5, shp=21, L=7]  imp=0.000000
# 6  f= 194  [ch=5, shp=20, L=3]  imp=0.000000
# 7  f= 193  [ch=5, shp=19, L=45]  imp=0.000000
# 8  f= 198  [ch=5, shp=24, L=2]  imp=0.000000
# 9  f= 192  [ch=5, shp=18, L=2]  imp=0.000000
#10  f= 191  [ch=5, shp=17, L=2]  imp=0.000000
#11  f= 189  [ch=5, shp=15, L=2]  imp=0.000000
#12  f= 190  [ch=5, shp=16, L=2]  imp=0.000000
#13  f= 187  [ch=5, shp=13, L=3]  imp=0.000000
#14  f= 186  [ch=5, shp=12, L=39]  imp=0

In [14]:
config = {
    'data_dir': data_dir + 'SelfRegulationSCP2',
    'Norm': False
}
Data = Data_Loader(config)
X = Data['train_data']
y = Data['train_label']

with open('../store/SelfRegulationSCP2_sd2.pkl', 'rb') as f:
    sd = pickle.load(f)
shapelets_info = sd.get_shapelet_info(number_of_shapelet=100)

sw = torch.tensor(shapelets_info[:,3])
sw = torch.softmax(sw*20, dim=0)*sw.shape[0]
shapelets_info[:,3] = sw.numpy()

shapelets = [[] for i in range(7)]
for si in shapelets_info:
    sc = X[int(si[0]), int(si[5]), int(si[1]):int(si[2])]
    for i in range(7):
        shapelets[i].append(sc)

_ = demo_train_and_rank(
    X, y, shapelets,
    per_window_z=False,         # z-normalize each window + shapelet (recommended)
    global_channel_z=False,    # or True, if you want an extra global channel z-norm
    n_perm_repeats=8,
    use_random_forest=False,
)

2025-10-22 22:56:04,664 | INFO : Loading preprocessed data ...
2025-10-22 22:56:04,673 | INFO : 100 samples will be used for training
2025-10-22 22:56:04,674 | INFO : 100 samples will be used for validation
2025-10-22 22:56:04,674 | INFO : 180 samples will be used for testing


[build] samples=100, channels=7, time=1152
[build] total shapelets=1400, features=1400
[build] distance = min Euclidean (raw)
[build] channel 0: 200 shapelets
[build] channel 1: 200 shapelets
[build] channel 2: 200 shapelets
[build] channel 3: 200 shapelets
[build] channel 4: 200 shapelets
[build] channel 5: 200 shapelets
[build] channel 6: 200 shapelets

L1-LogReg | |coef| (top 20):
# 1  f=1313  [ch=6, shp=113, L=4]  imp=10.520085
# 2  f= 591  [ch=2, shp=191, L=4]  imp=9.723508
# 3  f=1367  [ch=6, shp=167, L=4]  imp=6.891448
# 4  f=1113  [ch=5, shp=113, L=4]  imp=5.973976
# 5  f= 907  [ch=4, shp=107, L=3]  imp=5.935302
# 6  f=1107  [ch=5, shp=107, L=3]  imp=5.408041
# 7  f= 100  [ch=0, shp=100, L=27]  imp=5.335140
# 8  f=1170  [ch=5, shp=170, L=6]  imp=5.193987
# 9  f= 858  [ch=4, shp=58, L=2]  imp=5.029822
#10  f= 423  [ch=2, shp=23, L=5]  imp=5.029698
#11  f= 991  [ch=4, shp=191, L=4]  imp=5.027256
#12  f=1391  [ch=6, shp=191, L=4]  imp=5.014746
#13  f=1191  [ch=5, shp=191, L=4]  im