In [6]:
!pip install mne hmmlearn -q


[notice] A new release of pip is available: 25.0.1 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [7]:
import os
import numpy as np
import mne
from hmmlearn import hmm
from sklearn.decomposition import PCA

In [8]:
epochs_dir = "./epochs_fif"   
results_dir_train = "./results"
results_dir_test  = "./test"

os.makedirs(results_dir_train, exist_ok=True)
os.makedirs(results_dir_test, exist_ok=True)

train_subs = [f"sub-{str(i).zfill(2)}" for i in range(1, 21)]
test_subs  = [f"sub-{str(i).zfill(2)}" for i in range(21, 26)]

In [None]:
def load_epochs_for_subjects(subject_list, data_dir):
    X_list, y_list = [], []
    for sub in subject_list:
        epo_path = os.path.join(data_dir, f"{sub}_epo.fif")
        if not os.path.exists(epo_path):
            print(f"Skip {sub}: {epo_path} not found")
            continue

        epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
        data = epochs.get_data()          # (n_trials, n_channels, n_times)
        labels = epochs.events[:, 2].copy().astype(int)  # 0 = frequent, 1 = target

        n_trials, C, T = data.shape
        X_flat = data.reshape(n_trials, C * T)  # flatten for ML/HMM

        X_list.append(X_flat)
        y_list.append(labels)

        print(f"{sub}: X={X_flat.shape}, y={labels.shape}")

    if len(X_list) == 0:
        raise RuntimeError("Subjects data not found!")

    X = np.concatenate(X_list, axis=0)
    y = np.concatenate(y_list, axis=0)
    return X.astype(np.float32), y.astype(int)

In [10]:
print("Loading TRAIN subjects...")
X_all_train, y_all_train = load_epochs_for_subjects(train_subs, epochs_dir)
print("Loading TEST subjects...")
X_all_test,  y_all_test  = load_epochs_for_subjects(test_subs,  epochs_dir)

print("TRAIN:", X_all_train.shape, y_all_train.shape)
print("TEST :", X_all_test.shape,  y_all_test.shape)

Loading TRAIN subjects...


  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)


sub-01: X=(162, 9009), y=(162,)
sub-02: X=(162, 9009), y=(162,)
sub-03: X=(162, 9009), y=(162,)
sub-04: X=(162, 9009), y=(162,)
sub-05: X=(162, 9009), y=(162,)


  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)


sub-06: X=(162, 9009), y=(162,)
sub-07: X=(162, 9009), y=(162,)
sub-08: X=(162, 9009), y=(162,)
sub-09: X=(162, 9009), y=(162,)
sub-10: X=(162, 9009), y=(162,)


  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)


sub-11: X=(162, 9009), y=(162,)
sub-12: X=(162, 9009), y=(162,)
sub-13: X=(162, 9009), y=(162,)
sub-14: X=(162, 9009), y=(162,)
sub-15: X=(162, 9009), y=(162,)


  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)


sub-16: X=(162, 9009), y=(162,)
sub-17: X=(162, 9009), y=(162,)
sub-18: X=(162, 9009), y=(162,)
sub-19: X=(162, 9009), y=(162,)
sub-20: X=(162, 9009), y=(162,)
Loading TEST subjects...


  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)


sub-21: X=(162, 9009), y=(162,)
sub-22: X=(162, 9009), y=(162,)
sub-23: X=(162, 9009), y=(162,)
sub-24: X=(162, 9009), y=(162,)
sub-25: X=(162, 9009), y=(162,)
TRAIN: (3240, 9009) (3240,)
TEST : (810, 9009) (810,)


  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
  epochs = mne.read_epochs(epo_path, preload=True, verbose=False)


In [11]:
def fit_hmm_fast(X_train_norm, y_train, X_test_norm,
                 n_components_pca=50,
                 n_states=3,
                 cov_type="diag"):
    print(f"Running PCA dim reduction ... (from {X_train_norm.shape[1]} dims)")
    pca = PCA(
        n_components=n_components_pca,
        svd_solver="randomized",
        whiten=False,
        random_state=3407
    )
    X_train_pca = pca.fit_transform(X_train_norm)
    X_test_pca  = pca.transform(X_test_norm)

    print(f"PCA done: train {X_train_pca.shape}, test {X_test_pca.shape}")

    X_freq_train = X_train_pca[y_train == 0]
    X_targ_train = X_train_pca[y_train == 1]

    print("Frequent train (PCA):", X_freq_train.shape)
    print("Target   train (PCA):", X_targ_train.shape)

    hmm_freq = hmm.GaussianHMM(
        n_components=n_states,
        covariance_type=cov_type,  
        n_iter=50,                  
        tol=1e-2,                  
        random_state=3407,
        verbose=False
    )
    hmm_targ = hmm.GaussianHMM(
        n_components=n_states,
        covariance_type=cov_type,
        n_iter=50,
        tol=1e-2,
        random_state=3407,
        verbose=False
    )

    print("Fitting HMM for frequent (PCA space) ...")
    hmm_freq.fit(X_freq_train)
    print("Fitting HMM for target   (PCA space) ...")
    hmm_targ.fit(X_targ_train)

    def compute_hmm_scores(X_pca, name):
        feats = np.empty((X_pca.shape[0], 3), dtype=np.float32)
        for i in range(X_pca.shape[0]):
            sample = X_pca[i:i+1]   # 2D [1, d]
            s_f = hmm_freq.score(sample)
            s_t = hmm_targ.score(sample)
            feats[i, 0] = s_f
            feats[i, 1] = s_t
            feats[i, 2] = s_f - s_t
        print(f"{name}: HMM feats shape={feats.shape}")
        return feats

    X_hmm_train = compute_hmm_scores(X_train_pca, "TRAIN")
    X_hmm_test  = compute_hmm_scores(X_test_pca,  "TEST")

    return X_hmm_train, X_hmm_test

