Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
a07eb78
Introduce coupling between input channels
liamhuber Oct 18, 2023
cf81bbd
Format black
pyiron-runner Oct 18, 2023
e120a57
Improve docstring and type hinting
liamhuber Oct 18, 2023
2493495
Refactor: change signature
liamhuber Oct 18, 2023
9a59975
Refactor: rename variable
liamhuber Oct 18, 2023
20f5ce2
Break out a line of the IO creation into its own method
liamhuber Oct 18, 2023
36b70bb
:bug: apply the channel getter to _both_ mapped and unmapped cases
liamhuber Oct 18, 2023
c3ef618
:bug: actually use the key you try-excepted to get
liamhuber Oct 18, 2023
4d6ae7e
Revert change to io panel creation
liamhuber Oct 18, 2023
b1db7f7
Revert change to io panel creation
liamhuber Oct 18, 2023
2f33746
Build IO by _value_ in macro
liamhuber Oct 18, 2023
ca37c30
Update macro tests
liamhuber Oct 18, 2023
402ea7d
Move the value receiver up to DataChannel
liamhuber Oct 18, 2023
604fc56
Always keep macro output values synchronized
liamhuber Oct 18, 2023
08c74cc
Rebuild connections when you rebuild IO
liamhuber Oct 18, 2023
9146b00
Reexecute demo notebook
liamhuber Oct 18, 2023
80c5e5a
Hint how to run with executor
liamhuber Oct 18, 2023
92b00a9
Swap labels and return the replaced node for easier reversion
liamhuber Oct 19, 2023
2753081
Revert IO reconstruction if it fails
liamhuber Oct 19, 2023
3fe7da0
Revert replacement in macros if IO construction fails
liamhuber Oct 19, 2023
84496d5
Commit the tests used to actually work out the last three commits
liamhuber Oct 19, 2023
1d57fc3
Format black
pyiron-runner Oct 19, 2023
01525b8
Fix type hint typo
liamhuber Oct 19, 2023
ca97ad2
Make executor a bool
liamhuber Oct 19, 2023
d6df8b7
Refactor composite's run processing
liamhuber Oct 19, 2023
31deb97
Allow composite to update its executor value
liamhuber Oct 19, 2023
8a83feb
Write macro tests to define expected behaviour
liamhuber Oct 19, 2023
5796227
Mess with __getstate__ and __setstate__ until they work
liamhuber Oct 19, 2023
f119489
:bug: remove debug print
liamhuber Oct 19, 2023
b1a5551
:bug: switch IsNot to Is then comment out the test
liamhuber Oct 19, 2023
7f2d74b
Test that the workflow will work with an executor as well
liamhuber Oct 19, 2023
ba4030a
Tidy imports
liamhuber Oct 19, 2023
5debae1
Update node docstring
liamhuber Oct 19, 2023
99806cc
Fail hard if a function node tries to use self with an executor
liamhuber Oct 19, 2023
bbab1cf
Add back the warning about executors and self
liamhuber Oct 19, 2023
c6ee4ac
Format black
pyiron-runner Oct 19, 2023
7cc3e19
Implement __getstate__ for compatibility with python <3.11
liamhuber Oct 19, 2023
21459a5
Merge remote-tracking branch 'origin/executors_for_composite' into ex…
liamhuber Oct 19, 2023
ba9a9a2
Disable node package registration and simplify atomistics and standard
liamhuber Oct 20, 2023
13f3d29
Things seem fine but add a note
liamhuber Oct 20, 2023
b9a9e7e
Format black
pyiron-runner Oct 20, 2023
8f84103
Merge pull request #39 from pyiron/executors_for_composite
liamhuber Oct 20, 2023
857eb94
Manually set __get/setstate__ in DotDict for python <3.11
liamhuber Oct 22, 2023
69d15c3
:bug: fig typo in method name
liamhuber Oct 22, 2023
a64dc95
Exhaustively set __get/setstate__ everywhere we override __getattr__
liamhuber Oct 22, 2023
5d0f39b
:bug: have __setstate__ play nicely with __setattr__
liamhuber Oct 22, 2023
4809a6e
Merge pull request #37 from pyiron/no_io_by_reference_for_macros
liamhuber Oct 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 62 additions & 65 deletions notebooks/workflow_example.ipynb

