# Extract Number of Eye Blinks per Epoch

# Import Libraries

In [1]:
# Import libraries
import mne
import json
import numpy as np
import pandas as pd
import scipy.signal as signal
from scipy.signal import butter, filtfilt, find_peaks
from scipy.stats import zscore
import matplotlib.pyplot as plt
import sys
import os

sys.path.append(os.path.abspath("../Functions"))

# Import custom scripts
import import_data
import data_tools
import processing

# Enable interactive plots
%matplotlib qt

# Import Data

In [2]:
# Load list of files to import
files = [  
    "sub-P001_ses-S001_task-T1_run-001_eeg",
    "sub-P002_ses-S001_task-T1_run-001_eeg",
    "sub-P003_ses-S001_task-T1_run-001_eeg",
    "sub-P004_ses-S001_task-T1_run-001_eeg",
    "sub-P005_ses-S001_task-T1_run-001_eeg",
    "sub-P006_ses-S001_task-T1_run-001_eeg",
    "sub-P007_ses-S001_task-T1_run-001_eeg",
    "sub-P008_ses-S001_task-T1_run-001_eeg", 
    "sub-P009_ses-S001_task-T1_run-001_eeg",
    "sub-P010_ses-S001_task-T1_run-001_eeg",  
]

# Get unique subject IDs
subject_ids = [file.split('_')[0] for file in files]

# Preallocate eeg_ts, eeg_data, eeg_fs
eeg_ts =   [None] * len(files)
eeg_data = [None] * len(files)
raw_eeg =  [None] * len(files)

ch_names = ["Fz", "F4", "F8", "C3", "Cz", "C4", "T8", "P7", "P3", "P4", "P8", "PO7", "PO8", "O1", "Oz", "O2"]

for f, file in enumerate(files):
    for sub in subject_ids:
        if sub == file.split('_')[0]:
            [eeg_ts[f], eeg_data[f], eeg_fs] = import_data.read_xdf(f"..\\Data\\Pilot2\\EEG\\{sub}\\ses-S001\\eeg\\{file}.xdf", picks=ch_names)

            # Create MNE array
            info = mne.create_info(ch_names, eeg_fs, ch_types = 'eeg')  # Create info properties
            raw_eeg[f] = mne.io.RawArray(eeg_data[f], info = info)            

            # Set standard channel montage
            raw_eeg[f].set_montage('standard_1020')

Creating RawArray with float64 data, n_channels=16, n_times=207136
    Range : 0 ... 207135 =      0.000 ...   809.121 secs
Ready.
Creating RawArray with float64 data, n_channels=16, n_times=194944
    Range : 0 ... 194943 =      0.000 ...   761.496 secs
Ready.
Creating RawArray with float64 data, n_channels=16, n_times=199168
    Range : 0 ... 199167 =      0.000 ...   777.996 secs
Ready.
Creating RawArray with float64 data, n_channels=16, n_times=201344
    Range : 0 ... 201343 =      0.000 ...   786.496 secs
Ready.
Creating RawArray with float64 data, n_channels=16, n_times=195456
    Range : 0 ... 195455 =      0.000 ...   763.496 secs
Ready.
Creating RawArray with float64 data, n_channels=16, n_times=196992
    Range : 0 ... 196991 =      0.000 ...   769.496 secs
Ready.
Creating RawArray with float64 data, n_channels=16, n_times=200960
    Range : 0 ... 200959 =      0.000 ...   784.996 secs
Ready.
Creating RawArray with float64 data, n_channels=16, n_times=190080
    Range : 0 ..

# Filter Data

In [3]:
# Preallocate lists for filtered data
filt_raw = [None] * len(files)

# Loop through each file and corresponding raw EEG
for f, file in enumerate(files):
    raw = raw_eeg[f]
    filt_raw[f] = raw.copy().filter(l_freq=0.5, h_freq=8, picks=ch_names)

Filtering raw data in 1 contiguous segment
Setting up band-pass filter from 0.5 - 8 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal bandpass filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Lower passband edge: 0.50
- Lower transition bandwidth: 0.50 Hz (-6 dB cutoff frequency: 0.25 Hz)
- Upper passband edge: 8.00 Hz
- Upper transition bandwidth: 2.00 Hz (-6 dB cutoff frequency: 9.00 Hz)
- Filter length: 1691 samples (6.605 s)

Filtering raw data in 1 contiguous segment
Setting up band-pass filter from 0.5 - 8 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal bandpass filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Lower passband edge: 0.50
- Lower transition bandwidth: 0.50 Hz (-6 dB cutoff frequency: 0.25 Hz)
- Upper passband edge: 8

# Fix P001 Labels

In [4]:
[marker_ts_P001, markers_P001] = import_data.read_xdf_unity_markers(f"..\\Data\\Pilot2\\EEG\\sub-P001\\ses-S001\\eeg\\sub-P001_ses-S001_task-T1_run-001_eeg.xdf")

for temp in markers_P001:
    if temp[0] == "stimulus ended, getting score":
        temp[0] = "bleh" 

epoch_end_P001 = "bleh"

# Z score data

In [5]:
z_scored_data = [None] * len(files)

for f, file in enumerate(files):
    z_scored_data[f] = zscore(filt_raw[f].get_data(), axis=1)

