# pyiron_workflow concepts

In [None]:
%%time
%config IPCompleter.evaluation='unsafe'

import matplotlib.pylab as plt
from pyiron_workflow.workflow import Workflow
from pyiron_workflow.function import single_value_node

## Phonopy example

In [None]:
%%time
wf = Workflow('phonopy')
wf.register('pyiron_workflow.node_library.atomistic', domain='atomistic')

In [None]:
wf = Workflow('test')
wf.structure = wf.create.atomistic.structure.build.bulk('Al')

In [None]:
@Workflow.wrap_as.macro_node("structure")
def bulk_rotation(wf, name='Al', cubic: bool=True, repeat_cell=2, angle=0, axis=[0,0,1]):
    wf.structure = wf.create.atomistic.structure.build.bulk(name=name, cubic=cubic)
    wf.repeat = wf.create.atomistic.structure.transform.repeat(structure=wf.structure, repeat_scalar=repeat_cell)
    wf.rotate = wf.create.atomistic.structure.transform.rotate_axis_angle(structure=wf.repeat, angle=angle, axis=axis)
    return wf.rotate

In [None]:
br = bulk_rotation()
br.structure.inputs.name.value, br.structure.inputs.cubic.value

In [None]:
br.draw()

In [None]:
br.inputs.name

In [None]:
br = bulk_rotation(name='Fe')
br.structure.inputs.name = 'Al'
br.inputs.name = 'Al'
br.run()

In [None]:
br.draw()

In [None]:
from pyiron_workflow.node_library.atomistic.property.phonons import InputPhonopyGenerateSupercells

### Create phonopy workflow macro

Select input and output parameters in which users may be particularly interested, i.e., over which you want to do a parameter study (input) and the quantities you want to analyse (output)

In [None]:
@Workflow.wrap_as.macro_node(
    "imaginary_modes",
    "total_dos",
    "energy_relaxed",
    "energy_initial",
    "energy_displaced",
)
def run_phonopy(
    wf,
    element: str,
    cell_size: int = 2,
    vacancy_index: int | None = None,
    displacement: float = 0.01,
    max_workers: int = 1
):

    # wf.engine = wf.create.engine.ase.M3GNet()
    wf.engine = wf.create.atomistic.engine.ase.EMT()
    
    wf.structure = wf.create.atomistic.structure.build.cubic_bulk_cell(
        element=element, cell_size=cell_size, vacancy_index=vacancy_index
    )
    # explicit output needed since macro and not single_value_node (we should have also a single_value_macro)
    wf.relaxed_structure = wf.create.atomistic.calculator.ase.minimize(
        atoms=wf.structure.outputs.structure,
        engine=wf.engine,
    )
    
    wf.phonopy_input = wf.create.atomistic.property.phonons.PhonopyParameters(distance=displacement)

    wf.phonopy = wf.create.atomistic.property.phonons.create_phonopy(
        structure=wf.relaxed_structure.outputs.structure,
        parameters=wf.phonopy_input,
        engine=wf.engine,
        max_workers=max_workers,
    )
    # print ('test: ', displacement.run())

    wf.check_consistency = wf.create.atomistic.property.phonons.check_consistency(
        phonopy=wf.phonopy.outputs.phonopy
    )
    wf.total_dos = wf.create.atomistic.property.phonons.get_total_dos(phonopy=wf.phonopy.outputs.phonopy)

    # iterate over all nodes, extract the log_output and store it in hdf5
    # control the amount of output via log_level

    return (
        wf.check_consistency,
        wf.total_dos,
        wf.relaxed_structure.outputs.out.final.energy,
        wf.relaxed_structure.outputs.out.initial.energy,
        wf.phonopy.outputs.out["energies"],
    )

#### Run for a single parameter set

In [None]:
%%time
wf = run_phonopy(element='Al', cell_size=3, vacancy_index=0, displacement=0.1, max_workers=1)
out = wf.run()

In [None]:
wf.draw()

In [None]:
import pandas as pd

In [None]:
wf.phonopy.outputs.out.value['df']

#### Run iteratively over several input parameters and construct a pandas table

In [None]:
%%time
df = wf.iter(cell_size=list(range(1,4)), 
             element=['Al'], 
             vacancy_index=[None, 0], 
             displacement=[0.01, 0.1]
            ) #, Cu, Pd, Ag, Pt and Au])