Large diffs are not rendered by default.

45 changes: 45 additions & 0 deletions pyiron_workflow/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,11 +279,54 @@ def __init__(
node: Node,
default: typing.Optional[typing.Any] = NotData,
type_hint: typing.Optional[typing.Any] = None,
value_receiver: typing.Optional[InputData] = None,
):
super().__init__(label=label, node=node)
self._value = NotData
self._value_receiver = None
self.default = default
self.value = default
self.type_hint = type_hint
self.value_receiver = value_receiver

@property
def value(self):
return self._value

@value.setter
def value(self, new_value):
if self.value_receiver is not None:
self.value_receiver.value = new_value
self._value = new_value

@property
def value_receiver(self) -> InputData | OutputData | None:
"""
Another data channel of the same type to whom new values are always pushed
(without type checking of any sort, not even when forming the couple!)

Useful for macros, so that the IO of owned nodes and IO at the macro level can
be kept synchronized.
"""
return self._value_receiver

@value_receiver.setter
def value_receiver(self, new_partner: InputData | OutputData | None):
if new_partner is not None:
if not isinstance(new_partner, self.__class__):
raise TypeError(
f"The {self.__class__.__name__} {self.label} got a coupling "
f"partner {new_partner} but requires something of the same type"
)

if new_partner is self:
raise ValueError(
f"{self.__class__.__name__} {self.label} cannot couple to itself"
)

new_partner.value = self.value

self._value_receiver = new_partner

@property
def generic_type(self) -> type[Channel]:
Expand Down Expand Up @@ -375,13 +418,15 @@ def __init__(
node: Node,
default: typing.Optional[typing.Any] = NotData,
type_hint: typing.Optional[typing.Any] = None,
value_receiver: typing.Optional[InputData] = None,
strict_connections: bool = True,
):
super().__init__(
label=label,
node=node,
default=default,
type_hint=type_hint,
value_receiver=value_receiver,
)
self.strict_connections = strict_connections

Expand Down
129 changes: 101 additions & 28 deletions pyiron_workflow/composite.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from __future__ import annotations

from abc import ABC
from abc import ABC, abstractmethod
from functools import partial
from typing import Literal, Optional, TYPE_CHECKING

Expand All @@ -19,7 +19,7 @@
from pyiron_workflow.util import logger, DotDict, SeabornColors

if TYPE_CHECKING:
from pyiron_workflow.channels import Channel
from pyiron_workflow.channels import Channel, InputData, OutputData