# Epoch Z-scored Data

In [6]:
list_of_events = [] # same for all files

# Initialize lists for markers and marker timestamps
markers = [None] * len(files)
marker_ts = [None] * len(files)

# Initialize lists for epochs and EEG data for each frequency band
eeg_epochs = [None] * len(files)
events_epochs = [None] * len(files)
epochs_organized = [None] * len(files)

# Create a list of unique events
for x in range(4):
    for y in range(3):
        list_of_events.append(f"Contrast{x+1}Size{y+1}")

epoch_end = "getting score"

# Create a dict of stimuli using the unique events
dict_of_stimuli = {i: event for i, event in enumerate(list_of_events)}

for f, file in enumerate(files):
    for sub in subject_ids:
        if sub == file.split('_')[0]:
            if sub == "sub-P001": # Using special labels for P001
                [events_epochs[f], eeg_epochs[f]] = data_tools.create_epochs(
                        eeg_data = z_scored_data[f], 
                        eeg_ts = eeg_ts[f],
                        markers = markers_P001,
                        markers_ts = marker_ts_P001,
                        events = list_of_events,
                        epoch_end = epoch_end_P001
                )
            else:  # For all other subjects
                [marker_ts[f], markers[f]] = import_data.read_xdf_unity_markers(f"..\\Data\\Pilot2\\EEG\\{sub}\\ses-S001\\eeg\\{file}.xdf") # Import marker

                [events_epochs[f], eeg_epochs[f]] = data_tools.create_epochs(
                    eeg_data = z_scored_data[f],
                    eeg_ts = eeg_ts[f],
                    markers = markers[f],
                    markers_ts = marker_ts[f],
                    events = list_of_events,
                    epoch_end = epoch_end 
               )

# Organize epochs by stimuli and frequency
for f, file in enumerate(files):
    epochs_organized[f] = data_tools.epochs_stim(eeg_epochs = eeg_epochs[f], labels = events_epochs[f], stimuli = dict_of_stimuli)

# Eye Blink Detection Function

In [7]:
def count_eye_blinks_v2(eeg_epochs, sfreq, channel_names, frontal_channels=['Fz', 'F4'], min_dist_ms=100):
    frontal_idxs = [i for i, ch in enumerate(channel_names) if ch in frontal_channels]
    if not frontal_idxs:
        raise ValueError("No frontal channels found in data.")

    blink_counts = []
    min_dist_samples = int(sfreq * (min_dist_ms / 1000))

    for epoch in eeg_epochs:
        # Average signal from frontal channels
        frontal_avg = np.mean(epoch[frontal_idxs, :], axis=0)

        # Find both positive and negative peaks above threshold
        pos_peaks, _ = signal.find_peaks(frontal_avg, height=2)
        neg_peaks, _ = signal.find_peaks(-frontal_avg, height=2)

        # Combine and sort peaks
        all_peaks = np.sort(np.concatenate((pos_peaks, neg_peaks)))

        # Apply minimum distance filtering
        filtered_peaks = []
        last_peak = -np.inf

        for peak in all_peaks:
            if peak - last_peak >= min_dist_samples:
                filtered_peaks.append(peak)
                last_peak = peak

        blink_counts.append(len(filtered_peaks))

    return blink_counts

In [8]:
# Store results
blink_counts = {}
z_scores = {}

for f, file in enumerate(files):
    blink_counts[f] = {}  # Initialize per participant
    z_scores[f] = {}

    for stim_idx, stim_label in dict_of_stimuli.items():
        stim_epochs = epochs_organized[f][stim_idx]  # Shape: (n_epochs, n_channels, n_samples)

        if stim_epochs.shape[0] > 0:
            # Process all epochs at once
            bc = count_eye_blinks_v2( # bc: list of [n_channels] blink counts per epoch, s: list of arrays with shape [n_channels, n_times]
                eeg_epochs=stim_epochs,
                sfreq=eeg_fs,
                channel_names=ch_names,
                frontal_channels=['Fz', 'F4']
            )

            blink_counts[f][stim_label] = bc   # list of lists of ints
        else:
            blink_counts[f][stim_label] = []

In [12]:
participant = 0
stim_label = 2
epoch_idx = 0

# Get the z-scored data for one epoch
z_epoch = epochs_organized[participant][stim_label][epoch_idx]  # shape: (n_channels, n_samples)
n_samples = z_epoch.shape[1]

colors = ['red', 'blue', 'green', 'orange', 'purple', 'cyan', 'magenta', 'brown', 'gray', 'olive']

plt.figure(figsize=(12, 6))

# Plot each channel
for ch_idx in range(z_epoch.shape[0]):
    plt.plot(z_epoch[ch_idx], color=colors[ch_idx % len(colors)], label=f'Ch {ch_idx}')

# Horizontal threshold lines
plt.axhline(2, color='black', linestyle='--', label='Z=±2 Threshold')
plt.axhline(-2, color='black', linestyle='--')

# Vertical lines every 51 samples
for x in range(0, n_samples, 51):
    plt.axvline(x, color='blue', linestyle=':', linewidth=0.8)

plt.title(f"Participant {participant} - Stimulus: {stim_label} - Epoch {epoch_idx}")
plt.xlabel("Sample")
plt.ylabel("Z-score")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