In [None]:
df

### Elasticity

In [None]:
%%time
%config IPCompleter.evaluation='unsafe'

import matplotlib.pylab as plt
from pyiron_workflow.workflow import Workflow

import matplotlib.pylab as plt
import numpy as np

In [None]:
%time
wf = Workflow('phonopy')
wf.register('pyiron_workflow.node_library.atomistic', domain='atomistic')
wf.register('pyiron_workflow.node_library.databases', domain='databases')

In [None]:
# Get the source data
# from Ref. de Jong et al. https://www.nature.com/articles/sdata20159#MOESM77

import requests

url = 'https://datadryad.org/stash/downloads/file_stream/88988'
destination_file = 'ec.json'

response = requests.get(url)
if response.status_code == 200:
    with open(destination_file, 'wb') as f:
        f.write(response.content)
    print('File downloaded successfully')
else:
    print('Failed to download file:', response.status_code)

In [None]:
wf = Workflow('elastic')
wf.data = wf.create.databases.elasticity.de_jong()

In [None]:
%%time
df_data = wf.run()['data__dataframe']

In [None]:
df_data

In [None]:
unaries = df_data[df_data.formula.str.len() == 2]
K_Reuss = unaries.K_Reuss.values
K_Voigt = unaries.K_Voigt
structures = unaries.atoms.values

Note: 
- Different symbols/terms for identical physical quantities would be a nice example for application of ontology
- ChatGPT could be used to analyze data (where is agreement good, where not)

In [None]:
table_M3GNet = Workflow.create.atomistic.structure.calc.volume().iter(structure=structures)  # TODO: load rather than run

In [None]:
plt.scatter(unaries.volume, table_M3GNet.volume);

In [None]:
%%time
import warnings
warnings.filterwarnings("ignore")

out = Workflow.create.atomistic.property.elastic.elastic_constants().iter(structure=structures)

In [None]:
x = np.linspace(0, 500, 101)

plt.figure(figsize=(10,5))
plt.subplot(1,2,1)
plt.scatter(out.BR, unaries.K_Reuss)
plt.scatter(out.BV, unaries.K_Voigt, c='g', marker = 'x')
plt.xlabel('$B_{Reuss}$ (M3GNet)')
plt.ylabel('$B_{Reuss}$ (DFT)')
plt.title('Bulk Modulus')
plt.plot(x, x, 'k--');

plt.subplot(1,2,2)
x = np.linspace(0, 300, 101)

plt.scatter(out.GR, unaries.G_Reuss)
plt.scatter(out.GV, unaries.G_Voigt, c='g', marker = 'x')
plt.xlabel('$G_{Reuss}$ (M3GNet)')
plt.ylabel('$G_{Reuss}$ (DFT)')
plt.title('Shear Modulus')
plt.plot(x, x, 'k--');

In [None]:
unaries.keys()

### Test universal ML potential

In [None]:
from ase import units
from ase.build import bulk
from atomistics.calculators import calc_molecular_dynamics_langevin_with_ase

In [None]:
import matgl
from matgl.ext.ase import M3GNetCalculator

In [None]:
%%time
structure = bulk("Al", cubic=True).repeat([3, 3, 3])
ase_calculator = M3GNetCalculator(matgl.load_model("M3GNet-MP-2021.2.8-PES"))
result_dict = calc_molecular_dynamics_langevin_with_ase(
    structure=structure,
    ase_calculator=ase_calculator,
    run=1000,
    thermo=10,
    timestep=1 * units.fs,
    temperature=1000,
    friction=0.002,
)

In [None]:
result_dict.keys()

In [None]:
plt.plot(result_dict['energy_pot']);

In [None]:
plt.plot(result_dict['positions'][:,:,0]);

In [None]:
from pyiron_workflow.node_library.dev_tools import Output

In [None]:
# Example usage:
def get_energy(arg1, arg2):
    #print(f"Function 1 called with arguments: {arg1}, {arg2}")
    return arg1 + arg2

def function2(arg1, arg2):
    print(f"Function 2 called with arguments: {arg1}, {arg2}")
    return f"Hello, {arg1} {arg2}!"

