Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/daily.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ jobs:
uses: pyiron/actions/.github/workflows/tests-and-coverage.yml@actions-3.2.1
secrets: inherit
with:
extra-python-paths: tests tests/benchmark tests/integration tests/static tests/unit # For pympipool
extra-python-paths: tests tests/benchmark tests/integration tests/static tests/unit # For executorlib
3 changes: 1 addition & 2 deletions .github/workflows/push-pull.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ jobs:
with:
docs-env-files: .ci_support/environment.yml
notebooks-env-files: .ci_support/environment.yml
extra-python-paths: tests tests/benchmark tests/integration tests/static tests/unit # For pympipool
runner-alt2: 'macos-11' # For pympipool
extra-python-paths: tests tests/benchmark tests/integration tests/static tests/unit # For executorlib
python-version-alt3: 'exclude' # No python 3.9
alternate-tests-env-files: .ci_support/lower_bound.yml
alternate-tests-python-version: '3.10'
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test-latest-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ jobs:
local-code-directory: ''
- uses: pyiron/actions/pyiron-config@actions-3.2.1
- uses: pyiron/actions/add-to-python-path@actions-3.2.1
with: # This is specific to getting the pympipool tests to work
with: # This is specific to getting the executorlib tests to work
path-dirs: tests tests/benchmark tests/integration tests/static tests/unit
- name: Test
shell: bash -l {0}
Expand Down
2 changes: 1 addition & 1 deletion docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ By allowing (but not demanding, in the case of data DAGs) users to specify the e

By scraping type hints from decorated functions, both new data values and new graph connections are (optionally) required to conform to hints, making workflows strongly typed.

