# Mease lab - CED Converter

In [None]:
from mease_lab_to_nwb import CEDNWBConverter
from pynwb import NWBHDF5IO
from nwbwidgets import nwb2widget
from pathlib import Path

In [None]:
# Source data
base_path = Path(r"/mnt/sds/PainData/Corrected_Channel_Map/L6/Cortex/16.12.20")

file_recording = str(base_path / "Dual_S1_VPL_Freq_10mW.smrx")
# file_recording = str(base_path / '10minsSpike2Recordingm365/m365_pt1_590-1190secs.smrx')
# file_recording = str(base_path / 'CED_example_data/short_example/M113_C4.smrx')

source_data = dict(
    CEDRecording=dict(file_path=file_recording, smrx_channel_ids=[73]),
    CEDStimulus=dict(file_path=file_recording),
)

In [None]:
print(source_data["CEDRecording"])

In [None]:
print(source_data["CEDStimulus"])

In [None]:
# Initialize converter
converter = CEDNWBConverter(source_data=source_data)

In [None]:
print(len(source_data["CEDRecording"]))

In [None]:
print(source_data["CEDStimulus"])

In [None]:
# Get metadata from source data and modify any values you want
metadata = converter.get_metadata()
metadata["NWBFile"]["session_description"] = "my example conversion"
metadata["NWBFile"]["session_start_time"] = "2020-12-08T16:30:00"

# Get conversion options and modify any values you want
conversion_options = converter.get_conversion_options()
conversion_options["CEDRecording"] = dict(stub_test=False)

# OPTIONAL - Validate source_data, metadata and conversion_options
converter.validate_source(source_data)
converter.validate_metadata(metadata)
converter.validate_conversion_options(conversion_options)

# Run conversion
output_file = str("liam_test.nwb")

converter.run_conversion(
    metadata=metadata,
    nwbfile_path=output_file,
    save_to_file=True,
    overwrite=True,
    conversion_options=conversion_options,
)

## Check NWB file with widgets

In [None]:
io = NWBHDF5IO(output_file, "r")
nwbfile = io.read()
nwb2widget(nwbfile)

In [None]:
print(nwbfile)

In [None]:
from matplotlib import pyplot as plt
import matplotlib.colors as mcolors

In [None]:
nwbfile.stimulus["10mW Laser"]

In [None]:
fig, ax = plt.subplots(figsize=(20, 10))
plt.plot(
    np.arange(
        nwbfile.stimulus["10mW Laser"].starting_time,
        len(nwbfile.stimulus["10mW Laser"].data) / nwbfile.stimulus["10mW Laser"].rate,
        1.0 / nwbfile.stimulus["10mW Laser"].rate,
    )[0:2000000],
    nwbfile.stimulus["10mW Laser"].data[:2000000],
)
plt.show()

In [None]:
import numpy as np

In [None]:
# get pulse start time, length & interval to next pulse start time for each pulse
dts = np.diff(ts)
times = ts[0::2]
pulses = dts[0::2]
intervals = ts[2::2] - ts[0:-2:2]

In [None]:
# enumerate possible intervals (rounding to nearest 1/10 of smallest interval)
interval_res = 0.1 * np.min(intervals)
unique_intervals, unique_interval_counts = np.unique(
    np.round(intervals / interval_res) * interval_res, return_counts=True
)
unique_intervals, unique_interval_counts

In [None]:
# enumerate possible intervals (rounding to nearest 1/10 of smallest pulse)
pulse_res = 0.1 * np.min(pulses)
unique_pulses, unique_pulse_counts = np.unique(
    np.round(pulses / pulse_res) * pulse_res, return_counts=True
)
unique_pulses, unique_pulse_counts

In [None]:
times[:10], pulses[:10], intervals[:10]

In [None]:
# approximate inequality operator: returns true if they differ by at least ~5%
def differ(a, b):
    return not np.isclose(a, b, rtol=0.05)

In [None]:
# convert measured interval to nearest unique_interval and return as string
def to_hz_str(interval, unique_intervals):
    return (
        f"{1.0/unique_intervals[(np.abs(unique_intervals - interval)).argmin()]:.3g} Hz"
    )

In [None]:
# collect pulses into contiguous chunks of the same frequency

# note: interval of zero is a continuous laser pulse
conditions = {"Continuous": []}
for interval in unique_intervals:
    conditions[to_hz_str(interval, unique_intervals)] = []

i0 = 0
t0 = 0
n0 = 0
for t, p, i in zip(times, pulses, intervals):
    print(t, p, i)
    if i0 == 0:
        t0 = t
        p0 = p
        i0 = i
        n0 = 0
        print(f"starting at {t0} with pulse {p0}, intervals {i0}")
    elif differ(p, p0) or differ(i, i0):
        if n0 == 1:
            # single pulse -> start/stop times for continuous pulse
            print(f"single pulse -> continuous pulse {t0} -> {p0}")
            conditions["Continuous"].append((t0, p0))
            t0 = t
            p0 = p
            i0 = i
            n0 = 0
            print(f"starting at {t0} with pulse {p0}, intervals {i0}")
        else:
            # multiple pulses -> start/stop times for fixed frequency pulse
            print(f"ending at {t-t0+p} after {n0} pulses with interval {i0}")
            conditions[to_hz_str(i0, unique_intervals)].append((t0, t - t0 + p))
            i0 = 0
    n0 = n0 + 1
# deal with last pulse
# print(f"finishing last pulse")
if i0 == 0:
    # final single pulse -> start/stop times for continuous pulse
    print(f"single pulse -> continuous pulse {times[-1]} -> {pulses[-1]}")
    conditions["Continuous"].append((times[-1], pulses[-1]))
else:
    # multiple pulses -> start/stop times for fixed frequency pulse
    conditions[to_hz_str(i0, unique_intervals)].append((t0, times[-1] - t0 + p0))

# todo: above is not yet robust, end cases dubious, multiple continuous pulses dubious, etc etc

In [None]:
cols = [c for c in mcolors.TABLEAU_COLORS]
h = 1
hoffset = 0
labels = []
fig, (ax1, ax2) = plt.subplots(nrows=2, figsize=(20, 10))
ax2.set_title("Laser Conditions")
for i, (label, chunks) in enumerate(conditions.items()):
    if chunks:
        labels.append(label)
        plt.broken_barh(chunks, (hoffset * i, h), label=labels[-1], color=cols[i + 1])
if hoffset != 0:
    ax2.set_yticks(np.arange(h / 2, len(labels) * h))
    ax2.set_yticklabels(labels)
ax1.plot(
    np.arange(
        nwbfile.stimulus["10mW Laser"].starting_time,
        len(nwbfile.stimulus["10mW Laser"].data) / nwbfile.stimulus["10mW Laser"].rate,
        1.0 / nwbfile.stimulus["10mW Laser"].rate,
    ),
    nwbfile.stimulus["10mW Laser"].data[:],
)
ax1.set_xlim([0, 650])
ax2.set_xlim([0, 650])
plt.legend()
plt.show()