def function3(arg):
    print(f"Function 3 called with argument: {arg}")
    return [i for i in range(arg)]

def to_dict(func, args=(), kwargs={}):
    return (func, args, kwargs)

output = Output(keys_to_run=['energy'])
print ('keys to run: ', output._keys_to_run)
output['energy'] = to_dict(get_energy, args=(3, 2))

output.run()

# Iterating over keys and values
for key, value in output.items():
    print(f"{key}: {value}")

In [None]:
'energy' in output._functions

In [None]:
func, args, kwargs = output._functions['energy']

In [None]:
func, args, kwargs = to_dict(get_energy, args=(3, 2))

In [None]:
func(*args)

In [None]:
output._functions['energy']

In [None]:
func(*args, **kwargs)

In [None]:
class LazyDict:
    def __init__(self, **kwargs):
        self._functions = {key: (value['func'], value.get('args', ()), value.get('kwargs', {})) for key, value in kwargs.items()}

    def __getitem__(self, key):
        if key not in self._functions:
            raise KeyError(f"Key '{key}' not found.")
        func, args, kwargs = self._functions[key]
        return func(*args, **kwargs)

    def __setitem__(self, key, value):
        func, args, kwargs = value
        self._functions[key] = (func, args, kwargs)

    def __delitem__(self, key):
        if key in self._functions:
            del self._functions[key]

    def keys(self):
        return list(self._functions.keys())

    def values(self):
        return [self[key] for key in self.keys()]

    def items(self):
        return [(key, self[key]) for key in self.keys()]

    def run(self, keys_to_run):
        for key in keys_to_run:
            if key in self._functions:
                self[key]

# Example usage:
def function1(arg1, arg2):
    print(f"Function 1 called with arguments: {arg1}, {arg2}")
    return arg1 + arg2

def function2(arg1, arg2):
    print(f"Function 2 called with arguments: {arg1}, {arg2}")
    return f"Hello, {arg1} {arg2}!"

def function3(arg):
    print(f"Function 3 called with argument: {arg}")
    return [i for i in range(arg)]

lazy_dict = LazyDict(
    key1={'func': function1, 'args': (3, 4), 'kwargs': {}},
    key2={'func': function2, 'args': ('John', 'Doe'), 'kwargs': {}},
    key3={'func': function3, 'args': (5,), 'kwargs': {}},
)

# Accessing values triggers function evaluation with arguments
print(lazy_dict['key1'])  # Output: Function 1 called with arguments: 3, 4 7
print(lazy_dict['key2'])  # Output: Function 2 called with arguments: John, Doe Hello, John Doe!
print(lazy_dict['key3'])  # Output: Function 3 called with argument: 5 [0, 1, 2, 3, 4]

# Adding a new key-value pair with arguments
#lazy_dict['key4'] = (lambda x, y: x * y, (2, 3), {})
lazy_dict['key4'] = (function1,  (3, 4),  {})
print(lazy_dict['key4'])  # Output: 6

# Run specific functions
lazy_dict.run(['key1', 'key2'])
# Output:
# Function 1 called with arguments: 3, 4
# Function 2 called with arguments: John, Doe

# Iterating over keys and values
for key, value in lazy_dict.items():
    print(f"{key}: {value}")

In [None]:
from phonopy.api_phonopy import Phonopy
from pyiron_workflow.node_library.dev_tools import wf_data_class

from typing import Optional, Union

In [None]:
@wf_data_class(doc_func=Phonopy.generate_displacements)
class InputPhonopyGenerateSupercells:
    distance: float = 0.01
    is_plusminus: Union[str, bool] = "auto"
    is_diagonal: bool = True
    is_trigonal: bool = False
    number_of_snapshots: Optional[int] = None
    random_seed: Optional[int] = None
    temperature: Optional[float] = None
    cutoff_frequency: Optional[float] = None
    max_distance: Optional[float] = None    

In [None]:
par = InputPhonopyGenerateSupercells(max_distance=10)
par['distance'] = 1

In [None]:
par

In [None]:
par.distance

In [None]:
par??

In [None]:
par.distances=0.2

In [None]:
par.select(keys_to_store=['distance'])

In [None]:
def test(**kwargs):
    for a in kwargs.items():
        print (a)

