# Get HDF5 FILE(inclusing eeg data and aligened envelope)

In [None]:
# Imports
import numpy as np
from scipy.signal import correlate, correlation_lags
import librosa
import mne
from matplotlib import pyplot as plt
%matplotlib qt

from mne.preprocessing import ICA
from mne_icalabel import label_components
from matplotlib.cm import get_cmap

import numpy as np
import librosa as lb
from scipy import signal
from typing import Union

import numpy as np
import h5py
from scipy.signal import resample


# 1. Data Loading and preparation

In [None]:
root_dir = r'D:\A-BCIÂ£∞Èü≥ËÑëÊú∫Êé•Âè£\4'

eeg_files = sorted(glob.glob(os.path.join(root_dir, "Data", "Session*", "*.vhdr")))
audio_files = sorted(glob.glob(os.path.join(root_dir, 'audio', 'M*.wav')))
out_file = os.path.join(root_dir, 'my_backward_dataset1.hdf5')

print(f"find {len(eeg_files)} EEG Session files")
print(f"find {len(audio_files)} Audio files")

In [None]:
#Loading audio data for alignment
fs = 1000  # Hz

# Load audio wav-file
audios = []

for audionr in range(1, 16):

    # Lade und resample direkt auf 1000 Hz
    data, sr = librosa.load(audio_files, sr=fs)  # resample the audio data from 48kHz to 1kHz to align it with EEG
    
    audios.append(data)

In [None]:
# 32-channel mapping (to rename the channels)
channel_map = {
    '1': 'Fp1',
    '2': 'Fp2',
    '3': 'F7',
    '4': 'F3',
    '5': 'Fz',
    '6': 'F4',
    '7': 'F8',
    '8': 'FC5',
    '9': 'FC1',
    '10': 'FC2',
    '11': 'FC6',
    '12': 'T7',
    '13': 'C3',
    '14': 'Cz',
    '15': 'C4',
    '16': 'T8',
    '17': 'CP5',
    '18': 'CP1',
    '19': 'CP2',
    '20': 'CP6',
    '21': 'P7',
    '22': 'P3',
    '23': 'Pz',
    '24': 'P4',
    '25': 'P8',
    '26': 'POz',
    '27': 'O1',
    '28': 'Oz',
    '29': 'O2',
    '30': 'FT9',
    '31': 'FT10',
    '32': 'TP9'
}

# Set montage
montage = mne.channels.make_standard_montage('standard_1020')

In [None]:
#Alignment EEG (StimTrak) and Audio

## Pad shorter signal with zeroes
def pad_zeros_right(s, padding_length):
    return np.pad(s, (0, padding_length), mode='constant', constant_values=0)

def padding(a, b, pad_function=None):
    if len(a) != len(b) and pad_function is None:
        raise ValueError(f"len(a)={len(a)} != len(b)={len(b)} and no pad_function provided")
    elif len(a) != len(b):
        if len(a) < len(b):
            a = pad_function(a, len(b) - len(a))
        else:
            b = pad_function(b, len(a) - len(b))
    return a, b

def crosscorrelation(ref, sig):
    # ref = StimTrak/EEG-reference, sig = audio
    c = correlate(ref, sig, mode='full')
    lags = correlation_lags(len(ref), len(sig), mode='full')
    return c, lags

# 2. Get audio envelopes

In [None]:
import numpy as np
import librosa as lb
from scipy import signal
from typing import Union

# helper functions
def get_envelope_from_hilbert(data: np.ndarray):
    analytic_signal = signal.hilbert(data)
    return np.abs(analytic_signal)

def apply_butterworth_bandpass(data, order=4, cutoff_low=1, cutoff_high=20, fs=1000, axis=-1):
    sos = signal.butter(N=order, Wn=[cutoff_low, cutoff_high], 
                        btype="bandpass", fs=fs, output="sos")
    return signal.sosfiltfilt(sos, data, axis=axis)

# ========== audio preprocessing ==========
target_fs = 1000
envelopes_list = []

