In [2]:
from os import listdir, PathLike
from os.path import join
from typing import List, Tuple, Dict
from dataclasses import dataclass
import h5py
import numpy as np
import pandas as pd
from hdmf.backends.hdf5 import H5DataIO
from pynwb import NWBFile, TimeSeries
from pynwb.file import Subject
from pynwb.ecephys import ElectricalSeries, ElectrodeGroup, LFP
from pynwb.behavior import BehavioralEvents
import nixio
import regex as re
from usz_neuro_conversion.common import (
    SessionContext,
    NixContext,
    get_metadata_row,
    read_nix,
    get_date,
    write_nwb,
    standardize_sex,
    find_nix_files, get_matlab_matrix_scalars_ragged, get_matlab_matrix, get_micro_dir,
)

In [3]:
def convert_nix_to_nwb(subject: int, session: int) -> SessionContext:
    ctx = create_context(subject, session)
    write_subject(ctx)
    add_electrode_columns(ctx)
    write_eeg_electrodes(ctx)
    write_eeg_measurements(ctx)
    ieeg_electrode_group = write_ieeg_electrodes(ctx)
    write_ieeg_measurements(ctx)
    read_matlab(ctx)
    write_lfp(ctx)
    write_behavior(ctx)
    write_events(ctx)
    write_trial_data(ctx)
    write_waveforms(ctx, ieeg_electrode_group)
    return ctx

In [4]:
def create_context(subject: int, session: int) -> SessionContext:
    nix_context = NixContext(
        subject, session, project="Human_MTL_units_scalp_EEG_and_iEEG_verbal_WM"
    )
    nix = read_nix(nix_context)
    general = nix.sections["General"]
    nwb = NWBFile(
        session_description="Running experiment as described in the the experiment description",
        identifier=f"Human_MTL_units_scalp_EEG_and_iEEG_verbal_WM_subject{subject:02}_session{session:02}",
        session_start_time=get_date(nix_context),
        lab=general.props["Recording location"].values[0],
        institution="Universitätsspital Zürich, 8091 Zurich, Switzerland",  # Broken UTF-8 in file
        related_publications=_get_related_publications(nix),
        experimenter="Boran, Ece",
        experiment_description=_get_experiment(nix),
        keywords=[
            "Neuroscience",
            "Electrophysiology",
            "Human",
            "Awake",
            "Local field potential",
            "Neuronal action potential",
            "Spikes",
            "Medial temporal lobe",
            "Hippocampus",
            "Entorhinal cortex",
            "Amygdala",
            "Scalp EEG",
            "Intracranial EEG",
            "Cognitive task",
            "Verbal working memory",
            "Epilepsy",
        ],
    )
    return nix_context.to_session_context(nix, nwb)

In [5]:
def _get_experiment(nix: nixio.File) -> str:
    task = nix.sections["Task"].props
    task_name = task["Task name"].values[0]
    # Broken UTF-8 in file
    task_desc = "The task is a modified Sternberg task in which the encoding of memory items, maintenance, and recall were temporally separated. Each trial starts with a fixation period ([-6, -5] s), followed by the stimulus ([-5, -3] s). The stimulus consists of a set of eight consonants at the center of the screen. The middle four, six, or eight letters are the memory items,which determine the set size for the trial (4, 6, or 8, respectively). The outer positions are filled with “X,” which is never a memory item. After the stimulus, the letters disappear from the screen, and the maintenance interval starts ([-3, 0] s).A fixation square is shown throughout fixation, encoding, and maintenance. After maintenance, a probe letter is presented. The subjects respond with a button press to indicate whether the probe was part of the stimulus.The subjects are instructed to respond as rapidly as possible without making errors. The hand used for the response is counterbalanced across subjects within the clinical constraints. After the response, the probe is turned off, and the subjects receive acoustic feedback regarding whether their response was correct or incorrect. Before initiating the next trial, the subjects were encouraged to blink and relax. The subjects perform 50 trials in one session, which last approximately 10 min. Trials with different set sizes are presented in a random order,with the single exception that a trial with an incorrect response is always followed by a trial with a set size of 4."
    task_url = task["Task URL"].values[0]
    return (
        f"Task Name: {task_name}\nTask Description: {task_desc}\nTask URL: {task_url}"
    )

