In [None]:
# General imports
import urllib.parse

import ipywidgets as ipw
import numpy as np
from pathlib import Path
import json
from IPython.display import clear_output, display

# AiiDA imports.
%load_ext aiida
%aiida
import aiidalab_widgets_base as awb
from aiida import common, orm

from surfaces_tools.helpers import HART_2_EV
from surfaces_tools.utils import spm
from surfaces_tools.widgets import series_plotter

from pybis import Openbis

import zipfile
import base64
import io
import copy

# Local imports.

In [None]:
%%javascript
IPython.OutputArea.prototype._should_scroll = function(lines) {
    return false;
}

In [None]:
colormaps = ["gist_heat", "seismic"]

e_arr = None


def load_pk(b):
    global e_arr

    new_version = False
    workcalc = load_node(pk=pk_select.value)
    stm_calc = spm.get_calc_by_label(workcalc, "stm")
    try:
        cp2k_calc = spm.get_calc_by_label(workcalc, "scf_diag")
    except AssertionError:
        try:
            dft_out_params = workcalc.outputs.dft_output_parameters.get_dict()
            new_version = True
        except Exception as exc:
            print("Incorrect pk.")
            print(exc)
            return

    geom_info.value = spm.get_slab_calc_info(workcalc.inputs.structure)

    # Information about the calculation.
    with misc_info:
        clear_output()

    dft_inp_params = dict(workcalc.inputs["dft_params"])
    if not new_version:
        dft_out_params = dict(cp2k_calc.outputs.output_parameters)

    with misc_info:
        if dft_inp_params["uks"]:
            print(f"UKS multiplicity {dft_inp_params['multiplicity']}")
        else:
            print("RKS")

        print(
            f"Energy [au]: {dft_out_params['energy']:.6f}, [eV]: {dft_out_params['energy'] * HART_2_EV:.6f}"
        )

        try:
            spm_params = workcalc.inputs.stm_params
        except common.NotExistentAttributeError:
            spm_params = workcalc.inputs.spm_params

        extrap_plane = float(spm_params["--eval_region"][-1][1:])
        print(f"Extrap. plane [ang]: {extrap_plane:.1f}")

        if "--p_tip_ratios" in dict(spm_params):
            p_tip_ratio = spm_params["--p_tip_ratios"]

    ### Load data.
    with stm_calc.outputs.retrieved.open("stm.npz", mode="rb") as handle:
        loaded_data = np.load(handle.name, allow_pickle=True)
    stm_general_info = loaded_data["stm_general_info"][()]
    stm_series_info = loaded_data["stm_series_info"]
    stm_series_data = loaded_data["stm_series_data"]

    e_arr = stm_general_info["energies"]

    series_plotter_inst.add_series_collection(
        stm_general_info, stm_series_info, stm_series_data
    )

    series_plotter_inst.setup_added_collections(workcalc.pk)

    setup_selection_elements()


style = {"description_width": "50px"}
layout = {"width": "70%"}

pk_select = ipw.IntText(value=0, description="pk", style=style, layout=layout)

load_pk_btn = ipw.Button(description="Load pk", style=style, layout=layout)
load_pk_btn.on_click(load_pk)

geom_info = ipw.HTML()

display(ipw.HBox([ipw.VBox([pk_select, load_pk_btn]), geom_info]))

misc_info = ipw.Output()
display(misc_info)

# Scanning tunneling microscopy

In [None]:
def selected_orbital_indexes():
    if tab.selected_index == 0:
        # Continuous selection.
        min_e, max_e = energy_range_slider.value
        ie_1 = np.abs(e_arr - min_e).argmin()
        ie_2 = np.abs(e_arr - max_e).argmin() + 1
        indexes = np.arange(ie_1, ie_2)

    else:
        # Discrete selection.
        voltages = np.array(voltages_text.value.split(), dtype=float)
        filtered_voltages = []
        for v in voltages:
            if v >= np.min(e_arr) and v <= np.max(e_arr):
                filtered_voltages.append(v)
            else:
                print(f"Voltage {v:.2f} out of range, skipping.")

        indexes = []
        for i_bias, bias in enumerate(filtered_voltages):
            indexes.append(np.abs(e_arr - bias).argmin())

    return indexes

