In [23]:
%matplotlib qt
import numpy as np
import matplotlib.pyplot as plt
import mne

from mne.decoding import (
    GeneralizingEstimator,
    LinearModel,
    Scaler,
    SlidingEstimator,
    Vectorizer,
    cross_val_multiscore,
    get_coef,
)

from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import StratifiedKFold

Defining the triggers and have a look at their timing

In [None]:
epochs = mne.read_epochs("../sample/epochs/asjt-epo.fif.gz", preload=True)
epochs.pick(picks="eeg")
epochs.drop_bad(reject=dict(eeg=40e-6))
even_ids = epochs.event_id
even_ids.pop("New Segment/", None)
fig, ax = plt.subplots(1, 1, figsize=(10, 4), layout="tight")
mne.viz.plot_events(epochs.events, sfreq=1000, event_id=even_ids, axes=ax)
ax.get_legend().remove()
ax.spines[["right", "top"]].set_visible(False)

Reading /Users/payamsadeghishabestari/TinReg/test/../sample/epochs/asjt-epo.fif.gz ...
    Read a total of 2 projection items:
        EOG-eeg--0.200-0.200-PCA-01 (1 x 59) active
        EOG-eeg--0.200-0.200-PCA-02 (1 x 59) active
    Found the data of interest:
        t =   -1000.00 ...    1000.00 ms
        0 CTF compensation matrices available
Reading /Users/payamsadeghishabestari/TinReg/test/../sample/epochs/asjt-epo.fif.gz ...
    Read a total of 2 projection items:
        EOG-eeg--0.200-0.200-PCA-01 (1 x 59) active
        EOG-eeg--0.200-0.200-PCA-02 (1 x 59) active
    Found the data of interest:
        t =   -1000.00 ...    1000.00 ms
        0 CTF compensation matrices available
Not setting metadata
6000 matching events found
No baseline correction applied
Created an SSP operator (subspace dimension = 2)
2 projection items activated


  mne.viz.plot_events(epochs.events, sfreq=1000, event_id=even_ids, axes=ax)


In [None]:
## balance trials across the 4 classes in training
rnd_ids = [key for key in even_ids if key.endswith("rndm")]
eps_list = [epochs[rnd_id] for rnd_id in rnd_ids]
mne.epochs.equalize_epoch_counts(eps_list, method="mintime")
epochs_rnd = mne.concatenate_epochs(eps_list)

ord_ids = [key for key in even_ids if key.endswith("or")]
epochs_ord = epochs[ord_ids]

## compute covariance matrix from rnd epochs
cov = mne.compute_covariance(epochs_rnd, tmax=0.0)

## define epochs
ids = range(1, 5) 
epochs_rnd_std = epochs_rnd[[f"f{i}_std_rndm" for i in ids]]
epochs_rnd_tin = epochs_rnd[[f"f{i}_tin_rndm" for i in ids]]

epochs_ord_std = epochs_ord[[f"f{i}_std_or" for i in ids]]
epochs_ord_tin = epochs_ord[[f"f{i}_tin_or" for i in ids]]

del epochs_ord, epochs_rnd

Dropped 7 epochs: 0, 1, 2, 3, 4, 5, 6
Dropped 19 epochs: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18
Dropped 10 epochs: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9
Dropped 32 epochs: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31
Dropped 0 epochs: 
Dropped 25 epochs: 0, 1, 4, 155, 192, 246, 247, 252, 254, 279, 280, 297, 298, 313, 314, 315, 316, 317, 318, 357, 358, 359, 360, 364, 370
Dropped 37 epochs: 2, 10, 11, 156, 157, 158, 166, 172, 173, 174, 182, 183, 185, 261, 262, 264, 301, 302, 304, 317, 319, 320, 321, 323, 324, 328, 354, 355, 357, 367, 368, 369, 373, 375, 381, 382, 392
Dropped 6 epochs: 351, 354, 355, 356, 358, 362


  epochs_rnd = mne.concatenate_epochs(eps_list)


Not setting metadata
2864 matching events found
Applying baseline correction (mode: mean)
Created an SSP operator (subspace dimension = 2)


In [21]:
epochs_rnd_std.crop(tmin=-0.250, tmax=0.500)
epochs_rnd_std.resample(100)

