From 8833f5b085c5c49485d20e89514031f4b3d1a513 Mon Sep 17 00:00:00 2001 From: Liam Huber Date: Mon, 6 May 2024 16:08:06 -0700 Subject: [PATCH] [minor] Introduce for-loop (#309) * Change paradigm to whether or not the node uses __reduced__ and a constructor Instead of "Meta" nodes * Allow direct use of Constructed children * Move and update constructed stuff * Add new singleton behaviour so factory-produced classes can pass is-tests * PEP8 newline * Remove unnecessary __getstate__ The object isn't holding instance level state and older versions of python bork here. * Add constructed __*state__ compatibility for older versions * :bug: add missing `return` * Format black * Revert singleton * Remove constructed It's superceded by the snippets.factory stuff * Format black * Let the factory clear method take specific names * Don't override __module__ to the factory function If it was explicitly set downstream, leave that. But if the user left it empty, still default it back to the factory function's module * Clean up storage if job tests fail * Make tinybase the default storage backend * Switch Function and Macro over to using classfactory With this, everything is pickleable (unless you slap something unpickleable on top, or define it in a place that can't be reached by pickle like inside a local function scope). The big downside is that `h5io` storage is now basically useless, since all our nodes come from custom reconstructors. Similarly, for the node job `DataContainer` can no longer store the input node. The `tinybase` backend is still working ok, so I made it the default, and I got the node job working again by forcing it to cloudpickle the input node on saving. These are some ugly hacks, but since storage is an alpha feature right now anyhow, I'd prefer to push ahead with pickleability. * Remove unused decorator And reformat tests in the vein of usage in Function and Macro * Format black * Expose concurrent.futures executors on the creator * Only expose the base Executor from pympipool Doesn't hurt us now and prepares for the version bump * Extend `Runnable` to use a non-static method This is significant. `on_run` is no longer a property returning a staticmethod that will be shipped off, but we directly ship off `self.on_run` so `self` goes with it to remote processes. Similarly, `run_args` gets extended to be `tuple[tuple, dict]` so positional arguments can be sent too. Stacked on top of pickleability, this means we can now use standard `concurrent.futures.ProcessPoolExecutor` -- as long as the nodes are all defined somewhere importable, i.e. not in `__main__`. Since working in notebooks is pretty common, the more flexible `pympipool.Executor` is left as the default `Workflow.create.Executor`. This simplifies some stuff under the hood too, e.g. `Function` and `Composite` now just directly do their thing in `on_run` instead of needing the misdirection of returning their own static methods. * Format black * Expose concurrent.futures executors on the creator * Only expose the base Executor from pympipool Doesn't hurt us now and prepares for the version bump * Extend `Runnable` to use a non-static method This is significant. `on_run` is no longer a property returning a staticmethod that will be shipped off, but we directly ship off `self.on_run` so `self` goes with it to remote processes. Similarly, `run_args` gets extended to be `tuple[tuple, dict]` so positional arguments can be sent too. Stacked on top of pickleability, this means we can now use standard `concurrent.futures.ProcessPoolExecutor` -- as long as the nodes are all defined somewhere importable, i.e. not in `__main__`. Since working in notebooks is pretty common, the more flexible `pympipool.Executor` is left as the default `Workflow.create.Executor`. This simplifies some stuff under the hood too, e.g. `Function` and `Composite` now just directly do their thing in `on_run` instead of needing the misdirection of returning their own static methods. * Format black * Compute qualname if not provided * Fail early if there is a function in the factory made hierarchy * Skip the factory fanciness if you see This enables _FactoryMade objects to be cloudpickled, even when they can't be pickled, while still not letting the mere fact that they are dynamic classes stand in the way of pickling. Nicely lifts our constraint on the node job interaction with pyiron base, which was leveraging cloudpickle * Format black * Test ClassFactory this way too * Test existing list nodes * Rename length base class * Refactor transformers to use on_run and run_args more directly * Introduce an inputs-to-dict transformer * Preview IO as a separate step To guarantee IO construction happens as early as possible in case it fails * Add dataframe transformer * Remove prints :facepalm: * Add dataframe transformer tests * Add transformers to the create menu * Format black * :broom: be more consistent in caching/shortcuts Instead of always defining private holders by hand * Introduce a dataclass node * Give the dataclass node a simpler name Since we can inject attribute access, I don't anticipate ever needing the reverse dataclass-to-outputs node, so let's simplify the naming here. * Remove unused import * Set the output type hint automatically * Add docs * Add tests * Format black * PEP8 newline * Introduce for-loop * Refactor: break _build_body into smaller functions * Resolve dataframe column name conflicts When a body node has the same labels for looped input as for output * Update docstrings * Refactor: rename file * Don't break when one of iter or zipped is empty * :bug: pass body hint, not hint and default, to row collector input hint * Silence disconnection warning Since(?) disconnection is reciprocal it was firing left right and centre * Update deepdive * Remove old for loop * Remove unused import * Add tests * Remove unused attributes * Add a shortcut for assigning an executor to all body nodes * Format black * Format black --------- Co-authored-by: pyiron-runner --- notebooks/deepdive.ipynb | 760 +++++++++++++++++++++-------- pyiron_workflow/channels.py | 6 - pyiron_workflow/create.py | 10 +- pyiron_workflow/for_loop.py | 479 ++++++++++++++++++ pyiron_workflow/loops.py | 113 ----- tests/integration/test_workflow.py | 25 +- tests/unit/test_for_loop.py | 320 ++++++++++++ 7 files changed, 1364 insertions(+), 349 deletions(-) create mode 100644 pyiron_workflow/for_loop.py create mode 100644 tests/unit/test_for_loop.py diff --git a/notebooks/deepdive.ipynb b/notebooks/deepdive.ipynb index f1190821c..6484ad357 100644 --- a/notebooks/deepdive.ipynb +++ b/notebooks/deepdive.ipynb @@ -702,16 +702,6 @@ "tags": [] }, "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:176: UserWarning: The channel ran was not connected to run, andthus could not disconnect from it.\n", - " warn(\n", - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:176: UserWarning: The channel run was not connected to ran, andthus could not disconnect from it.\n", - " warn(\n" - ] - }, { "data": { "text/plain": [ @@ -1034,17 +1024,9 @@ "id": "6569014a-815b-46dd-8b47-4e1cd4584b3b", "metadata": {}, "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:176: UserWarning: The channel run was not connected to ran, andthus could not disconnect from it.\n", - " warn(\n" - ] - }, { "data": { - "image/png": "", + "image/png": "", "text/plain": [ "
" ] @@ -1719,7 +1701,7 @@ "\n" ], "text/plain": [ - "" + "" ] }, "execution_count": 41, @@ -1756,7 +1738,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "d43198bb314e4436a40aa9add084fed6", + "model_id": "a15ca91c407b4d2fadbae54e51621a30", "version_major": 2, "version_minor": 0 }, @@ -1775,7 +1757,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "009ad311f3c548f8ae2786dfd6a935e6", + "model_id": "bfc8738882d54fc791ee117bb509481f", "version_major": 2, "version_minor": 0 }, @@ -1789,7 +1771,7 @@ { "data": { "text/plain": [ - "" + "" ] }, "execution_count": 42, @@ -2049,7 +2031,7 @@ "\n" ], "text/plain": [ - "" + "" ] }, "execution_count": 43, @@ -2103,16 +2085,6 @@ "id": "2b9bb21a-73cd-444e-84a9-100e202aa422", "metadata": {}, "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:176: UserWarning: The channel user_input was not connected to x, andthus could not disconnect from it.\n", - " warn(\n", - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:176: UserWarning: The channel run was not connected to ran, andthus could not disconnect from it.\n", - " warn(\n" - ] - }, { "data": { "text/plain": [ @@ -2285,20 +2257,7 @@ "execution_count": 49, "id": "a832e552-b3cc-411a-a258-ef21574fc439", "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:176: UserWarning: The channel user_input was not connected to name, andthus could not disconnect from it.\n", - " warn(\n", - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:176: UserWarning: The channel user_input was not connected to crystalstructure, andthus could not disconnect from it.\n", - " warn(\n", - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:176: UserWarning: The channel user_input was not connected to a, andthus could not disconnect from it.\n", - " warn(\n" - ] - } - ], + "outputs": [], "source": [ "wf = Workflow(\"phase_preference\")\n", "wf.element = wf.create.standard.UserInput()\n", @@ -3296,7 +3255,7 @@ "\n" ], "text/plain": [ - "" + "" ] }, "execution_count": 50, @@ -3324,7 +3283,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "d42f0d45153c43d08c00b1be09a5d371", + "model_id": "c2d3d353f02f469cbb9bdf4fd8249962", "version_major": 2, "version_minor": 0 }, @@ -3345,7 +3304,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "9d3a8ad5fad943ca84a31ef0447ed921", + "model_id": "e3cb6c8a60ba4488976c563b03615dbd", "version_major": 2, "version_minor": 0 }, @@ -3375,14 +3334,6 @@ "id": "091e2386-0081-436c-a736-23d019bd9b91", "metadata": {}, "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:176: UserWarning: The channel ran was not connected to accumulate_and_run, andthus could not disconnect from it.\n", - " warn(\n" - ] - }, { "name": "stdout", "output_type": "stream", @@ -3393,7 +3344,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "1022ff0dc68a42649273a16d4783738c", + "model_id": "f1f215c871884c07a807522324b5d468", "version_major": 2, "version_minor": 0 }, @@ -3414,7 +3365,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "d702dbfc432d4f58847d9d13dcc3caf0", + "model_id": "484cb214c9aa42e7bf7e0c15a71dc7bf", "version_major": 2, "version_minor": 0 }, @@ -3455,18 +3406,7 @@ "execution_count": 53, "id": "4cdffdca-48d3-4486-9045-48102c7e5f31", "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:176: UserWarning: The channel job was not connected to job, andthus could not disconnect from it.\n", - " warn(\n", - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:176: UserWarning: The channel accumulate_and_run was not connected to ran, andthus could not disconnect from it.\n", - " warn(\n" - ] - } - ], + "outputs": [], "source": [ "replacee = wf.min_phase1.calc \n", "wf.min_phase1.calc = Workflow.create.pyiron_atomistics.CalcStatic" @@ -3488,14 +3428,6 @@ "id": "ed4a3a22-fc3a-44c9-9d4f-c65bc1288889", "metadata": {}, "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:176: UserWarning: The channel ran was not connected to accumulate_and_run, andthus could not disconnect from it.\n", - " warn(\n" - ] - }, { "name": "stdout", "output_type": "stream", @@ -3506,7 +3438,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "764286716ac94a30ba87c095e11d29a5", + "model_id": "3eda5eaed97d4ef5bd05db0a1aaa6808", "version_major": 2, "version_minor": 0 }, @@ -3527,7 +3459,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "c7c4f447758a4106b3ea7463796aed44", + "model_id": "e218374bcdcf4c818ce88b2410368050", "version_major": 2, "version_minor": 0 }, @@ -3558,14 +3490,6 @@ "id": "5a985cbf-c308-4369-9223-b8a37edb8ab1", "metadata": {}, "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:176: UserWarning: The channel ran was not connected to accumulate_and_run, andthus could not disconnect from it.\n", - " warn(\n" - ] - }, { "name": "stdout", "output_type": "stream", @@ -3576,7 +3500,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "6d864a43e64f44aca3717b3911d4a2f9", + "model_id": "963e16ec783846d1885ed72dfe26c3c5", "version_major": 2, "version_minor": 0 }, @@ -3597,7 +3521,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "0530268ddab7455584ba8f85f7da783c", + "model_id": "d13b542a098544c5a50e869948ccd5f5", "version_major": 2, "version_minor": 0 }, @@ -3663,7 +3587,7 @@ "output_type": "stream", "text": [ "None 1\n", - " 5\n" + " 5\n" ] } ], @@ -3749,7 +3673,7 @@ "output_type": "stream", "text": [ "None 1\n", - " 5\n", + " 5\n", "Finally 5\n", "b (Add):\n", "Inputs ['obj', 'other']\n", @@ -3810,7 +3734,7 @@ "name": "stdout", "output_type": "stream", "text": [ - "6.009681912953965\n" + "6.0116940710067865\n" ] } ], @@ -3842,7 +3766,7 @@ "name": "stdout", "output_type": "stream", "text": [ - "3.0886348159983754\n" + "2.92550689199561\n" ] } ], @@ -3889,6 +3813,501 @@ "Unfortunately, _nested_ executors are not yet working. So if you set a macro to use an executor, none of its (grand...)children may specify an executor." ] }, + { + "cell_type": "markdown", + "id": "4d3f2d37-9e35-425b-93a1-2c327685bbf4", + "metadata": {}, + "source": [ + "# For-loops\n", + "\n", + "Any node with an IO signature that is fixed at the class level (i.e. every `StaticNode`, which is all the standard ones except for a `Workflow` instance) can be transformed into a macro that loops over that node using the `Workflow.create.for_node` interface. Any input that is not explicity scattered using the `iter_on` or `zip_on` gets gets broadcast to _all_ copies of the body node. The result is a dataframe coupling looped input to body node output:" + ] + }, + { + "cell_type": "code", + "execution_count": 63, + "id": "e3538139-f814-43ba-aad2-f35be0dc2721", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "Al: [0. 0. 0.]\n", + "tags: \n", + " indices: [0]\n", + "pbc: [ True True True]\n", + "cell: \n", + "Cell([[0.0, 2.05, 2.05], [2.05, 0.0, 2.05], [2.05, 2.05, 0.0]])" + ] + }, + "execution_count": 63, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "n = Workflow.create.pyiron_atomistics.Bulk(name=\"Al\", a=4.1)\n", + "n()" + ] + }, + { + "cell_type": "code", + "execution_count": 64, + "id": "0b373764-b389-4c24-8086-f3d33a4f7fd7", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
astructure
03.90[Atom('Al', [0.0, 0.0, 0.0], index=0)]
13.95[Atom('Al', [0.0, 0.0, 0.0], index=0)]
24.00[Atom('Al', [0.0, 0.0, 0.0], index=0)]
34.05[Atom('Al', [0.0, 0.0, 0.0], index=0)]
44.10[Atom('Al', [0.0, 0.0, 0.0], index=0)]
\n", + "
" + ], + "text/plain": [ + " a structure\n", + "0 3.90 [Atom('Al', [0.0, 0.0, 0.0], index=0)]\n", + "1 3.95 [Atom('Al', [0.0, 0.0, 0.0], index=0)]\n", + "2 4.00 [Atom('Al', [0.0, 0.0, 0.0], index=0)]\n", + "3 4.05 [Atom('Al', [0.0, 0.0, 0.0], index=0)]\n", + "4 4.10 [Atom('Al', [0.0, 0.0, 0.0], index=0)]" + ] + }, + "execution_count": 64, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "bulk_loop = Workflow.create.for_node(\n", + " Workflow.create.pyiron_atomistics.Bulk,\n", + " iter_on=(\"a\",),\n", + " name=\"Al\",\n", + " a=np.linspace(3.9, 4.1, 5).tolist()\n", + ")\n", + "\n", + "out = bulk_loop()\n", + "out.df" + ] + }, + { + "cell_type": "markdown", + "id": "c8481efb-d7a5-4395-9e46-aab6a0e004eb", + "metadata": {}, + "source": [ + "Any number of input channels can be specified to make a nested list over, and/or zipped over by passing the channel labels as tuples to the `iter_on` and `zip_on` arguments respectively. In case the body node uses the same labels for both (looped) input channels _and_ output channels, you will need to provide a map to the for-loop to prevent the resulting dataframe from having degenerate column names:" + ] + }, + { + "cell_type": "code", + "execution_count": 65, + "id": "6f486c87-f3d4-405f-a759-2ada12cb45e2", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
abcdout_aout_bout_cout_de
013791379e
11381013810e
214791479e
31481014810e
415791579e
51581015810e
616791679e
71681016810e
823792379e
92381023810e
1024792479e
112481024810e
1225792579e
132581025810e
1426792679e
152681026810e
\n", + "
" + ], + "text/plain": [ + " a b c d out_a out_b out_c out_d e\n", + "0 1 3 7 9 1 3 7 9 e\n", + "1 1 3 8 10 1 3 8 10 e\n", + "2 1 4 7 9 1 4 7 9 e\n", + "3 1 4 8 10 1 4 8 10 e\n", + "4 1 5 7 9 1 5 7 9 e\n", + "5 1 5 8 10 1 5 8 10 e\n", + "6 1 6 7 9 1 6 7 9 e\n", + "7 1 6 8 10 1 6 8 10 e\n", + "8 2 3 7 9 2 3 7 9 e\n", + "9 2 3 8 10 2 3 8 10 e\n", + "10 2 4 7 9 2 4 7 9 e\n", + "11 2 4 8 10 2 4 8 10 e\n", + "12 2 5 7 9 2 5 7 9 e\n", + "13 2 5 8 10 2 5 8 10 e\n", + "14 2 6 7 9 2 6 7 9 e\n", + "15 2 6 8 10 2 6 8 10 e" + ] + }, + "execution_count": 65, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "@Workflow.wrap.as_function_node()\n", + "def FiveApart(a: int, b: int, c: int, d: int, e: str = \"foobar\"):\n", + " return a, b, c, d, e,\n", + "\n", + "for_instance = Workflow.create.for_node(\n", + " FiveApart,\n", + " iter_on=(\"a\", \"b\"),\n", + " zip_on=(\"c\", \"d\"),\n", + " a=[1, 2],\n", + " b=[3, 4, 5, 6],\n", + " c=[7, 8],\n", + " d=[9, 10, 11],\n", + " e=\"e\",\n", + " output_column_map={\n", + " \"a\": \"out_a\",\n", + " \"b\": \"out_b\",\n", + " \"c\": \"out_c\",\n", + " \"d\": \"out_d\"\n", + " }\n", + ")\n", + "\n", + "out = for_instance()\n", + "out.df" + ] + }, + { + "cell_type": "markdown", + "id": "22b688e2-d203-4795-a409-dfeaa978b595", + "metadata": {}, + "source": [ + "Once set, these inputs will _always_ be iterated on, and thus require list input, but the length of the input can be varied between runs of the node. Under the hood, the macro is destroying and recreating (many of) its subgraph nodes at each runtime -- so the interface is fixed, but the internal structure can vary. Note that we use the same standard as python, and zipped input is always truncated to the shortest zipping partner:" + ] + }, + { + "cell_type": "code", + "execution_count": 66, + "id": "10f284c8-9210-465f-b4d6-9aa4c0909b08", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
abcdout_aout_bout_cout_de
013791379e
\n", + "
" + ], + "text/plain": [ + " a b c d out_a out_b out_c out_d e\n", + "0 1 3 7 9 1 3 7 9 e" + ] + }, + "execution_count": 66, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "for_instance(a=[1], b=[3], c=[7]).df" + ] + }, { "cell_type": "markdown", "id": "f447531e-3e8c-4c7e-a579-5f9c56b75a5b", @@ -3933,7 +4352,7 @@ }, { "cell_type": "code", - "execution_count": 63, + "execution_count": 67, "id": "c8196054-aff3-4d39-a872-b428d329dac9", "metadata": {}, "outputs": [], @@ -3943,7 +4362,7 @@ }, { "cell_type": "code", - "execution_count": 64, + "execution_count": 68, "id": "ffd741a3-b086-4ed0-9a62-76143a3705b2", "metadata": {}, "outputs": [], @@ -3960,7 +4379,7 @@ }, { "cell_type": "code", - "execution_count": 65, + "execution_count": 69, "id": "3a22c622-f8c1-449b-a910-c52beb6a09c3", "metadata": {}, "outputs": [ @@ -3968,15 +4387,15 @@ "name": "stderr", "output_type": "stream", "text": [ - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/node.py:373: UserWarning: A saved file was found for the node save_demo -- attempting to load it...(To delete the saved file instead, use `overwrite_save=True`)\n", + "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/node.py:376: UserWarning: A saved file was found for the node save_demo -- attempting to load it...(To delete the saved file instead, use `overwrite_save=True`)\n", " warnings.warn(\n", - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/node.py:373: UserWarning: A saved file was found for the node inp -- attempting to load it...(To delete the saved file instead, use `overwrite_save=True`)\n", + "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/node.py:376: UserWarning: A saved file was found for the node inp -- attempting to load it...(To delete the saved file instead, use `overwrite_save=True`)\n", " warnings.warn(\n", - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/node.py:373: UserWarning: A saved file was found for the node middle -- attempting to load it...(To delete the saved file instead, use `overwrite_save=True`)\n", + "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/node.py:376: UserWarning: A saved file was found for the node middle -- attempting to load it...(To delete the saved file instead, use `overwrite_save=True`)\n", " warnings.warn(\n", - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/node.py:373: UserWarning: A saved file was found for the node end -- attempting to load it...(To delete the saved file instead, use `overwrite_save=True`)\n", + "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/node.py:376: UserWarning: A saved file was found for the node end -- attempting to load it...(To delete the saved file instead, use `overwrite_save=True`)\n", " warnings.warn(\n", - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/node.py:373: UserWarning: A saved file was found for the node out -- attempting to load it...(To delete the saved file instead, use `overwrite_save=True`)\n", + "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/node.py:376: UserWarning: A saved file was found for the node out -- attempting to load it...(To delete the saved file instead, use `overwrite_save=True`)\n", " warnings.warn(\n" ] } @@ -3999,7 +4418,7 @@ }, { "cell_type": "code", - "execution_count": 66, + "execution_count": 70, "id": "0999d3e8-3a5a-451d-8667-a01dae7c1193", "metadata": {}, "outputs": [], @@ -4008,100 +4427,23 @@ " reloaded.storage.delete()" ] }, - { - "cell_type": "markdown", - "id": "1f012460-19af-45f7-98aa-a0ad5b8e6faa", - "metadata": {}, - "source": [ - "## Meta-nodes and flow control\n", - "\n", - "A meta-node is a function that produces a node _class_ instedad of a node _instance_.\n", - "Right now, these are used to produce parameterized flow-control nodes, which take an node class as input and return a new macro class that builds some graph using the passed node class, e.g. for- and while-loops.\n", - "\n", - "Note: The body (and condition) node classes passed to for- and while-loops must be importable, i.e. they can come from a node package, or be defined here in the notebook (importable from `__main__`), but you can't use, e.g., a node defined _inside_ the scope of some other function.\n", - "\n", - "### For-loops\n", - "\n", - "One meta node is a for-loop builder, which creates a macro with $n$ internal instances of the \"loop body\" node class, and a new IO interface.\n", - "The new input allows you to specify which input channels are being looped over -- such that the macro input for this channel is interpreted as list-like and distributed to all the copies of the nodes separately --, and which is _not_ being looped over -- and thus interpreted as the loop body node would normally interpret the input and passed to all copies equally.\n", - "All of the loop body outputs are then collected as a list of length $n$.\n", - "\n", - "We follow a convention that inputs and outputs being looped over are indicated by their channel labels being ALL CAPS.\n", - "\n", - "In the example below, we loop over the bulk structure node to create structures with different lattice constants:" - ] - }, - { - "cell_type": "code", - "execution_count": 67, - "id": "0b373764-b389-4c24-8086-f3d33a4f7fd7", - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/io_preview.py:253: OutputLabelsNotValidated: Could not find the source code to validate BulkForA5 output labels against the number of returned values -- proceeding without validation\n", - " warnings.warn(\n" - ] - }, - { - "data": { - "text/plain": [ - "[14.829749999999995,\n", - " 15.407468749999998,\n", - " 15.999999999999998,\n", - " 16.60753125,\n", - " 17.230249999999995]" - ] - }, - "execution_count": 67, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "n = 5\n", - "\n", - "bulk_loop = Workflow.create.meta.for_loop(\n", - " Workflow.create.pyiron_atomistics.Bulk,\n", - " n,\n", - " iterate_on=(\"a\",),\n", - ")()\n", - "\n", - "out = bulk_loop(\n", - " name=\"Al\", # Sent equally to each body node\n", - " A=np.linspace(3.9, 4.1, n).tolist(), # Scattered across body nodes\n", - " # The rest of the values need to be filled\n", - " # (We don't currently pass body defaults to the loop node)\n", - " # but just get broadcast to each body node\n", - " crystalstructure=None,\n", - " c=None,\n", - " covera=None,\n", - " u=None,\n", - " orthorhombic=False,\n", - " cubic=False\n", - ")\n", - "\n", - "[struct.cell.volume for struct in out.STRUCTURE] \n", - "# output is a list collected from copies of the body node, as indicated by CAPS label" - ] - }, { "cell_type": "markdown", "id": "4e7ed210-dbc2-4afa-825e-b91168baff25", "metadata": {}, "source": [ - "## While-loops\n", + "# While-loops\n", + "\n", + "Similar to for-loops, we can also create a while-loop, which takes both a body node and a condition node. The condition node must be a single-output `Function` node returning a `bool` type. Instead of creating copies of the body node, the body node gets re-run until the condition node returns `False`.\n", "\n", - "We can also create a while-loop, which takes both a body node and a condition node. The condition node must be a single-output `Function` node returning a `bool` type. Instead of creating copies of the body node, the body node gets re-run until the condition node returns `False`.\n", + "You _must_ specify the data connection so that the body node passes information to the condition node. You may optionally also loop output of the body node back to input of the body node to change the input at each iteration. Right now this is done with horribly ugly string tuples, but we're working on improving this interface and making it more like the for-loop.\n", "\n", - "You _must_ specify the data connection so that the body node passes information to the condition node. You may optionally also loop output of the body node back to input of the body node to change the input at each iteration. Right now this is done with horribly ugly string tuples, but we're still working on it." + "Note: The body (and condition) node classes passed to while-loops must be importable, i.e. they can come from a node package, or be defined here in the notebook (importable from `__main__`), but you can't use, e.g., a node defined _inside_ the scope of some other function." ] }, { "cell_type": "code", - "execution_count": 68, + "execution_count": 71, "id": "0dd04b4c-e3e7-4072-ad34-58f2c1e4f596", "metadata": {}, "outputs": [ @@ -4109,18 +4451,8 @@ "name": "stderr", "output_type": "stream", "text": [ - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/io_preview.py:253: OutputLabelsNotValidated: Could not find the source code to validate AddWhileLessThan_m3900845772041641930 output labels against the number of returned values -- proceeding without validation\n", - " warnings.warn(\n", - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:176: UserWarning: The channel user_input was not connected to a, andthus could not disconnect from it.\n", - " warn(\n", - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:176: UserWarning: The channel user_input was not connected to b, andthus could not disconnect from it.\n", - " warn(\n", - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:176: UserWarning: The channel user_input was not connected to other, andthus could not disconnect from it.\n", - " warn(\n", - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:176: UserWarning: The channel run was not connected to true, andthus could not disconnect from it.\n", - " warn(\n", - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:176: UserWarning: The channel run was not connected to ran, andthus could not disconnect from it.\n", - " warn(\n" + "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/io_preview.py:261: OutputLabelsNotValidated: Could not find the source code to validate AddWhileLessThan_6300416345671079692 output labels against the number of returned values -- proceeding without validation\n", + " warnings.warn(\n" ] } ], @@ -4164,7 +4496,7 @@ }, { "cell_type": "code", - "execution_count": 69, + "execution_count": 72, "id": "2dfb967b-41ac-4463-b606-3e315e617f2a", "metadata": {}, "outputs": [ @@ -4188,7 +4520,7 @@ }, { "cell_type": "code", - "execution_count": 70, + "execution_count": 73, "id": "2e87f858-b327-4f6b-9237-c8a557f29aeb", "metadata": {}, "outputs": [ @@ -4196,23 +4528,23 @@ "name": "stdout", "output_type": "stream", "text": [ - "0.540 > 0.2\n", - "0.285 > 0.2\n", - "0.946 > 0.2\n", - "0.437 > 0.2\n", - "0.558 > 0.2\n", - "0.010 <= 0.2\n", - "Finally 0.010\n" + "0.717 > 0.2\n", + "0.384 > 0.2\n", + "0.334 > 0.2\n", + "0.223 > 0.2\n", + "0.219 > 0.2\n", + "0.333 > 0.2\n", + "0.834 > 0.2\n", + "0.167 <= 0.2\n", + "Finally 0.167\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/io_preview.py:253: OutputLabelsNotValidated: Could not find the source code to validate RandomWhileGreaterThan_m6305331635963844247 output labels against the number of returned values -- proceeding without validation\n", - " warnings.warn(\n", - "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/channels.py:176: UserWarning: The channel user_input was not connected to threshold, andthus could not disconnect from it.\n", - " warn(\n" + "/Users/huber/work/pyiron/pyiron_workflow/pyiron_workflow/io_preview.py:261: OutputLabelsNotValidated: Could not find the source code to validate RandomWhileGreaterThan_715325418919625042 output labels against the number of returned values -- proceeding without validation\n", + " warnings.warn(\n" ] } ], diff --git a/pyiron_workflow/channels.py b/pyiron_workflow/channels.py index 81be6e02b..c7d975175 100644 --- a/pyiron_workflow/channels.py +++ b/pyiron_workflow/channels.py @@ -11,7 +11,6 @@ import typing from abc import ABC, abstractmethod import inspect -from warnings import warn from pyiron_workflow.has_interface_mixins import HasChannel, HasLabel, UsesState from pyiron_workflow.has_to_dict import HasToDict @@ -172,11 +171,6 @@ def disconnect(self, *others: Channel) -> list[tuple[Channel, Channel]]: self.connections.remove(other) other.disconnect(self) destroyed_connections.append((self, other)) - else: - warn( - f"The channel {self.label} was not connected to {other.label}, and" - f"thus could not disconnect from it." - ) return destroyed_connections def disconnect_all(self) -> list[tuple[Channel, Channel]]: diff --git a/pyiron_workflow/create.py b/pyiron_workflow/create.py index 183e062a3..75e077f3a 100644 --- a/pyiron_workflow/create.py +++ b/pyiron_workflow/create.py @@ -64,6 +64,13 @@ def __init__(self): # this if-clause and just letting users of python <3.10 hit an error. self.register("pyiron_workflow.node_library.standard", "standard") + @property + @lru_cache(maxsize=1) + def for_node(self): + from pyiron_workflow.for_loop import for_node + + return for_node + @property @lru_cache(maxsize=1) def macro_node(self): @@ -82,12 +89,11 @@ def Workflow(self): @lru_cache(maxsize=1) def meta(self): from pyiron_workflow.transform import inputs_to_list, list_to_outputs - from pyiron_workflow.loops import for_loop, while_loop + from pyiron_workflow.loops import while_loop from pyiron_workflow.snippets.dotdict import DotDict return DotDict( { - for_loop.__name__: for_loop, inputs_to_list.__name__: inputs_to_list, list_to_outputs.__name__: list_to_outputs, while_loop.__name__: while_loop, diff --git a/pyiron_workflow/for_loop.py b/pyiron_workflow/for_loop.py new file mode 100644 index 000000000..a9a672291 --- /dev/null +++ b/pyiron_workflow/for_loop.py @@ -0,0 +1,479 @@ +from __future__ import annotations + +from abc import ABC +from concurrent.futures import Executor +from functools import lru_cache +import itertools +import math +from typing import Any, ClassVar, Literal, Optional + +from pandas import DataFrame + +from pyiron_workflow.channels import NOT_DATA +from pyiron_workflow.composite import Composite +from pyiron_workflow.io_preview import StaticNode +from pyiron_workflow.snippets.factory import classfactory +from pyiron_workflow.transform import inputs_to_dict, inputs_to_dataframe, InputsToDict + + +def dictionary_to_index_maps( + data: dict, + nested_keys: Optional[list[str] | tuple[str, ...]] = None, + zipped_keys: Optional[list[str] | tuple[str, ...]] = None, +): + """ + Given a dictionary where some data is iterable, and list(s) of keys over + which to make a nested and/or zipped loop, return dictionaries mapping + these keys to all the indices of the data they hold. Zipped loops are + nested outside the nesting loops. + + Args: + data (dict): The dictionary of data, some of which must me iterable. + nested_keys (tuple[str, ...] | None): The keys whose data to make a + nested for-loop over. + zipped_keys (tuple[str, ...] | None): The keys whose data to make a + zipped for-loop over. + + Returns: + (tuple[dict[..., int], ...]): A tuple of dictionaries where each item + maps the dictionary key to an index for that key's value. + + Raises: + (KeyError): If any of the provided keys are not keys of the provided + dictionary. + (TypeError): If any of the data held in a provided key does cannot be + operated on with `len`. + (ValueError): If neither set of keys to iterate on is provided, or if + all values being iterated over have a length of zero. + """ + + try: + nested_data_lengths = ( + [] + if (nested_keys is None or len(nested_keys) == 0) + else list(len(data[key]) for key in nested_keys) + ) + except TypeError as e: + raise TypeError( + f"Could not parse nested lengths -- Does one of the keys {nested_keys} " + f"have non-iterable data?" + ) from e + n_nest = math.prod(nested_data_lengths) if len(nested_data_lengths) > 0 else 0 + + try: + n_zip = ( + 0 + if (zipped_keys is None or len(zipped_keys) == 0) + else min(len(data[key]) for key in zipped_keys) + ) + except TypeError as e: + raise TypeError( + f"Could not parse zipped lengths -- Does one of the keys {zipped_keys} " + f"have non-iterable data?" + ) from e + + def nested_generator(): + return itertools.product(*[range(n) for n in nested_data_lengths]) + + def nested_index_map(nested_indices): + return { + nested_keys[i_key]: nested_index + for i_key, nested_index in enumerate(nested_indices) + } + + def zipped_generator(): + return range(n_zip) + + def zipped_index_map(zipped_index): + return {key: zipped_index for key in zipped_keys} + + def merge(d1, d2): + d1.update(d2) + return d1 + + if n_nest > 0 and n_zip > 0: + key_index_maps = tuple( + merge(nested_index_map(nested_indices), zipped_index_map(zipped_index)) + for nested_indices, zipped_index in itertools.product( + nested_generator(), zipped_generator() + ) + ) + elif n_nest > 0: + key_index_maps = tuple( + nested_index_map(nested_indices) for nested_indices in nested_generator() + ) + elif n_zip > 0: + key_index_maps = tuple( + zipped_index_map(zipped_index) for zipped_index in zipped_generator() + ) + else: + if nested_keys is None and zipped_keys is None: + raise ValueError( + "At least one of `nested_keys` or `zipped_keys` must be specified." + ) + else: + raise ValueError( + "Received keys to iterate over, but all values had length 0." + ) + + return key_index_maps + + +class UnmappedConflictError(ValueError): + """ + When a for-node gets a body whose output label conflicts with looped a input + label and no map was provided to avoid this. + """ + + +class MapsToNonexistentOutputError(ValueError): + """ + When a for-node tries to map body node output channels that don't exist. + """ + + +class For(Composite, StaticNode, ABC): + """ + Specifies fixed fields of some other node class to iterate over, but allows the + length of looped input to vary by dynamically destroying and recreating (most of) + its subgraph at run-time. + + Collects looped output and collates them with looped input values in a dataframe. + + The :attr:`body_node_executor` gets applied to each body node instance on each + run. + """ + + _body_node_class: ClassVar[type[StaticNode]] + _iter_on: ClassVar[tuple[str, ...]] = () + _zip_on: ClassVar[tuple[str, ...]] = () + + def __init_subclass__(cls, output_column_map=None, **kwargs): + super().__init_subclass__(**kwargs) + + unmapped_conflicts = ( + set(cls._body_node_class.preview_inputs().keys()) + .intersection(cls._iter_on + cls._zip_on) + .intersection(cls._body_node_class.preview_outputs().keys()) + .difference(() if output_column_map is None else output_column_map.keys()) + ) + if len(unmapped_conflicts) > 0: + raise UnmappedConflictError( + f"The body node {cls._body_node_class.__name__} has channel labels " + f"{unmapped_conflicts} that appear as both (looped) input _and_ output " + f"for {cls.__name__}. All such channels require a map to produce new, " + f"unique column names for the output." + ) + + maps_to_nonexistent_output = set( + {} if output_column_map is None else output_column_map.keys() + ).difference(cls._body_node_class.preview_outputs().keys()) + if len(maps_to_nonexistent_output) > 0: + raise MapsToNonexistentOutputError( + f"{cls.__name__} tried to map body node output(s) " + f"{maps_to_nonexistent_output} to new column names, but " + f"{cls._body_node_class.__name__} has no such outputs." + ) + + cls._output_column_map = output_column_map + + @classmethod + @property + @lru_cache(maxsize=1) + def output_column_map(cls) -> dict[str, str]: + """ + How to transform body node output labels to dataframe column names. + """ + map_ = {k: k for k in cls._body_node_class.preview_outputs().keys()} + overrides = {} if cls._output_column_map is None else cls._output_column_map + for body_label, column_name in overrides.items(): + map_[body_label] = column_name + return map_ + + def __init__( + self, + *args, + label: Optional[str] = None, + parent: Optional[Composite] = None, + overwrite_save: bool = False, + run_after_init: bool = False, + storage_backend: Optional[Literal["h5io", "tinybase"]] = None, + save_after_run: bool = False, + strict_naming: bool = True, + body_node_executor: Optional[Executor] = None, + **kwargs, + ): + super().__init__( + *args, + label=label, + parent=parent, + overwrite_save=overwrite_save, + run_after_init=run_after_init, + storage_backend=storage_backend, + save_after_run=save_after_run, + strict_naming=strict_naming, + **kwargs, + ) + self.body_node_executor = None + + def _setup_node(self) -> None: + super()._setup_node() + input_nodes = [] + for channel in self.inputs: + n = self.create.standard.UserInput( + channel.default, label=channel.label, parent=self + ) + n.inputs.user_input.type_hint = channel.type_hint + channel.value_receiver = n.inputs.user_input + input_nodes.append(n) + self.starting_nodes = input_nodes + self._input_node_labels = tuple(n.label for n in input_nodes) + + def on_run(self): + self._build_body() + return super().on_run() + + def _build_body(self): + """ + Construct instances of the body node based on input length, and wire them to IO. + """ + iter_maps = dictionary_to_index_maps( + self.inputs.to_value_dict(), + nested_keys=self._iter_on, + zipped_keys=self._zip_on, + ) + + self._clean_existing_subgraph() + + self.dataframe = inputs_to_dataframe(len(iter_maps)) + self.dataframe.outputs.df.value_receiver = self.outputs.df + + for n, channel_map in enumerate(iter_maps): + body_node = self._body_node_class(label=f"body_{n}", parent=self) + body_node.executor = self.body_node_executor + row_collector = self._build_collector_node(n) + + self._connect_broadcast_input(body_node) + for label, i in channel_map.items(): + self._connect_looped_input(body_node, row_collector, label, i) + + self._collect_output_from_body(body_node, row_collector) + + self.dataframe.inputs[f"row_{n}"] = row_collector + + self.set_run_signals_to_dag_execution() + + def _clean_existing_subgraph(self): + for label in self.child_labels: + if label not in self._input_node_labels: + self.remove_child(label) + else: + # Re-run the user input node so it has up-to-date output, otherwise + # when we inject a getitem node -- which will try to run automatically + # -- it will see data it can work with, but if that data happens to + # have the wrong length it may successfully auto-run on the wrong thing + # and throw an error! + self.children[label]() + # TODO: Instead of deleting _everything_ each time, try and re-use stuff + + def _build_collector_node(self, row_number): + # Iterated inputs + row_specification = { + key: (self._body_node_class.preview_inputs()[key][0], NOT_DATA) + for key in self._iter_on + self._zip_on + } + # Outputs + row_specification.update( + { + self.output_column_map[key]: (hint, NOT_DATA) + for key, hint in self._body_node_class.preview_outputs().items() + } + ) + return inputs_to_dict( + row_specification, parent=self, label=f"row_collector_{row_number}" + ) + + def _connect_broadcast_input(self, body_node: StaticNode) -> None: + """Connect broadcast macro input to each body node.""" + for broadcast_label in set(self.preview_inputs().keys()).difference( + self._iter_on + self._zip_on + ): + self.inputs[broadcast_label].value_receiver = body_node.inputs[ + broadcast_label + ] + + def _connect_looped_input( + self, + body_node: StaticNode, + row_collector: InputsToDict, + looped_input_label: str, + i: int, + ) -> None: + """Get item from macro input and connect it to body and collector nodes.""" + index_node = self.children[looped_input_label][i] # Inject getitem node + body_node.inputs[looped_input_label] = index_node + row_collector.inputs[looped_input_label] = index_node + + def _collect_output_from_body( + self, body_node: StaticNode, row_collector: InputsToDict + ) -> None: + """Pass body node output to the collector node.""" + for label, body_out in body_node.outputs.items(): + row_collector.inputs[self.output_column_map[label]] = body_out + + @classmethod + @lru_cache(maxsize=1) + def _build_inputs_preview(cls) -> dict[str, tuple[Any, Any]]: + preview = {} + for label, (hint, default) in cls._body_node_class.preview_inputs().items(): + # TODO: Leverage hint and default, listing if it's looped on + if label in cls._zip_on + cls._iter_on: + hint = list if hint is None else list[hint] + default = NOT_DATA # TODO: Figure out a generator pattern to get lists + preview[label] = (hint, default) + return preview + + @classmethod + def _build_outputs_preview(cls) -> dict[str, Any]: + return {"df": DataFrame} + + +def _for_node_class_name( + body_node_class: type[StaticNode], iter_on: tuple[str, ...], zip_on: tuple[str, ...] +): + iter_fields = ( + "" if len(iter_on) == 0 else "Iter" + "".join(k.title() for k in iter_on) + ) + zip_fields = "" if len(zip_on) == 0 else "Zip" + "".join(k.title() for k in zip_on) + return f"{For.__name__}{body_node_class.__name__}{iter_fields}{zip_fields}" + + +@classfactory +def for_node_factory( + body_node_class: type[StaticNode], + iter_on: tuple[str, ...] = (), + zip_on: tuple[str, ...] = (), + output_column_map: dict | None = None, + /, +): + return ( + _for_node_class_name(body_node_class, iter_on, zip_on), + (For,), + { + "_body_node_class": body_node_class, + "_iter_on": iter_on, + "_zip_on": zip_on, + }, + {"output_column_map": output_column_map}, + ) + + +def for_node( + body_node_class, + *node_args, + iter_on=(), + zip_on=(), + output_column_map: Optional[dict[str, str]] = None, + **node_kwargs, +): + """ + Makes a new :class:`For` node which internally creates instances of the + :param:`body_node_class` and loops input onto them in nested and/or zipped loop(s). + + Output is a single channel, `"df"`, which holds a :class:`pandas.DataFrame` whose + rows couple (looped) input to their respective body node outputs. + + The internal node structure gets re-created each run, so the same inputs must + consistently be iterated over, but their lengths can change freely. + + An executor can be applied to all body node instances at run-time by assigning it + to the :attr:`body_node_executor` attribute of the for-node. + + Args: + body_node_class type[StaticNode]: The class of node to loop on. + *node_args: Regular positional node arguments. + iter_on (tuple[str, ...]): Input labels in the :param:`body_node_class` to + nested-loop on. + zip_on (tuple[str, ...]): Input labels in the :param:`body_node_class` to + zip-loop on. + output_column_map (dict[str, str] | None): A map for generating dataframe + column names (values) from body node output channel labels (keys). + Necessary iff the body node has the same label for an output channel and + an input channel being looped over. (Default is None, just use the output + channel labels as columb names.) + **node_kwargs: Regular keyword node arguments. + + Returns: + (For): An instance of a dynamically-subclassed :class:`For` node. + + Examples: + >>> from pyiron_workflow import Workflow + >>> + >>> @Workflow.wrap.as_function_node("together") + ... def FiveTogether(a: int, b: int, c: int, d: int, e: str = "foobar"): + ... return (a, b, c, d, e), + >>> + >>> for_instance = Workflow.create.for_node( + ... FiveTogether, + ... iter_on=("a", "b"), + ... zip_on=("c", "d"), + ... a=[1, 2], + ... b=[3, 4, 5, 6], + ... c=[7, 8], + ... d=[9, 10, 11], + ... e="e" + ... ) + >>> + >>> out = for_instance() + >>> type(out.df) + + + Internally, the loop node has made a bunch of body nodes, as well as nodes to + index and collect data + >>> len(for_instance) + 48 + + We get one dataframe row for each possible combination of looped input + >>> len(out.df) + 16 + + We are stuck iterating on the fields we defined, but we can change the length + of the input and the loop node's body will get reconstructed at run-time to + accommodate this + >>> out = for_instance(a=[1], b=[3], d=[7]) + >>> len(for_instance), len(out) + (12, 1) + + Note that if we had simply returned each input individually, without any output + labels on the node, we'd need to specify a map on the for-node so that the + (looped) input and output columns on the resulting dataframe are all unique: + >>> @Workflow.wrap.as_function_node() + ... def FiveApart(a: int, b: int, c: int, d: int, e: str = "foobar"): + ... return a, b, c, d, e, + >>> + >>> for_instance = Workflow.create.for_node( + ... FiveApart, + ... iter_on=("a", "b"), + ... zip_on=("c", "d"), + ... a=[1, 2], + ... b=[3, 4, 5, 6], + ... c=[7, 8], + ... d=[9, 10, 11], + ... e="e", + ... output_column_map={ + ... "a": "out_a", + ... "b": "out_b", + ... "c": "out_c", + ... "d": "out_d" + ... } + ... ) + >>> + >>> out = for_instance() + >>> out.df.columns + Index(['a', 'b', 'c', 'd', 'out_a', 'out_b', 'out_c', 'out_d', 'e'], dtype='object') + + """ + for_node_factory.clear(_for_node_class_name(body_node_class, iter_on, zip_on)) + cls = for_node_factory(body_node_class, iter_on, zip_on, output_column_map) + cls.preview_io() + return cls(*node_args, **node_kwargs) diff --git a/pyiron_workflow/loops.py b/pyiron_workflow/loops.py index e7f9028d1..ec303db54 100644 --- a/pyiron_workflow/loops.py +++ b/pyiron_workflow/loops.py @@ -12,119 +12,6 @@ from pyiron_workflow.node import Node -def for_loop( - loop_body_class: type[Node], - length: int, - iterate_on: str | tuple[str] | list[str], -) -> type[Macro]: - """ - An _extremely rough_ second draft of a for-loop meta-node. - - Takes a node class, how long the loop should be, and which input(s) of the provided - node class should be looped over (given as strings of the channel labels) and - builds a macro that scatters some input and broadcasts the rest, then operates on - a zip of all the scattered input (so it had better be the same length). - - - Makes copies of the provided node class, i.e. the "body node" - - Labels in :param:`iterate_on` must correspond to `loop_body_class` input channels, - and the for-loop node then expects list-like input for these with ALL CAPS - labeling, and this gets scattered to the children. - - All other input simply gets broadcast to each child. - - Output channels correspond to input channels, but are lists of the children and - labeled in ALL CAPS - - Warnings: - The loop body class must be importable. E.g. it can come from a node package or - be defined in `__main__`, but not defined inside the scope of some other - function. - - Examples: - - >>> from pyiron_workflow import Workflow - >>> - >>> denominators = list(range(1, 5)) - >>> bulk_loop = Workflow.create.meta.for_loop( - ... Workflow.create.standard.Divide, - ... len(denominators), - ... iterate_on = ("other",), - ... )() - >>> bulk_loop.inputs.obj = 1 - >>> bulk_loop.inputs.OTHER = denominators - >>> bulk_loop().TRUEDIV - [1.0, 0.5, 0.3333333333333333, 0.25] - - TODO: - - - Refactor like crazy, it's super hard to read and some stuff is too hard-coded - - How to handle passing executors to the children? Maybe this is more - generically a Macro question? - - Is it possible to somehow dynamically adapt the held graph depending on the - length of the input values being iterated over? E.g. rebuilding the graph - every run call. - - Allow a different mode, or make a different meta node, that makes all possible - pairs of body nodes given the input being looped over instead of just - :param:`length` - - Provide enter and exit magic methods so we can `for` or `with` this fancy-like - """ - input_preview = loop_body_class.preview_inputs() - output_preview = loop_body_class.preview_outputs() - - # Ensure `iterate_on` is in the input - iterate_on = [iterate_on] if isinstance(iterate_on, str) else iterate_on - incommensurate_input = set(iterate_on).difference(input_preview.keys()) - if len(incommensurate_input) > 0: - raise ValueError( - f"Cannot loop on {incommensurate_input}, as it is not an input channel " - f"of {loop_body_class.__name__}; please choose from among " - f"{list(input_preview)}" - ) - - # Build code components that need an f-string, slash, etc. - output_labels = ", ".join(f'"{l.upper()}"' for l in output_preview.keys()).rstrip( - " " - ) - macro_args = ", ".join( - l.upper() if l in iterate_on else l for l in input_preview.keys() - ).rstrip(" ") - body_label = 'f"body{n}"' - item_access = "[{n}]" - body_kwargs = ", ".join( - f"{l}={l.upper()}[n]" if l in iterate_on else f"{l}={l}" - for l in input_preview.keys() - ).rstrip(" ") - input_label = 'f"item_{n}"' - returns = ", ".join( - f'self.children["{label.upper()}"]' for label in output_preview.keys() - ) - node_name = f'{loop_body_class.__name__}For{"".join([l.title() for l in sorted(iterate_on)])}{length}' - - # Assemble components into a decorated for-loop macro - for_loop_code = dedent( - f""" - @Macro.wrap.as_macro_node({output_labels}) - def {node_name}(self, {macro_args}): - from {loop_body_class.__module__} import {loop_body_class.__name__} - - for label in [{output_labels}]: - inputs_to_list({length}, label=label, parent=self) - - for n in range({length}): - body_node = {loop_body_class.__name__}( - {body_kwargs}, - label={body_label}, - parent=self - ) - for label in {list(output_preview.keys())}: - self.children[label.upper()].inputs[{input_label}] = body_node.outputs[label] - - return {returns} - """ - ) - - exec(for_loop_code) - return locals()[node_name] - - def while_loop( loop_body_class: type[Node], condition_class: type[Function], diff --git a/tests/integration/test_workflow.py b/tests/integration/test_workflow.py index 9e44e1cd4..71e080deb 100644 --- a/tests/integration/test_workflow.py +++ b/tests/integration/test_workflow.py @@ -105,26 +105,23 @@ def sqrt(value=0): def test_for_loop(self): Workflow.register("static.demo_nodes", "demo") - n = 5 - - bulk_loop = pyiron_workflow.loops.for_loop( - Workflow.create.demo.OptionallyAdd, - n, - iterate_on=("y",), - )() - base = 42 - to_add = list(range(n)) - out = bulk_loop( - x=base, # Sent equally to each body node - Y=to_add, # Distributed across body nodes + to_add = list(range(5)) + bulk_loop = Workflow.create.for_node( + Workflow.create.demo.OptionallyAdd, + iter_on=("y",), + x=base, # Broadcast + y=to_add # Scattered ) + out = bulk_loop() - for output, expectation in zip(out.SUM, [base + v for v in to_add]): + for output, expectation in zip( + out.df["sum"].values.tolist(), + [base + v for v in to_add] + ): self.assertAlmostEqual( output, expectation, - msg="Output should be list result of each individiual result" ) def test_while_loop(self): diff --git a/tests/unit/test_for_loop.py b/tests/unit/test_for_loop.py new file mode 100644 index 000000000..61296e067 --- /dev/null +++ b/tests/unit/test_for_loop.py @@ -0,0 +1,320 @@ +from concurrent.futures import ThreadPoolExecutor +from itertools import product +from time import perf_counter +import unittest + +from pandas import DataFrame + +from pyiron_workflow.for_loop import ( + dictionary_to_index_maps, + for_node, + UnmappedConflictError, + MapsToNonexistentOutputError +) +from pyiron_workflow.function import as_function_node +from pyiron_workflow.node_library.standard import Sleep + + +class TestDictionaryToIndexMaps(unittest.TestCase): + + def test_no_keys(self): + data = {"key": 5} + with self.assertRaises(ValueError): + dictionary_to_index_maps(data) + + def test_empty_nested_keys(self): + data = {"key1": [1, 2, 3], "key2": [4, 5, 6]} + with self.assertRaises(ValueError): + dictionary_to_index_maps(data, nested_keys=()) + + def test_empty_zipped_keys(self): + data = {"key1": [1, 2, 3], "key2": [4, 5, 6]} + with self.assertRaises(ValueError): + dictionary_to_index_maps(data, zipped_keys=()) + + def test_nested_non_iterable_data(self): + data = {"key1": [1, 2, 3], "key2": 5} + with self.assertRaises(TypeError): + dictionary_to_index_maps(data, nested_keys=("key1", "key2")) + + def test_zipped_non_iterable_data(self): + data = {"key1": [1, 2, 3], "key2": 5} + with self.assertRaises(TypeError): + dictionary_to_index_maps(data, zipped_keys=("key1", "key2")) + + def test_valid_data_nested_only(self): + data = {"key1": [1, 2, 3], "key2": [4, 5]} + nested_keys = ("key1", "key2") + expected_maps = tuple( + {nested_keys[i]: idx for i, idx in enumerate(indices)} + for indices in product(range(len(data["key1"])), range(len(data["key2"]))) + ) + self.assertEqual( + expected_maps, + dictionary_to_index_maps(data, nested_keys=nested_keys), + ) + + def test_valid_data_zipped_only(self): + data = {"key1": [1, 2, 3], "key2": [4, 5]} + zipped_keys = ("key1", "key2") + expected_maps = tuple( + {key: idx for key in zipped_keys} + for idx in range(min(len(data["key1"]), len(data["key2"]))) + ) + self.assertEqual( + expected_maps, + dictionary_to_index_maps(data, zipped_keys=zipped_keys), + ) + + def test_valid_data_nested_and_zipped(self): + data = { + "nested1": [2, 3], + "nested2": [4, 5, 6], + "zipped1": [7, 8, 9, 10], + "zipped2": [11, 12, 13, 14, 15] + } + nested_keys = ("nested1", "nested2") + zipped_keys = ("zipped1", "zipped2") + expected_maps = tuple( + { + nested_keys[0]: n_idx, + nested_keys[1]: n_idx2, + zipped_keys[0]: z_idx, + zipped_keys[1]: z_idx2 + } + for n_idx, n_idx2 in product( + range(len(data["nested1"])), + range(len(data["nested2"])) + ) + for z_idx, z_idx2 in zip( + range(len(data["zipped1"])), + range(len(data["zipped2"])) + ) + ) + self.assertEqual( + expected_maps, + dictionary_to_index_maps(data, nested_keys=nested_keys, zipped_keys=zipped_keys), + ) + + +@as_function_node("together") +def FiveTogether( + a: int = 0, + b: int = 1, + c: int = 2, + d: int = 3, + e: str = "foobar", +): + return (a, b, c, d, e,), + + +class TestForNode(unittest.TestCase): + def test_iter_only(self): + for_instance = for_node( + FiveTogether, + iter_on=("a", "b",), + a=[42, 43, 44], + b=[13, 14], + ) + out = for_instance(e="iter") + self.assertIsInstance(out.df, DataFrame,) + self.assertEqual( + len(out.df), + 3 * 2, + msg="Expect nested loops" + ) + self.assertListEqual( + out.df.columns.tolist(), + ["a", "b", "together"], + msg="Dataframe should only hold output and _looped_ input" + ) + self.assertTupleEqual( + out.df["together"][1], + ((42, 14, 2, 3, "iter"),), + msg="Iter should get nested, broadcast broadcast, else take default" + ) + + def test_zip_only(self): + for_instance = for_node( + FiveTogether, + zip_on=("c", "d",), + e="zip" + ) + out = for_instance(c=[100, 101], d=[-1, -2, -3]) + self.assertEqual( + len(out.df), + 2, + msg="Expect zipping with the python convention of truncating to shortest" + ) + self.assertListEqual( + out.df.columns.tolist(), + ["c", "d", "together"], + msg="Dataframe should only hold output and _looped_ input" + ) + self.assertTupleEqual( + out.df["together"][1], + ((0, 1, 101, -2, "zip"),), + msg="Zipped should get zipped, broadcast broadcast, else take default" + ) + + def test_iter_and_zip(self): + for_instance = for_node( + FiveTogether, + iter_on=("a", "b",), + a=[42, 43, 44], + b=[13, 14], + zip_on=("c", "d",), + e="both" + ) + out = for_instance(c=[100, 101], d=[-1, -2, -3]) + self.assertEqual( + len(out.df), + 3 * 2 * 2, + msg="Zipped stuff is nested with the individually nested fields" + ) + self.assertListEqual( + out.df.columns.tolist(), + ["a", "b", "c", "d", "together"], + msg="Dataframe should only hold output and _looped_ input" + ) + # We don't actually care if the order of nesting changes, but make sure the + # iters are getting nested and zipped stay together + self.assertTupleEqual( + out.df["together"][0], + ((42, 13, 100, -1, "both"),), + msg="All start" + ) + self.assertTupleEqual( + out.df["together"][1], + ((42, 13, 101, -2, "both"),), + msg="Bump zipped together" + ) + self.assertTupleEqual( + out.df["together"][2], + ((42, 14, 100, -1, "both"),), + msg="Back to start of zipped, bump _one_ iter" + ) + + def test_dynamic_length(self): + for_instance = for_node( + FiveTogether, + iter_on=("a", "b",), + a=[42, 43, 44], + b=[13, 14], + zip_on=("c", "d",), + c=[100, 101], + d=[-1, -2, -3] + ) + self.assertEqual( + 3 * 2 * 2, + len(for_instance().df), + msg="Sanity check" + ) + self.assertEqual( + 1, + len(for_instance(a=[0], b=[1], c=[2]).df), + msg="Should be able to re-run with different input lengths" + ) + + def test_column_mapping(self): + @as_function_node() + def FiveApart( + a: int = 0, + b: int = 1, + c: int = 2, + d: int = 3, + e: str = "foobar", + ): + return a, b, c, d, e, + + with self.subTest("Successful map"): + for_instance = for_node( + FiveApart, + iter_on=("a", "b"), + zip_on=("c", "d"), + a=[1, 2], + b=[3, 4, 5], + c=[7, 8], + d=[9, 10, 11], + e="e", + output_column_map={ + "a": "out_a", + "b": "out_b", + "c": "out_c", + "d": "out_d" + } + ) + self.assertEqual( + 4 + 5, # loop inputs + outputs + len(for_instance().df.columns), + msg="When all conflicting names are remapped, we should have no trouble" + ) + + with self.subTest("Insufficient map"): + with self.assertRaises( + UnmappedConflictError, + msg="Leaving conflicting channels unmapped should raise an error" + ): + for_node( + FiveApart, + iter_on=("a", "b"), + zip_on=("c", "d"), + a=[1, 2], + b=[3, 4, 5], + c=[7, 8], + d=[9, 10, 11], + e="e", + output_column_map={ + # "a": "out_a", + "b": "out_b", + "c": "out_c", + "d": "out_d" + } + ) + + with self.subTest("Excessive map"): + with self.assertRaises( + MapsToNonexistentOutputError, + msg="Trying to map something that isn't there should raise an error" + ): + for_node( + FiveApart, + iter_on=("a", "b"), + zip_on=("c", "d"), + a=[1, 2], + b=[3, 4, 5], + c=[7, 8], + d=[9, 10, 11], + e="e", + output_column_map={ + "a": "out_a", + "b": "out_b", + "c": "out_c", + "d": "out_d", + "not_a_key_on_the_body_node_outputs": "anything" + } + ) + + def test_body_node_executor(self): + t_sleep = 2 + for_parallel = for_node( + Sleep, + iter_on=("t",) + ) + t_start = perf_counter() + n_procs = 4 + with ThreadPoolExecutor(max_workers=n_procs) as exe: + for_parallel.body_node_executor = exe + for_parallel(t=n_procs*[t_sleep]) + dt = perf_counter() - t_start + grace = 1.1 + self.assertLess( + dt, + grace * t_sleep, + msg=f"Parallelization over children should result in faster completion. " + f"Expected limit {grace} x {t_sleep} = {grace * t_sleep} -- got {dt}" + ) + + +if __name__ == "__main__": + unittest.main()