In [None]:
style = {"description_width": "120px"}
layout = {"width": "40%"}

series_plotter_inst = series_plotter.SeriesPlotter(
    select_indexes_function=selected_orbital_indexes, zip_prepend="stm"
)

# Select energies to plot.

energy_range_slider = ipw.FloatRangeSlider(
    value=[0.0, 0.0],
    min=0.0,
    max=0.0,
    step=0.1,
    description="energy range (eV)",
    disabled=False,
    continuous_update=False,
    orientation="horizontal",
    readout=True,
    readout_format=".2f",
    style=style,
    layout={"width": "90%"},
)

voltages_text = ipw.Text(
    description="energies (eV)", value="", style=style, layout={"width": "90%"}
)

tab = ipw.Tab(layout={"width": "60%"})

tab.children = [energy_range_slider, voltages_text]
tab.set_title(0, "Continuous selection")
tab.set_title(1, "Discrete selection")

display(
    series_plotter_inst.selector_widget,
    tab,
    series_plotter_inst.plot_btn,
    series_plotter_inst.clear_btn,
    series_plotter_inst.plot_output,
)

In [None]:
def setup_selection_elements():
    default_voltages = [-2.0, -1.5, -1.0, -0.5, -0.1, 0.1, 0.5, 1.0, 1.5, 2.0]

    # Filter based on energy limits.
    default_voltages = [
        v for v in default_voltages if v >= np.min(e_arr) and v <= np.max(e_arr)
    ]
    voltages_text.value = " ".join([str(v) for v in default_voltages])

    energy_range_slider.min = np.min(e_arr)
    energy_range_slider.max = np.max(e_arr)
    energy_range_slider.step = e_arr[1] - e_arr[0]
    energy_range_slider.value = (np.min(e_arr), np.max(e_arr))

# Export
Export the currently selected series into a zip file. The raw data in plain txt and IGOR formats are included.

In [None]:
display(
    ipw.HBox([series_plotter_inst.zip_btn, series_plotter_inst.zip_progress]),
    series_plotter_inst.link_out,
)

In [None]:
def clear_tmp(b):
    ! rm -rf tmp && mkdir tmp
    with series_plotter_inst.link_out:
        clear_output()
    series_plotter_inst.zip_progress.value = 0.0

    if series_plotter_inst.series is not None:
        series_plotter_inst.zip_btn.disabled = False


clear_tmp_btn = ipw.Button(description="clear tmp")
clear_tmp_btn.on_click(clear_tmp)
display(clear_tmp_btn)

In [None]:
# Load the URL after everything is set up.
try:
    url = urllib.parse.urlsplit(jupyter_notebook_url)
    pk_select.value = urllib.parse.parse_qs(url.query)["pk"][0]
    load_pk(0)
except Exception as exc:
    print(exc)

In [None]:
def connect_openbis(eln_url, eln_token):
    try:
        session_data = {"url": eln_url, "token": eln_token}
        openbis_session = Openbis(eln_url, verify_certificates = False)
        openbis_session.set_token(eln_token)
    except ValueError:
        print("Session is no longer valid. Please check if the token is still valid.")
        openbis_session = None
        session_data = {}
    
    return openbis_session, session_data

ELN_CONFIG = Path.home() / ".aiidalab" / "aiidalab-eln-config.json"
ELN_CONFIG.parent.mkdir(
    parents=True, exist_ok=True
)  # making sure that the folder exists.

with open(ELN_CONFIG) as file:
    config = json.load(file)
    eln_url = config["default"]
    eln_config = config[eln_url]
    eln_token = eln_config["token"]
    OPENBIS_SESSION, _ = connect_openbis(eln_url, eln_token)