In [6]:
def _get_related_publications(nix: nixio.File) -> List[str]:
    related_publications = (
        nix.sections["General"].sections["Related publications"].props
    )
    dois = related_publications["Publication DOI"].values
    return [doi.strip() for doi in dois]

In [7]:
def write_subject(ctx: SessionContext):
    metadata = get_metadata_row(ctx.to_nix_context())
    age = metadata["Age"]
    sex = metadata["Sex"]
    ctx.nwb.subject = Subject(
        subject_id=f"{ctx.subject:02}",
        age=f"P{int(age)}Y",
        description=_get_subject_description(ctx),
        species="Homo sapiens",
        sex=standardize_sex(sex),
    )

In [8]:
def _get_subject_description(ctx: SessionContext) -> str:
    metadata = get_metadata_row(ctx.to_nix_context())
    subject = ctx.nix.sections["Subject"].props
    handedness = metadata["Handedness"]
    pathology = metadata["Pathology"]
    depth_electrodes = subject["Depth electrodes"].values[0]
    electrodes_in_soz = metadata["Electrodes in seizure onset zone (SOZ)"]
    return f"Handedness: {handedness}\nPathology: {pathology}\nDepth electrodes: {depth_electrodes}\nElectrodes in seizure onset zone (SOZ): {electrodes_in_soz}"

In [9]:
def add_electrode_columns(ctx: SessionContext):
    ctx.nwb.add_electrode_column(
        name="label",
        description="Channel label referenced by other data arrays",
    )
    ctx.nwb.add_electrode_column(
        name="is_inside_soz",
        description="Indicates whether the electrode is inside the seizure onset zone (SOZ)",
    )

In [10]:
def write_eeg_electrodes(ctx: SessionContext):
    nwb = ctx.nwb

    device = nwb.create_device(
        name="NicoletOne EEG System",
        manufacturer="Natus Medical Incorporated",
        description="EEG recording system",
    )

    # create an electrode group for this group
    electrode_group = nwb.create_electrode_group(
        name=f"eeg",
        description=f"EEG electrodes on scalp",
        device=device,
        location="Scalp",
    )

    electrodes = _get_eeg_electrodes(ctx)
    electrodes.apply(
        lambda row: _add_row_to_eeg_electrodes(nwb, electrode_group, row), axis=1
    )

In [11]:
def _add_row_to_eeg_electrodes(
        nwb: NWBFile, electrode_group: ElectrodeGroup, row: pd.Series
):
    # got BESA map: +X is anterior, +Y is left, +Z is superior according to <https://eeglab.org/tutorials/ConceptsGuide/coordinateSystem.html>
    # But need NWB: +X is posterior, +Y is inferior, +Z is right according to <https://pynwb.readthedocs.io/en/stable/pynwb.file.html#pynwb.file.NWBFile.add_electrode>
    nwb.add_electrode(
        group=electrode_group,
        label=row["label"],
        location=row["label"],
        reference="Averaged mastoid channels",
        is_inside_soz=False,
        x=-row["x"],
        y=-row["z"],
        z=-row["y"],
        filtering="Passband, 0.5 to 5000 Hz",
    )

In [12]:
def _get_eeg_electrodes(ctx: SessionContext) -> pd.DataFrame:
    labels = _get_eeg_electrode_labels(ctx)
    locations = _get_eeg_electrode_locations(ctx)
    locations_array = np.ndarray(locations.shape)
    locations.read_direct(locations_array)
    df = pd.DataFrame(locations_array, columns=["x", "y", "z"])
    df.insert(0, "label", labels)
    return df.reset_index()

In [13]:
def _get_eeg_electrode_labels(ctx: SessionContext) -> List[str]:
    _assert_all_eeg_electrodes_have_same_labels(ctx)
    session_data = _get_session_data(ctx)
    return (
        session_data.groups["Scalp EEG data"]
        .data_arrays["Scalp_EEG_Data_Trial_01"]
        .dimensions[0]
        .labels
    )

In [14]:
def _assert_all_eeg_electrodes_have_same_labels(ctx: SessionContext):
    data_arrays = _get_session_data(ctx).groups["Scalp EEG data"].data_arrays
    electrode_labels = [data_array.dimensions[0].labels for data_array in data_arrays]
    assert len(set(electrode_labels)) == 1

