diff --git a/.github/workflows/daily.yml b/.github/workflows/daily.yml index 84f05cbab..e4e5b2787 100644 --- a/.github/workflows/daily.yml +++ b/.github/workflows/daily.yml @@ -12,4 +12,4 @@ jobs: uses: pyiron/actions/.github/workflows/tests-and-coverage.yml@actions-3.2.1 secrets: inherit with: - extra-python-paths: tests tests/benchmark tests/integration tests/static tests/unit # For pympipool \ No newline at end of file + extra-python-paths: tests tests/benchmark tests/integration tests/static tests/unit # For executorlib \ No newline at end of file diff --git a/.github/workflows/push-pull.yml b/.github/workflows/push-pull.yml index 3bf451176..eb3aae0f7 100644 --- a/.github/workflows/push-pull.yml +++ b/.github/workflows/push-pull.yml @@ -14,8 +14,7 @@ jobs: with: docs-env-files: .ci_support/environment.yml notebooks-env-files: .ci_support/environment.yml - extra-python-paths: tests tests/benchmark tests/integration tests/static tests/unit # For pympipool - runner-alt2: 'macos-11' # For pympipool + extra-python-paths: tests tests/benchmark tests/integration tests/static tests/unit # For executorlib python-version-alt3: 'exclude' # No python 3.9 alternate-tests-env-files: .ci_support/lower_bound.yml alternate-tests-python-version: '3.10' diff --git a/.github/workflows/test-latest-release.yml b/.github/workflows/test-latest-release.yml index 4b46792c8..992a565c6 100644 --- a/.github/workflows/test-latest-release.yml +++ b/.github/workflows/test-latest-release.yml @@ -42,7 +42,7 @@ jobs: local-code-directory: '' - uses: pyiron/actions/pyiron-config@actions-3.2.1 - uses: pyiron/actions/add-to-python-path@actions-3.2.1 - with: # This is specific to getting the pympipool tests to work + with: # This is specific to getting the executorlib tests to work path-dirs: tests tests/benchmark tests/integration tests/static tests/unit - name: Test shell: bash -l {0} diff --git a/docs/README.md b/docs/README.md index 07e1abc16..e220327fc 100644 --- a/docs/README.md +++ b/docs/README.md @@ -22,7 +22,7 @@ By allowing (but not demanding, in the case of data DAGs) users to specify the e By scraping type hints from decorated functions, both new data values and new graph connections are (optionally) required to conform to hints, making workflows strongly typed. -Individual node computations can be shipped off to parallel processes for scalability. (This is a beta-feature at time of writing; the `PyMPIExecutor` executor from [`pympipool`](https://github.com/pyiron/pympipool) is supported and tested; automated execution flows to not yet fully leverage the efficiency possible in parallel execution, and `pympipool`'s more powerful flux- and slurm- based executors have not been tested and may fail.) +Individual node computations can be shipped off to parallel processes for scalability. (This is a beta-feature at time of writing; the `Executor` executor from [`executorlib`](https://github.com/pyiron/exectorlib) is supported and tested; automated execution flows to not yet fully leverage the efficiency possible in parallel execution, and `executorlib`'s more powerful flux- and slurm- based executors have not been tested and may fail.) Once you're happy with a workflow, it can be easily turned it into a macro for use in other workflows. This allows the clean construction of increasingly complex computation graphs by composing simpler graphs. diff --git a/notebooks/deepdive.ipynb b/notebooks/deepdive.ipynb index 3ee36886f..bed19c32a 100644 --- a/notebooks/deepdive.ipynb +++ b/notebooks/deepdive.ipynb @@ -3883,7 +3883,7 @@ "\n", "You can currently run nodes in different process by setting that node's `executor` to an instance of a compliant executor object -- i.e. that takes the standard `submit` method of `concurrent.futures.Executor`, returns a `concurrent.futures.Future` (or sub-class). The built-in `Node` instances (workflows, macros, function nodes, etc.) are `pickle`-compliant, and thus will work with a standard `ProcessPoolExecutor` or `ThreadPoolExecutor` from `concurrent.futures`.\n", "\n", - "For the case of `ProcessPoolExecutor`, the other process needs to be able to find an import the nodes, so they can't have been created in `__main__` (e.g. here in notebook) but need to be in some importable `.py` file. You might also want to have a node holding un-pickleable data. For these cases, we make a couple more flexible executors available on the creator. There is a toy `CloudpickleProcessPoolExecutor` which is a minimal example of handling dynamically defined/un-picklable data and useful for learning, but we also link to `pympipool.Executor`, which is both flexible and powerful. This is the default `Workflow.create.Executor`.\n", + "For the case of `ProcessPoolExecutor`, the other process needs to be able to find an import the nodes, so they can't have been created in `__main__` (e.g. here in notebook) but need to be in some importable `.py` file. You might also want to have a node holding un-pickleable data. For these cases, we make a couple more flexible executors available on the creator. There is a toy `CloudpickleProcessPoolExecutor` which is a minimal example of handling dynamically defined/un-picklable data and useful for learning, but we also link to `executorlib.Executor`, which is both flexible and powerful. For compatibility with GitHub CI on macos, this currently also requires an extra keyword.\n", "\n", "Here's a simple example of executor usage:" ] @@ -3909,7 +3909,7 @@ "wf.a2 = wf.create.standard.Add(2, 3)\n", "wf.b = wf.a1 + wf.a2\n", "\n", - "wf.a2.executor = wf.create.Executor()\n", + "wf.a2.executor = wf.create.ExecutorlibExecutor(hostname_localhost=True)\n", "wf()\n", "\n", "print(wf.a1.future, wf.a1.outputs.add.value)\n", @@ -3953,7 +3953,7 @@ "id": "4dd4159e-6e01-492a-8986-f07e26e68954", "metadata": {}, "source": [ - "When you're all done, it's best practice to shut down your `pympipool` executor to make sure you don't leave any dangling process (similar to how opened files should be closed). This can be done with a convience method from the parent-most object, which will recursively shut down all executors:" + "When you're all done, it's best practice to shut down your `executorlib` executor to make sure you don't leave any dangling process (similar to how opened files should be closed). This can be done with a convience method from the parent-most object, which will recursively shut down all executors:" ] }, { @@ -3996,7 +3996,7 @@ } ], "source": [ - "with Workflow.create.Executor() as executor:\n", + "with Workflow.create.ExecutorlibExecutor(hostname_localhost=True) as executor:\n", " wf = Workflow(\"with_executor\")\n", " wf.a1 = wf.create.standard.Add(0, 1)\n", " wf.a2 = wf.create.standard.Add(2, 3)\n", @@ -4093,7 +4093,7 @@ "wf.starting_nodes = [wf.a, wf.b, wf.c]\n", "\n", "\n", - "with wf.create.Executor(max_workers=3) as executor:\n", + "with wf.create.ExecutorlibExecutor(hostname_localhost=True, max_workers=3) as executor:\n", " wf.a.executor = executor\n", " wf.b.executor = executor\n", " wf.c.executor = executor\n", diff --git a/notebooks/quickstart.ipynb b/notebooks/quickstart.ipynb index 141eade54..5e404655a 100644 --- a/notebooks/quickstart.ipynb +++ b/notebooks/quickstart.ipynb @@ -1775,7 +1775,7 @@ "To learn more, take a look at the `deepdive.ipynb` notebook, and/or start looking through the class docstrings. Here's a brief map of what you're still missing:\n", "\n", "- Distributing node execution onto remote processes\n", - " - Single core parallel python processes is available by setting the `.executor` attribute to a compatible executor instance, e.g. `Workflow.create.Executor()`\n", + " - Parallel computation is available by setting the `.executor` attribute to a compatible executor instance, e.g. `Workflow.create.ProcessPoolExecutor()`\n", "- Acyclic graphs\n", " - Execution for graphs whose data flow topology is a DAG happens automatically, but you're always free to specify this manually with `Signals`, and indeed _must_ specify the execution flow manually for cyclic graphs -- but cyclic graphs _are_ possible!\n", "- Complex flow nodes\n", diff --git a/pyiron_workflow/__init__.py b/pyiron_workflow/__init__.py index cd856745b..43b6030fa 100644 --- a/pyiron_workflow/__init__.py +++ b/pyiron_workflow/__init__.py @@ -22,7 +22,7 @@ Planned: - Storage of executed workflows, including restarting from a partially executed workflow -- Support for more complex remote execution, especially leveraging :mod:`pympipool` +- Support for more complex remote execution, especially leveraging :mod:`executorlib` - Infrastructure that supports and encourages of FAIR principles for node packages and finished workflows - Ontological hinting for data channels in order to provide guided workflow design diff --git a/pyiron_workflow/create.py b/pyiron_workflow/create.py index 4cd6aedb3..d5409848e 100644 --- a/pyiron_workflow/create.py +++ b/pyiron_workflow/create.py @@ -16,7 +16,7 @@ from bidict import bidict from pyiron_snippets.dotdict import DotDict from pyiron_snippets.singleton import Singleton -from executorlib import Executor as PyMpiPoolExecutor +from executorlib import Executor as ExecutorlibExecutor from pyiron_workflow.executors import CloudpickleProcessPoolExecutor from pyiron_workflow.nodes.function import function_node, as_function_node @@ -24,9 +24,6 @@ if TYPE_CHECKING: from pyiron_workflow.node_package import NodePackage -# Specify the standard executor -Executor = PyMpiPoolExecutor - class Creator(metaclass=Singleton): """ @@ -47,14 +44,13 @@ def __init__(self): self._package_access = DotDict() self._package_registry = bidict() - self.Executor = Executor # Standard lib self.ProcessPoolExecutor = ProcessPoolExecutor self.ThreadPoolExecutor = ThreadPoolExecutor # Local cloudpickler self.CloudpickleProcessPoolExecutor = CloudpickleProcessPoolExecutor - # pympipool - self.PyMpiPoolExecutor = PyMpiPoolExecutor + # executorlib + self.ExecutorlibExecutor = ExecutorlibExecutor self.function_node = function_node diff --git a/pyiron_workflow/mixin/run.py b/pyiron_workflow/mixin/run.py index f644dbd64..69fff22d4 100644 --- a/pyiron_workflow/mixin/run.py +++ b/pyiron_workflow/mixin/run.py @@ -242,7 +242,7 @@ def __getstate__(self): state["future"] = None # Don't pass the future -- with the future in the state things work fine for # the simple pyiron_workflow.executors.CloudpickleProcessPoolExecutor, but for - # the more complex pympipool.Executor we're getting: + # the more complex executorlib.Executor we're getting: # TypeError: cannot pickle '_thread.RLock' object if isinstance(self.executor, StdLibExecutor): state["executor"] = None diff --git a/pyiron_workflow/node.py b/pyiron_workflow/node.py index f3d0fc4cd..68a4d7b43 100644 --- a/pyiron_workflow/node.py +++ b/pyiron_workflow/node.py @@ -115,7 +115,8 @@ class Node( `concurrent.futures.ProcessPoolExecutor`, but if you define your node somewhere that it can't be imported (e.g. `__main__` in a jupyter notebook), or it is otherwise not pickleable (e.g. it holds un-pickleable - io data), you will need a more powerful executor, e.g. `pympipool.Executor`. + io data), you will need a more powerful executor, e.g. + `executorlib.Executor`. - On executing this way, a futures object will be returned instead of the usual result, this future will also be stored as an attribute, and a callback will be registered with the executor diff --git a/tests/integration/test_workflow.py b/tests/integration/test_workflow.py index 12886ed38..a87ab335a 100644 --- a/tests/integration/test_workflow.py +++ b/tests/integration/test_workflow.py @@ -138,7 +138,7 @@ def test_executor_and_creator_interaction(self): wf.register("static.demo_nodes", "demo") wf.before_pickling = wf.create.demo.OptionallyAdd(1) - wf.before_pickling.executor = wf.create.Executor() + wf.before_pickling.executor = wf.create.ProcessPoolExecutor() wf() wf.before_pickling.future.result(timeout=120) # Wait for it to finish wf.executor_shutdown() @@ -152,7 +152,7 @@ def test_executors(self): Workflow.create.ProcessPoolExecutor, Workflow.create.ThreadPoolExecutor, Workflow.create.CloudpickleProcessPoolExecutor, - Workflow.create.PyMpiPoolExecutor + Workflow.create.ExecutorlibExecutor ] wf = Workflow("executed") diff --git a/tests/unit/nodes/test_macro.py b/tests/unit/nodes/test_macro.py index ae9dbba06..21433a9ec 100644 --- a/tests/unit/nodes/test_macro.py +++ b/tests/unit/nodes/test_macro.py @@ -212,7 +212,7 @@ def test_with_executor(self): # at the downstream output, and none of this is happening in a workflow original_one = macro.one - macro.executor = macro.create.Executor() + macro.executor = macro.create.ProcessPoolExecutor() self.assertIs( NOT_DATA, diff --git a/tests/unit/test_node.py b/tests/unit/test_node.py index 1bed1209c..bb7123b84 100644 --- a/tests/unit/test_node.py +++ b/tests/unit/test_node.py @@ -1,4 +1,4 @@ -from concurrent.futures import Future +from concurrent.futures import Future, ProcessPoolExecutor import os import sys import unittest @@ -6,8 +6,6 @@ from pyiron_snippets.files import DirectoryObject from pyiron_workflow.channels import InputData, NOT_DATA - -from pyiron_workflow.create import Executor from pyiron_workflow.mixin.injection import OutputDataWithInjection, OutputsWithInjection from pyiron_workflow.io import Inputs from pyiron_workflow.node import Node @@ -143,7 +141,7 @@ def test_check_readiness(self): ) def test_force_local_execution(self): - self.n1.executor = Executor() + self.n1.executor = ProcessPoolExecutor() out = self.n1.run(force_local_execution=False) with self.subTest("Test running with an executor fulfills promises"): self.assertIsInstance( @@ -179,7 +177,7 @@ def test_force_local_execution(self): "happens" ) - self.n2.executor = Executor() + self.n2.executor = ProcessPoolExecutor() self.n2.inputs.x = 0 self.assertEqual( 1, diff --git a/tests/unit/test_workflow.py b/tests/unit/test_workflow.py index a12b1b1fd..053e0b9a8 100644 --- a/tests/unit/test_workflow.py +++ b/tests/unit/test_workflow.py @@ -197,7 +197,7 @@ def test_with_executor(self): wf.b = wf.create.function_node(plus_one, x=wf.a) original_a = wf.a - wf.executor = wf.create.Executor() + wf.executor = wf.create.ProcessPoolExecutor() self.assertIs( NOT_DATA, @@ -243,7 +243,7 @@ def test_parallel_execution(self): wf.fast = five() wf.sum = sum(a=wf.fast, b=wf.slow) - wf.slow.executor = wf.create.Executor() + wf.slow.executor = wf.create.ProcessPoolExecutor() wf.slow.run() wf.fast.run() @@ -419,7 +419,7 @@ def add_three_macro(self, one__x): msg="Sanity check, pulling here should work perfectly fine" ) - wf.m.one.executor = wf.create.Executor() + wf.m.one.executor = wf.create.ProcessPoolExecutor() with self.assertRaises( ValueError, msg="Should not be able to pull with executor in local scope" @@ -428,7 +428,7 @@ def add_three_macro(self, one__x): wf.m.one.executor_shutdown() # Shouldn't get this far, but if so, shutdown wf.m.one.executor = None - wf.n1.executor = wf.create.Executor() + wf.n1.executor = wf.create.ProcessPoolExecutor() with self.assertRaises( ValueError, msg="Should not be able to pull with executor in parent scope"