In [None]:
DATASET_PREVIEW_DICTIONARY = {"@id": 1, "@type": "imaging.dto.ImagingDataSetPropertyConfig", "config": {"@id": 2, "@type": "imaging.dto.ImagingDataSetConfig", "inputs": [{"@id": 7, "type": "Dropdown", "unit": None, "@type": "imaging.dto.ImagingDataSetControl", "label": "Channel", "range": None, "speeds": None, "values": ["z", "I", "dIdV", "dIdV_Y"], "section": None, "metadata": None, "playable": None, "visibility": None, "multiselect": False}, {"@id": 8, "type": "Range", "unit": None, "@type": "imaging.dto.ImagingDataSetControl", "label": "X-axis", "range": ["0", "20.0", "0.01"], "speeds": None, "values": None, "section": None, "metadata": None, "playable": None, "visibility": None, "multiselect": None}, {"@id": 9, "type": "Range", "unit": None, "@type": "imaging.dto.ImagingDataSetControl", "label": "Y-axis", "range": ["0", "20.0", "0.01"], "speeds": None, "values": None, "section": None, "metadata": None, "playable": None, "visibility": None, "multiselect": None}, {"@id": 10, "type": "Range", "unit": "nm", "@type": "imaging.dto.ImagingDataSetControl", "label": "Color-scale", "range": ["-61.105", "-61.061314", "0.0001"], "speeds": None, "values": None, "section": None, "metadata": None, "playable": None, "visibility": [{"@id": 11, "unit": "nm", "@type": "imaging.dto.ImagingDataSetControlVisibility", "label": "Channel", "range": ["-61.105", "-61.061314", "0.0001"], "values": ["z"]}, {"@id": 12, "unit": "pA", "@type": "imaging.dto.ImagingDataSetControlVisibility", "label": "Channel", "range": ["49.26677", "50.608734", "0.01"], "values": ["I"]}, {"@id": 13, "unit": "pS", "@type": "imaging.dto.ImagingDataSetControlVisibility", "label": "Channel", "range": ["-0.2710401", "0.35857454", "0.01"], "values": ["dIdV"]}, {"@id": 14, "unit": "pS", "@type": "imaging.dto.ImagingDataSetControlVisibility", "label": "Channel", "range": ["-0.22871183", "0.393872", "0.01"], "values": ["dIdV_Y"]}], "multiselect": None}, {"@id": 15, "type": "Colormap", "unit": None, "@type": "imaging.dto.ImagingDataSetControl", "label": "Colormap", "range": None, "speeds": None, "values": ["gray", "YlOrBr", "viridis", "cividis", "inferno", "rainbow", "Spectral", "RdBu", "RdGy"], "section": None, "metadata": None, "playable": None, "visibility": None, "multiselect": False}, {"@id": 16, "type": "Dropdown", "unit": None, "@type": "imaging.dto.ImagingDataSetControl", "label": "Scaling", "range": None, "speeds": None, "values": ["linear", "logarithmic"], "section": None, "metadata": None, "playable": None, "visibility": None, "multiselect": False}], "speeds": [1000, 2000, 5000], "adaptor": None, "exports": [{"@id": 3, "type": "Dropdown", "unit": None, "@type": "imaging.dto.ImagingDataSetControl", "label": "include", "range": None, "speeds": None, "values": ["image", "raw data"], "section": None, "metadata": None, "playable": None, "visibility": None, "multiselect": True}, {"@id": 4, "type": "Dropdown", "unit": None, "@type": "imaging.dto.ImagingDataSetControl", "label": "image-format", "range": None, "speeds": None, "values": ["png", "svg"], "section": None, "metadata": None, "playable": None, "visibility": None, "multiselect": False}, {"@id": 5, "type": "Dropdown", "unit": None, "@type": "imaging.dto.ImagingDataSetControl", "label": "archive-format", "range": None, "speeds": None, "values": ["zip", "tar"], "section": None, "metadata": None, "playable": None, "visibility": None, "multiselect": False}, {"@id": 6, "type": "Dropdown", "unit": None, "@type": "imaging.dto.ImagingDataSetControl", "label": "resolution", "range": None, "speeds": None, "values": ["original", "150dpi", "300dpi"], "section": None, "metadata": None, "playable": None, "visibility": None, "multiselect": False}], "version": 1, "metadata": {}, "playable": True, "resolutions": ["original", "200x200", "2000x2000"]}, "images": [{"@id": 17, "@type": "imaging.dto.ImagingDataSetImage", "index": 0, "config": {}, "metadata": {}, "previews": [{"show": False, "@type": "imaging.dto.ImagingDataSetPreview", "bytes": "", "index": 0, "width": None, "config": {}, "format": "png", "height": None, "metadata": {"file": {"name": "013a.png", "size": 24015, "type": "image/png", "lastModified": 1640070004841, "lastModifiedDate": {}, "webkitRelativePath": ""}}}]}]}
DATASET_PREVIEW_TEMPLATE = {
    "show": False, 
    "@type": "imaging.dto.ImagingDataSetPreview", 
    "bytes": "",
    "index": 0, 
    "width": None, 
    "config": {},
#     "config": {
#         "X-axis": ["0", "20.0"], 
#         "Y-axis": ["0", "20.0"], 
#         "Channel": "z", 
#         "Scaling": "linear", 
#         "Colormap": "gray", 
#         "Color-scale": ["-61.105", "-61.061314"]}, 
    "format": "png", 
    "height": None, 
    "metadata": {
        "file": {
            "name": "013a.png", 
            "size": 24015, 
            "type": "image/png", 
            "lastModified": 1640070004841, 
            "lastModifiedDate": {}, 
            "webkitRelativePath": ""
        }
    }
}