Individual node computations can be shipped off to parallel processes for scalability. (This is a beta-feature at time of writing; the `PyMPIExecutor` executor from [`pympipool`](https://github.com/pyiron/pympipool) is supported and tested; automated execution flows to not yet fully leverage the efficiency possible in parallel execution, and `pympipool`'s more powerful flux- and slurm- based executors have not been tested and may fail.)
Individual node computations can be shipped off to parallel processes for scalability. (This is a beta-feature at time of writing; the `Executor` executor from [`executorlib`](https://github.com/pyiron/exectorlib) is supported and tested; automated execution flows to not yet fully leverage the efficiency possible in parallel execution, and `executorlib`'s more powerful flux- and slurm- based executors have not been tested and may fail.)

Once you're happy with a workflow, it can be easily turned it into a macro for use in other workflows. This allows the clean construction of increasingly complex computation graphs by composing simpler graphs.

Expand Down
10 changes: 5 additions & 5 deletions notebooks/deepdive.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -3883,7 +3883,7 @@
"\n",
"You can currently run nodes in different process by setting that node's `executor` to an instance of a compliant executor object -- i.e. that takes the standard `submit` method of `concurrent.futures.Executor`, returns a `concurrent.futures.Future` (or sub-class). The built-in `Node` instances (workflows, macros, function nodes, etc.) are `pickle`-compliant, and thus will work with a standard `ProcessPoolExecutor` or `ThreadPoolExecutor` from `concurrent.futures`.\n",
"\n",
"For the case of `ProcessPoolExecutor`, the other process needs to be able to find an import the nodes, so they can't have been created in `__main__` (e.g. here in notebook) but need to be in some importable `.py` file. You might also want to have a node holding un-pickleable data. For these cases, we make a couple more flexible executors available on the creator. There is a toy `CloudpickleProcessPoolExecutor` which is a minimal example of handling dynamically defined/un-picklable data and useful for learning, but we also link to `pympipool.Executor`, which is both flexible and powerful. This is the default `Workflow.create.Executor`.\n",
"For the case of `ProcessPoolExecutor`, the other process needs to be able to find an import the nodes, so they can't have been created in `__main__` (e.g. here in notebook) but need to be in some importable `.py` file. You might also want to have a node holding un-pickleable data. For these cases, we make a couple more flexible executors available on the creator. There is a toy `CloudpickleProcessPoolExecutor` which is a minimal example of handling dynamically defined/un-picklable data and useful for learning, but we also link to `executorlib.Executor`, which is both flexible and powerful. For compatibility with GitHub CI on macos, this currently also requires an extra keyword.\n",
"\n",
"Here's a simple example of executor usage:"
]
Expand All @@ -3909,7 +3909,7 @@
"wf.a2 = wf.create.standard.Add(2, 3)\n",
"wf.b = wf.a1 + wf.a2\n",
"\n",
"wf.a2.executor = wf.create.Executor()\n",
"wf.a2.executor = wf.create.ExecutorlibExecutor(hostname_localhost=True)\n",
"wf()\n",
"\n",
"print(wf.a1.future, wf.a1.outputs.add.value)\n",
Expand Down Expand Up @@ -3953,7 +3953,7 @@
"id": "4dd4159e-6e01-492a-8986-f07e26e68954",
"metadata": {},
"source": [
"When you're all done, it's best practice to shut down your `pympipool` executor to make sure you don't leave any dangling process (similar to how opened files should be closed). This can be done with a convience method from the parent-most object, which will recursively shut down all executors:"
"When you're all done, it's best practice to shut down your `executorlib` executor to make sure you don't leave any dangling process (similar to how opened files should be closed). This can be done with a convience method from the parent-most object, which will recursively shut down all executors:"
]
},
{
Expand Down Expand Up @@ -3996,7 +3996,7 @@
}
],
"source": [
"with Workflow.create.Executor() as executor:\n",
"with Workflow.create.ExecutorlibExecutor(hostname_localhost=True) as executor:\n",
" wf = Workflow(\"with_executor\")\n",
" wf.a1 = wf.create.standard.Add(0, 1)\n",
" wf.a2 = wf.create.standard.Add(2, 3)\n",
Expand Down Expand Up @@ -4093,7 +4093,7 @@
"wf.starting_nodes = [wf.a, wf.b, wf.c]\n",
"\n",
"\n",
"with wf.create.Executor(max_workers=3) as executor:\n",
"with wf.create.ExecutorlibExecutor(hostname_localhost=True, max_workers=3) as executor:\n",
" wf.a.executor = executor\n",
" wf.b.executor = executor\n",
" wf.c.executor = executor\n",
Expand Down
2 changes: 1 addition & 1 deletion notebooks/quickstart.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -1775,7 +1775,7 @@
"To learn more, take a look at the `deepdive.ipynb` notebook, and/or start looking through the class docstrings. Here's a brief map of what you're still missing:\n",
"\n",
"- Distributing node execution onto remote processes\n",
" - Single core parallel python processes is available by setting the `.executor` attribute to a compatible executor instance, e.g. `Workflow.create.Executor()`\n",
" - Parallel computation is available by setting the `.executor` attribute to a compatible executor instance, e.g. `Workflow.create.ProcessPoolExecutor()`\n",
"- Acyclic graphs\n",
" - Execution for graphs whose data flow topology is a DAG happens automatically, but you're always free to specify this manually with `Signals`, and indeed _must_ specify the execution flow manually for cyclic graphs -- but cyclic graphs _are_ possible!\n",
"- Complex flow nodes\n",
Expand Down
2 changes: 1 addition & 1 deletion pyiron_workflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

Planned:
- Storage of executed workflows, including restarting from a partially executed workflow
- Support for more complex remote execution, especially leveraging :mod:`pympipool`
- Support for more complex remote execution, especially leveraging :mod:`executorlib`
- Infrastructure that supports and encourages of FAIR principles for node packages and
finished workflows
- Ontological hinting for data channels in order to provide guided workflow design
Expand Down
10 changes: 3 additions & 7 deletions pyiron_workflow/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,14 @@
from bidict import bidict
from pyiron_snippets.dotdict import DotDict
from pyiron_snippets.singleton import Singleton
from executorlib import Executor as PyMpiPoolExecutor
from executorlib import Executor as ExecutorlibExecutor

from pyiron_workflow.executors import CloudpickleProcessPoolExecutor
from pyiron_workflow.nodes.function import function_node, as_function_node

if TYPE_CHECKING:
from pyiron_workflow.node_package import NodePackage

# Specify the standard executor
Executor = PyMpiPoolExecutor


class Creator(metaclass=Singleton):
"""
Expand All @@ -47,14 +44,13 @@ def __init__(self):
self._package_access = DotDict()
self._package_registry = bidict()

self.Executor = Executor
# Standard lib
self.ProcessPoolExecutor = ProcessPoolExecutor
self.ThreadPoolExecutor = ThreadPoolExecutor
# Local cloudpickler
self.CloudpickleProcessPoolExecutor = CloudpickleProcessPoolExecutor
# pympipool
self.PyMpiPoolExecutor = PyMpiPoolExecutor
# executorlib
self.ExecutorlibExecutor = ExecutorlibExecutor

self.function_node = function_node

Expand Down
2 changes: 1 addition & 1 deletion pyiron_workflow/mixin/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def __getstate__(self):
state["future"] = None
# Don't pass the future -- with the future in the state things work fine for
# the simple pyiron_workflow.executors.CloudpickleProcessPoolExecutor, but for
# the more complex pympipool.Executor we're getting:
# the more complex executorlib.Executor we're getting:
# TypeError: cannot pickle '_thread.RLock' object
if isinstance(self.executor, StdLibExecutor):
state["executor"] = None
Expand Down
3 changes: 2 additions & 1 deletion pyiron_workflow/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ class Node(
`concurrent.futures.ProcessPoolExecutor`, but if you define your node
somewhere that it can't be imported (e.g. `__main__` in a jupyter
notebook), or it is otherwise not pickleable (e.g. it holds un-pickleable
io data), you will need a more powerful executor, e.g. `pympipool.Executor`.
io data), you will need a more powerful executor, e.g.
`executorlib.Executor`.
- On executing this way, a futures object will be returned instead of the usual
result, this future will also be stored as an attribute, and a callback will
be registered with the executor
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def test_executor_and_creator_interaction(self):
wf.register("static.demo_nodes", "demo")

wf.before_pickling = wf.create.demo.OptionallyAdd(1)
wf.before_pickling.executor = wf.create.Executor()
wf.before_pickling.executor = wf.create.ProcessPoolExecutor()
wf()
wf.before_pickling.future.result(timeout=120) # Wait for it to finish
wf.executor_shutdown()
Expand All @@ -152,7 +152,7 @@ def test_executors(self):
Workflow.create.ProcessPoolExecutor,
Workflow.create.ThreadPoolExecutor,
Workflow.create.CloudpickleProcessPoolExecutor,
Workflow.create.PyMpiPoolExecutor
Workflow.create.ExecutorlibExecutor
]

wf = Workflow("executed")
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/nodes/test_macro.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def test_with_executor(self):
# at the downstream output, and none of this is happening in a workflow

original_one = macro.one
macro.executor = macro.create.Executor()
macro.executor = macro.create.ProcessPoolExecutor()

self.assertIs(
NOT_DATA,
Expand Down
8 changes: 3 additions & 5 deletions tests/unit/test_node.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
from concurrent.futures import Future
from concurrent.futures import Future, ProcessPoolExecutor
import os
import sys
import unittest

from pyiron_snippets.files import DirectoryObject

from pyiron_workflow.channels import InputData, NOT_DATA

from pyiron_workflow.create import Executor
from pyiron_workflow.mixin.injection import OutputDataWithInjection, OutputsWithInjection
from pyiron_workflow.io import Inputs
from pyiron_workflow.node import Node
Expand Down Expand Up @@ -143,7 +141,7 @@ def test_check_readiness(self):
)

def test_force_local_execution(self):
self.n1.executor = Executor()
self.n1.executor = ProcessPoolExecutor()
out = self.n1.run(force_local_execution=False)
with self.subTest("Test running with an executor fulfills promises"):
self.assertIsInstance(
Expand Down Expand Up @@ -179,7 +177,7 @@ def test_force_local_execution(self):
"happens"
)

self.n2.executor = Executor()
self.n2.executor = ProcessPoolExecutor()
self.n2.inputs.x = 0
self.assertEqual(
1,
Expand Down
8 changes: 4 additions & 4 deletions tests/unit/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def test_with_executor(self):
wf.b = wf.create.function_node(plus_one, x=wf.a)

original_a = wf.a
wf.executor = wf.create.Executor()
wf.executor = wf.create.ProcessPoolExecutor()

self.assertIs(
NOT_DATA,
Expand Down Expand Up @@ -243,7 +243,7 @@ def test_parallel_execution(self):
wf.fast = five()
wf.sum = sum(a=wf.fast, b=wf.slow)

wf.slow.executor = wf.create.Executor()
wf.slow.executor = wf.create.ProcessPoolExecutor()

wf.slow.run()
wf.fast.run()
Expand Down Expand Up @@ -419,7 +419,7 @@ def add_three_macro(self, one__x):
msg="Sanity check, pulling here should work perfectly fine"
)

wf.m.one.executor = wf.create.Executor()
wf.m.one.executor = wf.create.ProcessPoolExecutor()
with self.assertRaises(
ValueError,
msg="Should not be able to pull with executor in local scope"
Expand All @@ -428,7 +428,7 @@ def add_three_macro(self, one__x):
wf.m.one.executor_shutdown() # Shouldn't get this far, but if so, shutdown
wf.m.one.executor = None

wf.n1.executor = wf.create.Executor()
wf.n1.executor = wf.create.ProcessPoolExecutor()
with self.assertRaises(
ValueError,
msg="Should not be able to pull with executor in parent scope"
Expand Down