# EEG Quality Control

# Imports

In [None]:
import matplotlib.pyplot as plt
import mne
import numpy as np
import pandas as pd
import pyprep
import pyxdf
from utils import *
from scipy.signal import welch
import warnings
warnings.filterwarnings("ignore")

## Get Data

In [None]:
xdf_filename = '/Users/bryan.gonzalez/CUNY_subs/sub-P5029423/sub-P5029423_ses-S001_task-CUNY_run-001_mobi.xdf'
subject = xdf_filename.split('-')[1].split('/')[0]
df = get_event_data(event='RestingState', 
                    df=import_eeg_data(xdf_filename),
                    stim_df=get_stim(xdf_filename))

# Create MNE Raw Object

In [None]:
## Create MNE Raw object
ch_names = [f"E{i+1}" for i in range(df.shape[1] - 1)]
info = mne.create_info(ch_names, 
                       sfreq=1/df.lsl_time_stamp.diff().mean(), 
                       ch_types='eeg')
df.drop(columns=['lsl_time_stamp'], inplace=True)

raw = mne.io.RawArray(df.T * 1e-6, info=info) # multiplying by 1e-6 converts to volts

# Create a Cz reference
value = np.zeros((1, raw.n_times))
info = mne.create_info(["Cz"], raw.info['sfreq'], ch_types='eeg')
cz = mne.io.RawArray(value, info)
raw.add_channels([cz], force_update_info=True)

# Apply a montage
montage = mne.channels.make_standard_montage('GSN-HydroCel-129')
raw.set_montage(montage, on_missing='ignore')

# Run Automated Processing Pipeline
(this will take some time)

In [None]:
prep_params = {
        "ref_chs": "eeg",
        "reref_chs": "eeg",
        "line_freqs": np.arange(60, raw.info["sfreq"] / 2, 60),
    }
# these params set up the robust reference  - i.e. median of all channels and interpolate bad channels
prep = pyprep.PrepPipeline(raw, montage=montage, channel_wise=True, prep_params=prep_params)
prep_output = prep.fit()
raw_cleaned = prep_output.raw_eeg


In [None]:
print(f"Bad channels before robust reference: {prep.noisy_channels_original['bad_all']}")
print(f"Interpolated channels: {prep.interpolated_channels}")
print(f"Bad channels after interpolation: {prep.still_noisy_channels}")


# Plot PSD

In [None]:
fig = raw_cleaned.plot_psd(tmax=np.inf, fmax=250)
# add some arrows at 60 Hz and its harmonics:
'''
for ax in fig.axes[:2]:
    freqs = ax.lines[-1].get_xdata()
    psds = ax.lines[-1].get_ydata()
    for freq in (60, 120, 180, 240):
        idx = np.searchsorted(freqs, freq)
        ax.arrow(x=freqs[idx], y=psds[idx] + 18, dx=0, dy=-12, color='red',
                 width=0.1, head_width=3, length_includes_head=True)
        
'''
fig.savefig(f'report_images/{subject}_eeg_psd.png')

# Annotation Blinks and Muscle Artifacts

In [None]:
def annotate_blinks(
    raw: mne.io.Raw, ch_name: list[str] = ["E25", "E8"]
) -> mne.Annotations:
    """Annotate the blinks in the EEG signal.
 
    Args:
        raw (mne.io.Raw): The raw EEG data in mne format.
        ch_name (list[str]): The channels to use for the EOG. Default is
                             ["Fp1", "Fp2"]. I would suggest to use the
                             channels that are the most frontal (just above
                             the eyes). In the case of an EGI system the
                             channels would be "E25" and "E8".
 
    Returns:
        mne.Annotations: The annotations object containing the blink events.
    """
    eog_epochs = mne.preprocessing.create_eog_epochs(raw, ch_name=ch_name)
    blink_annotations = mne.annotations_from_events(
        eog_epochs.events,
        raw.info["sfreq"],
        event_desc={eog_epochs.events[0, 2]: "blink"},
    )
    return blink_annotations

def annotate_muscle(raw: mne.io.Raw) -> mne.Annotations:
    muscle_annotations, _ = mne.preprocessing.annotate_muscle_zscore(
        raw, 
        threshold=3, # this needs to be calibrated for the entire dataset
        ch_type='eeg', 
        min_length_good=0.1, 
        filter_freq=(95, 120), 
        )
 
    return muscle_annotations

In [None]:
fig = raw_cleaned.plot(show_scrollbars=False,
                       show_scalebars=False,events=None, start=0, duration=300,n_channels=75, scalings=11e-5,color='k')