class Composite(Node, ABC):
Expand Down Expand Up @@ -106,7 +106,7 @@ def __init__(
self._outputs_map = None
self.inputs_map = inputs_map
self.outputs_map = outputs_map
self.nodes: DotDict[str:Node] = DotDict()
self.nodes: DotDict[str, Node] = DotDict()
self.starting_nodes: list[Node] = []
self._creator = self.create
self.create = self._owned_creator # Override the create method from the class
Expand Down Expand Up @@ -138,17 +138,6 @@ def _owned_creator(self):
"""
return OwnedCreator(self, self._creator)

@property
def executor(self) -> None:
return None

@executor.setter
def executor(self, new_executor):
if new_executor is not None:
raise NotImplementedError(
"Running composite nodes with an executor is not yet supported"
)

def to_dict(self):
return {
"label": self.label,
Expand All @@ -170,10 +159,21 @@ def run_args(self) -> dict:
return {"_nodes": self.nodes, "_starting_nodes": self.starting_nodes}

def process_run_result(self, run_output):
# self.nodes = run_output
# Running on an executor will require a more sophisticated idea than above
if run_output is not self.nodes:
# Then we probably ran on a parallel process and have an unpacked future
self._update_children(run_output)
return DotDict(self.outputs.to_value_dict())

def _update_children(self, children_from_another_process: DotDict[str, Node]):
"""
If you receive a new dictionary of children, e.g. from unpacking a futures
object of your own children you sent off to another process for computation,
replace your own nodes with them, and set yourself as their parent.
"""
for child in children_from_another_process.values():
child.parent = self
self.nodes = children_from_another_process

def disconnect_run(self) -> list[tuple[Channel, Channel]]:
"""
Disconnect all `signals.input.run` connections on all child nodes.
Expand Down Expand Up @@ -260,29 +260,76 @@ def get_data_digraph(self) -> dict[str, set[str]]:

def _build_io(
self,
io: Inputs | Outputs,
target: Literal["inputs", "outputs"],
key_map: dict[str, str] | None,
i_or_o: Literal["inputs", "outputs"],
key_map: dict[str, str | None] | None,
) -> Inputs | Outputs:
"""
Build an IO panel for exposing child node IO to the outside world at the level
of the composite node's IO.

Args:
target [Literal["inputs", "outputs"]]: Whether this is I or O.
key_map [dict[str, str]|None]: A map between the default convention for
mapping child IO to composite IO (`"{node.label}__{channel.label}"`) and
whatever label you actually want to expose to the composite user. Also
allows non-standards channel exposure, i.e. exposing
internally-connected channels (which would not normally be exposed) by
providing a string-to-string map, or suppressing unconnected channels
(which normally would be exposed) by providing a string-None map.

Returns:
(Inputs|Outputs): The populated panel.
"""
key_map = {} if key_map is None else key_map
io = Inputs() if i_or_o == "inputs" else Outputs()
for node in self.nodes.values():
panel = getattr(node, target)
panel = getattr(node, i_or_o)
for channel_label in panel.labels:
channel = panel[channel_label]
default_key = f"{node.label}__{channel_label}"
try:
if key_map[default_key] is not None:
io[key_map[default_key]] = channel
io_panel_key = key_map[default_key]
if io_panel_key is not None:
io[io_panel_key] = self._get_linking_channel(
channel, io_panel_key
)
except KeyError:
if not channel.connected:
io[default_key] = channel
io[default_key] = self._get_linking_channel(
channel, default_key
)
return io

@abstractmethod
def _get_linking_channel(
self,
child_reference_channel: InputData | OutputData,
composite_io_key: str,
) -> InputData | OutputData:
"""
Returns the channel that will be the link between the provided child channel,
and the composite's IO at the given key.

The returned channel should be fully compatible with the provided child channel,
i.e. same type, same type hint... (For instance, the child channel itself is a
valid return, which would create a composite IO panel that works by reference.)

Args:
child_reference_channel (InputData | OutputData): The child channel
composite_io_key (str): The key under which this channel will be stored on
the composite's IO.

Returns:
(Channel): A channel with the same type, type hint, etc. as the reference
channel passed in.
"""
pass

def _build_inputs(self) -> Inputs:
return self._build_io(Inputs(), "inputs", self.inputs_map)
return self._build_io("inputs", self.inputs_map)

def _build_outputs(self) -> Outputs:
return self._build_io(Outputs(), "outputs", self.outputs_map)
return self._build_io("outputs", self.outputs_map)

def add(self, node: Node, label: Optional[str] = None) -> None:
"""
Expand Down Expand Up @@ -377,14 +424,20 @@ def remove(self, node: Node | str) -> list[tuple[Channel, Channel]]:
del self.nodes[node.label]
return disconnected

def replace(self, owned_node: Node | str, replacement: Node | type[Node]):
def replace(self, owned_node: Node | str, replacement: Node | type[Node]) -> Node:
"""
Replaces a node currently owned with a new node instance.
The replacement must not belong to any other parent or have any connections.
The IO of the new node must be a perfect superset of the replaced node, i.e.
channel labels need to match precisely, but additional channels may be present.
After replacement, the new node will have the old node's connections, label,
and belong to this composite.
The labels are swapped, such that the replaced node gets the name of its
replacement (which might be silly, but is useful in case you want to revert the
change and swap the replaced node back in!)

If replacement fails for some reason, the replacement and replacing node are
both returned to their original state, and the composite is left unchanged.

Args:
owned_node (Node|str): The node to replace or its label.
Expand Down Expand Up @@ -420,13 +473,17 @@ def replace(self, owned_node: Node | str, replacement: Node | type[Node]):
f"got {replacement}"
)

