In [516]:
"""docstring"""

import numpy as np
import pandas as pd

import os
from datetime import datetime, date
from pathlib import Path
from xml.dom import minidom

import aind_data_schema.core.data_description as data_description
import aind_data_schema.components.devices as devices
import aind_data_schema.components.coordinates as coordinates
import aind_data_schema.core.instrument as instrument
import aind_data_schema.core.acquisition as acquisition
import aind_data_schema.components.connections as connection
import aind_data_schema.components.stimulus as stimulus
from aind_data_schema.components.configs import EphysAssemblyConfig, ProbeConfig, MISModuleConfig, ManipulatorConfig
from aind_data_schema.components.identifiers import Person
from aind_data_schema.components.measurements import Calibration
from aind_data_schema_models.brain_atlas import CCFv3


from aind_data_schema_models.organizations import Organization
from aind_data_schema_models.harp_types import HarpDeviceType
from aind_data_schema_models.modalities import Modality
from aind_data_schema_models.stimulus_modality import StimulusModality


import aind_data_schema.components.stimulus as stimulus



from metadata_utils import *


In [517]:
print(os.listdir(os.path.abspath("D:/")))

['$RECYCLE.BIN', '787622_2025-06-10_13-04-06', '794591_2025-10-28_11-14-04', '794591_2025-10-29_11-27-44', '794591_2025-10-30_11-28-59', '794591_2025-10-31_10-44-02', '794591_2025-11-04_12-18-50', '794591_2025-11-04_12-22-52', 'System Volume Information']


In [518]:
rig_constants = pd.read_csv('ephys_rig_constants_rig5_v2.csv')

In [519]:
rig_constants["Basestation Manufacturer"][0]

'IMEC'

In [520]:
Organization.from_abbreviation(rig_constants["Basestation Manufacturer"][0])


Organization.from_abbreviation(rig_constants["Camera Manufacturer"][0]),
Organization.from_name(rig_constants["Manipulator Manufacturer"][0]),

Organization.from_abbreviation(rig_constants["Mouse Platform Manufacturer"][0]),
Organization.from_name(rig_constants["Monitor Manufacturer"][0]),
Organization.from_name(rig_constants["NIPC Manufacturer"][0]),
Organization.from_name(rig_constants["NIPX Manufacturer"][0]),


(_National_Instruments(name='National Instruments', abbreviation=None, registry=<Registry.ROR: 'Research Organization Registry (ROR)'>, registry_identifier='026exqw73'),)

In [None]:
# OBJECT CONSTRUCTION