fig.grab().save(f'report_images/{subject}_eeg_annotations.png')

In [None]:
# Applying a high pass filter to remove low frequency noise
raw_cleaned.filter(l_freq=0.1, h_freq=None)

blink_annotations = annotate_blinks(raw_cleaned, ch_name=["E25", "E8"])

muscle_annotations = annotate_muscle(raw_cleaned)

all_annotations = blink_annotations + muscle_annotations + raw.annotations
raw_cleaned.set_annotations(all_annotations)

In [None]:
# Compute a covariance matrix
# DO IT FROM THE ARRAY IN NUMPY
dat = raw_cleaned.get_data()
cov = np.cov(dat)
# plot the covariance matrix
fig = plt.imshow(cov, cmap='coolwarm', origin='lower')
plt.colorbar()
plt.title('Covariance matrix')


## Computing % Good Data

In [None]:
# Create a binary array
binary_mask = np.zeros(len(raw_cleaned.times), dtype=int)

# Iterate over annotations
for annot in raw_cleaned.annotations:
    onset_sample = int(annot['onset'] * raw_cleaned.info['sfreq'])
    duration_sample = int(annot['duration'] * raw_cleaned.info['sfreq'])
    binary_mask[onset_sample:onset_sample + duration_sample] = 1

percent_good = 1 - np.sum(binary_mask) / len(binary_mask)
print(f'Percent Good Data: {percent_good * 100:.2f}%')

In [None]:
(1 - percent_good)*100

## Quantifying Blinks and Muscle Artifacts through ICA

In [None]:
# First exclude bad channels


In [None]:
filt_raw = raw.copy().filter(l_freq=1.0, h_freq=None)
filt_raw.info['bads'] = prep.still_noisy_channels 
ica = mne.preprocessing.ICA(n_components=.99, method='picard')
ica.fit(filt_raw)
#ica.plot_sources(filt_raw)


In [None]:
ica.plot_sources(filt_raw)

In [None]:
ica.plot_components().savefig(f'report_images/{subject}_ica_components.png')
# save the figure
#plt.savefig(f'report_images/{subject}_ica_components.png')

In [None]:
comp_idx, scores = ica.find_bads_muscle(filt_raw)

# Remove the muscle artifacts
raw_ica = ica.apply(filt_raw, exclude=comp_idx)

In [None]:

ica = mne.preprocessing.ICA(n_components=None, method='picard')
ica.fit(raw_cleaned)
ica.plot_sources(raw_cleaned)
comp_idx, scores = ica.find_bads_muscle(raw_cleaned)

# Remove the muscle artifacts
raw_cleaned_ica = ica.apply(raw_cleaned, exclude=comp_idx)


In [None]:
ica.plot_sources(raw_cleaned)

# Ocular Artifacts

In [None]:
eog_evoked = mne.preprocessing.create_eog_epochs(filt_raw, ch_name=['E8', 'E25']).average(picks="all")
eog_evoked.apply_baseline((None, None))
eog_evoked.plot_joint().savefig(f'report_images/{subject}_eog_evoked.png')

In [None]:
eog_projs, _ = mne.preprocessing.compute_proj_eog(raw_cleaned, n_eeg=1, reject=None, no_proj=True,
                                                  ch_name=['E8', 'E25'])


In [None]:
mne.viz.plot_projs_topomap(eog_projs, info=raw.info)

In [None]:
fig = mne.viz.plot_projs_joint(eog_projs, eog_evoked,['E8', 'E25'] )
fig.suptitle("EOG projectors");

In [None]:
filt_raw.add_proj(eog_projs)
filt_raw.plot()

In [None]:
fig = raw_cleaned.plot_psd(tmax=np.inf, fmax=250)
# apply a notch filter to remove 60 Hz noise
raw_cleaned.notch_filter(np.arange(60, 241, 60), filter_length='auto', phase='zero')
fig = raw_cleaned.plot_psd(tmax=np.inf, fmax=250)

In [None]:
#perform an autocorrelation of a single channel
fig, ax = plt.subplots()
ax.acorr(raw_cleaned.get_data()[0], maxlags=1000)
ax.set_xlim([0, 1000])  # only show the first 1000 lags
ax.set_title('Autocorrelation of Channel 1')
# without plotting


In [None]:
autocorr = np.correlate(raw_cleaned.get_data()[0], raw_cleaned.get_data()[0], mode='full')

# Peak-to-Peak Amplitude

