In [46]:
import numpy as np
import mne
from scipy import signal


def delay_data(data, time_delay, sfreq):
    sample_shift = int(np.ceil(time_delay * sfreq))
    sample_shifts = np.arange(-sample_shift, sample_shift+1, 1)

    delayed_signals = []
    for shift in sample_shifts:
        delayed_signal = np.roll(data, shift, axis=1)
        if shift > 0:
            delayed_signal[:, :shift] = data[:, :shift]  # Zero out the initial part to avoid wrap-around
        elif shift < 0:
            delayed_signal[:, shift:] = data[:, shift:]  # Zero out the end part to avoid wrap-around
        delayed_signals.append(delayed_signal)

    delay_signals = np.concatenate(delayed_signals, axis=0)
    return delay_signals

# My way
def _correct_data(eeg_data, cwl_data, window_size, overlap):
    hanning_window = np.hanning(window_size)
    
    n_channels, n_times = eeg_data.shape
    eeg_corrected = np.zeros_like(eeg_data)
    weight_sum = np.zeros_like(eeg_data)
    step = int(window_size * (1 - overlap))
    starts = np.arange(0, n_times, step)

    for start in starts:
        end = min(start + window_size, n_times)
        actual_window_size = end - start
        cwl_segment = cwl_data[:, start:end] #* hanning_window[:actual_window_size]

        for ch in range(n_channels):
            eeg_segment = eeg_data[ch, start:end] #* hanning_window[:actual_window_size]
            # Regression
            coeffs = np.linalg.lstsq(cwl_segment.T, eeg_segment.T, )[0]
            correction = np.dot(coeffs.T, cwl_segment)
            corrected_segment = eeg_segment - correction
            # Apply Hanning window to the corrected segment
            corrected_segment *= hanning_window[:actual_window_size]
            eeg_corrected[ch, start:end] += corrected_segment
            weight_sum[ch, start:end] += hanning_window[:actual_window_size]
            
    # Normalize the corrected signal by the weight sum
    weight_sum[weight_sum == 0] = 1 # Avoid division by zero
    eeg_corrected /= weight_sum
    return eeg_corrected


# the MNE way
def __correct_data(eeg_data, cwl_data, window_size, overlap):
    hanning_window = np.hanning(window_size)
    
    n_channels, n_times = eeg_data.shape
    eeg_corrected = np.zeros_like(eeg_data)
    weight_sum = np.zeros_like(eeg_data)
    step = int(window_size * (1 - overlap))
    starts = np.arange(0, n_times, step)

    for start in starts:
        end = min(start + window_size, n_times)
        actual_window_size = end - start

        cwl_segment = cwl_data[:, start:end] #* hanning_window[:actual_window_size]
        ref_data = cwl_segment - np.mean(cwl_segment, axis=-1, keepdims=True)
        cov_ref = ref_data @ ref_data.T

        for ch in range(n_channels):
            eeg_segment = eeg_data[ch, start:end] #* hanning_window[:actual_window_size]
            # Regression
            eeg_segment_demean = eeg_segment - np.mean(eeg_segment, axis=-1, keepdims=True)
            eeg_segment_demean = eeg_segment_demean.reshape(1, -1)

            coef = np.linalg.solve(cov_ref, ref_data @ eeg_segment_demean.T).T[0]
            corrected_segment = eeg_segment - (coef @ ref_data)

            # Apply Hanning window to the corrected segment
            corrected_segment *= hanning_window[:actual_window_size]
            eeg_corrected[ch, start:end] += corrected_segment
            weight_sum[ch, start:end] += hanning_window[:actual_window_size]
            
    # Normalize the corrected signal by the weight sum
    weight_sum[weight_sum == 0] = 1 # Avoid division by zero
    eeg_corrected /= weight_sum
    return eeg_corrected
    
def cwl_correction_raw(raw, eeg_picks, cwl_picks, time_delay=21e-3, window_duration=4, overlap=0.5):
    # Get data
    sfreq = raw.info['sfreq']
    eeg_data = raw.get_data(picks=eeg_picks)
    cwl_data = raw.get_data(picks=cwl_picks)
    # Create delayed versions of the CWL data
    cwl_data = delay_data(cwl_data, time_delay, sfreq)
    # compute Hanning window
    window_size = int(np.ceil((window_duration * sfreq)))
    eeg_corrected = _correct_data(eeg_data, cwl_data, window_size, overlap)
    # Create new MNE Raw object
    info = mne.pick_info(raw.info, eeg_picks)
    raw_corrected = mne.io.RawArray(eeg_corrected, info)
    # Add CWL channels to the new Raw
    cwl_raw = raw.copy().pick(cwl_picks)
    raw_corrected.add_channels([cwl_raw], force_update_info=True)
    return raw_corrected

In [43]:
raw = mne.io.read_raw(r"C:\Users\victor.ferat\Documents\Soraya\EEG-MRI\P05_eyes_open_mrion.vhdr", preload=True)
raw.set_channel_types({'CWL1': 'misc', 'CWL2': 'misc', 'CWL3': 'misc', 'CWL4': 'misc', 'ECG': 'ecg'})
raw.resample(250)
raw.crop(0, 3*60)
raw.filter(0.5, 120)

