In [None]:
%%javascript
IPython.OutputArea.prototype._should_scroll = function(lines) {
    return false;
}
document.title = 'AiiDAlab ATMOSPEC app'

In [None]:
# External imports
import ipywidgets as ipw
from jinja2 import Environment
from importlib_resources import files

from aiida.plugins import DataFactory
from aiida.orm import StructureData, TrajectoryData
from aiida.orm import load_node

from aiidalab_widgets_base import WizardAppWidget, StructureManagerWidget
from aiidalab_widgets_base import StructureBrowserWidget, SmilesWidget, StructureUploadWidget

StructureData = DataFactory('structure')
TrajectoryData = DataFactory('array.trajectory')

In [None]:
# Temporarily here until merged into aiidalab-widgets-base
import io
import ase
from traitlets import Instance, Int, List, Unicode, Union, default, dlink, link, observe
from aiida.orm import CifData

def get_formula(data_node):
    """A simple wrapper for Data.get_formula, for TrajectoryData we return the formula of the first structure"""
    if isinstance(data_node, TrajectoryData):
        stepid = data_node.get_stepids()[0]
        return data_node.get_step_structure(stepid).get_formula()

    return data_node.get_formula()

class TrajectoryManagerWidget(StructureManagerWidget):
    SUPPORTED_DATA_FORMATS = {
        "CifData": "cif",
        "StructureData": "structure",
        "TrajectoryData": "array.trajectory",
    }
    
    def __init__(
        self,
        importers,
        viewer=None,
        editors=None,
        storable=True,
        node_class=None,
        **kwargs,
    ):

        # History of modifications
        self.history = []

        # Undo functionality.
        btn_undo = ipw.Button(description="Undo", button_style="success")
        btn_undo.on_click(self.undo)
        self.structure_set_by_undo = False

        # To keep track of last inserted structure object
        self._inserted_structure = None

        # Structure viewer.
        if viewer:
            self.viewer = viewer
        else:
            self.viewer = StructureDataViewer(**kwargs)

        if node_class == "TrajectoryData":
            dlink((self, "structure_node"), (self.viewer, "trajectory"))
        else:
            dlink((self, "structure_node"), (self.viewer, "structure"))

        # Store button.
        self.btn_store = ipw.Button(description="Store in AiiDA", disabled=True)
        self.btn_store.on_click(self.store_structure)

        # Label and description that are stored along with the new structure.
        self.structure_label = ipw.Text(description="Label")
        self.structure_description = ipw.Text(description="Description")

        # Store format selector.
        data_format = ipw.RadioButtons(
            options=self.SUPPORTED_DATA_FORMATS, description="Data type:"
        )
        link((data_format, "label"), (self, "node_class"))

        # Store button, store class selector, description.
        store_and_description = [self.btn_store] if storable else []
        
        if node_class is None:
            store_and_description.append(data_format)
        elif node_class in self.SUPPORTED_DATA_FORMATS:
            self.node_class = node_class
        else:
            raise ValueError(
                "Unknown data format '{}'. Options: {}".format(
                    node_class, list(self.SUPPORTED_DATA_FORMATS.keys())
                )
            ) 
    
        self.output = ipw.HTML("")

        children = [
            self._structure_importers(importers),
            self.viewer,
            ipw.HBox(
                store_and_description
                + [self.structure_label, self.structure_description]
            ),
        ]

        super(ipw.VBox, self).__init__(children=children + [self.output], **kwargs)


    def _convert_to_structure_node(self, structure):
        """Convert structure of any type to the StructureNode object."""
        if structure is None:
            return None
        structure_node_type = DataFactory(
            self.SUPPORTED_DATA_FORMATS[self.node_class]
        )  # pylint: disable=invalid-name

        # If the input_structure trait is set to Atoms object, structure node must be created from it.
        if isinstance(structure, Atoms):
            if structure_node_type == TrajectoryData:
                structure_node = structure_node_type(
                    structurelist=(StructureData(ase=structure),)
                )
            else:
                structure_node = structure_node_type(ase=structure)

            # If the Atoms object was created by SmilesWidget,
            # attach its SMILES code as an extra.
            if "smiles" in structure.info:
                structure_node.set_extra("smiles", structure.info["smiles"])
            return structure_node

        # If the input_structure trait is set to AiiDA node, check what type
        elif isinstance(structure, Data):
            # Transform the structure to the structure_node_type if needed.
            if isinstance(structure, structure_node_type):
                return structure
            # TrajectoryData cannot be created from Atoms object
            if structure_node_type == TrajectoryData:
                if isinstance(structure, StructureData):
                    return structure_node_type(structurelist=(structure,))
                elif isinstance(structure, CifData):
                    return structure_node_type(structurelist=(StructureData(ase=structure.get_ase()),))
                else:
                    raise ValueError(f"Unexpected node type {type(structure)}")

        # Using self.structure, as it was already converted to the ASE Atoms object.
        return structure_node_type(ase=self.structure)


    @observe("structure_node")
    def _observe_structure_node(self, change):
        """Modify structure label and description when a new structure is provided."""
        struct = change["new"]
        if struct is None:
            self.btn_store.disabled = True
            self.structure_label.value = ""
            self.structure_label.disabled = True
            self.structure_description.value = ""
            self.structure_description.disabled = True
            return
        if struct.is_stored:
            self.btn_store.disabled = True
            self.structure_label.value = struct.label
            self.structure_label.disabled = True
            self.structure_description.value = struct.description
            self.structure_description.disabled = True
        else:
            self.btn_store.disabled = False
            self.structure_label.value = get_formula(struct)
            self.structure_label.disabled = False
            self.structure_description.value = ""
            self.structure_description.disabled = False
        
    @observe("input_structure")
    def _observe_input_structure(self, change):
        """Returns ASE atoms object and sets structure_node trait."""
        # If the `input_structure` trait is set to Atoms object, then the `structure` trait should be set to it as well.
        self.history = []

        if isinstance(change["new"], Atoms):
            self.structure = change["new"]

        # If the `input_structure` trait is set to AiiDA node, then the `structure` trait should
        # be converted to an ASE Atoms object.
        elif isinstance(
            change["new"], CifData
        ):  # Special treatement of the CifData object
            str_io = io.StringIO(change["new"].get_content())
            self.structure = ase.io.read(
                str_io, format="cif", reader="ase", store_tags=True
            )
        elif isinstance(change["new"], StructureData):
            self.structure = change["new"].get_ase()

        elif isinstance(change["new"], TrajectoryData):
            # self.structure is essentially used for editing purposes.
            # We're currently not allowing editing TrajectoryData,
            # so we don't even attempt to set self.structure,
            # instead we update the structure_node directly here
            self.set_trait("structure_node", change["new"])

        else:
            self.structure = None