In [12]:
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer

def clean_and_scale(X, name):
    if np.isnan(X).any():
        print(f"{name}: NaN detected â†’ imputing (median)")
        imp = SimpleImputer(strategy="median")
        X = imp.fit_transform(X)

    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    print(f"{name}: mean={X_scaled.mean():.4f}, std={X_scaled.std():.4f}")
    return X_scaled

X_all_train_norm = clean_and_scale(X_all_train, "TRAIN")
X_all_test_norm  = clean_and_scale(X_all_test,  "TEST")

TRAIN: mean=-0.0000, std=1.0000
TEST: mean=0.0000, std=1.0000


In [13]:
def fit_hmm_features(X_train_norm, y_train, X_test_norm):
    X_freq_train = X_train_norm[y_train == 0]
    X_targ_train = X_train_norm[y_train == 1]

    print("Frequent train:", X_freq_train.shape)
    print("Target   train:", X_targ_train.shape)

    hmm_freq = hmm.GaussianHMM(
        n_components=3,
        covariance_type="full",
        n_iter=100,
        random_state=3407,
    )
    hmm_targ = hmm.GaussianHMM(
        n_components=3,
        covariance_type="full",
        n_iter=100,
        random_state=3407,
    )

    print("Fitting HMM for frequent ...")
    hmm_freq.fit(X_freq_train)
    print("Fitting HMM for target   ...")
    hmm_targ.fit(X_targ_train)

    def compute_feats(X_norm, name):
        feats = []
        for i in range(X_norm.shape[0]):
            sample = X_norm[i:i+1]
            try:
                s_f = hmm_freq.score(sample)
            except Exception:
                s_f = 0.0
            try:
                s_t = hmm_targ.score(sample)
            except Exception:
                s_t = 0.0
            feats.append([s_f, s_t, s_f - s_t])
        feats = np.asarray(feats, dtype=np.float32)
        print(f"{name}: HMM feats shape={feats.shape}")
        return feats

    X_hmm_train = compute_feats(X_train_norm, "TRAIN")
    X_hmm_test  = compute_feats(X_test_norm,  "TEST")
    return X_hmm_train, X_hmm_test

X_all_hmm_train, X_all_hmm_test = fit_hmm_fast(
    X_all_train_norm, y_all_train, X_all_test_norm,
    n_components_pca=64,  
    n_states=3,            
    cov_type="diag"       
)

Running PCA dim reduction ... (from 9009 dims)
PCA done: train (3240, 64), test (810, 64)
Frequent train (PCA): (1620, 64)
Target   train (PCA): (1620, 64)
Fitting HMM for frequent (PCA space) ...
Fitting HMM for target   (PCA space) ...
TRAIN: HMM feats shape=(3240, 3)
TEST: HMM feats shape=(810, 3)


In [14]:
np.save(os.path.join(results_dir_train, "X_all.npy"),      X_all_train_norm)
np.save(os.path.join(results_dir_train, "y_all.npy"),      y_all_train)
np.save(os.path.join(results_dir_train, "X_all_hmm.npy"),  X_all_hmm_train)

print("\nSaved TRAIN to ./results:")
print(" X_all.npy     ", X_all_train_norm.shape)
print(" y_all.npy     ", y_all_train.shape)
print(" X_all_hmm.npy ", X_all_hmm_train.shape)

np.save(os.path.join(results_dir_test, "X_all_test.npy"),      X_all_test_norm)
np.save(os.path.join(results_dir_test, "y_all_test.npy"),      y_all_test)
np.save(os.path.join(results_dir_test, "X_all_hmm_test.npy"),  X_all_hmm_test)

print("\nSaved TEST to ./test:")
print(" X_all_test.npy     ", X_all_test_norm.shape)
print(" y_all_test.npy     ", y_all_test.shape)
print(" X_all_hmm_test.npy ", X_all_hmm_test.shape)


Saved TRAIN to ./results:
 X_all.npy      (3240, 9009)
 y_all.npy      (3240,)
 X_all_hmm.npy  (3240, 3)

Saved TEST to ./test:
 X_all_test.npy      (810, 9009)
 y_all_test.npy      (810,)
 X_all_hmm_test.npy  (810, 3)