Extracting parameters from C:\Users\victor.ferat\Documents\Soraya\EEG-MRI\P05_eyes_open_mrion.vhdr...


Setting channel info structure...
Reading 0 ... 1325899  =      0.000 ...   265.180 secs...


  raw.set_channel_types({'CWL1': 'misc', 'CWL2': 'misc', 'CWL3': 'misc', 'CWL4': 'misc', 'ECG': 'ecg'})


Filtering raw data in 1 contiguous segment
Setting up band-pass filter from 0.5 - 1.2e+02 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal bandpass filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Lower passband edge: 0.50
- Lower transition bandwidth: 0.50 Hz (-6 dB cutoff frequency: 0.25 Hz)
- Upper passband edge: 120.00 Hz
- Upper transition bandwidth: 5.00 Hz (-6 dB cutoff frequency: 122.50 Hz)
- Filter length: 1651 samples (6.604 s)



[Parallel(n_jobs=1)]: Done  17 tasks      | elapsed:    0.0s


0,1
Measurement date,"January 10, 2024 12:58:09 GMT"
Experimenter,Unknown
Participant,Unknown

0,1
Digitized points,Not available
Good channels,"31 EEG, 1 ECG, 4 misc"
Bad channels,
EOG channels,Not available
ECG channels,ECG

0,1
Sampling frequency,250.00 Hz
Highpass,0.50 Hz
Lowpass,120.00 Hz
Filenames,P05_eyes_open_mrion.eeg
Duration,00:03:00 (HH:MM:SS)


In [41]:
eeg_picks=mne.pick_types(raw.info, eeg=True)
eeg_picks

array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16,
       17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30])

In [47]:
raw_corrected = cwl_correction_raw(raw, eeg_picks=mne.pick_types(raw.info, eeg=True), cwl_picks=mne.pick_types(raw.info, misc=True), time_delay=21e-3, window_duration=4, overlap=0.5)

  coeffs = np.linalg.lstsq(cwl_segment.T, eeg_segment.T, )[0]


Creating RawArray with float64 data, n_channels=31, n_times=45001
    Range : 0 ... 45000 =      0.000 ...   180.000 secs
Ready.


In [31]:
cwl_picks=mne.pick_types(raw.info, misc=True)
eeg_picks=mne.pick_types(raw.info, eeg=True)
time_delay=21e-3
window_duration=4
overlap=0.5

raw_ = raw.copy().crop(0, 4)
raw_.resample(250)

sfreq = raw.info['sfreq']
eeg_data = raw_.get_data(picks=eeg_picks)
cwl_data = raw_.get_data(picks=cwl_picks)
# Create delayed versions of the CWL data
cwl_data = delay_data(cwl_data, time_delay, sfreq)
# compute Hanning window
window_size = int(np.ceil((window_duration * sfreq)))
hanning_window = np.hanning(window_size)

n_channels, n_times = eeg_data.shape
eeg_corrected = np.zeros_like(eeg_data)
weight_sum = np.zeros_like(eeg_data)
step = int(window_size * (1 - overlap))
starts = np.arange(0, n_times, step)

for start in starts:
    end = min(start + window_size, n_times)
    actual_window_size = end - start

    cwl_segment = cwl_data[:, start:end] #* hanning_window[:actual_window_size]
    ref_data = cwl_segment - np.mean(cwl_segment, axis=-1, keepdims=True)
    cov_ref = ref_data @ ref_data.T

    for ch in range(n_channels):
        eeg_segment = eeg_data[ch, start:end] #* hanning_window[:actual_window_size]
        # Regression
        eeg_segment_demean = eeg_segment - np.mean(eeg_segment, axis=-1, keepdims=True)
        eeg_segment_demean = eeg_segment_demean.reshape(1, -1)

        coef = np.linalg.solve(cov_ref, ref_data @ eeg_segment_demean.T).T[0]
        corrected_segment = eeg_segment - (coef @ ref_data)

        # Apply Hanning window to the corrected segment
        corrected_segment *= hanning_window[:actual_window_size]
        eeg_corrected[ch, start:end] += corrected_segment
        weight_sum[ch, start:end] += hanning_window[:actual_window_size]
        
# Normalize the corrected signal by the weight sum
weight_sum[weight_sum == 0] = 1 # Avoid division by zero
eeg_corrected /= weight_sum

# Create new MNE Raw object
info = mne.pick_info(raw_.info, eeg_picks)
raw_corrected = mne.io.RawArray(eeg_corrected, info)

Creating RawArray with float64 data, n_channels=31, n_times=1000
    Range : 0 ... 999 =      0.000 ...     3.996 secs
Ready.


In [48]:
raw_corrected.plot()

<mne_qt_browser._pg_figure.MNEQtBrowser at 0x1c5847ba1f0>

Channels marked as bad:
none


In [27]:
raw.times

array([0.000000e+00, 2.000000e-04, 4.000000e-04, ..., 2.651794e+02,
       2.651796e+02, 2.651798e+02])

In [10]:
cwl_segment.shape

(844, 20000)

In [11]:
eeg_segment.shape

(10000,)

In [12]:
cwl_data.shape

(844, 300001)

In [13]:
raw_.get_data(picks=cwl_picks).shape

(4, 300001)

In [14]:
sfreq

5000.0