In [None]:
test(**par.select(keys_to_store=['distance']))

In [None]:
test(**par)

In [None]:
from ase import Atoms, build

In [None]:
Al = build.bulk('Al', cubic=True)
Al.get_positions(wrap=True)

In [None]:
Al.get_stress()

In [None]:
Al.get_

In [None]:
from dataclasses import dataclass

@dataclass
class Address:
    street: str
    city: str
    zip_code: str

@dataclass
class Person:
    name: str
    age: int
    address: Address = Address(street='', city='', zip_code='')

# Example usage
person_with_default_address = Person(name='John Doe', age=30)

# Accessing the default address
print(person_with_default_address.address)

In [None]:
from dataclasses import dataclass, field

@dataclass
class Address:
    street: str
    city: str
    zip_code: str

@dataclass
class Person:
    name: str
    age: int
    address: Address = field(default_factory=lambda: Address(street='', city='', zip_code=''))

# Example usage
person_with_default_address = Person(name='John Doe', age=30)

# Accessing the default address
print(person_with_default_address.address)

### Pseudocode for output class

In [None]:
from typing import Optional, Union
from typing import Callable, TypeVar, Any, TypeAlias
from dataclasses import dataclass

import numpy as np

In [None]:
@dataclass
class VarFunc:
    func: Callable = None
    log_level: int = 0
    unit: str = ''

In [None]:
@dataclass
class VarType:
    property: TypeVar = None
    log_level: int = 0
    unit: str = ''

In [None]:
VarFunc(func=np.sin, log_level=2).func

In [None]:
class toy_job:
    def __init__(self, x=0):
        self.x = x

    def get_energy(self):
        return np.sin(self.x)

    def get_forces(self):
        return np.ones(3)

    @property
    def my_x(self):
        return self.x

In [None]:
job = toy_job(1)
job.get_energy(), job.get_forces()

In [None]:
@dataclass
class wfOutput:
    pass

    def keys(self):
        return self.__dict__.keys()

    def __getitem__(self, key):
        return self.__dict__[key]    

    def __call__(self, job):
        out_dict = dict()
        for key in self.keys():
            print (key)
            v = self[key]
            if isinstance(v, VarFunc):
                out_dict[key] = job.__getattribute(job)()
                
        return out_dict       

In [None]:
import pint
ureg = pint.UnitRegistry()
ureg.angstrom

In [None]:
# import ase
import pint

@dataclass
class OutputEnergyStatic:
    distance: float = 0.01
    energy: VarFunc = VarFunc(func=toy_job.get_energy, log_level=0, unit=ureg.eV)
    forces: VarFunc = VarFunc(func=toy_job.get_forces, log_level=1, unit=ureg.eV/ureg.angstrom)
    prop: VarType = VarType(toy_job.my_x, log_level=2)

    def keys(self):
        return self.__dict__.keys()

    def __getitem__(self, key):
        return self.__dict__[key]    

    def __call__(self, job):
        out_dict = dict()
        for key in self.keys():
            print (key)
            v = self[key]
            if isinstance(v, VarFunc):
                out_dict[key] = job.__getattribute(job)()
                
        return out_dict 

In [None]:
xx

In [None]:
%%time
df = wf.iter(cell_size=list(range(1,4)), element=['Al'], vacancy_index=[None, 0], displacement=[0.01, 0.1]) #, Cu, Pd, Ag, Pt and Au])

In [None]:
df

In [None]:
df.energy_displaced

### Parallel pooling

In [None]:
@single_value_node('out')
def sleep(time=1, a=None, b=10):
    from time import sleep
    
    sleep(time) 
    return dict(times=time, a2=a, b2=b)

In [None]:
import inspect

print(inspect.getsource(sleep.node_function))

In [None]:
%%time
sleep().iter(a=[1,2,3,4,5], max_workers=5, executor=1)

In [None]:
def sort_list_by_first_element(input_list):
    sorted_list = sorted(input_list, key=lambda x: x[0])
    return sorted_list

In [None]:
def func(node, **kwargs):
    return node(**kwargs).run()

create list of dictionaries