In [15]:
def _get_session_data(ctx: SessionContext) -> nixio.Block:
    return ctx.nix.blocks[f"Data_Subject_{ctx.subject:02}_Session_{ctx.session:02}"]

In [16]:
def _get_eeg_electrode_locations(ctx: SessionContext) -> nixio.DataArray:
    return (
        _get_session_data(ctx)
        .groups["Scalp EEG electrode information"]
        .data_arrays["Scalp_Electrode_EEGLAB_BESA_Coordinates"]
    )

In [17]:
def write_ieeg_electrodes(ctx: SessionContext) -> ElectrodeGroup:
    nwb = ctx.nwb

    device = nwb.create_device(
        name="ATLAS Neurophysiology System",
        manufacturer="Neuralynx, Inc.",
        description="iEEG recording system",
    )

    # create an electrode group for this group
    electrode_group = nwb.create_electrode_group(
        name="ieeg",
        description=f"iEEG electrodes",
        device=device,
        location="Intracranial",
    )

    electrodes = _get_ieeg_electrodes(ctx)
    electrodes.apply(
        lambda row: _add_row_to_ieeg_electrodes(nwb, electrode_group, row), axis=1
    )
    return electrode_group

In [18]:
def _get_ieeg_electrodes(ctx: SessionContext) -> pd.DataFrame:
    labels = _get_ieeg_electrode_labels(ctx)
    anatomical_locations = _get_ieeg_electrode_anatomical_locations(ctx)
    inside_soz = _get_ieeg_electrode_inside_soz(ctx)
    locations = _get_ieeg_electrode_locations(ctx)
    locations_array = np.ndarray(locations.shape)
    locations.read_direct(locations_array)
    df = pd.DataFrame(locations_array, columns=["x", "y", "z"])
    df.insert(0, "label", labels)
    df.insert(1, "anatomical_location", anatomical_locations)
    df.insert(2, "inside_soz", inside_soz)
    return df.reset_index()

In [19]:
def _get_ieeg_electrode_labels(ctx: SessionContext) -> List[str]:
    return [
        channel.sources[0].name[1:]  # Strip the leading "m"
        for channel in _get_session_data(ctx)
        .groups["iEEG electrode information"]
        .data_arrays["iEEG_Electrode_Map"]
        .sources
    ]

In [20]:
def _get_ieeg_electrode_anatomical_locations(ctx: SessionContext) -> List[str]:
    return [
        channel.sources[1].name
        if channel.sources[1].name != "no_label_found"
        else "unspecific"
        for channel in _get_session_data(ctx)
        .groups["iEEG electrode information"]
        .data_arrays["iEEG_Electrode_Map"]
        .sources
    ]

In [21]:
def _get_ieeg_electrode_inside_soz(ctx: SessionContext) -> List[bool]:
    metadata = get_metadata_row(ctx.to_nix_context())
    electrodes_in_soz = metadata["Electrodes in seizure onset zone (SOZ)"].split(",")
    electrodes = [label[:-1] for label in _get_ieeg_electrode_labels(ctx)]
    return [electrode in electrodes_in_soz for electrode in electrodes]

In [22]:
def _get_ieeg_electrode_locations(ctx: SessionContext) -> nixio.DataArray:
    return (
        _get_session_data(ctx)
        .groups["iEEG electrode information"]
        .data_arrays["iEEG_Electrode_MNI_Coordinates"]
    )

In [23]:
def _add_row_to_ieeg_electrodes(
        nwb: NWBFile, electrode_group: ElectrodeGroup, row: pd.Series
):
    # Got MNI map: +X is right, +Y is anterior, +Z is superior according to <https://kathleenhupfeld.com/mni-template-coordinate-systems/>
    # But need NWB: +X is posterior, +Y is inferior, +Z is right according to <https://pynwb.readthedocs.io/en/stable/pynwb.file.html#pynwb.file.NWBFile.add_electrode>
    nwb.add_electrode(
        group=electrode_group,
        label=row["label"],
        location=row["anatomical_location"],
        reference="Common intracranial reference",
        is_inside_soz=row["inside_soz"],
        x=-row["y"],
        y=-row["z"],
        z=row["x"],
        filtering="Passband, 0.5 to 1000 Hz",
    )