def build_instrument_object(
    constants_file,
    targets_insertions_file,
    insertion_numbers_list,
):
    """
    Returns an ephys_rig from given data.
    
    Parameters
    ----------
    constants_file : str
                        csv filename
    targets_insertions_file : str
                        excel filename containing targets on first sheet and insertions on second
    insertion_numbers_list : list of str or int
                        from setup no. column of csv
    
    Returns
    -------
    EphysRig object from aind_data_schema
    """

    rig_constants = csv_opener(constants_file)
    insertion_tracking = xl_opener(targets_insertions_file)[1]
    target_tracking = xl_opener(targets_insertions_file)[0]

    insertions = []
    targets = []
    for i in insertion_numbers_list:
        targets.append(get_insertion_details(target_tracking, i))
        insertions.append(get_insertion_details(insertion_tracking, i))

    config_number = None
    if pd.isna(insertions[0]["Config Number"]):
        config_number = 1
    else:
        config_number = int(insertions[0]["Config Number"])

    my_xml = xml_opener(targets)

    port_dict = {}
    port_sources = {}
    for i in range(len(my_xml["recording"].getElementsByTagName("NP_PROBE"))):
        port_dict[i] = devices.ProbePort(
            index=xml_attribute(my_xml["recording"], "NP_PROBE", i, "port"),
            probes=[
                xml_attribute(
                    my_xml["recording"], "NP_PROBE", i, "custom_probe_name"
                )
            ],
        )
        port_sources[i] = ("recording", i)
    
    if my_xml["surface_finding"] is not None:
        for i in range(len(my_xml["surface_finding"].getElementsByTagName("NP_PROBE"))):
            if xml_attribute(my_xml["surface_finding"], "NP_PROBE", i, "custom_probe_name") not in [k.probes[0] for k in port_dict.values()]:
                ind = int(np.max(list(port_dict.keys()))) + 1
                port_dict[ind] = instrument.ProbePort(
                    index=xml_attribute(my_xml["surface_finding"], "NP_PROBE", i, "port"),
                    probes=[
                        xml_attribute(
                            my_xml["surface_finding"], "NP_PROBE", i, "custom_probe_name"
                        )
                    ],
                )
                port_sources[ind] = ("surface_finding", i)

    basestation = devices.NeuropixelsBasestation(
        name=rig_constants["Basestation Name"][0],
        basestation_firmware_version=xml_attribute(
            my_xml["recording"], "NP_PROBE", 0, "bs_firmware_version"
        ),
        bsc_firmware_version=xml_attribute(
            my_xml["recording"], "NP_PROBE", 0, "bsc_firmware_version"
        ),
        slot=int(xml_attribute(my_xml["recording"], "BASESTATION", 0, "Slot")),
        ports=[i for i in port_dict.values()],
        manufacturer=Organization.from_abbreviation(rig_constants["Basestation Manufacturer"][0]),
        model=xml_attribute(
            my_xml["recording"], "NP_PROBE", 0, "bsc_part_number"
        ),
        serial_number=str(
            int(
                xml_attribute(
                    my_xml["recording"], "NP_PROBE", 0, "bsc_serial_number"
                )
            )
        ),
    )

    nipx = devices.DAQDevice(
        data_interface=rig_constants["NIPX Data Interface"][0],
        name=rig_constants["NIPX Name"][0],
        serial_number=rig_constants["NIPX Serial Number"][0],
        manufacturer=Organization.from_name(rig_constants["NIPX Manufacturer"][0]),
        model=rig_constants["NIPX Model"][0]
    )

    nipc = devices.DAQDevice(
        data_interface=rig_constants["NIPC Data Interface"][0],
        name=rig_constants["NIPC Name"][0],
        serial_number=rig_constants["NIPC Serial Number"][0],
        manufacturer=Organization.from_name(rig_constants["NIPC Manufacturer"][0]),
        model=rig_constants["NIPC Model"][0]
    )

    camera_dict = {}
    for i in range(len(rig_constants["Camera Assembly Name"].values())):
        if not pd.isna(rig_constants["Camera Assembly Name"][i]):
            cam_id = str(rig_constants["Camera Assembly Name"][i])
            camera_dict[cam_id] = devices.Camera(
                serial_number=str(rig_constants["Camera Serial Number"][i]),
                model=rig_constants["Camera Model"][0],
                notes=rig_constants["Camera Notes"][0],
                name=str(int(rig_constants["Camera Name"][i])),
                data_interface=rig_constants["Camera Data Interface"][0],
                manufacturer=Organization.from_abbreviation(rig_constants["Camera Manufacturer"][0]),
                size_unit=rig_constants["Camera Size Unit"][0],
                sensor_format=rig_constants["Camera Format Unit"][0],
                sensor_format_unit="Inches",
                chroma=rig_constants["Camera Chroma"][0],
                recording_software=devices.Software(
                    name=rig_constants["Camera Recording Software"][0],
                    version=rig_constants["Camera Recording Software Version"][
                        0
                    ],
                ),
            )

    stick_lens = devices.Lens(
        manufacturer=Organization.INFINITY_PHOTO_OPTICAL,
        name="Stick Lens"
        )

    microscope_assembly_dict = {}
    for i in camera_dict.keys():
        microscope_assembly_dict[i] = devices.CameraAssembly(
            name=str(int(float(i))),
            target=devices.CameraTarget.BRAIN,
            relative_position=[devices.AnatomicalRelative.SUPERIOR],
            camera=camera_dict[i],
            lens=stick_lens,
        )

    probe_dict = {}
    for i in port_dict.keys():
        prb_id = str(port_dict[i].probes[0])
        assert (
            xml_attribute(
                my_xml[port_sources[i][0]], "NP_PROBE", port_sources[i][1], "probe_name"
            )
            == "Neuropixels 2.0 - Single Shank"
        ), "Probe model from settings XML is either incorrect or in the wrong format for AIND JSON schema"
        probe_dict[prb_id] = devices.EphysProbe(
            name=prb_id,
            serial_number=xml_attribute(
                my_xml[port_sources[i][0]], "NP_PROBE", port_sources[i][1], "probe_serial_number"
            ),
            manufacturer=Organization.from_abbreviation(rig_constants["Probe Manufacturer"][0]),
            probe_model="Neuropixels 2.0 (Single Shank)",
            headstage=devices.Device(
                name='Headstage ' + prb_id,
                model=xml_attribute(
                    my_xml[port_sources[i][0]],
                    "NP_PROBE",
                    port_sources[i][1],
                    "headstage_part_number",
                ),
                serial_number=xml_attribute(
                    my_xml[port_sources[i][0]],
                    "NP_PROBE",
                    port_sources[i][1],
                    "headstage_serial_number",
                ),
            ),
        )

    ephys_assembly_dict = {}
    for i in range(len(probe_dict.keys())):
        ephys_assembly_dict[
            xml_attribute(
                my_xml[port_sources[i][0]], "NP_PROBE", port_sources[i][1], "custom_probe_name"
            )
        ] = devices.EphysAssembly(
            name=xml_attribute(
                my_xml[port_sources[i][0]], "NP_PROBE", port_sources[i][1], "custom_probe_name"
            ),
            probes=[k for (j, k) in probe_dict.items() if j == xml_attribute(
                    my_xml[port_sources[i][0]], "NP_PROBE", port_sources[i][1], "custom_probe_name"
                )],
            manipulator=devices.Manipulator(
                name=xml_attribute(
                    my_xml[port_sources[i][0]], "NP_PROBE", port_sources[i][1], "custom_probe_name"
                ),
                serial_number=xml_attribute(
                    my_xml[port_sources[i][0]], "NP_PROBE", port_sources[i][1], "custom_probe_name"
                ),
                model=rig_constants["Manipulator Model"][0],
                manufacturer=Organization.NEW_SCALE_TECHNOLOGIES,
            ),
        )

    white_rabbit = devices.HarpDevice(name="White Rabbit", is_clock_generator=True, harp_device_type=HarpDeviceType.WHITERABBIT)

    # basestation sync --> NI PXI digital input 0
    basestation_sync_connection = connection.Connection(
        source_device = rig_constants["Basestation Name"][0],
        source_port = rig_constants["BSChnl Channel Name"][0],
        target_device = rig_constants["BSChnl Device Name"][0],
        target_port = str(rig_constants["BSChnl Port"][0])
    )

    # white rabbit --> NI PXI digital input
    harp_sync_connection = connection.Connection(
        source_device = white_rabbit.name,
        source_port ="harp_clock",
        target_device = nipx.name,
        target_port = "6"
    )

    #INSTRUMENT_ID = ()
    #                + date_converter_no_dashes(insertions[0]["Date"])
    #                + "_0"
    #                + str(config_number))

    components = [basestation, nipx, nipc, white_rabbit]
    components.extend(list(ephys_assembly_dict.values()))
    components.extend(list(microscope_assembly_dict.values()))

    #print(insertions[0]["Date"])
    #date_converter_no_dashes(insertions[0]["Date"])

    return instrument.Instrument(
        instrument_id = str(rig_constants["Rig ID Prefix"][0]),
        modification_date = datetime.strptime(date_converter(insertions[0]["Date"]), '%Y-%m-%d'),
        components = components, 
        modalities = [Modality.ECEPHYS],
        coordinate_system = coordinates.CoordinateSystemLibrary.BREGMA_ARI,
        connections = [basestation_sync_connection, harp_sync_connection]
    )

In [None]:
def build_ephys_acquisition_object(
    constants_file,
    targets_insertions_file,
    insertion_numbers_list,
):
    """
    Returns an ephys_session from given data.
    
    Parameters
    ----------
    constants_file : str
                        csv filename
    targets_insertions_file : str
                        excel filename containing targets on first sheet and insertions on second
    insertion_numbers_list : list of strings or ints
                        each from setup no. column of csv
    
    Returns
    -------
    EphysSession object from aind_data_schema
    """

    # CSV opening and basic organization

    rig_constants = csv_opener(constants_file)

    target_tracking = xl_opener(targets_insertions_file)[0]
    insertion_tracking = xl_opener(targets_insertions_file)[1]
    targets = []
    insertions = []
    for i in insertion_numbers_list:
        targets.append(get_insertion_details(target_tracking, i))
        insertions.append(get_insertion_details(insertion_tracking, i))

    # Setting of constants 

    subject_id = str(int(targets[0]["Mouse"]))
    date = insertions[0]["Date"]
    session_number = (
        None  # 1 means it's the first session of the day, 2 means second, etc.
    )
    if pd.isna(insertions[0]["Config Number"]):
        session_number = 1
    else:
        session_number = int(insertions[0]["Config Number"])

    ### XML opening

    xml_dict = xml_opener(targets)

    num_of_recordings_this_session = 0
    for i in xml_dict.keys():
        if xml_dict[i] is not None:
            num_of_recordings_this_session += 1
    
    # Saving the ecephys folder name as a variable to save time later on
    ephys_recording_node_folder = None
    potential_folder_names = set()
    for i in range(len(targets)):
        potential_folder_names.add(str(targets[i]["OEPh Recording"]))
    options = os.listdir(os.path.abspath("D:/"))
    for j in potential_folder_names:
        for k in options:
            if j in k:
                ephys_recording_node_folder = (
                    "D:/" + str(k) + "/Record Node 101"
                )

    surface_find_node_folder = None
    potential_folder_names = set()
    for i in range(len(targets)):
        potential_folder_names.add(str(targets[i]["OEPh Positioning"]))
    options = os.listdir(os.path.abspath("D:/"))
    for j in potential_folder_names:
        for k in options:
            if j in k:
                surface_find_node_folder = (
                    "D:/" + str(k) + "/Record Node 101"
                )

    # Building the fields

    # TODO -- figure out microscope config
    #microscope_dict = {}
    #for i in range(len(rig_constants["Camera Assembly Name"].values())):
    #    if not pd.isna(rig_constants["Camera Assembly Name"][i]):
    #        cam_id = str(rig_constants["Camera Assembly Name"][i])
    #        microscope_dict[cam_id] = aises.DomeModule(
    #            assembly_name=cam_id,
    #            arc_angle=rig_constants["Camera Arc Angle"][0],
    #            module_angle=rig_constants["Camera Module Angle"][0],
    #            rotation_angle=None, 
    #            angle_unit=rig_constants["Camera Angle Unit"][0],
    #            notes=rig_constants["Camera Notes"][0],
    #        )

    basestation = rig_constants["Basestation Name"][0]

    # Streams
    stream_dict = {}
    calibrations = [ ]
    for iStream in xml_dict.keys():
        if xml_dict[iStream] is not None:
            # See which inserts for this stream succeeded, set key as tuple:
            # (probe number, target, index number in targets/insertions list)
            successes = {}
            for iInsertionNumber in range(len(insertions)):
                if not pd.isna(insertions[iInsertionNumber]["Manipulator X"]):
                    successes[
                        (
                            str(
                                int(
                                    targets[iInsertionNumber][
                                        "Manip controller SN"
                                    ]
                                )
                            ),
                            targets[iInsertionNumber]["Target"],
                            iInsertionNumber,
                        )
                    ] = None
            # Link successful insertions from spreadsheets to OpenEphys probes
            xml_successes = {}
            for iProbe in range(
                len(xml_dict[iStream].getElementsByTagName("NP_PROBE"))
            ):
                probe_name = xml_attribute(
                    xml_dict[iStream], "NP_PROBE", iProbe, "custom_probe_name"
                )
                for iSuccesses in successes.keys():
                    if iSuccesses[0] == probe_name:
                        xml_successes[
                            iSuccesses
                        ] = iProbe  
                        # The value associated w/ the successes key is the index into xml_attribute
            
            # Ephys modules for each stream
            module_dict = {}
            for iInsertion in xml_successes.keys():

                probe_name = iInsertion[0]

                # TODO -- fix calibrations
                #calibrations.extend(Calibration(
                #    calibration_date = #
                #    description = #
                #    input = #
                #    output = #
                #    targets[iInsertion[2]]["Manipulator Calibration"]))
                
                module_dict[probe_name] = EphysAssemblyConfig(

                    device_name = "Ephys Assembly " + probe_name,

                    manipulator = ManipulatorConfig(
                        device_name = "Manipulator " + probe_name, 
                        coordinate_system=coordinates.CoordinateSystemLibrary.MPM_MANIP_RFB,
                        local_axis_positions = coordinates.Translation(
                                translation=[insertions[iInsertion[2]]["Manipulator X"],
                                insertions[iInsertion[2]]["Manipulator Y"],
                                insertions[iInsertion[2]]["Final Z"]]
                        )
                    ),

                    probes = [ProbeConfig(
                        device_name = probe_name,
                        primary_targeted_structure = CCFv3.by_acronym(iInsertion[1]),
                        # atlas_coordinate=[
                        #     coordinates.CcfCoords(
                        #         ccf_version=rig_constants["CCF Version"][0],
                        #         unit=rig_constants["CCF Unit"][0],
                        #         ml=targets[iInsertion[2]]["CCF ML (um)"],
                        #         ap=targets[iInsertion[2]]["CCF AP (um)"],
                        #         dv=targets[iInsertion[2]]["CCF DV (um)"],
                        #     )
                        # ],
                        coordinate_system=coordinates.CoordinateSystemLibrary.BREGMA_ARI,
                        transform = [coordinates.Translation(
                            translation=[0,0,0],
                        )],
                        notes=insertions[iInsertion[2]]["comments"]
                    )],

                    modules = [MISModuleConfig(
                        arc_angle = targets[iInsertion[2]]["Arc AP"],
                        module_angle = targets[iInsertion[2]]["Arc ML"],
                        angle_unit = rig_constants["Ephys Module Angle Unit"][0]
                    )]
                )

            if iStream == "recording":
                stream_ts = np.lib.format.open_memmap(
                    str(ephys_recording_node_folder)
                    + "/experiment1/recording1/continuous/Neuropix-PXI-100." + str(list(xml_successes.keys())[0][0]) + "/timestamps.npy",
                    mode="r",
                )
                xml_date = (
                    xml_dict[iStream].getElementsByTagName("DATE")[0]
                ).firstChild.nodeValue
                if xml_date[-8:][0] == " ":
                    xml_time = "0" + xml_date[-7:]
                else:
                    xml_time = xml_date[-8:]
                print(date)
                print(str(date_converter(date))
                    + "T"
                    + str(xml_time)
                    + ".000Z")
                
                print(str(date_converter(date))
                    + "T"
                    + str(
                        add_seconds_to_clock(
                            int(stream_ts[-1] - stream_ts[0]), xml_time
                        )
                    )
                    + ".000Z",)
                stream_dict[iStream] = acquisition.DataStream(
                    stream_start_time=str(date_converter(date))
                    + "T"
                    + str(xml_time)
                    + ".000Z",
                    stream_end_time=str(date_converter(date))
                    + "T"
                    + str(
                        add_seconds_to_clock(
                            int(stream_ts[-1] - stream_ts[0]), xml_time
                        )
                    )
                    + ".000Z",
                    active_devices = list(module_dict.keys()),
                    configurations = list(module_dict.values()),
                    modalities=[Modality.ECEPHYS],
                    notes=targets[list(xml_successes.keys())[0][2]][
                        "OEPh Recording"
                    ],
                )

            if iStream == "surface_finding":
                stream_ts = np.lib.format.open_memmap(
                    str(surface_find_node_folder)
                    + "/experiment1/recording1/continuous/Neuropix-PXI-100." + str(list(xml_successes.keys())[0][0]) + "/timestamps.npy",
                    mode="r",
                )
                xml_date = (
                    xml_dict[iStream].getElementsByTagName("DATE")[0]
                ).firstChild.nodeValue
                if xml_date[-8:][0] == " ":
                    xml_time = "0" + xml_date[-7:]
                else:
                    xml_time = xml_date[-8:]
                stream_dict[iStream] = acquisition.DataStream(
                    stream_start_time=str(date_converter(date))
                    + "T"
                    + str(xml_time)
                    + ".000Z",
                    stream_end_time=str(date_converter(date))
                    + "T"
                    + str(
                        add_seconds_to_clock(
                            int(stream_ts[-1] - stream_ts[0]), xml_time
                        )
                    )
                    + ".000Z",
                    active_devices = list(module_dict.keys()),
                    configurations = list(module_dict.values()),
                    modalities=[Modality.ECEPHYS],
                    notes=str(
                        targets[list(xml_successes.keys())[0][2]][
                            "OEPh Positioning"
                        ]
                    )
                    + "; Surface Finding",
                )
                
    return acquisition.Acquisition(
        subject_id=subject_id,
        acquisition_start_time=stream_dict["recording"].stream_start_time,
        acquisition_end_time=stream_dict["recording"].stream_end_time,

        experimenters=[
            rig_constants["Experimenters"][i]
            for i in range(len(rig_constants["Experimenters"]))
            if not pd.isna(rig_constants["Experimenters"][i])
        ],
        
        acquisition_type=rig_constants["Session Type"][0],
        calibrations=calibrations,

        ethics_review_id=[str(int(rig_constants["IACUC Protocol"][0]))],
        instrument_id=str(rig_constants["Rig ID Prefix"][0]),
        data_streams=list(stream_dict.values())
    )

In [529]:
def build_data_description_object(
    constants_file,
    targets_insertions_file,
    insertion_number,
):
    
    """
    Returns a data_description object from given data.
    
    Parameters
    ----------
    constants_file : str
                        csv filename
    targets_insertions_file : str
                        excel filename containing targets on first sheet and insertions on second
    insertion_number : str or int
                        from setup no. column of csv
    
    Returns
    -------
    DataDescription object from aind_data_schema
    """

    constants = csv_opener(constants_file)

    target_tracking = xl_opener(targets_insertions_file)[0]
    target = get_insertion_details(target_tracking, insertion_number)

    insertion_tracking = xl_opener(targets_insertions_file)[1]
    insertion = get_insertion_details(insertion_tracking, insertion_number)

    return data_description.DataDescription(
        modalities=[Modality.ECEPHYS, Modality.BEHAVIOR],
        license=constants["License"][0],
        subject_id=str(target["Mouse"]),
        creation_time=str(date_converter(insertion["Date"])) + "T" + str((target["OEPh Recording"][-8:]).replace("-", ":")),
        institution=Organization.AIND,
        investigators=[
            Person(name=constants["Experimenters"][i])
            for i in constants["Experimenters"].keys()
            if not pd.isna(constants["Experimenters"][i])
        ],
        funding_source=[data_description.Funding(funder=Organization.AI)],
        data_level=constants["Data Level"][0],
        group=constants["Data Group"][0],
        project_name=constants["Project Name"][0],
        data_summary=constants["Data Summary"][0],
    )

In [530]:
# MASTER WRITE

def write_all_jsons(
    constants_file,
    targets_insertions_file,
    jobs_list,
    save_folder
):

    """
    For each job, writes three JSON files to directory
    
    Parameters
    ----------
    constants_file : str
                        csv filename
    targets_insertions_file : str
                        excel filename containing targets on first sheet and insertions on second
    jobs_list : list of lists of strings or ints
                        from setup no. column of csv
    save_folder : full path string, no slash at the end
                        should lead to an existing folder where new folder and subfolders will be made
    
    Returns
    -------
    None

    Notes
    -----
    Within specified folder, creates a "Metadata JSONs" folder containing folders named after each job/session, each with
    a session, rig, and data description file
    """
    json_folder = Path(str(save_folder) + "/Metadata JSONs")
    json_folder.mkdir(parents=True, exist_ok=True)

    for job in jobs_list:
        
        rig_x = build_instrument_object(constants_file, targets_insertions_file, job)
        ses_x = build_ephys_acquisition_object(constants_file, targets_insertions_file, job)
        dd_x = build_data_description_object(constants_file, targets_insertions_file, job[0])

        data_folder = dd_x.name[-26:]
        data_folder = Path(str(save_folder) + "/Metadata JSONs/" + str(data_folder) + "/metadata")
        data_folder.mkdir(parents=True, exist_ok=True)

        ses_x.write_standard_file(data_folder)
        rig_x.write_standard_file(data_folder)
        dd_x.write_standard_file(data_folder)
    
    print("hooray")
    

In [531]:
rig_constants_csv = "ephys_rig_constants_rig5_v2.csv"
tracking_spreadsheet_path = r"C:\Users\svc_aind_ephys\Desktop\InsertionTracking_checkpoint_2025-10-27.xlsx"
insertion_numbers = [[685]]
output_path = r'C:\Users\svc_aind_ephys\Documents\metadata_test' #"E:/"

In [532]:
write_all_jsons(rig_constants_csv, tracking_spreadsheet_path, insertion_numbers, output_path)

10/30/2025
2025-10-30T11:28:59.000Z
2025-10-30T12:21:13.000Z
hooray


**ADD-ONS**

These can be placed in the write all jsons function to modify the objects before they are saved

In [None]:
# gaf = aistim.VisualStimulation(
# stimulus_type = "VisualStimulation",
# stimulus_name="Gratings and Flashes",
# stimulus_software="Bonsai",
# stimulus_software_version="2.7",
# stimulus_script="GratingAndFlashes",
# stimulus_script_version="Added Block structure (git commit)"
# )
# laser = aistim.OptoStimulation(
# stimulus_type = "OptoStimulation",
# stimulus_name = "Laser Stimulation",
# pulse_shape = aistim.PulseShape.RAMP,
# pulse_frequency = 0.4,
# number_pulse_trains = 1,
# pulse_width = 500,
# pulse_train_duration = 0,
# fixed_pulse_train_interval = True,
# baseline_duration = 0,
# notes = "50 ms linear ramp up and down; Did not measure baseline duration"
# )
# vis_epoch = aistim.StimulusEpoch(
# stimulus=gaf,
# stimulus_start_time="00:00:00",
# stimulus_end_time="00:10:50",
# )
# opt_epoch = aistim.StimulusEpoch(
# stimulus=laser,
# stimulus_start_time="00:00:00",
# stimulus_end_time="00:10:50",
# )

# blue_laser = aidev.Laser(
# manufacturer = "QUANTIFI",
# wavelength = 450
# )

# laser_assembly = airig.LaserAssembly(
# laser_assembly_name = "Opto Stimulation Laser",
# manipulator = airig.Manipulator(serial_number = "45879", manufacturer="NEW_SCALE_TECHNOLOGIES"),
# lasers = [blue_laser]
# )

In [None]:
# modifiable = build_ephys_session_object("ephys_rig_constants.csv", "C:/Users/svc_aind_ephys/OneDrive - Allen Institute/Documents/MRI/InsertionTracking.xlsx", [153, 154, 155])
# gaf = aistim.VisualStimulation(
#         stimulus_type = "VisualStimulation",
#         stimulus_name="Gratings and Flashes",
#         stimulus_software="Bonsai",
#         stimulus_software_version="2.7",
#         stimulus_script="GratingAndFlashes",
#         stimulus_script_version="Added Block structure (git commit)"
#     )
# laser = aistim.OptoStimulation(
#     stimulus_type = "OptoStimulation",
#     stimulus_name = "Laser Stimulation",
#     pulse_shape = aistim.PulseShape.RAMP,
#     pulse_frequency = 0.4,
#     number_pulse_trains = 1,
#     pulse_width = 500,
#     pulse_train_duration = 649.6405,
#     fixed_pulse_train_interval = True,
#     baseline_duration = 0,
#     notes = "Did not measure baseline duration"
# )
# vis_epoch = aistim.StimulusEpoch(
#     stimulus=gaf,
#     stimulus_start_time="00:00:00",
#     stimulus_end_time="00:10:50",
# )
# opt_epoch = aistim.StimulusEpoch(
#     stimulus=laser,
#     stimulus_start_time="00:00:00",
#     stimulus_end_time="00:10:50",
# )
# modifiable.data_streams[0].stimulus_epochs.append(vis_epoch)
# print(modifiable.data_streams[0].stimulus_epochs)

In [None]:
# ses_x.data_streams[0].stimulus_epochs.append(opt_epoch)
# ses_x.data_streams[0].stimulus_epochs.append(vis_epoch)
# ses_x.data_streams[0].stimulus_epochs.append(opt_epoch)
# ses_x.data_streams[0].stimulus_epochs.append(vis_epoch)
# ses_x.data_streams[0].stimulus_epochs.append(opt_epoch)
# ses_x.data_streams[0].stimulus_epochs.append(vis_epoch)
# ses_x.data_streams[0].stimulus_epochs.append(opt_epoch)
# ses_x.data_streams[0].stimulus_epochs.append(vis_epoch)
# ses_x.data_streams[0].stimulus_epochs.append(opt_epoch)
# ses_x.data_streams[0].stimulus_epochs.append(vis_epoch)
# ses_x.data_streams[0].stimulus_epochs.append(opt_epoch)

# rig_x.laser_assemblies = [laser_assembly]

In [None]:
# jobs_list_list = [[60, 61, 62], [63, 64, 65], [66, 67], [68, 69, 70], [71, 72, 73], [74, 75, 76], [77, 78, 79], [94, 95, 96], [104, 105, 106], [110, 111, 112], [113, 114, 115], [116, 117, 118], [119, 120, 121], [122, 123, 124], [125, 126, 127, 128], [129, 130, 131]]

# for iJob in jobs_list_list:
#     write_all_jsons("ephys_rig_constants.csv", "TargetData.csv", "InsertionData.csv", [iJob], "/results")