diff --git a/notebooks/workflow_example.ipynb b/notebooks/workflow_example.ipynb index 06e536ed2..f6fd2ab9a 100644 --- a/notebooks/workflow_example.ipynb +++ b/notebooks/workflow_example.ipynb @@ -666,8 +666,8 @@ { "data": { "text/plain": [ - "array([0.1153697 , 0.29712504, 0.22636199, 0.1263152 , 0.00630191,\n", - " 0.64039423, 0.73223408, 0.76977259, 0.62491999, 0.52663026])" + "array([0.6816222 , 0.60285251, 0.31984666, 0.38336884, 0.95586544,\n", + " 0.20915899, 0.73614411, 0.67259937, 0.84499503, 0.10539287])" ] }, "execution_count": 23, @@ -676,7 +676,7 @@ }, { "data": { - "image/png": "", + "image/png": "", "text/plain": [ "
" ] @@ -932,127 +932,127 @@ "clustersimple\n", "\n", "simple: Workflow\n", - "\n", - "clustersimpleInputs\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "Inputs\n", - "\n", "\n", "clustersimpleOutputs\n", "\n", - "\n", + "\n", "\n", "\n", "\n", "\n", - "\n", + "\n", "Outputs\n", "\n", "\n", "clustersimplea\n", "\n", - "\n", + "\n", "\n", "\n", "\n", "\n", - "\n", + "\n", "a: AddOne\n", "\n", "\n", "clustersimpleaInputs\n", "\n", - "\n", + "\n", "\n", "\n", "\n", "\n", - "\n", + "\n", "Inputs\n", "\n", "\n", "clustersimpleaOutputs\n", "\n", - "\n", + "\n", "\n", "\n", "\n", "\n", - "\n", + "\n", "Outputs\n", "\n", "\n", "clustersimpleb\n", "\n", - "\n", + "\n", "\n", "\n", "\n", "\n", - "\n", + "\n", "b: AddOne\n", "\n", "\n", "clustersimplebInputs\n", "\n", - "\n", + "\n", "\n", "\n", "\n", "\n", - "\n", + "\n", "Inputs\n", "\n", "\n", "clustersimplebOutputs\n", "\n", - "\n", + "\n", "\n", "\n", "\n", "\n", - "\n", + "\n", "Outputs\n", "\n", "\n", "clustersimplesum\n", "\n", - "\n", + "\n", "\n", "\n", "\n", "\n", - "\n", + "\n", "sum: AddNode\n", "\n", "\n", "clustersimplesumInputs\n", "\n", - "\n", + "\n", "\n", "\n", "\n", "\n", - "\n", + "\n", "Inputs\n", "\n", "\n", "clustersimplesumOutputs\n", "\n", - "\n", + "\n", "\n", "\n", "\n", "\n", - "\n", + "\n", "Outputs\n", "\n", + "\n", + "clustersimpleInputs\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "Inputs\n", + "\n", "\n", "\n", "clustersimpleInputsrun\n", @@ -1231,7 +1231,7 @@ "\n" ], "text/plain": [ - "" + "" ] }, "execution_count": 29, @@ -1262,7 +1262,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "5ee2f89cadea46a8926434ab39f44805", + "model_id": "11fa1336d10a42f4936ce22a299f191d", "version_major": 2, "version_minor": 0 }, @@ -1275,7 +1275,7 @@ "name": "stderr", "output_type": "stream", "text": [ - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:159: UserWarning: The channel run was not connected to ran, andthus could not disconnect from it.\n", + "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:158: UserWarning: The channel run was not connected to ran, andthus could not disconnect from it.\n", " warn(\n" ] }, @@ -1289,7 +1289,7 @@ { "data": { "text/plain": [ - "" + "" ] }, "execution_count": 30, @@ -1541,7 +1541,7 @@ "\n" ], "text/plain": [ - "" + "" ] }, "execution_count": 31, @@ -1583,7 +1583,7 @@ "name": "stderr", "output_type": "stream", "text": [ - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:159: UserWarning: The channel run was not connected to ran, andthus could not disconnect from it.\n", + "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:158: UserWarning: The channel run was not connected to ran, andthus could not disconnect from it.\n", " warn(\n" ] }, @@ -1703,6 +1703,7 @@ "\n", "@Workflow.wrap_as.single_value_node()\n", "def per_atom_energy_difference(structure1, energy1, structure2, energy2):\n", + " # The unrelaxed structure is fine, we're just using it to get n_atoms\n", " de = (energy2[-1]/len(structure2)) - (energy1[-1]/len(structure1))\n", " return de" ] @@ -2960,7 +2961,7 @@ "\n" ], "text/plain": [ - "" + "" ] }, "execution_count": 37, @@ -3003,7 +3004,7 @@ "name": "stderr", "output_type": "stream", "text": [ - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:159: UserWarning: The channel run was not connected to ran, andthus could not disconnect from it.\n", + "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:158: UserWarning: The channel run was not connected to ran, andthus could not disconnect from it.\n", " warn(\n" ] }, @@ -3044,16 +3045,21 @@ "name": "stderr", "output_type": "stream", "text": [ - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:159: UserWarning: The channel job was not connected to job, andthus could not disconnect from it.\n", + "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:158: UserWarning: The channel job was not connected to job, andthus could not disconnect from it.\n", + " warn(\n", + "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:158: UserWarning: The channel run was not connected to ran, andthus could not disconnect from it.\n", + " warn(\n", + "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:158: UserWarning: The channel element was not connected to user_input, andthus could not disconnect from it.\n", " warn(\n", - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:159: UserWarning: The channel energy_pot was not connected to energy1, andthus could not disconnect from it.\n", + "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:158: UserWarning: The channel structure was not connected to structure1, andthus could not disconnect from it.\n", " warn(\n", - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:159: UserWarning: The channel run was not connected to ran, andthus could not disconnect from it.\n", + "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:158: UserWarning: The channel energy was not connected to energy1, andthus could not disconnect from it.\n", " warn(\n" ] } ], "source": [ + "replacee = wf.min_phase1.calc \n", "wf.min_phase1.calc = Macro.create.atomistics.CalcStatic" ] }, @@ -3085,7 +3091,7 @@ ], "source": [ "# Bad guess\n", - "out = wf(element=\"Al\", phase1=\"fcc\", phase2=\"hcp\", lattice_guess1=3, lattice_guess2=3)\n", + "out = wf(element=\"Al\", phase1=\"fcc\", phase2=\"hcp\", lattice_guess1=3, lattice_guess2=3.1)\n", "print(f\"{wf.inputs.element.value}: E({wf.inputs.phase2.value}) - E({wf.inputs.phase1.value}) = {out.compare__de:.2f} eV/atom\")" ] }, @@ -3099,7 +3105,7 @@ "name": "stderr", "output_type": "stream", "text": [ - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:159: UserWarning: The channel run was not connected to ran, andthus could not disconnect from it.\n", + "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:158: UserWarning: The channel run was not connected to ran, andthus could not disconnect from it.\n", " warn(\n" ] }, @@ -3115,7 +3121,7 @@ ], "source": [ "# Good guess\n", - "out = wf(element=\"Al\", phase1=\"fcc\", phase2=\"hcp\", lattice_guess1=4.05, lattice_guess2=3)\n", + "out = wf(element=\"Al\", phase1=\"fcc\", phase2=\"hcp\", lattice_guess1=4.05, lattice_guess2=3.2)\n", "print(f\"{wf.inputs.element.value}: E({wf.inputs.phase2.value}) - E({wf.inputs.phase1.value}) = {out.compare__de:.2f} eV/atom\")" ] }, @@ -3240,9 +3246,9 @@ "name": "stderr", "output_type": "stream", "text": [ - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:159: UserWarning: The channel run was not connected to true, andthus could not disconnect from it.\n", + "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:158: UserWarning: The channel run was not connected to true, andthus could not disconnect from it.\n", " warn(\n", - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:159: UserWarning: The channel run was not connected to ran, andthus could not disconnect from it.\n", + "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:158: UserWarning: The channel run was not connected to ran, andthus could not disconnect from it.\n", " warn(\n" ] } @@ -3323,21 +3329,12 @@ "name": "stdout", "output_type": "stream", "text": [ - "0.885 > 0.2\n", - "0.790 > 0.2\n", - "0.395 > 0.2\n", - "0.593 > 0.2\n", - "0.220 > 0.2\n", - "0.440 > 0.2\n", - "0.523 > 0.2\n", - "0.407 > 0.2\n", - "0.479 > 0.2\n", - "0.883 > 0.2\n", - "0.607 > 0.2\n", - "0.767 > 0.2\n", - "0.768 > 0.2\n", - "0.012 <= 0.2\n", - "Finally 0.012\n" + "0.406 > 0.2\n", + "0.999 > 0.2\n", + "0.827 > 0.2\n", + "0.417 > 0.2\n", + "0.120 <= 0.2\n", + "Finally 0.120\n" ] } ], diff --git a/pyiron_workflow/channels.py b/pyiron_workflow/channels.py index 9f8a82a91..4cfee805a 100644 --- a/pyiron_workflow/channels.py +++ b/pyiron_workflow/channels.py @@ -279,11 +279,54 @@ def __init__( node: Node, default: typing.Optional[typing.Any] = NotData, type_hint: typing.Optional[typing.Any] = None, + value_receiver: typing.Optional[InputData] = None, ): super().__init__(label=label, node=node) + self._value = NotData + self._value_receiver = None self.default = default self.value = default self.type_hint = type_hint + self.value_receiver = value_receiver + + @property + def value(self): + return self._value + + @value.setter + def value(self, new_value): + if self.value_receiver is not None: + self.value_receiver.value = new_value + self._value = new_value + + @property + def value_receiver(self) -> InputData | OutputData | None: + """ + Another data channel of the same type to whom new values are always pushed + (without type checking of any sort, not even when forming the couple!) + + Useful for macros, so that the IO of owned nodes and IO at the macro level can + be kept synchronized. + """ + return self._value_receiver + + @value_receiver.setter + def value_receiver(self, new_partner: InputData | OutputData | None): + if new_partner is not None: + if not isinstance(new_partner, self.__class__): + raise TypeError( + f"The {self.__class__.__name__} {self.label} got a coupling " + f"partner {new_partner} but requires something of the same type" + ) + + if new_partner is self: + raise ValueError( + f"{self.__class__.__name__} {self.label} cannot couple to itself" + ) + + new_partner.value = self.value + + self._value_receiver = new_partner @property def generic_type(self) -> type[Channel]: @@ -375,6 +418,7 @@ def __init__( node: Node, default: typing.Optional[typing.Any] = NotData, type_hint: typing.Optional[typing.Any] = None, + value_receiver: typing.Optional[InputData] = None, strict_connections: bool = True, ): super().__init__( @@ -382,6 +426,7 @@ def __init__( node=node, default=default, type_hint=type_hint, + value_receiver=value_receiver, ) self.strict_connections = strict_connections diff --git a/pyiron_workflow/composite.py b/pyiron_workflow/composite.py index cd1132593..1bb1abc29 100644 --- a/pyiron_workflow/composite.py +++ b/pyiron_workflow/composite.py @@ -5,7 +5,7 @@ from __future__ import annotations -from abc import ABC +from abc import ABC, abstractmethod from functools import partial from typing import Literal, Optional, TYPE_CHECKING @@ -19,7 +19,7 @@ from pyiron_workflow.util import logger, DotDict, SeabornColors if TYPE_CHECKING: - from pyiron_workflow.channels import Channel + from pyiron_workflow.channels import Channel, InputData, OutputData class Composite(Node, ABC): @@ -106,7 +106,7 @@ def __init__( self._outputs_map = None self.inputs_map = inputs_map self.outputs_map = outputs_map - self.nodes: DotDict[str:Node] = DotDict() + self.nodes: DotDict[str, Node] = DotDict() self.starting_nodes: list[Node] = [] self._creator = self.create self.create = self._owned_creator # Override the create method from the class @@ -138,17 +138,6 @@ def _owned_creator(self): """ return OwnedCreator(self, self._creator) - @property - def executor(self) -> None: - return None - - @executor.setter - def executor(self, new_executor): - if new_executor is not None: - raise NotImplementedError( - "Running composite nodes with an executor is not yet supported" - ) - def to_dict(self): return { "label": self.label, @@ -170,10 +159,21 @@ def run_args(self) -> dict: return {"_nodes": self.nodes, "_starting_nodes": self.starting_nodes} def process_run_result(self, run_output): - # self.nodes = run_output - # Running on an executor will require a more sophisticated idea than above + if run_output is not self.nodes: + # Then we probably ran on a parallel process and have an unpacked future + self._update_children(run_output) return DotDict(self.outputs.to_value_dict()) + def _update_children(self, children_from_another_process: DotDict[str, Node]): + """ + If you receive a new dictionary of children, e.g. from unpacking a futures + object of your own children you sent off to another process for computation, + replace your own nodes with them, and set yourself as their parent. + """ + for child in children_from_another_process.values(): + child.parent = self + self.nodes = children_from_another_process + def disconnect_run(self) -> list[tuple[Channel, Channel]]: """ Disconnect all `signals.input.run` connections on all child nodes. @@ -260,29 +260,76 @@ def get_data_digraph(self) -> dict[str, set[str]]: def _build_io( self, - io: Inputs | Outputs, - target: Literal["inputs", "outputs"], - key_map: dict[str, str] | None, + i_or_o: Literal["inputs", "outputs"], + key_map: dict[str, str | None] | None, ) -> Inputs | Outputs: + """ + Build an IO panel for exposing child node IO to the outside world at the level + of the composite node's IO. + + Args: + target [Literal["inputs", "outputs"]]: Whether this is I or O. + key_map [dict[str, str]|None]: A map between the default convention for + mapping child IO to composite IO (`"{node.label}__{channel.label}"`) and + whatever label you actually want to expose to the composite user. Also + allows non-standards channel exposure, i.e. exposing + internally-connected channels (which would not normally be exposed) by + providing a string-to-string map, or suppressing unconnected channels + (which normally would be exposed) by providing a string-None map. + + Returns: + (Inputs|Outputs): The populated panel. + """ key_map = {} if key_map is None else key_map + io = Inputs() if i_or_o == "inputs" else Outputs() for node in self.nodes.values(): - panel = getattr(node, target) + panel = getattr(node, i_or_o) for channel_label in panel.labels: channel = panel[channel_label] default_key = f"{node.label}__{channel_label}" try: - if key_map[default_key] is not None: - io[key_map[default_key]] = channel + io_panel_key = key_map[default_key] + if io_panel_key is not None: + io[io_panel_key] = self._get_linking_channel( + channel, io_panel_key + ) except KeyError: if not channel.connected: - io[default_key] = channel + io[default_key] = self._get_linking_channel( + channel, default_key + ) return io + @abstractmethod + def _get_linking_channel( + self, + child_reference_channel: InputData | OutputData, + composite_io_key: str, + ) -> InputData | OutputData: + """ + Returns the channel that will be the link between the provided child channel, + and the composite's IO at the given key. + + The returned channel should be fully compatible with the provided child channel, + i.e. same type, same type hint... (For instance, the child channel itself is a + valid return, which would create a composite IO panel that works by reference.) + + Args: + child_reference_channel (InputData | OutputData): The child channel + composite_io_key (str): The key under which this channel will be stored on + the composite's IO. + + Returns: + (Channel): A channel with the same type, type hint, etc. as the reference + channel passed in. + """ + pass + def _build_inputs(self) -> Inputs: - return self._build_io(Inputs(), "inputs", self.inputs_map) + return self._build_io("inputs", self.inputs_map) def _build_outputs(self) -> Outputs: - return self._build_io(Outputs(), "outputs", self.outputs_map) + return self._build_io("outputs", self.outputs_map) def add(self, node: Node, label: Optional[str] = None) -> None: """ @@ -377,7 +424,7 @@ def remove(self, node: Node | str) -> list[tuple[Channel, Channel]]: del self.nodes[node.label] return disconnected - def replace(self, owned_node: Node | str, replacement: Node | type[Node]): + def replace(self, owned_node: Node | str, replacement: Node | type[Node]) -> Node: """ Replaces a node currently owned with a new node instance. The replacement must not belong to any other parent or have any connections. @@ -385,6 +432,12 @@ def replace(self, owned_node: Node | str, replacement: Node | type[Node]): channel labels need to match precisely, but additional channels may be present. After replacement, the new node will have the old node's connections, label, and belong to this composite. + The labels are swapped, such that the replaced node gets the name of its + replacement (which might be silly, but is useful in case you want to revert the + change and swap the replaced node back in!) + + If replacement fails for some reason, the replacement and replacing node are + both returned to their original state, and the composite is left unchanged. Args: owned_node (Node|str): The node to replace or its label. @@ -420,13 +473,17 @@ def replace(self, owned_node: Node | str, replacement: Node | type[Node]): f"got {replacement}" ) - replacement.copy_io(owned_node) - replacement.label = owned_node.label + replacement.copy_io(owned_node) # If the replacement is incompatible, we'll + # fail here before we've changed the parent at all. Since the replacement was + # first guaranteed to be an unconnected orphan, there is not yet any permanent + # damage is_starting_node = owned_node in self.starting_nodes self.remove(owned_node) + replacement.label, owned_node.label = owned_node.label, replacement.label self.add(replacement) if is_starting_node: self.starting_nodes.append(replacement) + return owned_node def __setattr__(self, key: str, node: Node): if isinstance(node, Node) and key != "parent": @@ -501,6 +558,16 @@ def __getattr__(self, item): return value + def __getstate__(self): + # Compatibility with python <3.11 + return self.__dict__ + + def __setstate__(self, state): + # Because we override getattr, we need to use __dict__ assignment directly in + # __setstate__ + self.__dict__["_parent"] = state["_parent"] + self.__dict__["_creator"] = state["_creator"] + class OwnedNodePackage: """ @@ -517,3 +584,9 @@ def __getattr__(self, item): if issubclass(value, Node): value = partial(value, parent=self._parent) return value + + def __getstate__(self): + return self.__dict__ + + def __setstate__(self, state): + self.__dict__ = state diff --git a/pyiron_workflow/function.py b/pyiron_workflow/function.py index ca43c0323..34b85c91c 100644 --- a/pyiron_workflow/function.py +++ b/pyiron_workflow/function.py @@ -474,11 +474,10 @@ def on_run(self): def run_args(self) -> dict: kwargs = self.inputs.to_value_dict() if "self" in self._input_args: - if self.executor is not None: - raise NotImplementedError( - f"The node {self.label} cannot be run on an executor because it " - f"uses the `self` argument and this functionality is not yet " - f"implemented" + if self.executor: + raise ValueError( + f"Function node {self.label} uses the `self` argument, but this " + f"can't yet be run with executors" ) kwargs["self"] = self return kwargs diff --git a/pyiron_workflow/interfaces.py b/pyiron_workflow/interfaces.py index 36cd2c858..02d826de8 100644 --- a/pyiron_workflow/interfaces.py +++ b/pyiron_workflow/interfaces.py @@ -8,9 +8,7 @@ from pyiron_base.interfaces.singleton import Singleton -# from pyiron_contrib.executors import CloudpickleProcessPoolExecutor as Executor # from pympipool.mpi.executor import PyMPISingleTaskExecutor as Executor - from pyiron_workflow.executors import CloudpickleProcessPoolExecutor as Executor from pyiron_workflow.function import ( @@ -60,23 +58,17 @@ def Workflow(self): @property def standard(self): - try: - return self._standard - except AttributeError: - from pyiron_workflow.node_library.standard import nodes + from pyiron_workflow.node_package import NodePackage + from pyiron_workflow.node_library.standard import nodes - self.register("_standard", *nodes) - return self._standard + return NodePackage(*nodes) @property def atomistics(self): - try: - return self._atomistics - except AttributeError: - from pyiron_workflow.node_library.atomistics import nodes + from pyiron_workflow.node_package import NodePackage + from pyiron_workflow.node_library.atomistics import nodes - self.register("_atomistics", *nodes) - return self._atomistics + return NodePackage(*nodes) @property def meta(self): @@ -87,11 +79,15 @@ def meta(self): return self._meta def register(self, domain: str, *nodes: list[type[Node]]): - if domain in self.__dir__(): - raise AttributeError(f"{domain} is already an attribute of {self}") - from pyiron_workflow.node_package import NodePackage - - setattr(self, domain, NodePackage(*nodes)) + raise NotImplementedError( + "Registering new node packages is currently not playing well with " + "executors. We hope to return this feature soon." + ) + # if domain in self.__dir__(): + # raise AttributeError(f"{domain} is already an attribute of {self}") + # from pyiron_workflow.node_package import NodePackage + # + # setattr(self, domain, NodePackage(*nodes)) class Wrappers(metaclass=Singleton): diff --git a/pyiron_workflow/io.py b/pyiron_workflow/io.py index 20e1ca79c..f9b341dbf 100644 --- a/pyiron_workflow/io.py +++ b/pyiron_workflow/io.py @@ -161,6 +161,15 @@ def to_dict(self): "channels": {l: c.to_dict() for l, c in self.channel_dict.items()}, } + def __getstate__(self): + # Compatibility with python <3.11 + return self.__dict__ + + def __setstate__(self, state): + # Because we override getattr, we need to use __dict__ assignment directly in + # __setstate__ the same way we need it in __init__ + self.__dict__["channel_dict"] = state["channel_dict"] + class DataIO(IO, ABC): """ diff --git a/pyiron_workflow/macro.py b/pyiron_workflow/macro.py index cdea3244a..1fb567565 100644 --- a/pyiron_workflow/macro.py +++ b/pyiron_workflow/macro.py @@ -8,6 +8,7 @@ from functools import partialmethod from typing import Optional, TYPE_CHECKING +from pyiron_workflow.channels import InputData, OutputData from pyiron_workflow.composite import Composite from pyiron_workflow.io import Outputs, Inputs @@ -184,6 +185,39 @@ def __init__( self.update_input(**kwargs) + def _get_linking_channel( + self, + child_reference_channel: InputData | OutputData, + composite_io_key: str, + ) -> InputData | OutputData: + """ + Build IO by value: create a new channel just like the child's channel. + + In the case of input data, we also form a value link from the composite channel + down to the child channel, so that the child will stay up-to-date. + """ + composite_channel = child_reference_channel.__class__( + label=composite_io_key, + node=self, + default=child_reference_channel.default, + type_hint=child_reference_channel.type_hint, + ) + composite_channel.value = child_reference_channel.value + + if isinstance(composite_channel, InputData): + composite_channel.strict_connections = ( + child_reference_channel.strict_connections + ) + composite_channel.value_receiver = child_reference_channel + elif isinstance(composite_channel, OutputData): + child_reference_channel.value_receiver = composite_channel + else: + raise TypeError( + "This should not be an accessible state, please contact the developers" + ) + + return composite_channel + @property def inputs(self) -> Inputs: return self._inputs @@ -192,9 +226,47 @@ def inputs(self) -> Inputs: def outputs(self) -> Outputs: return self._outputs + def _update_children(self, children_from_another_process): + super()._update_children(children_from_another_process) + self._rebuild_data_io() + def _rebuild_data_io(self): - self._inputs = self._build_inputs() - self._outputs = self._build_outputs() + """ + Try to rebuild the IO. + + If an error is encountered, revert back to the existing IO then raise it. + """ + old_inputs = self.inputs + old_outputs = self.outputs + connection_changes = [] # For reversion if there's an error + try: + self._inputs = self._build_inputs() + self._outputs = self._build_outputs() + for old, new in [(old_inputs, self.inputs), (old_outputs, self.outputs)]: + for old_channel in old: + if old_channel.connected: + # If the old channel was connected to stuff, we'd better still + # have a corresponding channel and be able to copy these, or we + # should fail hard. + # But, if it wasn't connected, we don't even care whether or not + # we still have a corresponding channel to copy to + new_channel = new[old_channel.label] + new_channel.copy_connections(old_channel) + swapped_conenctions = old_channel.disconnect_all() # Purge old + connection_changes.append( + (new_channel, old_channel, swapped_conenctions) + ) + except Exception as e: + for new_channel, old_channel, swapped_conenctions in connection_changes: + new_channel.disconnect(*swapped_conenctions) + old_channel.connect(*swapped_conenctions) + self._inputs = old_inputs + self._outputs = old_outputs + e.message = ( + f"Unable to rebuild IO for {self.label}; reverting to old IO." + f"{e.message}" + ) + raise e def _configure_graph_execution(self): run_signals = self.disconnect_run() @@ -223,11 +295,17 @@ def _reconnect_run(self, run_signal_pairs_to_restore): pairs[0].connect(pairs[1]) def replace(self, owned_node: Node | str, replacement: Node | type[Node]): - super().replace(owned_node=owned_node, replacement=replacement) - # Make sure node-level IO is pointing to the new node - self._rebuild_data_io() - # This is brute-force overkill since only the replaced node needs to be updated - # but it's not particularly expensive + replaced_node = super().replace(owned_node=owned_node, replacement=replacement) + try: + # Make sure node-level IO is pointing to the new node and that macro-level + # IO gets safely reconstructed + self._rebuild_data_io() + except Exception as e: + # If IO can't be successfully rebuilt using this node, revert changes and + # raise the exception + self.replace(replacement, replaced_node) # Guaranteed to work since + # replacement in the other direction was already a success + raise e def to_workfow(self): raise NotImplementedError diff --git a/pyiron_workflow/meta.py b/pyiron_workflow/meta.py index 948d4bb1d..2f8020e1c 100644 --- a/pyiron_workflow/meta.py +++ b/pyiron_workflow/meta.py @@ -159,7 +159,7 @@ def make_loop(macro): # Connect each body node output to the output interface's respective input for body_node, inp in zip(body_nodes, interface.inputs): inp.connect(body_node.outputs[label]) - if body_node.executor is not None: + if body_node.executor: raise NotImplementedError( "Right now the output interface gets run after each body node," "if the body nodes can run asynchronously we need something " diff --git a/pyiron_workflow/node.py b/pyiron_workflow/node.py index 26c50b128..1199d3802 100644 --- a/pyiron_workflow/node.py +++ b/pyiron_workflow/node.py @@ -12,6 +12,7 @@ from pyiron_workflow.channels import NotData from pyiron_workflow.draw import Node as GraphvizNode +from pyiron_workflow.executors import CloudpickleProcessPoolExecutor as Executor from pyiron_workflow.files import DirectoryObject from pyiron_workflow.has_to_dict import HasToDict from pyiron_workflow.io import Signals, InputSignal, OutputSignal @@ -103,8 +104,11 @@ class Node(HasToDict, ABC): Their value is controlled automatically in the defined `run` and `finish_run` methods. - Nodes can be run on the main python process that owns them, or by assigning an - appropriate executor to their `executor` attribute. + Nodes can be run on the main python process that owns them, or by setting their + `executor` attribute to `True`, in which case a + `pyiron_workflow.executors.CloudPickleExecutor` will be used to run the node on a + new process on a single core (in the future, the interface will look a little + different and you'll have more options than that). In case they are run with an executor, their `future` attribute will be populated with the resulting future object. WARNING: Executors are currently only working when the node executable function does @@ -182,7 +186,10 @@ def __init__( # TODO: Provide support for actually computing stuff with the executor self.signals = self._build_signal_channels() self._working_directory = None - self.executor = None + self.executor = False + # We call it an executor, but it's just whether to use one. + # This is a simply stop-gap as we work out more sophisticated ways to reference + # (or create) an executor process without ever trying to pickle a `_thread.lock` self.future: None | Future = None @property @@ -291,13 +298,14 @@ def _run(self, finished_callback: callable) -> Any | tuple | Future: Handles the status of the node, and communicating with any remote computing resources. """ - if self.executor is None: + if not self.executor: run_output = self.on_run(**self.run_args) return finished_callback(run_output) else: # Just blindly try to execute -- as we nail down the executor interaction # we'll want to fail more cleanly here. - self.future = self.executor.submit(self.on_run, **self.run_args) + executor = Executor() + self.future = executor.submit(self.on_run, **self.run_args) self.future.add_done_callback(finished_callback) return self.future @@ -605,3 +613,31 @@ def replace_with(self, other: Node | type[Node]): self.parent.replace(self, other) else: warnings.warn(f"Could not replace {self.label}, as it has no parent.") + + def __getstate__(self): + state = self.__dict__ + state["parent"] = None + # I am not at all confident that removing the parent here is the _right_ + # solution. + # In order to run composites on a parallel process, we ship off just the nodes + # and starting nodes. + # When the parallel process returns these, they're obviously different + # instances, so we re-parent them back to the receiving composite. + # At the same time, we want to make sure that the _old_ children get orphaned. + # Of course, we could do that directly in the composite method, but it also + # works to do it here. + # Something I like about this, is it also means that when we ship groups of + # nodes off to another process with cloudpickle, they're definitely not lugging + # along their parent, its connections, etc. with them! + # This is all working nicely as demonstrated over in the macro test suite. + # However, I have a bit of concern that when we start thinking about + # serialization for storage instead of serialization to another process, this + # might introduce a hard-to-track-down bug. + # For now, it works and I'm going to be super pragmatic and go for it, but + # for the record I am admitting that the current shallowness of my understanding + # may cause me/us headaches in the future. + # -Liam + return self.__dict__ + + def __setstate__(self, state): + self.__dict__ = state diff --git a/pyiron_workflow/util.py b/pyiron_workflow/util.py index 61dae6c7c..eff32b27d 100644 --- a/pyiron_workflow/util.py +++ b/pyiron_workflow/util.py @@ -13,6 +13,13 @@ def __setattr__(self, key, value): def __dir__(self): return set(super().__dir__() + list(self.keys())) + def __getstate__(self): + return self.__dict__ + + def __setstate__(self, state): + for k, v in state.items(): + self.__dict__[k] = v + class SeabornColors: """ diff --git a/pyiron_workflow/workflow.py b/pyiron_workflow/workflow.py index 4ee1be006..5817fa7e7 100644 --- a/pyiron_workflow/workflow.py +++ b/pyiron_workflow/workflow.py @@ -15,6 +15,7 @@ if TYPE_CHECKING: from bidict import bidict + from pyiron_workflow.channels import InputData, OutputData from pyiron_workflow.node import Node @@ -184,6 +185,16 @@ def __init__( for node in nodes: self.add(node) + def _get_linking_channel( + self, + child_reference_channel: InputData | OutputData, + composite_io_key: str, + ) -> InputData | OutputData: + """ + Build IO by reference: just return the child's channel itself. + """ + return child_reference_channel + @property def inputs(self) -> Inputs: return self._build_inputs() diff --git a/tests/unit/test_channels.py b/tests/unit/test_channels.py index f747350c5..c6e711fe5 100644 --- a/tests/unit/test_channels.py +++ b/tests/unit/test_channels.py @@ -234,6 +234,44 @@ def test_ready(self): self.ni1.value = "Not numeric at all" self.assertFalse(self.ni1.ready) + def test_input_coupling(self): + self.assertNotEqual( + self.ni2.value, + 2, + msg="Ensure we start from a setup that the next test is meaningful" + ) + self.ni1.value = 2 + self.ni1.value_receiver = self.ni2 + self.assertEqual( + self.ni2.value, + 2, + msg="Coupled value should get updated on coupling" + ) + self.ni1.value = 3 + self.assertEqual( + self.ni2.value, + 3, + msg="Coupled value should get updated after partner update" + ) + self.ni2.value = 4 + self.assertEqual( + self.ni1.value, + 3, + msg="Coupling is uni-directional, the partner should not push values back" + ) + + with self.assertRaises( + TypeError, + msg="Only input data channels are valid partners" + ): + self.ni1.value_receiver = self.no + + with self.assertRaises( + ValueError, + msg="Must not couple to self to avoid infinite recursion" + ): + self.ni1.value_receiver = self.ni1 + class TestSignalChannels(TestCase): def setUp(self) -> None: diff --git a/tests/unit/test_function.py b/tests/unit/test_function.py index 01cf516d4..4f2958996 100644 --- a/tests/unit/test_function.py +++ b/tests/unit/test_function.py @@ -4,11 +4,6 @@ import unittest import warnings -# from pyiron_contrib.executors import CloudpickleProcessPoolExecutor as Executor -# from pympipool.mpi.executor import PyMPISingleTaskExecutor as Executor - -from pyiron_workflow.executors import CloudpickleProcessPoolExecutor as Executor - from pyiron_workflow.channels import NotData, ChannelConnectionError from pyiron_workflow.files import DirectoryObject from pyiron_workflow.function import ( @@ -304,12 +299,14 @@ def with_self(self, x: float) -> float: msg="Function functions should be able to modify attributes on the node object." ) - node.executor = Executor() - with self.assertRaises(NotImplementedError): - # Submitting node_functions that use self is still raising - # TypeError: cannot pickle '_thread.lock' object - # For now we just fail cleanly + node.executor = True + with self.assertRaises( + ValueError, + msg="We haven't implemented any way to update a function node's `self` when" + "it runs on an executor, so trying to do so should fail hard" + ): node.run() + node.executor = False def with_messed_self(x: float, self) -> float: return x + 0.1 @@ -398,7 +395,7 @@ def test_return_value(self): ) with self.subTest("Run on executor"): - node.executor = Executor() + node.executor = True return_on_explicit_run = node.run() self.assertIsInstance( diff --git a/tests/unit/test_macro.py b/tests/unit/test_macro.py index b8d811339..73bae0f59 100644 --- a/tests/unit/test_macro.py +++ b/tests/unit/test_macro.py @@ -1,6 +1,8 @@ +from concurrent.futures import Future from functools import partialmethod -import unittest from sys import version_info +from time import sleep +import unittest from pyiron_workflow.channels import NotData from pyiron_workflow.function import SingleValue @@ -318,7 +320,7 @@ def add_two(x): msg="Replacement should be reflected in the starting nodes" ) self.assertIs( - macro.inputs.one__x, + macro.inputs.one__x.value_receiver, new_starter.inputs.x, msg="Replacement should be reflected in composite IO" ) @@ -367,6 +369,189 @@ def add_two_incompatible_io(not_x): ): macro.two = add_two_incompatible_io + def test_macro_connections_after_replace(self): + # If the macro-level IO is going to change after replacing a child, + # it had better still be able to recreate all the macro-level IO connections + # For macro IO channels that weren't connected, we don't really care + # If it fails to replace, it had better revert to its original state + + macro = Macro(add_three_macro) + downstream = SingleValue(add_one, x=macro.outputs.three__result) + macro > downstream + macro(one__x=0) + # Or once pull exists: macro.one__x = 0; downstream.pull() + self.assertEqual( + 0 + (1 + 1 + 1) + 1, + downstream.outputs.result.value, + msg="Sanity check that our test setup is what we want: macro->single" + ) + + def add_two(x): + result = x + 2 + return result + compatible_replacement = SingleValue(add_two) + + macro.replace(macro.three, compatible_replacement) + macro(one__x=0) + self.assertEqual( + len(downstream.inputs.x.connections), + 1, + msg="After replacement, the downstream node should still have exactly one " + "connection to the macro" + ) + self.assertIs( + downstream.inputs.x.connections[0], + macro.outputs.three__result, + msg="The one connection should be the living, updated macro IO channel" + ) + self.assertEqual( + 0 + (1 + 1 + 2) + 1, + downstream.outputs.result.value, + msg="The whole flow should still function after replacement, but with the " + "new behaviour (and extra 1 added)" + ) + + def different_signature(x): + # When replacing the final node of add_three_macro, the rebuilt IO will + # no longer have three__result, but rather three__changed_output_label, + # which will break existing macro-level IO if the macro output is connected + changed_output_label = x + 3 + return changed_output_label + + incompatible_replacement = SingleValue( + different_signature, + label="original_label" + ) + with self.assertRaises( + AttributeError, + msg="macro.three__result is connected output, but can't be found in the " + "rebuilt IO, so an exception is expected" + ): + macro.replace(macro.three, incompatible_replacement) + self.assertIs( + macro.three, + compatible_replacement, + msg="Failed replacements should get reverted, putting the original node " + "back" + ) + self.assertIs( + macro.three.outputs.result.value_receiver, + macro.outputs.three__result, + msg="Failed replacements should get reverted, restoring the link between " + "child IO and macro IO" + ) + self.assertIs( + downstream.inputs.x.connections[0], + macro.outputs.three__result, + msg="Failed replacements should get reverted, and macro IO should be as " + "it was before" + ) + self.assertFalse( + incompatible_replacement.connected, + msg="Failed replacements should get reverted, leaving the replacement in " + "its original state" + ) + self.assertEqual( + "original_label", + incompatible_replacement.label, + msg="Failed replacements should get reverted, leaving the replacement in " + "its original state" + ) + macro(one__x=1) # Fresh input to make sure updates are actually going through + self.assertEqual( + 1 + (1 + 1 + 2) + 1, + downstream.outputs.result.value, + msg="Final integration test that replacements get reverted, the macro " + "function and downstream results should be the same as before" + ) + + downstream.disconnect() + macro.replace(macro.three, incompatible_replacement) + self.assertIs( + macro.three, + incompatible_replacement, + msg="Since it is only incompatible with the external connections and we " + "broke those first, replacement is expected to work fine now" + ) + macro(one__x=2) + self.assertEqual( + 2 + (1 + 1 + 3), + macro.outputs.three__changed_output_label.value, + msg="For all to be working, we need the result with the new behaviour " + "at its new location" + ) + + def test_with_executor(self): + macro = Macro(add_three_macro) + downstream = SingleValue(add_one, x=macro.outputs.three__result) + macro > downstream # Later we can just pull() instead + + original_one = macro.one + macro.executor = True + + self.assertIs( + NotData, + macro.outputs.three__result.value, + msg="Sanity check that test is in right starting condition" + ) + + result = macro(one__x=0) + self.assertIsInstance( + result, + Future, + msg="Should be running as a parallel process" + ) + self.assertIs( + NotData, + downstream.outputs.result.value, + msg="Downstream events should not yet have triggered either, we should wait" + "for the callback when the result is ready" + ) + + returned_nodes = result.result() # Wait for the process to finish + self.assertIsNot( + original_one, + returned_nodes.one, + msg="Executing in a parallel process should be returning new instances" + ) + # self.assertIs( + # returned_nodes.one, + # macro.nodes.one, + # msg="Returned nodes should be taken as children" + # ) # You can't do this, result.result() is returning new instances each call + self.assertIs( + macro, + macro.nodes.one.parent, + msg="Returned nodes should get the macro as their parent" + # Once upon a time there was some evidence that this test was failing + # stochastically, but I just ran the whole test suite 6 times and this test + # 8 times and it always passed fine, so maybe the issue is resolved... + ) + self.assertIsNone( + original_one.parent, + msg="Original nodes should be orphaned" + # Note: At time of writing, this is accomplished in Node.__getstate__, + # which feels a bit dangerous... + ) + self.assertEqual( + 0 + 3, + macro.outputs.three__result.value, + msg="And of course we expect the calculation to actually run" + ) + self.assertIs( + downstream.inputs.x.connections[0], + macro.outputs.three__result, + msg="The macro should still be connected to " + ) + sleep(0.2) # Give a moment for the ran signal to emit and downstream to run + # I'm a bit surprised this sleep is necessary + self.assertEqual( + 0 + 3 + 1, + downstream.outputs.result.value, + msg="The finishing callback should also fire off the ran signal triggering" + "downstream execution" + ) + if __name__ == '__main__': unittest.main() diff --git a/tests/unit/test_workflow.py b/tests/unit/test_workflow.py index 698cf71d8..0c0e09ae8 100644 --- a/tests/unit/test_workflow.py +++ b/tests/unit/test_workflow.py @@ -1,6 +1,7 @@ -import unittest +from concurrent.futures import Future from sys import version_info from time import sleep +import unittest from bidict import ValueDuplicationError @@ -194,13 +195,50 @@ def test_no_parents(self): # Setting a non-None value to parent raises the type error from the setter wf2.parent = wf - def test_executor(self): + def test_with_executor(self): + wf = Workflow("wf") - with self.assertRaises(NotImplementedError): - # Submitting callables that use self is still raising - # TypeError: cannot pickle '_thread.lock' object - # For now we just fail cleanly - wf.executor = "literally anything other than None should raise the error" + wf.a = wf.create.SingleValue(plus_one) + wf.b = wf.create.SingleValue(plus_one, x=wf.a) + + original_a = wf.a + wf.executor = True + + self.assertIs( + NotData, + wf.outputs.b__y.value, + msg="Sanity check that test is in right starting condition" + ) + + result = wf(a__x=0) + self.assertIsInstance( + result, + Future, + msg="Should be running as a parallel process" + ) + + returned_nodes = result.result() # Wait for the process to finish + self.assertIsNot( + original_a, + returned_nodes.a, + msg="Executing in a parallel process should be returning new instances" + ) + self.assertIs( + wf, + wf.nodes.a.parent, + msg="Returned nodes should get the macro as their parent" + ) + self.assertIsNone( + original_a.parent, + msg="Original nodes should be orphaned" + # Note: At time of writing, this is accomplished in Node.__getstate__, + # which feels a bit dangerous... + ) + self.assertEqual( + 0 + 1 + 1, + wf.outputs.b__y.value, + msg="And of course we expect the calculation to actually run" + ) def test_parallel_execution(self): wf = Workflow("wf")