In [24]:
def write_eeg_measurements(ctx: SessionContext):
    nwb = ctx.nwb
    eeg_electrode_indices = list(range(_get_eeg_electrode_count(ctx)))
    eeg_table_region = nwb.create_electrode_table_region(
        region=eeg_electrode_indices,  # reference row indices 0 to N-1
        description="eeg electrodes",
    )
    trials = _get_session_data(ctx).groups["Scalp EEG data"].data_arrays
    data = np.concatenate([trial[:] for trial in trials], axis=1).transpose()
    timestamps = _get_data_collection_timestamps(ctx, trials)
    raw_electrical_series = ElectricalSeries(
        name="ecephys.eeg",
        description="Scalp EEG data",
        data=data,
        electrodes=eeg_table_region,
        timestamps=timestamps,
    )
    nwb.add_acquisition(raw_electrical_series)

In [25]:
def _get_data_collection_timestamps(ctx: SessionContext, trials: np.ndarray) -> np.ndarray:
    sampling_interval = trials[0].dimensions[1].sampling_interval
    collection_duration = _get_data_collection_duration(ctx)
    total_trial_duration = _get_total_trial_duration(ctx)
    timestamps = np.array([])
    for i in range(len(trials)):
        trial_start = i * total_trial_duration
        trial_end = trial_start + collection_duration
        trial_timestamps = np.arange(trial_start, trial_end, sampling_interval)
        timestamps = np.concatenate([timestamps, trial_timestamps])
    return timestamps

In [26]:
def _get_eeg_electrode_count(ctx: SessionContext) -> int:
    return len(_get_eeg_electrode_labels(ctx))

In [27]:
def write_ieeg_measurements(ctx: SessionContext):
    nwb = ctx.nwb
    min_index = _get_eeg_electrode_count(ctx)
    ieeg_electrode_indices = list(
        range(min_index, min_index + _get_ieeg_electrode_count(ctx))
    )
    ieeg_table_region = nwb.create_electrode_table_region(
        region=ieeg_electrode_indices,  # reference row indices 0 to N-1
        description="ieeg electrodes",
    )
    trials = _get_session_data(ctx).groups["iEEG data"].data_arrays
    data = np.concatenate([trial[:] for trial in trials], axis=1).transpose()
    timestamps = _get_data_collection_timestamps(ctx, trials)

    compressed_data = H5DataIO(
        data=data,
        compression="gzip",
    )
    electrical_series = ElectricalSeries(
        name="ecephys.ieeg",
        description="iEEG data",
        data=compressed_data,
        electrodes=ieeg_table_region,
        timestamps=timestamps,
    )
    nwb.add_acquisition(electrical_series)

In [28]:
def _get_ieeg_electrode_count(ctx: SessionContext) -> int:
    return len(_get_ieeg_electrode_labels(ctx))

In [29]:
def write_events(ctx: SessionContext):
    nwb = ctx.nwb
    session = _get_session_data(ctx)
    tags = session.groups[
        "Trial events single tags spike times"
    ].tags  # same as EEG and iEEG in this case
    tags_by_trial = [(_EVENT_RE.findall(tag.name)[0], tag.position) for tag in tags]
    events = [
        (int(trial_number) - 1, name, position[0])
        for (name, trial_number), position in tags_by_trial
        if name != "Response"
    ]
    events.sort(key=lambda x: x[0])
    offset = _get_time_offset(ctx)
    total_trial_duration = _get_total_trial_duration(ctx)
    events = [
        (name, time - offset + trial_number * total_trial_duration)
        for trial_number, name, time in events
    ]
    events.append(("END", len(events) * total_trial_duration))

    for (name, start), (_, end) in zip(events, events[1:]):
        nwb.add_epoch(
            start_time=start,
            stop_time=end,
            tags=name,
            timeseries=_get_main_time_series(ctx),
        )
        assert start < end

In [30]:
def _get_main_time_series(ctx: SessionContext) -> List[TimeSeries]:
    nwb = ctx.nwb
    return [nwb.acquisition["ecephys.eeg"],
            nwb.acquisition["ecephys.ieeg"],
            nwb.processing["ecephys"].get("LFP").get_electrical_series("ecephys.lfp"),
            nwb.processing["behavior"].get("BehavioralEvents.response").get_timeseries("response")]