for i in range(1, 16):

    # 1. load audio
    stimuli, sr_orig = lb.load(audio_files, sr=None, mono=True)

    # 2. get envelope
    envelope = get_envelope_from_hilbert(stimuli)

    # 3. filter delta band
    delta_band_high = apply_butterworth_bandpass(
        envelope, order=4, cutoff_low=1, cutoff_high=4, fs=sr_orig
    )

    # 4. change sampling rate to 1000 Hz
    num_samples_1khz = int(len(delta_band_high) * target_fs / sr_orig)
    delta_band_1khz = signal.resample(delta_band_high, num_samples_1khz)

    # 5. finnal list
    envelopes_list.append(delta_band_1khz)


In [None]:
# --- 1. initalization ---

# delate old file 
if os.path.exists(out_file):
    os.remove(out_file) 
    print(f"previous file {out_file} has been delatedÔºåcreating new file.")

# start Subject ID
current_subject_id = 1

# --- 2. itterate Session ---
for sess_id, eeg_path in enumerate(eeg_files, start=1):
    print(f"\n=== processing Session {sess_id} : {os.path.basename(eeg_path)} ===")

    # readding data
    raw = mne.io.read_raw_brainvision(eeg_path, preload=True)
    
    # get StimTrak (assump every session need reprocessing)
    raw_data = raw.get_data()
    stimtrak = raw_data[-1, :]
    speech_stimulus_eeg = stimtrak

    # --- Timeshifts computation (current Session) ---
    lag_samples_list = []

    print("  Computating Audio Lags...")
    for i, audio in enumerate(audios, start=1):
        audio_p, stim_p = padding(audio, speech_stimulus_eeg, pad_function=pad_zeros_right)
        corr, lags = crosscorrelation(stim_p, audio_p)
        peak_idx = np.argmax(np.abs(corr)) 
        lag_samples = lags[peak_idx]
        # lag_seconds = lag_samples / fs
        lag_samples_list.append(lag_samples)
        # timeshift_s_list.append(lag_seconds)
    
    # Get Raw subject
    subj1_ch = [str(i) for i in range(1, 33)]
    subj2_ch = [str(i) for i in range(33, 65)]
    subj3_ch = [str(i) for i in range(65, 97)]

    raw_list = [
        raw.copy().pick_channels(subj1_ch),
        raw.copy().pick_channels(subj2_ch),
        raw.copy().pick_channels(subj3_ch)
    ]

    # --- 3. itterate 3 subjct in this Session ---
    for sub_idx, raw_sub in enumerate(raw_list):
        
        print(f" Precessing Subject {current_subject_id} ({sub_idx+1} in session {sess_id}...")

        # Set Montage & Rename
        raw_sub.rename_channels(channel_map)
        raw_sub.set_montage(montage)

        # --- preprocessing & ICA  ---
        # Highpass-filter for ICA
        raw_ica = raw_sub.copy().filter(1., 100., fir_design='firwin', verbose=False)
        raw_ica.set_eeg_reference('average', projection=False, verbose=False)

        # ICA
        ica = mne.preprocessing.ICA(n_components=15, method='infomax', fit_params=dict(extended=True), random_state=97, max_iter='auto')
        ica.fit(raw_ica, verbose=False)

        # ICLabel
        labels = label_components(raw_ica, ica, method='iclabel')
        
        artifact_comps = [i for i, label in enumerate(labels['labels']) if label in ['eye blink', 'muscle artifact']]
        ica.exclude = artifact_comps

        # Apply ICA
        raw_clean = ica.apply(raw_sub.copy(), verbose=False)

        # Notch & Filter
        raw_clean.notch_filter(freqs=50, verbose=False)
        delta = raw_clean.copy().filter(1., 4., fir_design='firwin', verbose=False)

        # --- Data Slicing ---
        eeg_mat = delta.get_data()
        eeg_trials = []
        env_trials = []

        for audio, lag in zip(envelopes_list, lag_samples_list):
            L = len(audio)
            start = int(lag)
            stop = start + L
            
            # edge checking, i.c.o crashing
            if stop > eeg_mat.shape[1]:
                print(f"Warning: Trial beyond data range for Sub {current_subject_id}")
                break 
                
            eeg_trial = eeg_mat[:, start:stop]
            env_trial = audio
            
            eeg_trials.append(eeg_trial)
            env_trials.append(env_trial)

        # --- downsampling ---
        current_fs = 1000
        target_fs = 128
        eeg_trials_low_fs = []
        env_trials_low_fs = []
        
        for eeg_t in eeg_trials:
            n_samples_new = int(eeg_t.shape[1] * target_fs / current_fs)
            eeg_low = np.zeros((eeg_t.shape[0], n_samples_new))
            for ch in range(eeg_t.shape[0]):
                eeg_low[ch, :] = resample(eeg_t[ch, :], n_samples_new)
            eeg_trials_low_fs.append(eeg_low)

        for env_t in env_trials:
            n_samples_new = int(env_t.shape[0] * target_fs / current_fs)
            env_low = resample(env_t, n_samples_new)
            env_trials_low_fs.append(env_low)

        # --- 4. Writing HDF5 file ---
        # Model has to be "a" (append)Ôºåotherwise will cover previous data
        with h5py.File(out_file, "a") as f:
            n_trials = len(eeg_trials_low_fs)
            
            print(f"      -> Writing Subject {current_subject_id} to HDF5...")
            
            for t in range(n_trials):
                trial_id = t + 1
                
                eeg_data = np.asarray(eeg_trials_low_fs[t])
                env_data = np.asarray(env_trials_low_fs[t])

                eeg_path = f"eeg/{current_subject_id}/{trial_id}"
                
                if eeg_path in f:
                    del f[eeg_path]

                eeg_ds = f.create_dataset(eeg_path, data=eeg_data)

                stim_code = f"s{current_subject_id}_t{trial_id}"
                eeg_ds.attrs["stimulus"] = stim_code

                # Envelope
                env_group_path = f"stimulus_files/{stim_code}"
                if env_group_path in f:
                     del f[env_group_path]
                
                stim_group = f.require_group(env_group_path)
                stim_group.create_dataset("attended_env", data=env_data)

        # --- 5. itteration para ---
        current_subject_id += 1

