# Preprocessing

In [5]:
import numpy as np
import mne
import pyxdf
import matplotlib.pyplot as plt
from mne.io import get_channel_type_constants
import os

### Read xdf-file into MNE object

In [6]:
def _is_markerstream(stream):
    srate = float(stream["info"]["nominal_srate"][0])
    n_chans = int(stream["info"]["channel_count"][0])
    return srate == 0 and n_chans == 1
    
def read_raw_xdf(
    fname,
    stream_ids,
    marker_ids=None,
    prefix_markers=False,
    fs_new=None,
    *args,
    **kwargs,
):
    if len(stream_ids) > 1 and fs_new is None:
        raise ValueError("Argument `fs_new` required when reading multiple streams.")
        
    streams, _ = pyxdf.load_xdf(fname)
    streams = {stream["info"]["stream_id"]: stream for stream in streams}
 
    if all(_is_markerstream(streams[stream_id]) for stream_id in stream_ids):
        raise RuntimeError(
            "Loading only marker streams is not supported, at least one stream must be a "
            "regular stream."
        )
                                     
    labels_all, types_all, units_all = [], [], []
    for stream_id in stream_ids:
        stream = streams[stream_id]
    
        n_chans = int(stream["info"]["channel_count"][0])
        labels, types, units = [], [], []
        try:
            for ch in stream["info"]["desc"][0]["channels"][0]["channel"]:
                labels.append(str(ch["label"][0]))
                if ch["type"] and ch["type"][0].lower() in get_channel_type_constants(True):
                    types.append(ch["type"][0].lower())
                else:
                    types.append("misc")
                units.append(ch["unit"][0] if ch["unit"] else "NA")
        except (TypeError, IndexError):  # no channel labels found
            pass
            
        if not labels:
            labels = [f"{stream['info']['name'][0]}_{n}" for n in range(n_chans)]   
        if not units:
            units = ["NA" for _ in range(n_chans)]
        if not types:
            types = ["misc" for _ in range(n_chans)]
        labels_all.extend(labels)
        types_all.extend(types)
        units_all.extend(units)
       
    if fs_new is not None:
        all_time_series, first_time = _resample_streams(streams, stream_ids, fs_new)
        fs = fs_new
    else:  # only possible if a single stream was selected
        all_time_series = streams[stream_ids[0]]["time_series"]
        first_time = streams[stream_ids[0]]["time_stamps"][0]
        fs = float(np.array(stream["info"]["effective_srate"]).item())
    
    info = mne.create_info(ch_names=labels_all, sfreq=fs, ch_types=types_all, verbose=False)
    
    microvolts = ("microvolt", "microvolts", "µV", "?V", "uV")
    scale = np.array([1e-6 if u in microvolts else 1 for u in units_all])
    all_time_series_scaled = (all_time_series * scale).T

    raw = mne.io.RawArray(all_time_series_scaled, info, verbose=False)
    raw._filenames = [fname]

    # convert marker streams to annotations
    for stream_id, stream in streams.items():
        if marker_ids is not None and stream_id not in marker_ids:
            continue
        if not _is_markerstream(stream):
            continue

        onsets = stream["time_stamps"] - first_time
        prefix = f"{stream_id}-" if prefix_markers else ""
    
        # Create descriptions for the annotations
        descriptions = [f"{prefix}{item}" for sub in stream["time_series"] for item in sub]
        
        # Find the index where "start_run" appears
        start_run_index = None
        for i, description in enumerate(descriptions):
            if description == "start_run":
                start_run_index = i
                break

        # If "start_run" exists, start adding annotations from that point
        true_descriptions = []
        if start_run_index is not None:
            for onset, description in zip(onsets[start_run_index:], descriptions[start_run_index:]):
                # Add annotations starting from "start_run"
                #print([description])
                #print(onset)
                raw.annotations.append(onset, [0], [description])

    return raw

### Visualize MNE object: channels + markers

In [7]:
def visualize(raw, j):
    #1/j the number of data points you want to show:
    
    data = raw.get_data()
    times = raw.times
    times = times[:int(len(times)/j)]
    #print(len(times))
    #print(times)
    
    # Get the channel names from raw.info
    channel_names = raw.info['ch_names']
    print("channel_names", channel_names)
    fig, axs = plt.subplots(32, 2, figsize=(30, 200))
    for i in range(64):
        row = i // 2    # Determine the row index (0 to 20)
        col = i % 2
        data_plot = data[1+i]
        data_plot = data_plot[:int(len(data_plot)/j)]
        axs[row,col].plot(times,data_plot)
        axs[row, col].set_title(channel_names[1+i]) 
        axs[row,col].set_xlabel('Times (s)')
        axs[row,col].set_ylabel('EG Signal (uV)')
        axs[row,col].set_xlim([times[0], times[-1]])
        for onset in raw.annotations.onset:
            axs[row,col].axvline(x=onset, color='r', linestyle='-', label="Annotation Onset")
    plt.subplots_adjust(hspace=0.5) 
    plt.show()