In [None]:
# Detailed documentation
# https://www.rdkit.org/docs/RDKit_Book.html#conformer-generation
# API reference
# https://www.rdkit.org/docs/source/rdkit.Chem.rdDistGeom.html?highlight=embedmultipleconfs#rdkit.Chem.rdDistGeom.EmbedMultipleConfs

# https://sourceforge.net/p/rdkit/mailman/rdkit-discuss/thread/CWLP265MB0818A57240D003F146E910798C680%40CWLP265MB0818.GBRP265.PROD.OUTLOOK.COM/#msg36584689
from traitlets import Union, Instance
from ase import Atoms
from aiida.orm import Data
import numpy as np

# Temporary hack before we can guarantee that xtb-python is installed
DISABLE_XTB = False
try:
    from xtb.interface import Calculator
    from xtb.ase.calculator import XTB
except ImportError:
    DISABLE_XTB = True

from ase.optimize import BFGS, GPMin

class ConformerSmilesWidget(SmilesWidget):
    
    structure = Union([Instance(Atoms), Instance(StructureData), Instance(TrajectoryData)], allow_none=True)

    def _mol_from_smiles(self, smiles, steps=1000):
        """Convert SMILES to ase structure try rdkit then pybel"""
        conformers = None
        # TODO: Make a dropdown menu for algorithm selection
        rdkit_algorithm = "UFF"
        try:
            conformers = self._rdkit_opt(smiles, steps, algo=rdkit_algorithm)
            if conformers is None:
                #conformers = self._pybel_opt(smiles, steps)
                return None
        except ValueError as e:
            self.output.value = str(e)
            return None

        if conformers is None or len(conformers) == 0:
            return None
        
        # Fallback if XTB is not available
        if DISABLE_XTB:
            return self._create_trajectory_node(conformers)
        
        conformers = self.optimize_conformers(conformers)
        conformers = self._filter_and_sort_conformers(conformers)
        return self._create_trajectory_node(conformers)

        
    # TODO: Adjust mux number of steps and relax convergence criteria
    # fmax - maximum force per atom for convergence (0.05 default in ASE)
    # maxstep - maximum atom displacement per iteration (angstrom, 0.04 ASE default)
    def _xtb_opt(self, atoms, xtb_method="GFN2-xTB", max_steps=50, fmax=0.04):
        # https://wiki.fysik.dtu.dk/ase/gettingstarted/tut02_h2o_structure/h2o.html
        # https://xtb-python.readthedocs.io/en/latest/general-api.html
        if not xtb_method:
            return atoms

        atoms.calc = XTB(method=xtb_method)
        #opt = BFGS(atoms, maxstep=0.06, trajectory=None, logfile=None)
        opt = GPMin(atoms, trajectory=None, logfile=None)
        converged = opt.run(steps=max_steps, fmax=fmax)
        if converged:
            print(f"{xtb_method} minimization converged in {opt.get_number_of_steps()} iterations")
        else:
            print(f"{xtb_method} minimization failed to converged in {opt.get_number_of_steps()} iterations")
        return atoms


    def optimize_conformers(self, conformers):
        """Conformer optimization with XTB"""
        #method = "GFN2-xTB"
        #method = "GFNFF"
        method = "GFN2-xTB"
        max_steps = 5
        fmax = 0.15
        
        if len(conformers) == 1:
            self.output.value = f"Optimizing {len(conformers)} conformer with {method}"
        else:
            self.output.value = f"Optimizing {len(conformers)} conformers with {method}"

        opt_structs = []
        for ase_struct in conformers:
            opt_struct = self._xtb_opt(
                ase_struct, 
                xtb_method=method, max_steps=max_steps, fmax=fmax)
            if opt_struct is not None:
                opt_structs.append(opt_struct)
        return opt_structs
    
    def _create_trajectory_node(self, conformers):
        if conformers is None or len(conformers) == 0:
            return None
        
        traj = TrajectoryData(
            structurelist=[StructureData(ase=conformer) for conformer in conformers]
        )
        traj.set_extra("smiles", conformers[0].info["smiles"])
        if DISABLE_XTB:
            return traj
        # TODO: I am not sure whether get_potential_energy triggers computation,
        # or uses already computed energy from the optimization step
        en0 = conformers[0].get_potential_energy()
        energies = np.fromiter((conf.get_potential_energy() - en0 for conf in conformers), count=len(conformers), dtype=float)
        traj.set_array('energies', energies)
        return traj
    
    # TODO: Automatically filter out conformers with high energy
    # Boltzmann criterion: Add conformers until reaching e.g. 95% cumulative Boltzmann population
    # To test Boltzmann population values:
    # https://www.colby.edu/chemistry/PChem/Hartree.html
    def _filter_and_sort_conformers(self, conformers):
        energies = np.fromiter((conf.get_potential_energy() for conf in conformers), count=len(conformers), dtype=float)
        # TODO: There must be a better way to do this sorting...
        sorted_indeces = np.argsort(energies)
        sorted_conformers = []
        for i in sorted_indeces:
            sorted_conformers.append(conformers[i])
        return sorted_conformers
    
    def _rdkit_opt(self, smiles, steps, algo="UFF", num_confs=10):
        """Optimize a molecule using force field and rdkit (needed for complex SMILES)."""
        from rdkit import Chem
        from rdkit.Chem import AllChem  
        self.output.value += f"<br>Using algorithm: {algo}"

        mol = Chem.MolFromSmiles(smiles)
        if mol is None:
            # Something is seriously wrong with the SMILES code,
            # just return None and don't attempt anything else.
            msg = "RDkit ERROR: Invalid SMILES string"
            self.output.value = msg
            return None
        
        mol = Chem.AddHs(mol)
        
        if algo == "UFF-single":
            params = AllChem.ETKDG()
            params.maxAttempts = 20
            params.randomSeed =  42
            conf_id = AllChem.EmbedMolecule(mol, params=params)
            if conf_id == -1:
                # This is a more robust setting for larger molecules, per
                # https://sourceforge.net/p/rdkit/mailman/message/21776083/
                self.output.value += "Embedding failed, retrying with random coordinates."
                params.useRandomCoords = True
                conf_id = AllChem.EmbedMolecule(mol, params=params)
            if conf_id == -1:
                msg = " Failed to generate conformer with RDKit. Trying OpenBabel next."
                raise ValueError(msg)
            if not AllChem.UFFHasAllMoleculeParams(mol):
                self.output.value += "RDKit WARNING: Missing UFF parameters"
            else:
                AllChem.UFFOptimizeMolecule(mol, maxIters=steps)
            conf_ids = [conf_id]

        if algo == "UFF":
            params = AllChem.ETKDG()
            params.pruneRmsThresh = 0.1
            params.maxAttempts = 20
            params.randomSeed =  422
            conf_ids = AllChem.EmbedMultipleConfs(mol, numConfs=num_confs, params = params)
            # Not sure what is the fail condition here
            if len(conf_ids) == 0:
                # This is a more robust setting for larger molecules, per
                # https://sourceforge.net/p/rdkit/mailman/message/21776083/
                self.output.value += "Embedding failed, retrying with random coordinates."
                params.useRandomCoords = True
                conf_ids = AllChem.EmbedMultipleConfs(mol, numConfs=num_confs, params = params)
            if len(conf_ids) == -1:
                msg = " Failed to generate conformer with RDKit. Trying OpenBabel next."
                raise ValueError(msg)
            
            if not AllChem.UFFHasAllMoleculeParams(mol):
                self.output.value += " RDKit WARNING: Missing UFF parameters"
            else:
                # https://www.rdkit.org/docs/source/rdkit.Chem.rdForceFieldHelpers.html?highlight=uff#rdkit.Chem.rdForceFieldHelpers.UFFOptimizeMoleculeConfs
                conf_opt = AllChem.UFFOptimizeMoleculeConfs(mol, maxIters=steps, numThreads=1)
                # TODO: I guess we should check the return value somehow?
                #for converged, energy in conf_opt:
                #    print("Converged: %d, Energy: %g" % (converged, energy))
 
            
        elif algo == "ETKDGv2":
            # https://www.rdkit.org/docs/Cookbook.html?highlight=allchem%20embedmultipleconfs#conformer-generation-with-etkdg
            params = AllChem.ETKDGv2()
            params.pruneRmsThresh = 0.1
            params.maxAttempts = 40
            params.randomSeed =  422
            #params.pruneRmsThresh = 0.5
            #params.randomSeed = 424242
            conf_ids = AllChem.EmbedMultipleConfs(mol, numConfs=num_confs, params = params)
            
        else:
            raise ValueError(f"Invalid algorithm '{algo}'")
        
        self.output.value += f"No. conformers = {len(conf_ids)}"

        # TODO: Sort conformers based on UFF energy if available
        ase_structs = []
        for conf_id in conf_ids:
            positions = mol.GetConformer(id=conf_id).GetPositions()
            natoms = mol.GetNumAtoms()
            species = [mol.GetAtomWithIdx(j).GetSymbol() for j in range(natoms)]
            ase_structs.append(self._make_ase(species, positions, smiles))
        return ase_structs

