Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add and test wrappers for sticking nodes in a pyiron job #189

Merged
merged 5 commits into from
Feb 15, 2024

Conversation

liamhuber
Copy link
Member

@liamhuber liamhuber commented Feb 1, 2024

In the long run we want to have queue submission fully integrated with nodes; in the short term, we can get that by making a pyiron_base job out of a node. This isn't a permanent solution as it doesn't allow, e.g., sending just one node from a graph off to the queue, only entire graphs (even if they're just one node themself).

Here I introduce two solutions: one defines a new subclass of TemplateJob and relies on the node's own storage capabilities to (de)serialize itself (from)to HDF inside the job's working directory. The only true input on the job is a string saying what storage backend the node should use ("h5io"/"tinybase", per #160); then the user supplies a node instance to the job's .node attribute. The executed node can be found in the same place on the job on reloading.

The other exploits the new pyiron_base.Project.wrap_python_function, which takes a node in job.input["node"] and returns the executed version in job.output["result"] using the wrapper's cloudpickle and ignoring the nodes own storage.

Here's an example of the first approach copied from the docstring:

        >>> from pyiron_base import Project
        >>> from pyiron_workflow import Workflow
        >>> import pyiron_workflow.job  # To get the job registered in JOB_CLASS_DICT
        >>> 
        >>> @Workflow.wrap_as.single_value_node("t")
        ... def Sleep(t):
        ...     from time import sleep
        ...     sleep(t)
        ...     return t
        >>> 
        >>> wf = Workflow("pyiron_node", overwrite_save=True)
        >>> wf.sleep = Sleep(0)
        >>> wf.out = wf.create.standard.UserInput(wf.sleep)
        >>> 
        >>> pr = Project("test")
        >>> 
        >>> nj = pr.create.job.NodeJob("my_node")
        >>> nj.node = wf
        >>> nj.run()
        >>> print(nj.node.outputs.to_value_dict())
        {'out__user_input': 0}

        >>> lj = pr.load(nj.job_name)
        >>> print(nj.node.outputs.to_value_dict())
        {'out__user_input': 0}

        >>> pr.remove_jobs(recursive=True, silently=True)
        >>> pr.remove(enable=True)

The docstrings and tests are relatively thorough, but I'm not keen to document it in the tutorial notebooks or expose the functionality directly on the Workflow class yet -- I'd rather if some people at MPIE played around with it a bit and let me know what would be helpful, which interface they prefer, etc.

The one big constraint in this PR is that the nodes will still only run on a single core, even when shipped off to a queue. This is because the node's .executor attribute is (so far) None or an instance of an executor -- and the later can't be easily serialized. @jan-janssen is working on this for the underlying PythonFunctionContainerJob, but we may also be able to solve it independently for the NodeJob here using the same attack -- namely, supplying information on how to instantiate a fresh executor on the far side instead of passing along a live instance. We already have the infrastructure in place to do this in the Node._parse_executor method, it's simply a matter of populating that method with the logic to parse a class name and args/kwargs for the new executor.

@ligerzero-ai, IIRC you were going to look at running multi-core Vasp jobs in a workflow? This should let you do it on the queue, so basically you just need to write parser nodes, and a shell node (preferably a generic one, and a more specific one that executes VASP in particular), then put them all together in a macro. @JNmpi has examples of these things for other codes over on his branch (#33), so we're not too far off. I don't expect you to do anything with this immediately, I just want to stay mutually well informed so we don't wind up doubling up on work. If you do want to work on it, maybe you could first/also open a PR to extend _parse_executors?

Note that this PR depends on pyiron_contrib and h5io code that isn't merged to main yet, much less released, so none of it will run unless you clone those branches locally. This is also why the CI tests all fail.

Copy link

github-actions bot commented Feb 1, 2024

Binder 👈 Launch a binder notebook on branch pyiron/pyiron_workflow/as_pyiron_job

@liamhuber liamhuber marked this pull request as ready for review February 1, 2024 19:24
@ligerzero-ai
Copy link

Cheers, I'll take a careful look over the next week. If it's urgent (stopping other work), feel free to merge w/o review, we can always revert :)

@liamhuber
Copy link
Member Author

Cheers, I'll take a careful look over the next week. If it's urgent (stopping other work), feel free to merge w/o review, we can always revert :)

Super! No, this is pretty standalone so I should be fine leaving it open for O(week).

@niklassiemer niklassiemer self-assigned this Feb 4, 2024
Saving now requires that the saved nodes be importable, which would be the case if you copied and pasted the example code into a notebook, but is not the case in the dynamic place doctest is running things. So just don't define a custom "Sleep" node here, instead use a standard node.
@liamhuber liamhuber added the format_black trigger the Black formatting bot label Feb 7, 2024
Comment on lines +107 to +114
elif self._node_working_directory_already_there(new_node):
self.raise_working_directory_error()
else:
self._node = new_node
self.input._class_type = (
f"{new_node.__class__.__module__}." f"{new_node.__class__.__name__}"
)
self.input._label = new_node.label

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[not a blocker for this PR]

my main concern/feedback is that there are many cases where a user may not necessarily want to create a new folder for every single step of their workflow. Am I correct in thinking that this will raise an error if this reuses a working dir from a node? In a lot of cases, a workflow will fail at some point, and then will it not be extremely clunky to reuse early-computations from the earlier successful workflow steps.

This is mostly a philosophical thing, and need not be addressed in this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a user may not necessarily want to create a new folder for every single step of their workflow.

In this case each NodeJob is related 1:1 with a Node; that may be a single Function node, or a whole complex Workflow. In this respect, running a NodeJob is guaranteed to make at least one directory (for the job) and populate at least one file (the node's HDF file). So far we're no different from a regular pyiron job in this respect. If the Node is a graph and some of its children produce files, they'll be nested inside the job's directory with a directory structure that mirrors the graph topology; this is normal for the Composite nodes.

Am I correct in thinking that this will raise an error if this reuses a working dir from a node?

What it's doing is making sure that the node is both fresh (not reloaded) and that it hasn't peeked at its working directory -- which would set it to ./{node_label}, when we want to force it to be nested inside the job ./{job_name}/{node_label}. So a workflow that has been saved previously will surely fail, some nodes may be able to be assigned post-execution, but not if they already peaked at their nodes, e.g. if it's some macro running Lammps and the Lammps node has written IO files to the node's directory.

In a lot of cases, a workflow will fail at some point, and then will it not be extremely clunky to reuse early-computations from the earlier successful workflow steps.

This is mostly a philosophical thing, and need not be addressed in this PR.

These last two I'll take together. Yes. Yep, definitely still clunky. In general the current implementation of file storage is not yet prepared for restarting from failed nodes -- it's a run-and-done feature right now. I agree the issue shouldn't be blocking as right now I'd prioritize getting some version of this capability running to build familiarity with the pain points, but I am fully on board with making sure we get philosophical regularly as we go along!

Ultimately I see wrapping the nodes in a job as an intermediate step for database and queue-access functionality. Ideally, I'd like to see each node be able to make these connections individually, e.g. so you could have some Workflow where only the Vasp node got shipped off to the queue.

This would involve more sophisticated treatment of the storage, e.g. having remotely run nodes serialize to their own HDF5 file, then merging this into the main root-most node's HDF5 file at reload time. Some more thoughts on this are here.

Similarly instead of re-running a node, a key feature for recovering partially executed graphs is to re-load instead of re-run output -- i.e. we should be checking if there is saved data, if the current input matches the saved input, and if so just reload the old output instead of re-running. Discussed here but also not implemented yet.

@ligerzero-ai
Copy link

Some minor feedback - but otherwise lgtm. Let's get this merged :)

@liamhuber liamhuber self-assigned this Feb 15, 2024
@liamhuber liamhuber merged commit 92a0b17 into simple_storage Feb 15, 2024
2 of 12 checks passed
@liamhuber liamhuber deleted the as_pyiron_job branch February 15, 2024 22:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
format_black trigger the Black formatting bot
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants