In [2]:
from IPython.display import display
import ipywidgets as ipw
import widgets
import utils
import datetime
import os
import nanonis_importer

In [3]:
CONFIG = utils.read_json("config.json")
CONFIG_ELN = utils.get_aiidalab_eln_config()
# CONFIG_ELN = utils.read_json("eln_config.json")
OPENBIS_SESSION, SESSION_DATA = utils.connect_openbis(CONFIG_ELN["url"], CONFIG_ELN["token"])
SAMPLE_PREPARATION_SAMPLE_TYPES = [object_info["openbis_object_type"] for _, object_info in CONFIG["objects"].items() if object_info["object_type"] == "sample_preparation"]
SLABS_TYPES = [object_info["openbis_object_type"] for _, object_info in CONFIG["objects"].items() if object_info["object_type"] == "slab"]
SAMPLES_COLLECTION_OPENBIS_PATH = CONFIG["samples_collection_openbis_path"]
MEASUREMENT_FILE_EXTENSIONS = CONFIG["measurement_file_extensions"]

experiment_selector = widgets.ExperimentSelectionWidget()
experiment_selector.load_dropdown_box()

sample_selector_config = {
    "dropdown": {"width": "315px"},
    "details": {"width": "589px", "height": "250px"}
}
sample_selector = widgets.ObjectSelectionWidget("Sample", sample_selector_config)
sample_selector.load_dropdown_box("SAMPLE", "sample")

instrument_selector = widgets.ObjectSelectionWidget("Instrument")
instrument_selector.load_dropdown_box("INSTRUMENT", "instrument")

sample_out_name_textbox = utils.Text(
    description = "Name", disabled = False, layout = ipw.Layout(width = '400px'), 
    placeholder = f"Write sample name here...", style = {'description_width': "110px"}
)

increase_buttons_size = utils.HTML(data = ''.join(CONFIG["save_home_buttons_settings"]))
create_button = utils.Button(
    description = '', disabled = False, button_style = '', tooltip = 'Save', 
    icon = 'save', layout = ipw.Layout(width = '100px', height = '50px')
)
quit_button = utils.Button(
    description = '', disabled = False, button_style = '', 
    tooltip = 'Main menu', icon = 'home', layout = ipw.Layout(width = '100px', height = '50px')
)

save_close_buttons_hbox = ipw.HBox([create_button, quit_button])

folder_selector = utils.FileChooser(path = '.', select_default=True, use_dir_icons=True, show_only_dirs = True)

In [None]:
def find_first_object_permid(openbis_object, openbis_type):
    first_object = openbis_object
    object_parents_identifiers = openbis_object.parents
    for parent_identifier in object_parents_identifiers:
        parent_object = OPENBIS_SESSION.get_object(parent_identifier)
        if parent_object.type == openbis_type:
            first_object = find_first_object_permid(parent_object, openbis_type)
    
    return first_object

def load_sample_metadata(change):
    if sample_selector.dropdown.value == -1:
        instrument_selector.dropdown.value = -1
        sample_selector.details_textbox.value = ''
        sample_out_name_textbox.value = ''
        return
    
    sample_object = OPENBIS_SESSION.get_object(sample_selector.dropdown.value)
    sample_parents_metadata = utils.get_openbis_parents_recursive(OPENBIS_SESSION, sample_object, [])
    last_sample_preparation = None
    sample_strings = {"sample_preparations": [], "materials": [], "sample_preparation_datetimes": []}
    number_parents = len(sample_parents_metadata)
    parent_idx = 0
    while parent_idx < number_parents:
        parent_metadata = sample_parents_metadata[parent_idx]
        if parent_metadata[0] == "DEPOSITION":
            deposition_object = OPENBIS_SESSION.get_object(parent_metadata[1])
            
            # Get substance used in the specific deposition
            substance_metadata = []
            for parent_id in deposition_object.parents:
                deposition_parent_object = OPENBIS_SESSION.get_object(parent_id)
                if deposition_parent_object.type == "MOLECULE":
                    deposition_parent_object_metadata = deposition_parent_object.props.all()
                    substance_metadata = [deposition_parent_object_metadata["$name"], deposition_parent_object.permId]
                    
            if substance_metadata: # If deposition does not contain any substance, the app must not try to display it
                sample_metadata_string = f"> {parent_metadata[0]} ({parent_metadata[3]}, {parent_metadata[1]}, {parent_metadata[2]}) [{substance_metadata[0]} ({substance_metadata[1]})]"
            else:
                sample_metadata_string = f"> {parent_metadata[0]} ({parent_metadata[3]}, {parent_metadata[1]}, {parent_metadata[2]})"
            parent_idx += 1
        else:
            sample_metadata_string = f"> {parent_metadata[0]} ({parent_metadata[3]}, {parent_metadata[1]}, {parent_metadata[2]})"
        
        if parent_metadata[0] in SAMPLE_PREPARATION_SAMPLE_TYPES:
            if sample_metadata_string not in sample_strings["sample_preparations"]:
                sample_strings["sample_preparations"].append(sample_metadata_string)
                sample_strings["sample_preparation_datetimes"].append(parent_metadata[2])
            # Get the last sample preparation method performed on the sample in order to search the correct experiment where the sample is being used.
            if last_sample_preparation is None:
                last_sample_preparation = parent_metadata[1]
                
        elif parent_metadata[0] in SLABS_TYPES:
            if sample_metadata_string not in sample_strings["materials"]:
                sample_strings["materials"].append(sample_metadata_string)
            
        parent_idx += 1
    
    # Parse the datetime strings and zip them with sample preparations and materials
    parsed_datetimes = [datetime.datetime.strptime(dt, "%Y-%m-%d %H:%M:%S") for dt in sample_strings["sample_preparation_datetimes"]]
    zipped_lists = list(zip(parsed_datetimes, sample_strings["sample_preparations"]))

    # Sort by the datetime
    sorted_zipped_lists = sorted(zipped_lists, key=lambda x: x[0], reverse = True)

    # Unpack the sorted values back into sample_strings
    if sample_strings["sample_preparations"]:
        _, sample_strings["sample_preparations"] = zip(*sorted_zipped_lists)
    
    sample_metadata_string = (f"PermId: {sample_object.attrs.permId}\nMaterial:\n" +
                            "\n".join(sample_strings["materials"]) + 
                            "\nProcesses:\n" + 
                            "\n".join(sample_strings["sample_preparations"]))

    if last_sample_preparation:
        last_sample_preparation_object = OPENBIS_SESSION.get_object(last_sample_preparation)
        # Automatically select the experiment where the last sample preparation task was saved
        last_sample_preparation_experiment = OPENBIS_SESSION.get_experiment(last_sample_preparation_object.attrs.experiment)
        # Notify the user in case the experiment changed
        if experiment_selector.dropdown.value != last_sample_preparation_experiment.permId and experiment_selector.dropdown.value != -1:
            dropdown_options_dict = {key: val for val, key in experiment_selector.dropdown.options}
            previous_experiment_name = dropdown_options_dict.get(experiment_selector.dropdown.value)
            new_experiment_name = dropdown_options_dict.get(last_sample_preparation_experiment.permId)
            display(utils.Javascript(data = f"alert('{f'Experiment was changed from {previous_experiment_name} to {new_experiment_name}!'}')"))
        experiment_selector.dropdown.value =  last_sample_preparation_experiment.permId
        
        # Automatically select the instrument used in the last sample preparation task
        for parent in last_sample_preparation_object.parents:
            parent_object = OPENBIS_SESSION.get_object(parent)
            if parent_object.type == "INSTRUMENT":
                instrument_selector.dropdown.value = parent_object.permId
    
    sample_selector.details_textbox.value = sample_metadata_string
    sample_out_name = sample_object.props['$name']
    sample_out_name_textbox.value = sample_out_name

def close_notebook(b):
    display(utils.Javascript(data = 'window.location.replace("home.ipynb")'))
    
def create_measurements(folderpath, sample_experiment, sample_object, object):
    paths = utils.full_listdir(folderpath)
    any_readable_measurement = any(f".{filename.split('.')[-1]}" in MEASUREMENT_FILE_EXTENSIONS for filename in paths)
    
    if any_readable_measurement:
        nanonis_importer.upload_measurements_into_openbis(
                SESSION_DATA['url'], folderpath,
                sample_experiment, sample_object.permId,
                object.permId,
                instrument_selector.dropdown.value
        )
    else:
        output_message = f"Folder does not contain any readable measurement file: {', '.join(MEASUREMENT_FILE_EXTENSIONS)}."
        print(output_message)
        
    for path in paths:
        if os.path.isdir(path):
            path_exists_in_openbis = False
            object_children = object.get_children()
            for child in object_children:
                if child.type == "MEASUREMENTS" and child.props["$name"] == path:
                    path_exists_in_openbis = True
                    break
            
            if path_exists_in_openbis:
                measurements_object = child
            else:
                measurements_object = utils.create_openbis_object(
                    OPENBIS_SESSION, type = "MEASUREMENTS", 
                    collection = sample_experiment, 
                    props = {
                        "$name": path,
                        "$default_object_view": "IMAGING_GALLERY_VIEW",
                        "measurements_folder_path": path
                    },
                    parents = [object.permId]
                )
                
            create_measurements(path, sample_experiment, sample_object, measurements_object)

def upload_measurements_to_openbis(b):
    sample_object = OPENBIS_SESSION.get_object(sample_selector.dropdown.value)
    
    # Get sample experiment from parent
    sample_experiment = experiment_selector.dropdown.value
    
    if not sample_experiment:
        print("Sample does not belong to any project yet.")
        return
    
    # Check if sample is being measured for the first time after the preparation or if
    # some measurements were already performed before the next preparation
    measurements_object = None
    new_measurements = True
    if sample_object.children:
        for sample_child in sample_object.children:
            sample_child_object = OPENBIS_SESSION.get_object(sample_child)
            if sample_child_object.type in ["1D_MEASUREMENT", "2D_MEASUREMENT"]:
                new_measurements = False
                measurements_object = find_first_object_permid(sample_child_object, "MEASUREMENTS")
                break
    
    # Register measurements in a different experiment then the preparation experiment
    if measurements_object.experiment.permId != sample_experiment:
        new_measurements = True
    
    data_folder = folder_selector.selected_path
    sample_name = sample_object.props["$name"]
    if new_measurements:
        measurements_object = utils.create_openbis_object(
            OPENBIS_SESSION, type = "MEASUREMENTS", 
            collection = sample_experiment, 
            props = {
                "$name": f"Measurements of {sample_name}",
                "$default_object_view": "IMAGING_GALLERY_VIEW",
                "measurements_folder_path": data_folder
            }
        )
        
    create_measurements(data_folder, sample_experiment, sample_object, measurements_object)


# Measurement Uploader

## Select experiment

In [None]:
display(experiment_selector)

## Select sample

In [None]:
display(sample_selector, instrument_selector)
create_button.on_click(upload_measurements_to_openbis)
quit_button.on_click(close_notebook)
sample_selector.dropdown.observe(load_sample_metadata, names = 'value')

## Select measurements folders

In [None]:
display(folder_selector)
display(save_close_buttons_hbox)
display(increase_buttons_size)

['/home/jovyan/prep1/RT008/01_06_RT008',
 '/home/jovyan/prep1/RT008/01_02_RT008',
 '/home/jovyan/prep1/RT008/01_04_RT008']