### bandpas filtering + epoching + downsampling

In [8]:
def preprocess(subject, data_dir, repo_dir):
    
    #trial time
    trial_time = 4.2
    baseline = None
    fs = 120
    pr = 60
    
    l_freq = 6.00   #0.05
    h_freq = 21.00  #25.05
    n_trials = 30
    
    # Create output folder
    if not os.path.exists(os.path.join(data_dir, "derivatives", subject)):
        os.makedirs(os.path.join(data_dir, "derivatives", subject))
    
    i_person = int(subject[3:])

    eeg = dict()
    labels = dict()
    for i_run in range(8):
                
        fn = os.path.join(data_dir, "data", f"sub-{subject}", "ses-01", "eeg",
                        f"sub-{subject}_ses-01_task-cvep_run-{i_run+1:03d}_eeg.xdf")

        # Load labels and conditions from marker stream
        streams = pyxdf.load_xdf(fn)[0]
        names = [stream["info"]["name"][0] for stream in streams]
        marker_stream = streams[names.index(f"KeyboardMarkerStream{i_run+1}")]
        y_ = [int(str(marker[0]).split(";")[2].split("=")[1]) for marker in marker_stream["time_series"] if 
                        str(marker[0]).startswith("start_cue") and str(marker[0]).split(";")[2].split("=")[0] == "target"]

        tmp = marker_stream["time_series"][0][0]
        condition = tmp.split(";")[1]
        if "grating" in tmp.split(";")[2]:
            condition += "_grating"
        else:
            condition += "_bw"
        print(condition)
            
        # Load EEG
        streams = pyxdf.resolve_streams(fn)
        names = [stream["name"] for stream in streams]
        stream_id = streams[names.index("BioSemi")]["stream_id"]
        marker_id = streams[names.index(f"KeyboardMarkerStream{i_run+1}")]["stream_id"]
        raw = read_raw_xdf(fn, stream_ids=[stream_id], marker_ids=[marker_id])
            
        # Filtering (demeans and detrends)
        raw.filter(l_freq=l_freq, h_freq=h_freq, fir_design='firwin', verbose=False)  # Common cVEP filtering

        #EPOCHING
        #change events variable to only include markers for the trial (not the cue)
        event_id = [f"start_trial;trial={i_trial}" for i_trial in range(n_trials)]
            
        # Slicing
        # N.B. add 0.5 sec pre and post trial to capture filtering artefacts of downsampling (removed later on)
        # N.B. Use largest trial time (samples are cut away later)
        epo = mne.Epochs(raw, events=None, event_id = event_id, tmin=-0.5, tmax=trial_time + 0.5, baseline=baseline, picks="eeg",
                                     preload=True, verbose=False)
            
        # Resampling
        # N.B. Downsampling is done after slicing to maintain accurate stimulus timing
        epo = epo.resample(sfreq=fs, verbose=False)
            
        #print(epo.get_data(tmin=0, tmax=trial_time, copy=True).shape)
            
        # Add EEG to database (trials channels samples)
        X_ = epo.get_data(tmin=0, tmax=trial_time, copy=True)

        if condition in eeg:
            eeg[condition].append(X_)
            labels[condition].extend(y_)
        else:
            eeg[condition] = [X_]
            labels[condition] = y_

    for condition in eeg.keys():
        # Extract data
        X = np.concatenate(eeg[condition], axis=0).astype("float32")  # trials channels samples
        y = np.array(labels[condition]).astype("uint8")
    
        # Load codes
        classes = condition.split('_')[0]
        fn = os.path.join(data_dir, "data", "codes", f"m_sequence_shift_{classes}.npz")
        V = np.load(fn)["codes_real"]
        V = np.repeat(V, int(fs / pr), axis=1).astype("uint8")
    
        # Print summary
        print("Condition:", condition)
        print("\tX:", X.shape)
        print("\ty:", y.shape)
        print("\tV:", V.shape)
    
        #Save data
        np.savez(os.path.join(data_dir, "derivatives", subject, f"{subject}_cvep_{condition}.npz"),
                 X=X, y=y, V=V, fs=fs)
        


SyntaxError: closing parenthesis ')' does not match opening parenthesis '[' (2446025763.py, line 85)

In [None]:
#subject 1
subjects = [
        "01", "02"]
for subject in subjects:
    preprocess(subject, os.path.join(os.path.expanduser("~"), "ideaProjects", "programming", "BCI", "Thesis", "steven", "steven"),
             os.path.join(os.path.expanduser("~"), "ideaProjects", "programming", "BCI", "Thesis", "steven", "steven", "cvep_codes"))