In [None]:
sensor_ptp = [np.ptp(filt_raw.get_data()[i]) for i in range(len(raw_cleaned.get_data()))]
f, ax = plt.subplots(figsize=(20, 5))
plt.bar(raw_cleaned.ch_names, sensor_ptp)
plt.xlabel('Channel')
# rotate xticks
plt.xticks(rotation=90 )
ax.tick_params(axis='x',  pad=0)
plt.ylabel('Peak-to-Peak Amplitude')
plt.title('Peak-to-Peak Amplitude for Each Channel')
plt.show()


In [None]:
np.median(sensor_ptp)

In [None]:
raw_cleaned.plot()

# Standard Deviation of Amplitude

In [None]:
# Compute Standard Deviation of Amplitude for each Channel
sensor_std = [np.std(raw_cleaned.get_data()[i]) for i in range(len(raw_cleaned.get_data()))]
plt.bar(range(len(sensor_ptp)), sensor_ptp)
plt.xlabel('Channel')
plt.ylabel('Std. Deviation of Amplitude')
plt.title('Standard Deviation of Amplitude for Each Channel')
plt.show()

In [None]:
np.median(sensor_std)

In [None]:
filt_raw.plot_psd()

# Power in 60Hz Noise Frequency

In [None]:
psds, freqs = mne.time_frequency.psd_array_welch(
    raw_cleaned.get_data(), 
    sfreq=raw_cleaned.info['sfreq'],
    fmin=59, 
    fmax=61,  # A small band around 60 Hz to capture power
    n_fft=2048,  # Adjust the window length for better frequency resolution
    verbose=False
)

In [None]:
mean_power_60hz = psds.mean(axis=0).mean()  # Mean across channels and frequency bins
print(f"Mean power at 60 Hz: {mean_power_60hz:.4e} V²/Hz")

In [None]:
signal_power = np.mean(raw_cleaned._data ** 2, axis=1)
signal_power = signal_power.mean()

# Calculate SNR in dB
snr = 10 * np.log10(signal_power / noise_power)

In [None]:
psds, freqs = mne.time_frequency.psd_array_welch(raw_cleaned.get_data(), sfreq=raw_cleaned.info['sfreq'], fmin=0.5, fmax=250)
psds = 10 * np.log10(psds)
plt.plot(freqs, psds.mean(axis=0))
plt.xlabel('Frequency (Hz)')
plt.ylabel('Power Spectral Density (dB)')
plt.show()
'''
idx_60hz = (freqs > 59) & (freqs < 61)
power_60hz = psds[idx_60hz].mean(axis=1)

noise = {}
for ch, power in zip(raw_cleaned.ch_names, power_60hz):
    noise[ch] = power
    print(f' Channel {ch}: Power at  60Hz:{power:.2f} dB')
'''

In [None]:
sum(power_60hz)

In [None]:
freq_min = 59
freq_max = 61
# Apply a bandpass filter to the data
raw_filtered = raw_cleaned.filter(freq_min, freq_max)
psds, freqs = mne.time_frequency.psd_array_welch(raw_filtered.get_data(), sfreq=raw_filtered.info['sfreq'], fmin=freq_min, fmax=freq_max, n_fft=2048)

# Extract the power at 60 Hz
idx_60hz = (freqs >= 60 - 0.5) & (freqs <= 60 + 0.5)
power_60hz = psds[:, idx_60hz].mean(axis=1)
noise = {}
# Print the power at 60 Hz for each channel
for ch_name, power in zip(raw.ch_names, power_60hz):
    noise[ch_name] = power
    print(f'Channel {ch_name}: Power at 60 Hz = {power:.2e} µV²/Hz')

In [None]:
noises = [noise[ch] for ch in raw.ch_names]
noise_df = pd.DataFrame({'Channel':list(noise.keys()), 'Noise Power (dB)': noise.values()})
plt.subplots(figsize=(30, 5))
plt.plot(noise_df['Channel'], noise_df['Noise Power (dB)'])
plt.xlabel('Channel')
# fix spacing of the xticks
plt.xticks(rotation=90)
plt.ylabel('Noise Power (dB)')          
plt.title('Noise Power at 60 Hz for Each Channel')
plt.show()

In [None]:
# Compute power of the signal and noise
signal_power = np.mean(raw_cleaned._data ** 2, axis=1)
noise_power = np.mean([n**2 for n in noises])

# Calculate SNR in dB
snr = 10 * np.log10(signal_power / noise_power)

In [None]:
# square each value of noise
plt.plot(snr)
plt.xlabel('Channel')
plt.ylabel('SNR (dB)')
plt.title('Signal-to-Noise Ratio for Each Channel')
plt.show()

In [None]:
np.mean(snr)

In [None]:
x = raw_cleaned.get_data()[0]

In [None]:
len(x)