print(f"\nProcessed all session, file was saved as: {out_file}")

# Decoding acoustic stimuli from EEG-data using backward models

Show the basics of linear backward models and how can reconstruct the presented audio books from brain signals.

Again we are working with pre-processed and aligned data. This includes Filtering of EEG data and pre-calculated speech envelopes. Both 1-8Hz.

# 1. Import necessary libraries

In [1]:
root_dir = "/Users/zhiyingliu/Desktop/FAU/3-semester/audio-computer"

import os
import sys
import sklearn
if root_dir not in sys.path:
    sys.path.append(root_dir)
from models.ridge import Ridge
import h5py
import numpy as np
import seaborn as sns
import pingouin as pg

import matplotlib.pyplot as plt

# 2. Define constants and paths

In [2]:
data_file = os.path.join(root_dir, 'my_backward_dataset.hdf5')
Fs = 128

# 3. Load Data

For more information on the data: look into the forward_modelling.ipynb Notebook

In [3]:
initial_subjects = list(range(1, 16))

trials = list(range(1,16))

In [4]:
def load_subject_data(subject, ica = False):
    subject_eeg = []
    subject_env = []
    with h5py.File(data_file, 'r') as f:
        for trial in trials: 
            eeg_path = f'eeg/{subject}/{trial}'
            # eeg_ica_path = f'eeg_ica/{subject}/{trial}'
            stim_code = f[eeg_path].attrs['stimulus']
            env_attended_path = f'stimulus_files/{stim_code}/attended_env'
            # if ica:
            #     eeg = f[eeg_ica_path][:]
            # else:
            eeg = f[eeg_path][:]
            # We drop the last two channels which are AUX channels (not EEG)
            eeg = eeg[:31,:]
            env_attended = f[env_attended_path][:]
            subject_eeg.append(np.array(eeg))
            subject_env.append(np.array(env_attended))
    return subject_eeg, subject_env