Unnamed: 0,General,General.1
,MNE object type,EpochsArray
,Measurement date,2025-07-29 at 10:40:38 UTC
,Participant,Unknown
,Experimenter,Unknown
,Acquisition,Acquisition
,Total number of events,1432
,Events counts,f1_std_rndm: 358  f2_std_rndm: 358  f3_std_rndm: 358  f4_std_rndm: 358
,Time range,-0.250 – 0.490 s
,Baseline,-1.000 – 0.000 s
,Sampling frequency,100.00 Hz


train clfs on random trials

In [None]:
n_splits = 5

## create masks 
post_mask = epochs_rnd_std.times >= 0
pre_mask  = epochs_rnd_std.times < 0
X = epochs_rnd_std.get_data()
y = epochs_rnd_std.events[:, 2]

X_post = X[:, :, post_mask]
X_pre = X[:, :, pre_mask]

## define and fit generalization object
clf = make_pipeline(
                    Scaler(epochs_rnd_std.info),
                    Vectorizer(),           
                    LinearModel(LinearDiscriminantAnalysis(solver="svd"))
                    )
gen = GeneralizingEstimator(clf, scoring="accuracy", n_jobs=1, verbose=True)
cv = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)

## post -> post
scores_post_post = cross_val_multiscore(gen, X_post, y, cv=cv, n_jobs=1)

## post -> pre
scores_post_pre = []
for train_idx, test_idx in cv.split(X_post, y):
    gen.fit(X_post[train_idx], y[train_idx]) # train on post
    score = gen.score(X_pre[test_idx], y[test_idx]) # test on pre
    scores_post_pre.append(score)

scores_post_pre = np.array(scores_post_pre)

## compute coefficients
coef_filt = get_coef(gen, "filters_", inverse_transform=False) # (n_chs, n_class, n_time)
coef_patt = get_coef(gen, "patterns_", inverse_transform=True)[0] # (n_chs, n_class, n_time)

## save in numpy array  
np.save("", score ...)


## test on ordered tones
X_ord = epochs_ord_std.get_data()
y_ord = epochs_ord_std.events[:, 2]

times_ord = epochs_ord_std.times
post_mask_ord = epochs_ord_std.times >= 0
pre_mask_ord  = epochs_ord_std < 0

X_ord_post = X_ord[:, :, post_mask_ord]
X_ord_pre  = X_ord[:, :, pre_mask_ord]

gen.fit(X_post, y) # train again on random

## scores and coeffs
score_ord_post = gen.score(X_ord_post, y_ord)
score_ord_pre = gen.score(X_ord_pre, y_ord)

coef_filt_ord = get_coef(gen, "filters_", inverse_transform=False) # (n_chs, n_class, n_time)
coef_patt_ord = get_coef(gen, "patterns_", inverse_transform=True)[0] # (n_chs, n_class, n_time)

## save in numpy array  
np.save("", score ...)

  0%|          | Fitting GeneralizingEstimator : 0/50 [00:00<?,       ?it/s]

  0%|          | Scoring GeneralizingEstimator : 0/2500 [00:00<?,       ?it/s]

  0%|          | Fitting GeneralizingEstimator : 0/50 [00:00<?,       ?it/s]

  0%|          | Scoring GeneralizingEstimator : 0/2500 [00:00<?,       ?it/s]

  0%|          | Fitting GeneralizingEstimator : 0/50 [00:00<?,       ?it/s]

  0%|          | Scoring GeneralizingEstimator : 0/1250 [00:00<?,       ?it/s]

  0%|          | Fitting GeneralizingEstimator : 0/50 [00:00<?,       ?it/s]

  0%|          | Scoring GeneralizingEstimator : 0/1250 [00:00<?,       ?it/s]

now bring it into source space

