# EEG Viewer
![status](https://img.shields.io/badge/status-in%20progress-orange)



<div style="text-align: center;">
    <img src="./assets/230524_eeg-viewer.png" alt="eeg viewer preview" width="450"/>
</div>

## Summary

This workflow is intended to demonstrate the visualization of a set of 1D EEG timeseries with HoloViz and Bokeh tools.

For details specific to this workflow, such as goals, specifications, and bottlenecks, please see this workflow's [readme](./readme_eeg-viewer.md).

For a summary of EEG research, data, and software, please see [neuro/wiki/EEG-notes](https://github.com/holoviz-topics/neuro/wiki/EEG-notes).

## Imports and config

<div class="admonition alert alert-info">
    <p class="admonition-title" style="font-weight:bold">Requirements</p>
    <p>This workflow notebook requires the <a href="./environment.yml">environment</a> specified in this workflow directory.</p>
</div>


In [None]:
%load_ext autoreload
%autoreload 2

import colorcet as cc
import holoviews as hv
import numpy as np
import pandas as pd

hv.extension("bokeh")
import panel as pn
from bokeh.models import HoverTool
from holoviews import Dataset
from holoviews.operation.datashader import rasterize
from holoviews.plotting.links import RangeToolLink
from hvneuro import download_file
from neurodatagen.eeg import generate_eeg_powerlaw
from scipy.stats import zscore

pn.extension(template="material")

## Synthetic data pipeline

The `generate_eeg_powerlaw` function synthesizes EEG data as high-pass filtered pink noise power law time series by default. The function returns a 2D numpy array of synthetic EEG data (in microvolts) shaped as (number of channels, total samples), a 1D time array (in seconds), and a list of channel names. Parameters such as the high-pass filter factor (in Hz) and an amplitude scaling factor allow customization of the generated data.

### Generate synthetic data

In [None]:
n_channels = 25
n_seconds = 30
fs = 512

data, time, channels = generate_eeg_powerlaw(n_channels, n_seconds, fs)

print(f"shape: {data.shape} (n_channels, samples) ")
data

### Visualize synthetic data

In [None]:
max_ch_disp = 10  # max channels to initially display
max_t_disp = 3  # max time in seconds to initially display

spacing = 5.5  # Spacing between channels
offset = np.std(data) * spacing

annotation = hv.VSpan(1, 2)  # example annotation (start, end) time

# Create a hv.Curve element per chan
channel_curves = []
max_data = data.max()

hover = HoverTool(
    tooltips=[
        ("Channel", "@channel"),
        ("Time", "$x s"),
        ("Amplitude", "@original_amplitude µV"),
    ]
)

for i, channel_data in enumerate(data):
    offset_data = channel_data + (i * offset)
    max_data = max(offset_data.max(), max_data)  # update max
    ds = Dataset(
        (time, offset_data, channel_data, channels[i]),
        ["Time", "Amplitude", "original_amplitude", "channel"],
    )
    channel_curves.append(
        hv.Curve(ds, "Time", ["Amplitude", "original_amplitude", "channel"]).opts(
            color="black", line_width=1, tools=[hover, "xwheel_zoom"], shared_axes=False
        )
    )

# Create mapping from yaxis location to ytick for each channel
# so we can have categorical-style labeling on a continuous axis.
# Note: this would/should change when we implement independent
# coordinates.
yticks = [(i * offset, ich) for i, ich in enumerate(channels)]

# Create an overlay of curves
# TODO.. setting x/y_range bounds does not yet restrict the RangeTool from going beyond these limits
# TODO.. the zoom out will stop when it hits any single bound, and not continue zooming out in other directions/dims
eeg_viewer = (annotation * hv.Overlay(channel_curves, kdims="Channel")).opts(
    padding=0,
    xlabel="Time (s)",
    ylabel="Channel",  # default_tools=['hover', 'pan', 'box_zoom', 'save', 'reset'],
    yticks=yticks,
    show_legend=False,
    aspect=1.5,
    responsive=True,
    shared_axes=False,
    backend_opts={
        "x_range.bounds": (time.min(), time.max()),
        "y_range.bounds": (data.min(), max_data),
    },
)

# Get the y positions of the yticks to use as yaxis of minimap image
y_positions, _ = zip(*yticks)

# Compute z-scores across time for each channel
z_data = zscore(data, axis=1)

# Generate the zscored image for the minimap using the y tiack positions from the eeg_viewer
minimap = rasterize(
    hv.Image((time, y_positions, z_data), ["Time (s)", "Channel"], "Amplitude (uV)")
)

# Style the minimap
clim_mul = 1.2
minimap = minimap.opts(
    cmap="RdBu_r",
    colorbar=False,
    xlabel="",
    alpha=0.5,
    yticks=[yticks[0], yticks[-1]],
    height=100,
    responsive=True,
    default_tools=[""],
    shared_axes=False,
    clim=(-z_data.std() * clim_mul, z_data.std() * clim_mul),
)

# Create RangeToolLink between the minimap and the main EEG viewer
# (quirk: apply to just one eeg trace and it will apply to all. see HoloViews #4472)
max_y_disp = np.max(data[max_ch_disp - 1, :] + ((max_ch_disp - 1) * offset))
RangeToolLink(
    minimap,
    next(iter(eeg_viewer.values())),
    axes=["x", "y"],
    boundsx=(None, max_t_disp),
    boundsy=(None, max_y_disp),
)


# layout = (eeg_viewer + minimap).cols(1).opts(shared_axes=False, merge_tools=False)
# eeg_app = pn.Row(layout).servable() # too much spacing between plots in served app
# eeg_app = pn.Column(pn.Row(eeg_viewer, min_height=500, sizing_mode='stretch_both'), minimap, sizing_mode='stretch_both')#.servable()#target='main') # BUG Panel #5315: rangetool is variably active in the bokeh toolbar on eeg viewer plot.. not respecting shared_axes=False

# reverting approach because of the rangetool bug.. will deal with the spacing in the served app later
eeg_app = pn.Column(
    (eeg_viewer + minimap * annotation).cols(1), min_height=650
).servable(target="main", title="EEG Viewer with HoloViz and Bokeh")
eeg_app

## Real data pipeline

In [None]:
import mne

### Intake data

In [None]:
# This dataset is 2.6 MB on disk

url = "https://physionet.org/files/eegmmidb/1.0.0/S001/S001R04.edf?download"
local_data_path = "../../data/"

# Will not download if already present at local_data_path
local_file_path = download_file(url, local_data_path)

In [None]:
raw = mne.io.read_raw_edf(local_file_path, preload=True)
raw.info

In [None]:
# preview the channel names, types, signal ranges, and uncompressed size

raw.describe()

### Gather the annotations

In [None]:
# get initial time of experiment
orig_time = raw.annotations.orig_time.replace(tzinfo=None)

# get annotations into pd df
annotations_df = raw.annotations.to_data_frame()

annotations_df["start_time"] = annotations_df["onset"]
annotations_df["end_time"] = annotations_df["start_time"] + pd.to_timedelta(
    annotations_df["duration"], "s"
)
annotations_df = annotations_df.drop(["onset", "duration"], axis=1)

# Should be datetime object in final
annotations_df["start_time"] = (
    annotations_df["start_time"] - orig_time
).dt.total_seconds()
annotations_df["end_time"] = (annotations_df["end_time"] - orig_time).dt.total_seconds()


unique_descriptions = annotations_df["description"].unique()
color_map = dict(zip(unique_descriptions, cc.glasbey))

annotations_df

In [None]:
df = pd.concat(
    [
        raw.annotations.to_data_frame(),
        raw.annotations.to_data_frame(),
        raw.annotations.to_data_frame(),
    ],
    axis=0,
)
%time [_ for _ in df.itertuples()];
%time [_ for _ in df.iterrows()];

In [None]:
from holonote.annotate import Annotator

# Want to add more automatic way to set this up.
# annotator = Annotator(spec={"time": np.datetime64}, fields=["description"])
annotator = Annotator(spec={"time": float}, fields=["description"])

if annotator.df.empty:
    # Only run once!
    for n in annotations_df.itertuples():
        annotator.set_range(n.start_time, n.end_time)
        annotator.add_annotation(description=n.description)
    annotator.commit()


range_style = {
    "color": hv.dim("description").categorize(color_map),
    "show_legend": False,
}
range_style = {"show_legend": False}
annotations_overlay = annotator.overlay(
    hv.Curve([]), range_style=range_style
)  # My hack in using an empty Curve.
annotations_overlay

### Clean channel names, set sensor positions, and reference data

In [None]:
# clean up the channel names
raw.rename_channels(lambda s: s.strip("."));

In [None]:
# # preview available montages that are shipped with MNE
# mne.channels.get_builtin_montages(descriptions=True)

In [None]:
# # Let's use the standard 10-20
# montage = mne.channels.make_standard_montage("standard_1020")

In [None]:
# # plot the assigned positions of our data channels
# raw.set_montage(montage, match_case=False)
# sphere=(0, 0.015, 0, 0.099) #manually adjust the y origin coord and radius a bit
# raw.plot_sensors(show_names=True, sphere=sphere);

In [None]:
# re-reference EEG data to the average over all recording channels
raw.set_eeg_reference("average");

### Gather the data for plotting into simple arrays

In [None]:
time = raw.times
channels = raw.ch_names

# get the EEG data (for this data set, all channels are EEG anyways)
eeg_indices = mne.pick_types(raw.info, eeg=True)
data = raw.get_data(picks=eeg_indices, units={"eeg": "uV"})

### Visualize real data

In [None]:
from holoviews.operation.downsample import downsample1d

max_ch_disp = 10  # max channels to initially display
max_t_disp = 5  # max time in seconds to initially display

spacing = 2.5  # Spacing between channels
offset = np.std(data) * spacing

# Create an overlay of VSpan annotations based on the annotations dataframe
# annotations_overlay = annotator.overlay(hv.Curve([]))

# Create a hv.Curve element per chan
channel_curves = []
max_data = data.max()

hover = HoverTool(
    tooltips=[
        ("Channel", "@channel"),
        ("Time", "$x s"),
        ("Amplitude", "@original_amplitude µV"),
    ]
)

for i, channel_data in enumerate(data):
    offset_data = channel_data + (i * offset)
    max_data = max(offset_data.max(), max_data)  # update max
    ds = Dataset(
        (time, offset_data, channel_data, channels[i]),
        ["Time", "Amplitude", "original_amplitude", "channel"],
    )
    channel_curves.append(
        downsample1d(
            hv.Curve(ds, "Time", ["Amplitude", "original_amplitude", "channel"]).opts(
                color="black",
                line_width=1,
                tools=[hover, "xwheel_zoom"],
                shared_axes=False,
            )
        )
    )

yticks = [(i * offset, ich) for i, ich in enumerate(channels)]

# Create an overlay of curves
eeg_viewer = hv.Overlay(channel_curves, kdims="Channel").collate()
eeg_with_annotations = (annotations_overlay * eeg_viewer).opts(
    padding=0,
    xlabel="Time (s)",
    ylabel="Channel",
    yticks=yticks,
    show_legend=False,
    aspect=1.5,
    responsive=True,
    shared_axes=False,
    backend_opts={
        "x_range.bounds": (time.min(), time.max()),
        "y_range.bounds": (data.min(), max_data),
    },
)

# Get the y positions of the yticks to use as yaxis of minimap image
y_positions, _ = zip(*yticks)

# Compute z-scores across time for each channel
z_data = zscore(data, axis=1)

# Generate the zscored image for the minimap using the y tiack positions from the eeg_viewer
minimap = rasterize(
    hv.Image((time, y_positions, z_data), ["Time (s)", "Channel"], "Amplitude (uV)")
)

# Style the minimap
clim_mul = 1.2
minimap = minimap.opts(
    cmap="RdBu_r",
    colorbar=False,
    xlabel="",
    alpha=0.5,
    yticks=[yticks[0], yticks[-1]],
    height=100,
    responsive=True,
    default_tools=[""],
    shared_axes=False,
    clim=(-z_data.std() * clim_mul, z_data.std() * clim_mul),
)

# Create RangeToolLink between the minimap and the main EEG viewer
max_y_disp = np.max(data[max_ch_disp - 1, :] + ((max_ch_disp - 1) * offset))
RangeToolLink(
    minimap,
    eeg_viewer,
    axes=["x", "y"],
    boundsx=(None, max_t_disp),
    boundsy=(None, max_y_disp),
)


eeg_app = pn.Column(
    (eeg_with_annotations + minimap * annotations_overlay).cols(1), min_height=650
).servable(target="main", title="EEG Viewer with HoloViz and Bokeh")
eeg_app

In [None]:
channel_curves = []
for i, channel_data in enumerate(data):
    offset_data = channel_data + (i * offset)
    max_data = max(offset_data.max(), max_data)  # update max
    ds = Dataset(
        (
            time,
            np.ascontiguousarray(offset_data),
            np.ascontiguousarray(channel_data),
            channels[i],
        ),
        ["Time", "Amplitude", "original_amplitude", "channel"],
    )
    channel_curves.append(
        hv.Curve((time, offset_data), "Time", "Amplitude").opts(
            color="black", line_width=1, shared_axes=False
        )
    )
    # if i == 10:
    #    break

# yticks = [(i * offset, ich) for i, ich in enumerate(channels)]

# Create an overlay of curves
# hv.Overlay(channel_curves, kdims="Channel")

In [None]:
np.ascontiguousarray(offset_data)

In [None]:
# channels
a = hv.Overlay([downsample1d(c, algorithm="nth") for c in channel_curves]).collate()
a

In [None]:
len(channels), len(time)

In [None]:
import xarray as xr

data2 = data + (np.arange(len(data))[:, np.newaxis] * offset)
max_data = data2.max()
xds = xr.Dataset(
    coords={"channel": channels, "Time": time},
    data_vars={
        "Amplitude": (("channel", "Time"), data2),
        # "original_amplitude": (("channel", "Time"), data),
    },
)

annotations_overlay = hv.Dataset(xds).to(hv.Curve, groupby="channel", vdims="Amplitude").overlay().opts(
    hv.opts.Curve(color="black", line_width=1, tools=[hover, "xwheel_zoom"])
)
annotations_overlay.data



In [None]:
a = hv.Overlay(channel_curves, kdims="Channel")
a

In [None]:
a.data

In [None]:
?downsample1d

In [None]:
# channel_data
for _i, channel_data in enumerate(data):
    print(channel_data.shape)

In [None]:
annotator.add_annotation(description="Hello")

In [None]:
annotator.commit()

In [None]:
annotator.df