In [31]:
def write_trial_data(ctx: SessionContext):
    nwb = ctx.nwb
    nwb.add_trial_column(
        name="set_size",
        description="Number of letters shown during encoding period (4, 6, or 8 letters)",
    )
    nwb.add_trial_column(
        name="probe_letter",
        description="The letter presented to the participant during the retrieval period",
    )
    nwb.add_trial_column(
        name="solution",
        description='The solution to the question "Was the letter at hand present in the encoding set?"',
    )
    nwb.add_trial_column(
        name="artifact",
        description="Whether the trial data was visually marked as an artifact by the experimenter",
    )
    nwb.add_trial_column(
        name="set_letters",
        description="Letters shown during encoding period. Note that this information is currently not part of the dataset, so all entries are \"not available\"",
    )

    nix = ctx.nix
    trials = nix.sections["Session"].sections["Trial properties"].sections
    duration = _get_total_trial_duration(ctx)
    for trial in trials:
        trial = trial.props
        trial_number = int(trial["Trial number"].values[0]) - 1
        start_time = trial_number * duration
        stop_time = start_time + duration
        assert start_time < stop_time
        solution = int(trial["Match"].values[0]) == 1
        nwb.add_trial(
            id=trial_number,
            start_time=start_time,
            stop_time=stop_time,
            set_size=int(trial["Set size"].values[0]),
            set_letters="not available",
            probe_letter=str(trial["Probe letter"].values[0]),
            solution=solution,
            artifact=bool(trial["Artifact"].values[0]),
            timeseries=_get_main_time_series(ctx),
        )

In [32]:
def write_behavior(ctx: SessionContext):
    nwb = ctx.nwb
    behavior_module = nwb.create_processing_module(
        name="behavior", description="Data for all trials in this session."
    )
    nix = ctx.nix
    trials = nix.sections["Session"].sections["Trial properties"].sections
    offset = _get_time_offset(ctx)
    duration = _get_total_trial_duration(ctx)
    data = []
    timestamps = []
    for trial in trials:
        trial_number = int(trial["Trial number"]) - 1
        trial = trial.props
        # See https://gin.g-node.org/USZ_NCH/Human_MTL_units_scalp_EEG_and_iEEG_verbal_WM/issues/2#issuecomment-3729
        data.append(str(trial["Response"].values[0])[1] == 1)
        time = trial["Response time"].values[0] - offset + trial_number * duration
        timestamps.append(time)
    time_series = TimeSeries(
        name="response",
        data=data,
        timestamps=timestamps,
        description='The participant\'s answer to the question "Was the letter at hand present in the encoding set?"',
        unit="n/a",  # Might as well use https://github.com/rly/ndx-events, but it's not built-in...
        continuity="instantaneous",
    )

    behavioral_events = BehavioralEvents(
        name=f"BehavioralEvents.response", time_series=time_series
    )

    behavior_module.add(behavioral_events)

In [33]:
_EVENT_RE = re.compile(r"Event_([a-zA-Z]+)_.*Trial_(\d\d)_.*")