In [5]:
def get_data_from_indices(eeg, env, indices, test = False):

    # Extract data for the given indices
    eeg_data = [eeg[ind-1] for ind in indices]
    env_data = [env[ind-1] for ind in indices]

    if not test:
        # Concatenate all trials into single arrays
        eeg_data_array = np.hstack([eeg_data[i] for i in range(len(eeg_data))])
        env_data_array = np.hstack([env_data[i] for i in range(len(env_data))])
        env_data_array = env_data_array[np.newaxis,:]  # Add channel dimension
    else:
        eeg_data_array = eeg_data
        env_data_array = [np.array(env_data[i])[np.newaxis,:] for i in range(len(env_data))]
        #dont concate every trialÔºåcasue later evaluation per trial

    return eeg_data_array, env_data_array


In [6]:
def get_data_windows(eeg, env, window_len, Fs, null_model=False):
    """
    Returns Windows of eeg and env to evaluate

    Args:
        eeg (list): eeg_data each element is (n_channels, n_samples)
        env (list): env_data each element is (1, n_samples,)
        window_len (float): window_len in seconds
        Fs (float): sampling frequency

    Returns:
        tuple: eeg_windows, env_windows
    """
    eeg_windows = []
    env_windows = []
    step_size = window_len * Fs

    for i in range(len(eeg)):
        eeg_trial = eeg[i]
        env_trial = env[i]

        if null_model:
            max_shift = int(env_trial.shape[1] * 0.8)
            offset = np.random.randint(int(env_trial.shape[1]*0.2), max_shift)
            env_trial = np.roll(env_trial, offset, axis=1)
        
        num_samples = eeg_trial.shape[1]
        for start in range(0, num_samples - step_size + 1, step_size):
            end = start + step_size
            eeg_windows.append(eeg_trial[:, start:end])
            env_windows.append(env_trial[:, start:end])
    return eeg_windows, env_windows

# 4. üóÇÔ∏è Split Trials into Train, Validation, and Test Sets

In [7]:
from sklearn import model_selection
from scipy.stats import pearsonr
import pandas as pd
%matplotlib qt

initial_subjects = list(range(301, 316))
trials = list(range(1,16))

window_lengths = [1, 2, 5, 10, 20] 


alphas = np.logspace(-7, -4, 15)
tmin, tmax = -0.25, 0.0
start_lag = int(np.floor(tmin * Fs))
end_lag = int(np.ceil(tmax * Fs))
all_window_results = []

# === Outer loop: Iterate through all window sizes ===
for win_len in window_lengths:
    print(f"\n======== ‚öôÔ∏è Processing window size: {win_len} s ========")
    # initialization score list
    all_subjects_accuracy = [] 
    all_test_scores_null = []
    
    # --- Main loop: iterate every sujects ---
    for subject_id in initial_subjects:
        print(f"==================================================")
        print(f"üöÄ Processing participateÔºö{subject_id}")
        print(f"==================================================")
        
        # === 1. Data loading and segmentation ===
        
        # load current subject
        eeg, env = load_subject_data(subject_id, ica=False) 
        print(f"total trial number about {subject_id}: {len(eeg)}")
        
        #inter-trial split
        train_ind, test_ind = sklearn.model_selection.train_test_split(trials, test_size=0.4, random_state=42)
        val_ind, test_ind = sklearn.model_selection.train_test_split(test_ind, test_size=0.5, random_state=42)
        
        #get data
        eeg_train, env_train = get_data_from_indices(eeg, env, train_ind)
        eeg_val, env_val = get_data_from_indices(eeg, env, val_ind)
        eeg_test, env_test = get_data_from_indices(eeg, env, test_ind, test=True)
                
        # if train set is none
        if eeg_train.size == 0 or env_train.size == 0:
            print(f"üö® waring: Subject {subject_id} training set is none (size=0)ÔºÅignor this subject„ÄÇ")
            continue # 
            
        print(f"Train Shapes: EEG={eeg_train.shape}, ENV={env_train.shape}")
        
        # === 2. Model initialization and fitting ===
        ridge = Ridge(alpha=alphas, start_lag=start_lag, end_lag=end_lag)
        # ridge.fit(eeg_train.T, env_train.T)
        
        # b. fitting 
        try:
            ridge.fit(eeg_train.T, env_train.T)
        except Exception as e:
            print(f"‚ùå Faulse: Subject {subject_id} model training fail: {e}")
            continue
    
        # === 3. validation choose optimal alpha ===
        scores = ridge.model_selection(eeg_val.T, env_val.T)
        
        # get alpha
        best_alpha_idx = np.argmax(scores)
        best_alpha = alphas[best_alpha_idx]
        
        # ----------------------------------------------------------
        # üîë keypoint: refit model after setting optimal normalization parameter
        # ----------------------------------------------------------
        ridge.alpha = best_alpha
        try:
            ridge.fit(eeg_train.T, env_train.T) 
        except Exception as e:
            print(f"‚ùå Faulse: Subject {subject_id} fall to fit: {e}")
            continue
        # ----------------------------------------------------------

        # c. Final test
        # chunk test data into windows
        eeg_test_windows, env_test_windows = get_data_windows(eeg_test, env_test, window_len=win_len, Fs=Fs)
        
        # Evaluate on the test windows
    
        test_scores = []
        for i in range(len(eeg_test_windows)):
            X = eeg_test_windows[i].T
            y_true = env_test_windows[i]
            y_pred = ridge.predict(X)   # shape: (samples,)
            r, _ = pearsonr(y_pred.flatten(), y_true.flatten())
            
            test_scores.append(r)
        
        # === 3. Result storage ===
        # append current subject accuracy to list
        all_subjects_accuracy.append(test_scores)
    
        # === 4. Noise Control === 
        # Generate null distribution by shifting env data
        env_test_windows_null = get_data_windows(eeg_test, env_test, window_len=win_len, Fs=Fs, null_model=True)[1]
        null_scores = []
        for i in range(len(eeg_test_windows)):
            X = eeg_test_windows[i].T
            y_null_true = env_test_windows_null[i] 
            
            y_pred = ridge.predict(X) 
            r_null, _ = pearsonr(y_pred.flatten(), y_null_true.flatten())
            null_scores.append(r_null)
        test_scores_null_a = np.array(null_scores).flatten()  #make sure it is a clean 1D array to append 
        all_test_scores_null.append(test_scores_null_a)
        print(f"‚úÖ participate {subject_id} done„ÄÇ")

    # ----------------------------------------------------
            # otter loop (current win_len)
    # ----------------------------------------------------
    
    # 1. flatten all score
    all_scores_flattened = np.concatenate(all_subjects_accuracy)
    all_null_scores_flattened = np.concatenate(all_test_scores_null)
    
    # 2. mean and std of all subject at current window size
    mean_r = np.mean(all_scores_flattened)
    std_r = np.std(all_scores_flattened)
    
    # 3. (Wilcoxon Test)
    wilcoxon_results = pg.wilcoxon(all_scores_flattened, all_null_scores_flattened, alternative='greater')
    p_val = wilcoxon_results['p-val'].values[0]

    print(f"‚úÖ window {win_len} s -> mean r: {mean_r:.3f}, std: {std_r:.3f}, P value: {p_val:.2e}")
    
    # 4. store result to plot
    all_window_results.append({
        'window_len': win_len,
        'mean_r': mean_r,
        'std_r': std_r,
        'p_val': p_val
    })
    
    # ------------------------------------------
    # --------------visualization--------------
    # ------------------------------------------
    if win_len == window_lengths[4]: # control windowsize only for ws=20s
        print("\n--- Plot Test Scores vs Null Distribution (for model validation) ---")
        
        # transfer flatten array to Pandas DataFrame
        data = {
            'Reconstruction Score (r)': np.concatenate([all_scores_flattened, all_null_scores_flattened]),
            'Distribution Type': (
                ['Test Scores'] * len(all_scores_flattened) + 
                ['Null Scores'] * len(all_null_scores_flattened)
            )
        }
        df = pd.DataFrame(data)
        
        fig, ax = plt.subplots(figsize=(7,5))
        sns.boxplot(x='Distribution Type', y='Reconstruction Score (r)', data=df, ax=ax, palette=['#1f77b4', '#ff7f0e'])
        ax.axhline(0, color='k', linestyle='dashed', linewidth=1) 
        
        # ax.set_title(f'Test Scores vs Null Distribution ({win_len}s Window)', fontsize=14)
        ax.set_xlabel('')
        ax.set_ylabel('Reconstruction Score (r)')
        
        plt.tight_layout()
        plt.savefig("test_scores.png", dpi=300, bbox_inches="tight")
        plt.savefig("test_scores.pdf", bbox_inches="tight")   
        
        plt.show()
        
        # print p value
        pg.wilcoxon(all_scores_flattened, all_null_scores_flattened)