In [None]:
def run_source_analysis(coef_patt, epochs):

    evokeds = []
    for i_cls in range(4):
        evokeds.append(
                        mne.EvokedArray(coef_patt[:, i_cls, :],
                                        epochs.info,
                                        tmin=epochs.times[0])
                        )

    noise_cov = compute_covariance(epochs, tmax=0.0)
    kwargs = {
                "subject": "fsaverage",
                "subjects_dir": None
            }

    fs_dir = fetch_fsaverage()
    trans = fs_dir / "bem" / "fsaverage-trans.fif"
    src = fs_dir / "bem" / "fsaverage-ico-5-src.fif"
    bem = fs_dir / "bem" / "fsaverage-5120-5120-5120-bem-sol.fif"

    fwd = make_forward_solution(
                                epochs.info,
                                trans=trans,
                                src=src,
                                bem=bem,
                                meg=False,
                                eeg=True
                                )
    inv = make_inverse_operator(
                                epochs.info,
                                fwd,
                                noise_cov
                                )
    
    for evoked in evokeds:
        stc = apply_inverse(
                            evoked, 
                            inv,
                            lambda2=1.0 / 9.0,
                            method="dSPM",
                            pick_ori="normal"
                            )
    
    del fwd, inv
    
    stc.save("...")

test clfs on ordered trials

In [None]:
X_ord = epochs_ord_std.get_data()
y_ord = epochs_ord_std.events[:, -1]
times_ord = epochs_ord_std.times

post_mask_ord = (times_ord >= 0) & (times_ord <= 0.5)
pre_mask_ord  = (times_ord >= -0.4) & (times_ord < 0)

X_ord_post = X_ord[:, :, post_mask_ord]
X_ord_pre  = X_ord[:, :, pre_mask_ord]

gen.fit(X_post, y) # from random

score_ord_post = gen.score(X_ord_post, y_ord)
print("Random-post → Ordered-post accuracy:", score_ord_post.mean())

# Test on ordered pre
score_ord_pre = gen.score(X_ord_pre, y_ord)
print("Random-post → Ordered-pre accuracy:", score_ord_pre.mean())

In [None]:
n_splits = 5

## create masks 
post_mask = epochs_rnd_std.times >= 0
pre_mask  = epochs_rnd_std.times < 0
X = epochs_rnd_std.get_data()
y = epochs_rnd_std.events[:, 2]

X_post = X[:, :, post_mask]
X_pre = X[:, :, pre_mask]

## define and fit generalization object
clf = make_pipeline(
                    Scaler(epochs_rnd_std.info),
                    Vectorizer(),           
                    LinearModel(LinearDiscriminantAnalysis(solver="svd"))
                    )
gen = GeneralizingEstimator(clf, scoring="accuracy", n_jobs=1, verbose=True)
cv = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)

## post -> ord
scores_post_post = cross_val_multiscore(gen, X_post, y, cv=cv, n_jobs=1)

## post -> pre
scores_post_pre = []
for train_idx, test_idx in cv.split(X_post, y):
    gen.fit(X_post[train_idx], y[train_idx]) # train on post
    score = gen.score(X_pre[test_idx], y[test_idx]) # test on pre
    scores_post_pre.append(score)

scores_post_pre = np.array(scores_post_pre)

## compute coefficients
coef_filt = get_coef(gen, "filters_", inverse_transform=False) # (n_chs, n_class, n_time)
coef_patt = get_coef(gen, "patterns_", inverse_transform=True)[0] # (n_chs, n_class, n_time)

## save in numpy array  
np.save("", score ...)

plotting

In [19]:
fig, ax = plt.subplots()
ax.plot(epochs_rnd_std.times, np.diag(avg_scores), label="score")
ax.axhline(0.25, color="k", linestyle="--", label="chance")
ax.set_xlabel("Times")
ax.set_ylabel("AUC")
ax.legend()
ax.axvline(0.0, color="k", linestyle="-")
ax.set_title("Decoding random prestimulus")

Text(0.5, 1.0, 'Decoding random prestimulus')

In [20]:
fig, ax = plt.subplots(1, 1)
im = ax.imshow(
    avg_scores,
    interpolation="lanczos",
    origin="lower",
    cmap="RdBu_r",
    extent=epochs_rnd_std.times[[0, -1, 0, -1]],
    vmin=0.0,
    vmax=0.4,
)
ax.set_xlabel("Testing Time (s)")
ax.set_ylabel("Training Time (s)")
ax.set_title("Temporal generalization")
ax.axvline(0, color="k")
ax.axhline(0, color="k")
cbar = plt.colorbar(im, ax=ax)
cbar.set_label("AUC")