In [None]:
from aiidalab_ispg import WorkChainSelector, TrajectoryDataViewer
from aiidalab_ispg import StructureSelectionStep, SubmitAtmospecAppWorkChainStep
from aiidalab_ispg import ViewAtmospecAppWorkChainStatusAndResultsStep, ViewSpectrumStep
from aiidalab_ispg import static, __version__

WORKCHAIN_LABEL = "AtmospecWorkChain"

structure_manager_widget = TrajectoryManagerWidget(
    importers=[
        ConformerSmilesWidget(title="SMILES"),
        StructureUploadWidget(title="Upload file", allow_trajectories=True),
        StructureBrowserWidget(title="AiiDA database"),        
        #StructureBrowserWidget(title="AiiDA database", query_structure_type=(StructureData, TrajectoryData)),
    ],
    node_class='TrajectoryData',
    viewer=TrajectoryDataViewer(),
    storable=True,
)

structure_selection_description = ipw.Label("Select a structure from one of the following sources and then click \"Confirm\" to go to the next step. ")

structure_selection_step = StructureSelectionStep(
    manager=structure_manager_widget,
    description=structure_selection_description
)
structure_selection_step.auto_advance = True

submit_atmospec_work_chain_step = SubmitAtmospecAppWorkChainStep()
submit_atmospec_work_chain_step.auto_advance = True