In [34]:
def write_waveforms(ctx: SessionContext, ieeg_electrode_group: ElectrodeGroup):
    nwb = ctx.nwb
    session = _get_session_data(ctx)
    waveforms = session.groups["Spike waveforms"].data_arrays
    spike_times = session.groups["Spike times"].data_arrays
    if len(waveforms) == 0:
        assert len(spike_times) == 0
        return

    waveforms = [
        (_WAVEFORM_RE.findall(waveform.name)[0], waveform) for waveform in waveforms
    ]
    waveforms = [
        (int(unit), electrode, channel, values)
        for (unit, electrode, channel), values in waveforms
    ]
    waveforms.sort(key=lambda x: x[0])

    spike_times = [
        (_SPIKE_TIMES_RE.findall(spike_time.name)[0], spike_time[:])
        for spike_time in spike_times
    ]
    unit_to_trial_to_spike_times = {}
    for (unit, electrode, channel, trial), values in spike_times:
        unit_to_trial_to_spike_times.setdefault(int(unit), {})[trial] = (
            electrode,
            channel,
            values,
        )

    nwb.add_unit_column(
        name="offset",
        description="The offset in seconds of the first waveform voltage relative to the spike event",
    )

    waveform_sampling_interval = session.groups["Spike waveforms"].data_arrays[0].dimensions[1].sampling_interval
    nwb.units.waveform_rate = 1.0 / waveform_sampling_interval
    waveform_offset = session.groups["Spike waveforms"].data_arrays[0].dimensions[1].offset

    for unit, electrode, channel, waveform_voltages in waveforms:
        trial_to_spike_times = unit_to_trial_to_spike_times[unit]

        spike_times_for_trials = []
        for trial, (electrode_, channel_, spike_times) in trial_to_spike_times.items():
            assert electrode == electrode_
            assert channel == channel_
            spike_times_for_trials.append((trial, spike_times))
        spike_times_for_trials.sort(key=lambda x: x[0])
        spike_times_for_trials = [
            spike_times for _, spike_times in spike_times_for_trials
        ]
        spike_times_for_trials = _untrialize_irregular_timestamps(
            spike_times_for_trials, ctx
        )

        electrode_label = f"{electrode}{channel}"
        electrode_index = _get_electrode_index(ctx, electrode_label)

        means = [micro_volt * 1e-6 for micro_volt in waveform_voltages[:][0]]
        sds = [micro_volt * 1e-6 for micro_volt in waveform_voltages[:][1]]

        obs_intervals = _get_obs_intervals(ctx)
        nwb.add_unit(
            id=int(unit),
            electrode_group=ieeg_electrode_group,
            electrodes=[electrode_index],
            waveform_mean=means,
            waveform_sd=sds,
            spike_times=spike_times_for_trials,
            obs_intervals=obs_intervals,
            offset=waveform_offset
        )

In [35]:
def _get_obs_intervals(ctx: SessionContext) -> List[Tuple[float, float]]:
    trials = ctx.nix.sections["Session"].sections["Trial properties"].sections
    observation_duration = _get_data_collection_duration(ctx)
    total_duration = _get_total_trial_duration(ctx)
    return [(i * total_duration, i * total_duration + observation_duration) for i in range(len(trials))]

In [36]:
# Spike_Waveform_Unit_1_uAHL_2
_WAVEFORM_RE = re.compile(r"Spike_Waveform_Unit_(\d+)_u([a-zA-Z]+)_(\d+)")
# Spike_Times_Unit_36_uPHR_1_Trial_16
_SPIKE_TIMES_RE = re.compile(r"Spike_Times_Unit_(\d+)_u([a-zA-Z]+)_(\d+)_Trial_(\d+)")

In [37]:
def _untrialize_irregular_timestamps(
        timestamps: List[List[float]], ctx: SessionContext
) -> List[float]:
    offset = _get_time_offset(ctx)
    duration = _get_total_trial_duration(ctx)
    untrialized = []
    for trial, times in enumerate(timestamps):
        times = [time - offset + trial * duration for time in times]
        untrialized.extend(times)
    return untrialized

In [38]:
def _get_electrode_index(ctx: SessionContext, electrode: str) -> int:
    nwb = ctx.nwb
    index = nwb.electrodes["label"][:].index(electrode)
    return nwb.electrodes["id"][index]

In [39]:
def _get_data_collection_duration(ctx: SessionContext) -> float:
    return ctx.nix.sections["Session"].props["Trial duration"].values[0]

In [40]:
def _get_uncollected_duration(_ctx: SessionContext) -> float:
    # Arbitrarily picked since the highest response time seems to be around 9.2 sec
    # Since 2 sec of the response time are still collected, this leaves us with a total response time of 10 sec
    return 8.0

In [41]:
def _get_total_trial_duration(ctx: SessionContext) -> float:
    data_collection_duration = _get_data_collection_duration(ctx)
    participant_response_time = _get_uncollected_duration(ctx)
    return data_collection_duration + participant_response_time

In [42]:
def _get_time_offset(ctx: SessionContext) -> float:
    return -abs(
        _get_session_data(ctx)
        .groups["Scalp EEG data"]
        .data_arrays[0]
        .dimensions[1]
        .offset
    )