# ----------------------------------------------------
# final stepÔºöplot comparation of Window Size
# ----------------------------------------------------
df_window = pd.DataFrame(all_window_results)

fig, ax = plt.subplots(figsize=(8, 6))

plt.figure(figsize=(6,4))

ax.errorbar(df_window['window_len'], df_window['mean_r'], 
            yerr=df_window['std_r'], fmt='-o', color='b', capsize=5, label='Mean $r \pm 1$ SD')

ax.set_xscale('log')

ax.set_xlabel('Window Length (seconds, log scale)')
ax.set_ylabel('Mean Reconstruction Score (r)')
# ax.set_title('Effect of Window Length on Reconstruction Score', fontsize=16)

ax.axhline(0, color='k', linestyle='dashed', linewidth=1)

ax.set_xticks(df_window['window_len'])
ax.set_xticklabels(df_window['window_len']) 

plt.grid(True, which="both", ls="--", alpha=0.5)
plt.legend()

plt.tight_layout()
plt.savefig("test_scores.png", dpi=300, bbox_inches="tight")
plt.savefig("test_scores.pdf", bbox_inches="tight")

plt.show()

print("\n--- Statistical summary of all window lengths ---")
print(df_window)


üöÄ Ê≠£Âú®Â§ÑÁêÜÂèÇ‰∏éËÄÖÔºö301
Âä†ËΩΩ 301 ÂêéÁöÑÊÄª Trial Êï∞: 15
Train Shapes: EEG=(31, 121570), ENV=(1, 121570)
fitting entered
Checking inputs...
Formatting data matrix...
Calculating coefficients...
fitting entered
Checking inputs...
Formatting data matrix...
Calculating coefficients...
‚úÖ ÂèÇ‰∏éËÄÖ 301 Â§ÑÁêÜÂÆåÊàê„ÄÇ
üöÄ Ê≠£Âú®Â§ÑÁêÜÂèÇ‰∏éËÄÖÔºö302
Âä†ËΩΩ 302 ÂêéÁöÑÊÄª Trial Êï∞: 15
Train Shapes: EEG=(31, 121570), ENV=(1, 121570)
fitting entered
Checking inputs...
Formatting data matrix...
Calculating coefficients...
fitting entered
Checking inputs...
Formatting data matrix...
Calculating coefficients...
‚úÖ ÂèÇ‰∏éËÄÖ 302 Â§ÑÁêÜÂÆåÊàê„ÄÇ
üöÄ Ê≠£Âú®Â§ÑÁêÜÂèÇ‰∏éËÄÖÔºö303
Âä†ËΩΩ 303 ÂêéÁöÑÊÄª Trial Êï∞: 15
Train Shapes: EEG=(31, 121570), ENV=(1, 121570)
fitting entered
Checking inputs...
Formatting data matrix...
Calculating coefficients...
fitting entered
Checking inputs...
Formatting data matrix...
Calculating coefficients...
‚úÖ ÂèÇ‰∏éËÄÖ 303 Â§ÑÁêÜÂÆåÊàê„ÄÇ
üöÄ Ê≠£Âú®Â§ÑÁ