view_atmospec_work_chain_status_and_results_step = ViewAtmospecAppWorkChainStatusAndResultsStep()
view_atmospec_work_chain_status_and_results_step.auto_advance = True

view_spectrum_step = ViewSpectrumStep()

# Link the application steps
ipw.dlink((structure_selection_step, 'confirmed_structure'), (submit_atmospec_work_chain_step, 'input_structure'))
ipw.dlink((submit_atmospec_work_chain_step, 'process'), (view_atmospec_work_chain_status_and_results_step, 'process'))
ipw.dlink((view_atmospec_work_chain_status_and_results_step, 'process'), (view_spectrum_step, 'process'))

# Add the application steps to the application
app = WizardAppWidget(
    steps=[
        ('Select structure', structure_selection_step),
        ('Submit workflow', submit_atmospec_work_chain_step),
        ('Status & Detailed outputs', view_atmospec_work_chain_status_and_results_step),
        ('UV/VIS Spectrum', view_spectrum_step),
    ])

# Reset all subsequent steps in case that a new structure is selected
def _observe_structure_selection(change):
    with structure_selection_step.hold_sync():
        if structure_selection_step.confirmed_structure is not None and \
                structure_selection_step.confirmed_structure != change['new']:
            app.reset()
structure_selection_step.observe(_observe_structure_selection, 'structure')