In [61]:
def write_lfp(ctx: SessionContext):
    nwb = ctx.nwb

    indexed_labels = {}
    indices = []
    for i, label in enumerate(_get_ieeg_electrode_labels(ctx)):
        electrode = label[:-1]
        if electrode in indexed_labels or electrode not in micros:
            continue
        indexed_labels[electrode] = None
        indices.append(i)

    min_index = _get_eeg_electrode_count(ctx)
    channels = 8
    ieeg_electrode_indices = [range(i + min_index, i + min_index + channels) for i in indices]
    ieeg_electrode_indices = [item for sublist in ieeg_electrode_indices for item in sublist]
    ieeg_table_region = nwb.create_electrode_table_region(
        region=ieeg_electrode_indices,
        description="ieeg electrodes",
    )

    keys = list(indexed_labels.keys())
    data = micros[keys[0]].trials
    data = np.array(data)
    for electrode in keys[1:]:
        data = np.concatenate((data, micros[electrode].trials), axis=0)
    data = data.transpose()

    times = micros[keys[0]].times
    times = np.array(times)
    for electrode in keys[1:]:
        times = np.concatenate((times, micros[electrode].trials), axis=0)
    times = times.transpose()

    print("data shape", data.shape)
    print("times shape", times.shape)

    compressed_data = H5DataIO(
        data=data,
        compression="gzip",
    )
    electrical_series = ElectricalSeries(
        name="ecephys.lfp",
        description="iEEG data",
        data=compressed_data,
        electrodes=ieeg_table_region,
        timestamps=times,
    )
    lfp = LFP(electrical_series)
    ecephys_module = nwb.create_processing_module(
        name="ecephys", description="processed extracellular electrophysiology data"
    )
    ecephys_module.add(lfp)

In [51]:
def read_matlab(ctx: SessionContext):
    global micros
    if len(micros) > 0:
        return
    micro_files = _find_micro_data_files(ctx)
    files = micro_files[ctx.subject]
    for electrode, file in files.items():
        file = h5py.File(file, 'r')
        trials = get_matlab_matrix(file, "trial").transpose()
        times = get_matlab_matrix(file, "time").transpose()
        micros[electrode] = Micro(
            trials=trials,
            times=times
        )
    assert len(micros) > 0

In [52]:
@dataclass(frozen=True)
class Micro:
    trials: np.ndarray
    times: np.ndarray


micros = {}

In [46]:
def _find_micro_data_files(ctx: SessionContext) -> Dict[int, Dict[str, PathLike]]:
    dir = get_micro_dir(ctx)
    micro_files = {}
    for file in listdir(dir):
        match = MATLAB_RE.match(file)
        if match:
            subject, _electrode_index, electrode = match.groups()
            subject = int(subject)
            if subject not in CORRECTED_PATIENT:
                continue
            subject = CORRECTED_PATIENT[subject]
            if subject not in micro_files:
                micro_files[subject] = {}
            micro_files[subject][electrode] = join(dir, file)
    assert len(micro_files) > 0
    return micro_files


In [47]:
CORRECTED_PATIENT = {
    28: 1,
    22: 2,
    19: 3,
    30: 4,
    33: 5,
    13: 6,
    23: 7,
    29: 8,
    16: 9,
}

In [48]:
# Micro_Data_Patient_04_Electrode_01_uAR
MATLAB_RE = re.compile(r"Micro_Data_Patient_(\d+)_Electrode_(\d+)_u([A-Z]+).mat")

Main

In [62]:
if __name__ == "__main__":
    context = convert_nix_to_nwb(1, 3)
    write_nwb(context)

ValueError: all the input array dimensions except for the concatenation axis must match exactly, but along dimension 0, the array at index 0 has size 1 and the array at index 1 has size 8

In [None]:
if __name__ == "__main__":
    project = "Human_MTL_units_scalp_EEG_and_iEEG_verbal_WM"
    for subject, sessions in find_nix_files(project).items():
        for session, _ in sessions.items():
            print(f"Converting subject {subject} session {session}")
            try:
                context = convert_nix_to_nwb(subject, session)
                write_nwb(context)
                print("Done")
            except Exception as e:
                print(f"Failed to convert {subject} {session}")
                print(e)
    print("Everything done!")