def get_list_images_bytes(zip_file_path):
    # List to store PNG byte strings
    png_byte_strings = []
    png_filenames = []

    # Open the ZIP file
    with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
        # Loop through all files in the ZIP
        for file_name in zip_ref.namelist():
            if file_name.lower().endswith('.png'):
                # Read the PNG file's content as bytes
                with zip_ref.open(file_name) as file:
                    # Convert the PNG file's content to a byte string
                    png_bytes = file.read()
                    # Add the PNG byte string to the list
                    png_byte_strings.append(base64.b64encode(png_bytes).decode('utf-8'))
                    png_filenames.append(file_name)
    
    return png_byte_strings, png_filenames

def export_spm_images(b):
    if workchain_eln_info:
#         eln_url = workchain_eln_info["url"]
        eln_object_uuid = workchain_eln_info["object_uuid"]
        eln_objects = OPENBIS_SESSION.get_samples(eln_object_uuid)
        if eln_objects:
            # Upload dataset
            spm_object = eln_objects[0]
            images_zip_filename = series_plotter_inst.create_zip_link_for_openbis()
            list_images_bytes, list_images_filenames = get_list_images_bytes(images_zip_filename)
            images_previews = []
            
            for idx, image_bytes in enumerate(list_images_bytes):
                template = copy.deepcopy(DATASET_PREVIEW_TEMPLATE)
                template["index"] = idx
                template["bytes"] = image_bytes
                template["metadata"]["file"]["name"] = list_images_filenames[idx]
                images_previews.append(template)
            DATASET_PREVIEW_DICTIONARY["images"][0]["previews"] = images_previews
            images_dataset = OPENBIS_SESSION.new_dataset(
                type = "IMAGING_DATA", 
                files = [images_zip_filename],
                sample = spm_object,
                props = {
                    "$imaging_data_config": json.dumps(DATASET_PREVIEW_DICTIONARY),
                    "$default_dataset_view": "imaging_dataset_viewer"
                }
            )
            images_dataset.save()
        else:
            print("Please export the simulation object to the ELN first.")
    else:
        print("Please export the simulation object to the ELN first.")

# Export to ELN

In [None]:
export_button = ipw.Button(description = "Export to ELN")
pk = urllib.parse.parse_qs(urllib.parse.urlsplit(jupyter_notebook_url).query)["pk"][0]
workcalc = orm.load_node(pk)
workchain_eln_info = workcalc.base.extras.all.get("eln", "")
export_button.on_click(export_spm_images)
display(export_button)
# aiidalab_instance = urllib.parse.urlsplit(jupyter_notebook_url).netloc
# display(awb.ElnExportWidget(node=workcalc, aiidalab_instance=aiidalab_instance))