In [None]:
def to_list_of_kwargs(**kwargs):
    keys = list(kwargs.keys())
    lists = list(kwargs.values())

    # Get the number of dimensions
    num_dimensions = len(keys)

    # Get the length of each list
    lengths = [len(lst) for lst in lists]

    # Initialize indices
    indices = [0] * num_dimensions

    kwargs_list = []

    # Perform multidimensional for loop
    count = 0
    while indices[0] < lengths[0]:
        # Access the current elements using indices
        current_elements = [lists[i][indices[i]] for i in range(num_dimensions)]

        # Add current_elements as a dictionary
        current_elements_kwarg = dict(zip(keys, current_elements))
        kwargs_list.append(current_elements_kwarg)

        # Update indices for the next iteration
        indices[num_dimensions - 1] += 1

        # Update indices and carry-over if needed
        for i in range(num_dimensions - 1, 0, -1):
            if indices[i] == lengths[i]:
                indices[i] = 0
                indices[i - 1] += 1
                
    return kwargs_list                

In [None]:
def iter(node, max_workers=5, **kwargs):
    from concurrent.futures import ThreadPoolExecutor, as_completed
    import pandas as pd
    
    futures = []
    future_index_map = {}
    out = []
    out_index = []

    refs = to_list_of_kwargs(**kwargs)
    df_refs = pd.DataFrame(refs)
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:  
        for i, ref in enumerate(refs):    
            future = executor.submit(func, node, **ref)
            future_index_map[future] = i
            futures.append(future)
        
        for future in as_completed(futures):
            out.append(future.result())
            out_index.append(future_index_map[future])
        
    df_out = pd.DataFrame(out, index=out_index).sort_index()
    return pd.concat([df_refs, df_out], axis=1)
    

In [None]:
iter(sleep, max_workers=10, a=[1,2,3,4], b=[1,3])

In [None]:
type(sleep())()

In [None]:
import pandas as pd

In [None]:
pd.DataFrame(dict(a=[1,2,3,4]), index=[2,1,4,3]).sort_index()

In [None]:
%%time
from pyiron_workflow.node_library.atomistic.engine.lammps import Code

In [None]:
%config IPCompleter.evaluation='unsafe'

from pyiron_workflow import Workflow
import numpy as np

In [None]:
wf = Workflow('test')
wf.register('pyiron_workflow.node_library.atomistic', domain='atomistic')

In [None]:
@Workflow.wrap_as.macro_node('energy_pot')
def energy_at_volume(wf, element='Al', cell_size=2, strain=0):

    wf.structure = wf.create.atomistic.build.cubic_bulk_cell(element=element, cubic=True, cell_size=cell_size)
    wf.apply_strain = wf.create.atomistic.structure.transform.apply_strain(structure=wf.structure.outputs.structure, strain=strain)
    wf.engine = wf.create.atomistic.engine.lammps.Code(structure=wf.apply_strain)  # TODO: find a way to avoid structure=wf.structure !
    wf.calc = wf.create.atomistic.calculator.generic.static(structure=wf.apply_strain, engine=wf.engine)
    
    return wf.calc.outputs.generic.energy_pot

In [None]:
df = energy_at_volume(element='Fe').iter(strain=np.linspace(-0.2, 0.2, 11))
df.plot(x='strain', ylabel='Energy (eV)', title='Energy-Volume Curve');

In [None]:
wf = Workflow('test')
wf.register('pyiron_workflow.node_library.atomistic', domain='atomistic')
wf.structure = wf.create.atomistic.structure.build.bulk('Al')
wf.engine = wf.create.atomistic.engine.lammps.Code(structure=wf.structure)  # TODO: find a way to avoid structure=wf.structure !
wf.calc = wf.create.atomistic.calculator.generic.static(structure=wf.structure, engine=wf.engine)

wf.run()

In [None]:
wf.calc.outputs.generic.value.energy_pot

In [None]:
wf.engine.outputs.generic.value

In [None]:
wf.calc

In [None]:
wf.calc.outputs.generic.value.energy_pot

In [None]:
%%time
from pyiron_workflow.node_library.atomistic.calculator.data import InputCalcMinimize, InputCalcStatic

In [None]:
%%time
from pyiron.lammps.base import LammpsControl

In [None]:
InputCalcMinimize(), InputCalcStatic()

In [None]:
from pyiron_atomistics.lammps.control import LammpsControl

In [None]:
LammpsControl._mode