replacement.copy_io(owned_node)
replacement.label = owned_node.label
replacement.copy_io(owned_node) # If the replacement is incompatible, we'll
# fail here before we've changed the parent at all. Since the replacement was
# first guaranteed to be an unconnected orphan, there is not yet any permanent
# damage
is_starting_node = owned_node in self.starting_nodes
self.remove(owned_node)
replacement.label, owned_node.label = owned_node.label, replacement.label
self.add(replacement)
if is_starting_node:
self.starting_nodes.append(replacement)
return owned_node

def __setattr__(self, key: str, node: Node):
if isinstance(node, Node) and key != "parent":
Expand Down Expand Up @@ -501,6 +558,16 @@ def __getattr__(self, item):

return value

def __getstate__(self):
# Compatibility with python <3.11
return self.__dict__

def __setstate__(self, state):
# Because we override getattr, we need to use __dict__ assignment directly in
# __setstate__
self.__dict__["_parent"] = state["_parent"]
self.__dict__["_creator"] = state["_creator"]


class OwnedNodePackage:
"""
Expand All @@ -517,3 +584,9 @@ def __getattr__(self, item):
if issubclass(value, Node):
value = partial(value, parent=self._parent)
return value

def __getstate__(self):
return self.__dict__

def __setstate__(self, state):
self.__dict__ = state
9 changes: 4 additions & 5 deletions pyiron_workflow/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,11 +474,10 @@ def on_run(self):
def run_args(self) -> dict:
kwargs = self.inputs.to_value_dict()
if "self" in self._input_args:
if self.executor is not None:
raise NotImplementedError(
f"The node {self.label} cannot be run on an executor because it "
f"uses the `self` argument and this functionality is not yet "
f"implemented"
if self.executor:
raise ValueError(
f"Function node {self.label} uses the `self` argument, but this "
f"can't yet be run with executors"
)
kwargs["self"] = self
return kwargs
Expand Down
34 changes: 15 additions & 19 deletions pyiron_workflow/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@

from pyiron_base.interfaces.singleton import Singleton

# from pyiron_contrib.executors import CloudpickleProcessPoolExecutor as Executor
# from pympipool.mpi.executor import PyMPISingleTaskExecutor as Executor

from pyiron_workflow.executors import CloudpickleProcessPoolExecutor as Executor

from pyiron_workflow.function import (
Expand Down Expand Up @@ -60,23 +58,17 @@ def Workflow(self):

@property
def standard(self):
try:
return self._standard
except AttributeError:
from pyiron_workflow.node_library.standard import nodes
from pyiron_workflow.node_package import NodePackage
from pyiron_workflow.node_library.standard import nodes

self.register("_standard", *nodes)
return self._standard
return NodePackage(*nodes)

@property
def atomistics(self):
try:
return self._atomistics
except AttributeError:
from pyiron_workflow.node_library.atomistics import nodes
from pyiron_workflow.node_package import NodePackage
from pyiron_workflow.node_library.atomistics import nodes

self.register("_atomistics", *nodes)
return self._atomistics
return NodePackage(*nodes)

@property
def meta(self):
Expand All @@ -87,11 +79,15 @@ def meta(self):
return self._meta

def register(self, domain: str, *nodes: list[type[Node]]):
if domain in self.__dir__():
raise AttributeError(f"{domain} is already an attribute of {self}")
from pyiron_workflow.node_package import NodePackage

setattr(self, domain, NodePackage(*nodes))
raise NotImplementedError(
"Registering new node packages is currently not playing well with "
"executors. We hope to return this feature soon."
)
# if domain in self.__dir__():
# raise AttributeError(f"{domain} is already an attribute of {self}")
# from pyiron_workflow.node_package import NodePackage
#
# setattr(self, domain, NodePackage(*nodes))


class Wrappers(metaclass=Singleton):
Expand Down
9 changes: 9 additions & 0 deletions pyiron_workflow/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,15 @@ def to_dict(self):
"channels": {l: c.to_dict() for l, c in self.channel_dict.items()},
}

def __getstate__(self):
# Compatibility with python <3.11
return self.__dict__

def __setstate__(self, state):
# Because we override getattr, we need to use __dict__ assignment directly in
# __setstate__ the same way we need it in __init__
self.__dict__["channel_dict"] = state["channel_dict"]


class DataIO(IO, ABC):
"""
Expand Down
Loading