# Add process selection header
work_chain_selector = WorkChainSelector(workchain_label=WORKCHAIN_LABEL, layout=ipw.Layout(width='auto'))
def _observe_process_selection(change):
    if change['old'] == change['new']:
        return
    pk = change['new']
    if pk is None:
        app.reset()
        app.selected_index = 0
    else:
        process = load_node(pk)
        with structure_manager_widget.hold_sync():
            with structure_selection_step.hold_sync():
                if process.is_finished_ok:
                    app.selected_index = 3
                else:
                    app.selected_index = 2
                structure_manager_widget.input_structure = process.inputs.structure
                structure_selection_step.structure = process.inputs.structure
                structure_selection_step.confirmed_structure = process.inputs.structure
                submit_atmospec_work_chain_step.process = process

work_chain_selector.observe(_observe_process_selection, 'value')    
ipw.dlink((submit_atmospec_work_chain_step, 'process'), (work_chain_selector, 'value'),
          transform=lambda node: None if node is None else node.pk)

env = Environment()

template = files(static).joinpath("welcome.jinja").read_text()
style = files(static).joinpath("style.css").read_text()
welcome_message = ipw.HTML(env.from_string(template).render(style=style,staticpath=files(static)))
footer = ipw.HTML(f'<p style="text-align:right;">Copyright (c) 2022 ISPG team (University of Bristol)&#8195Version: {__version__}</p>')

app_with_work_chain_selector = ipw.VBox(children=[work_chain_selector, app])

display(welcome_message, app_with_work_chain_selector,footer)