Passing `palette` without assigning `hue` is deprecated and will be removed in v0.14.0. Assign the `x` variable to `hue` and set `legend=False` for the same effect.

  sns.boxplot(x='Distribution Type', y='Reconstruction Score (r)', data=df, ax=ax, palette=['#1f77b4', '#ff7f0e'])



--- ÊâÄÊúâÁ™óÂè£ÈïøÂ∫¶ÁöÑÁªüËÆ°ÊëòË¶Å ---
   window_len    mean_r     std_r         p_val
0           1  0.080486  0.335231  1.336978e-28
1           2  0.086346  0.241676  2.458003e-38
2           5  0.085677  0.156873  3.402521e-30
3          10  0.085912  0.114541  7.483707e-26
4          20  0.085357  0.089676  6.150816e-19


  plt.legend()


In [8]:
pg.wilcoxon(all_scores_flattened, all_null_scores_flattened)

Unnamed: 0,W-val,alternative,p-val,RBC,CLES
Wilcoxon,4972.0,two-sided,1.230163e-18,0.656155,0.746493


# 10. ‚è±Ô∏è Study the Effect of Window Size

Here, we want to see how different window sizes (or "chunk lengths") affect the reconstruction scores.



Intuitively, a longer window might provide more information and lead to a better score, but it also results in fewer samples to average over. A shorter window gives us more samples, but each one might be less reliable.

To find the "sweet spot," we will create and evaluate test sets using several different window lengths.

In [9]:
window_lengths = [1, 2, 5, 10, 20]

In [10]:
# 1. Get a list of (eeg, env) pairs for each window length
windowed_data = [get_data_windows(eeg_test, env_test, window_len=w, Fs=Fs) for w in window_lengths]

# 2. "Unzip" the list of pairs into two separate lists
eeg_diff_windows, env_diff_windows = zip(*windowed_data)

In [11]:
windowed_mean_scores = []
windowed_std_scores = []
for i, w in enumerate(window_lengths):
    eeg_windows = eeg_diff_windows[i]
    env_windows = env_diff_windows[i]
    test_scores_w = [ridge.score(np.array(eeg_windows[j]).T, np.array(env_windows[j]).T) for j in range(len(eeg_windows))]
    test_scores_w = np.array(test_scores_w).flatten()

    windowed_mean_scores.append(np.mean(test_scores_w))
    windowed_std_scores.append(np.std(test_scores_w))

    print(f'Window Length: {w} sec - Mean Reconstruction Score: {np.mean(test_scores_w):.4f} ¬± {np.std(test_scores_w):.4f}')

Window Length: 1 sec - Mean Reconstruction Score: 0.0618 ¬± 0.3161
Window Length: 2 sec - Mean Reconstruction Score: 0.0639 ¬± 0.2155
Window Length: 5 sec - Mean Reconstruction Score: 0.0650 ¬± 0.1366
Window Length: 10 sec - Mean Reconstruction Score: 0.0632 ¬± 0.0848
Window Length: 20 sec - Mean Reconstruction Score: 0.0638 ¬± 0.0743


In [12]:
fig, ax = plt.subplots(figsize=(5,4))
ax.errorbar(window_lengths, windowed_mean_scores, yerr=windowed_std_scores, marker='o', capsize=5)
ax.set_xlabel('Window Length (seconds)')
ax.set_ylabel('Mean Reconstruction Score (r)')
ax.set_title('Effect of Window Length on Reconstruction Score', fontsize=14)

Text(0.5, 1.0, 'Effect of Window Length on Reconstruction Score')