Skip to content

Commit

Permalink
Merge branch 'main' into feat_sshtunnel
Browse files Browse the repository at this point in the history
  • Loading branch information
utf committed Dec 5, 2023
2 parents e312d97 + 43f8463 commit e8fa70c
Show file tree
Hide file tree
Showing 14 changed files with 324 additions and 169 deletions.
265 changes: 198 additions & 67 deletions docs/tutorials/5-dynamic-flows.ipynb

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion paper/paper.md
Expand Up @@ -117,7 +117,7 @@ responses = run_locally(flow)

## Dynamic Workflows

Beyond the typical acyclic graph of jobs, Jobflow fully supports dynamic workflows where the precise number of jobs is unknown until runtime. This is a particularly common requirement in chemistry and materials science workflows and is made possible through the use of a `Response` object that controls the flow execution. For instance, the example below is a `Flow` that will add two numbers (`1 + 2`), construct a list of random length containing the prior result (e.g. `[3, 3, 3]`), and then add an integer to each element of the list (`[3 + 10, 3 + 10, 3 + 10]`). The `Response(replace=Flow(jobs))` syntax tells Jobflow to replace the current `Job` with a (sub)workflow after the `Job` completes.
Beyond the typical acyclic graph of jobs, Jobflow fully supports dynamic workflows where the precise number of jobs is unknown until runtime. This is a particularly common requirement in chemistry and materials science workflows and is made possible through the use of a `Response` object that controls the flow execution. For instance, the example below is a `Flow` that will add two numbers (`1 + 2`), construct a list of random length containing the prior result (e.g., `[3, 3, 3]`), and then add an integer to each element of the list (`[3 + 10, 3 + 10, 3 + 10]`). The `Response(replace=Flow(jobs))` syntax tells Jobflow to replace the current `Job` with a (sub)workflow after the `Job` completes.

![](figure2.png)

Expand Down
61 changes: 46 additions & 15 deletions paper/refs.bib
Expand Up @@ -3,15 +3,17 @@ @misc{atomate2
year = {2023},
url = {https://github.com/materialsproject/atomate2}
}
@misc{quacc,
doi = {10.5281/zenodo.7720998},
title = {Quacc -- The Quantum Accelerator},
year = {2023},
url = {https://github.com/Quantum-Accelerators/quacc}
@software{quacc,
author = {Rosen, Andrew S},
title = {quacc – The Quantum Accelerator},
year = {2023},
publisher = {Zenodo},
doi = {10.5281/zenodo.7720998},
url = {https://doi.org/10.5281/zenodo.7720998}
}
@article{materialsproject,
title={Commentary: The Materials Project: A materials genome approach to accelerating materials innovation},
author={Jain, Anubhav and Ong, Shyue Ping and Hautier, Geoffroy and Chen, Wei and Richards, William Davidson and Dacek, Stephen and Cholia, Shreyas and Gunter, Dan and Skinner, David and Ceder, Gerbrand and others},
author={Jain, Anubhav and Ong, Shyue Ping and Hautier, Geoffroy and Chen, Wei and Richards, William Davidson and Dacek, Stephen and Cholia, Shreyas and Gunter, Dan and Skinner, David and Ceder, Gerbrand and Persson, Kristin A},
journal={APL materials},
volume={1},
number={1},
Expand Down Expand Up @@ -57,7 +59,7 @@ @misc{nptools
}
@article{fireworks,
title={FireWorks: a dynamic workflow system designed for high-throughput applications},
author={Jain, Anubhav and Ong, Shyue Ping and Chen, Wei and Medasani, Bharat and Qu, Xiaohui and Kocher, Michael and Brafman, Miriam and Petretto, Guido and Rignanese, Gian-Marco and Hautier, Geoffroy and others},
author={Jain, Anubhav and Ong, Shyue Ping and Chen, Wei and Medasani, Bharat and Qu, Xiaohui and Kocher, Michael and Brafman, Miriam and Petretto, Guido and Rignanese, Gian-Marco and Hautier, Geoffroy and Gunter, Daniel and Persson, Kristin A},
journal={Concurrency and Computation: Practice and Experience},
volume={27},
number={17},
Expand All @@ -68,7 +70,7 @@ @article{fireworks
}
@article{da2023workflows,
title={Workflows Community Summit 2022: A Roadmap Revolution},
author={da Silva, Rafael Ferreira and Badia, Rosa M and Bala, Venkat and Bard, Debbie and Bremer, Peer-Timo and Buckley, Ian and Caino-Lores, Silvina and Chard, Kyle and Goble, Carole and Jha, Shantenu and others},
author={Da Silva, Rafael Ferreira and Badia, Rosa M. and Bala, Venkat and Bard, Debbie and Bremer, Peer-Timo and Buckley, Ian and Caino-Lores, Silvina and Chard, Kyle and Goble, Carole and Jha, Shantenu and Katz, Daniel S. and Laney, Daniel and Parashar, Manish and Suter, Frederic and Tyler, Nick and Uram, Thomas and Altintas, Ilkay and Andersson, Stefan and Arndt, William and Aznar, Juan and Bader, Jonathan and Balis, Bartosz and Blanton, Chris and Braghetto, Kelly Rosa and Brodutch, Aharon and Brunk, Paul and Casanova, Henri and Lierta, Alba Cervera and Chigu, Justin and Coleman, Taina and Collier, Nick and Colonnelli, Iacopo and Coppens, Frederik and Crusoe, Michael and Cunningham, Will and De Paula Kinoshita, Bruno and Di Tommaso, Paolo and Doutriaux, Charles and Downton, Matthew and Elwasif, Wael and Enders, Bjoern and Erdmann, Chris and Fahringer, Thomas and Figueiredo, Ludmilla and Filgueira, Rosa and Foltin, Martin and Fouilloux, Anne and Gadelha, Luiz and Gallo, Andy and Saez, Artur Garcia and Garijo, Daniel and Gerlach, Roman and Grant, Ryan and Grayson, Samuel and Grubel, Patricia and Gustafsson, Johan and Hayot-Sasson, Valerie and Hernandez, Oscar and Hilbrich, Marcus and Justine, AnnMary and Laflotte, Ian and Lehmann, Fabian and Luckow, Andre and Luettgau, Jakob and Maheshwari, Ketan and Matsuda, Motohiko and Medic, Doriana and Mendygral, Pete and Michalewicz, Marek and Nonaka, Jorji and Pawlik, Maciej and Pottier, Loic and Pouchard, Line and Putz, Mathias and Radha, Santosh Kumar and Ramakrishnan, Lavanya and Ristov, Sashko and Romano, Paul and Rosendo, Daniel and Ruefenacht, Martin and Rycerz, Katarzyna and Saurabh, Nishant and Savchenko, Volodymyr and Schulz, Martin and Simpson, Christine and Sirvent, Raul and Skluzacek, Tyler and Soiland-Reyes, Stian and Souza, Renan and Sukumar, Sreenivas Rangan and Sun, Ziheng and Sussman, Alan and Thain, Douglas and Titov, Mikhail and Tovar, Benjamin and Tripathy, Aalap and Turilli, Matteo and Tuznik, Bartosz and Van Dam, Hubertus and Vivas, Aurelio and Ward, Logan and Widener, Patrick and Wilkinson, Sean and Zawalska, Justyna and Zulfiqar, Mahnoor},
journal={arXiv preprint arXiv:2304.00019},
year={2023}
}
Expand All @@ -88,7 +90,7 @@ @misc{wflowsystems
}
@inproceedings{al2021exaworks,
title={Exaworks: Workflows for exascale},
author={Al-Saadi, Aymen and Ahn, Dong H and Babuji, Yadu and Chard, Kyle and Corbett, James and Hategan, Mihael and Herbein, Stephen and Jha, Shantenu and Laney, Daniel and Merzky, Andre and others},
author={Al-Saadi, Aymen and Ahn, Dong H and Babuji, Yadu and Chard, Kyle and Corbett, James and Hategan, Mihael and Herbein, Stephen and Jha, Shantenu and Laney, Daniel and Merzky, Andre and Munson, Todd and Salim, Michael and Titov, Mikhail and Uram, Thomas D and Wozniak, Justin M},
booktitle={2021 IEEE Workshop on Workflows in Support of Large-Scale Science (WORKS)},
pages={50--57},
year={2021},
Expand All @@ -97,7 +99,7 @@ @inproceedings{al2021exaworks
}
@inproceedings{babuji2019parsl,
title={Parsl: Pervasive parallel programming in python},
author={Babuji, Yadu and Woodard, Anna and Li, Zhuozhao and Katz, Daniel S and Clifford, Ben and Kumar, Rohan and Lacinski, Lukasz and Chard, Ryan and Wozniak, Justin M and Foster, Ian and others},
author={Babuji, Yadu and Woodard, Anna and Li, Zhuozhao and Katz, Daniel S and Clifford, Ben and Kumar, Rohan and Lacinski, Lukasz and Chard, Ryan and Wozniak, Justin M and Foster, Ian and Wilde, Michael and Chard, Kyle},
booktitle={Proceedings of the 28th International Symposium on High-Performance Parallel and Distributed Computing},
pages={25--36},
year={2019}
Expand All @@ -112,11 +114,40 @@ @misc{redun
year = {2023},
url={https://github.com/insitro/redun}
}
@misc{covalent,
title={Covalent},
year = {2023},
doi={10.5281/zenodo.5903364},
url={https://github.com/AgnostiqHQ/covalent}
@software{covalent,
author = {Cunningham, Will
Esquivel, Alejandro and
Jao, Casey and
Hasan, Faiyaz and
Bala, Venkat and
Sanand, Sankalp and
Venkatesh, Prasanna and
Tandon, Madhur and
Emmanuel Ochia, Okechukwu and
Rosen, Andrew S and
dwelsch-esi and
jkanem and
Aravind and
HaimHorowitzAgnostiq and
Li, Ruihao and
Neagle, Scott Wyman and
valkostadinov and
Ghukasyan, Ara and
Rao, Poojith U and
Dutta, Sayandip and
WingCode and
Hughes, Anna and
RaviPsiog and
Udayan and
Akalanka and
Obasi, Amara and
Singh, Divyanshu and
FilipBolt},
title = {Covalent},
year = {2023},
publisher = {Zenodo},
doi = {10.5281/zenodo.5903364},
url = {https://doi.org/10.5281/zenodo.5903364}
}
@misc{maggma,
title={Maggma},
Expand Down
14 changes: 7 additions & 7 deletions pyproject.toml
Expand Up @@ -39,26 +39,26 @@ dependencies = [
docs = [
"autodoc_pydantic==2.0.1",
"furo==2023.9.10",
"ipython==8.17.2",
"ipython==8.18.1",
"myst_parser==2.0.0",
"nbsphinx==0.9.3",
"sphinx-copybutton==0.5.2",
"sphinx==7.2.6",
]
dev = ["pre-commit>=2.12.1"]
tests = ["moto==4.2.7", "pytest-cov==4.1.0", "pytest==7.4.3"]
tests = ["moto==4.2.11", "pytest-cov==4.1.0", "pytest==7.4.3"]
vis = ["matplotlib", "pydot"]
fireworks = ["FireWorks"]
strict = [
"FireWorks==2.0.3",
"PyYAML==6.0.1",
"maggma==0.57.5",
"matplotlib==3.8.1",
"maggma==0.58.0",
"matplotlib==3.8.2",
"monty==2023.11.3",
"moto==4.2.7",
"moto==4.2.11",
"networkx==3.2.1",
"pydantic-settings==2.0.3",
"pydantic==2.4.2",
"pydantic-settings==2.1.0",
"pydantic==2.5.2",
"pydash==7.0.6",
"pydot==1.4.2",
"typing-extensions==4.8.0",
Expand Down
3 changes: 2 additions & 1 deletion src/jobflow/core/flow.py
Expand Up @@ -794,7 +794,8 @@ def add_jobs(self, jobs: Job | Flow | Sequence[Flow | Job]) -> None:
f"current Flow ({self.uuid})"
)
job_ids.add(job.uuid)
job.add_hosts_uuids(hosts)
if job.host != self.uuid:
job.add_hosts_uuids(hosts)
self._jobs += tuple(jobs)

def remove_jobs(self, indices: int | list[int]):
Expand Down
2 changes: 1 addition & 1 deletion src/jobflow/core/job.py
Expand Up @@ -10,7 +10,6 @@
from monty.json import MSONable, jsanitize

from jobflow.core.reference import OnMissing, OutputReference
from jobflow.schemas.job_output_schema import JobStoreDocument
from jobflow.utils.uuid import suuid

if typing.TYPE_CHECKING:
Expand Down Expand Up @@ -560,6 +559,7 @@ def run(self, store: jobflow.JobStore) -> Response:

from jobflow import CURRENT_JOB
from jobflow.core.flow import get_flow
from jobflow.core.schemas import JobStoreDocument

index_str = f", {self.index}" if self.index != 1 else ""
logger.info(f"Starting job - {self.name} ({self.uuid}{index_str})")
Expand Down
6 changes: 5 additions & 1 deletion src/jobflow/core/reference.py
Expand Up @@ -177,7 +177,11 @@ def resolve(

for attr_type, attr in self.attributes:
# i means index else use attribute access
data = data[attr] if attr_type == "i" else getattr(data, attr)
data = (
data[attr]
if attr_type == "i" or isinstance(data, dict)
else getattr(data, attr)
)

return data

Expand Down
34 changes: 34 additions & 0 deletions src/jobflow/core/schemas.py
@@ -0,0 +1,34 @@
"""A Pydantic model for Jobstore document."""

from typing import Any

from pydantic import BaseModel, Field


class JobStoreDocument(BaseModel):
"""A Pydantic model for Jobstore document."""

uuid: str = Field(
None, description="An unique identifier for the job. Generated automatically."
)
index: int = Field(
None,
description="The index of the job (number of times the job has been replaced).",
)
output: Any = Field(
None,
description="This is a reference to the future job output.",
)
completed_at: str = Field(None, description="The time the job was completed.")
metadata: dict = Field(
None,
description="Metadata information supplied by the user.",
)
hosts: list[str] = Field(
None,
description="The list of UUIDs of the hosts containing the job.",
)
name: str = Field(
None,
description="The name of the job.",
)
2 changes: 1 addition & 1 deletion src/jobflow/core/store.py
Expand Up @@ -18,7 +18,7 @@

from maggma.core import Sort

from jobflow.schemas.job_output_schema import JobStoreDocument
from jobflow.core.schemas import JobStoreDocument

obj_type = Union[str, Enum, type[MSONable], list[Union[Enum, str, type[MSONable]]]]
save_type = Optional[dict[str, obj_type]]
Expand Down
27 changes: 17 additions & 10 deletions src/jobflow/managers/local.py
Expand Up @@ -6,8 +6,9 @@
import typing

if typing.TYPE_CHECKING:
import jobflow
from pathlib import Path

import jobflow

logger = logging.getLogger(__name__)

Expand All @@ -17,6 +18,7 @@ def run_locally(
log: bool = True,
store: jobflow.JobStore = None,
create_folders: bool = False,
root_dir: str | Path | None = None,
ensure_success: bool = False,
allow_external_references: bool = False,
) -> dict[str, dict[int, jobflow.Response]]:
Expand All @@ -25,25 +27,29 @@ def run_locally(
Parameters
----------
flow
flow : Flow | Job | list[Job]
A job or flow.
log
log : bool
Whether to print log messages.
store
store : JobStore
A job store. If a job store is not specified then
:obj:`JobflowSettings.JOB_STORE` will be used. By default this is a maggma
``MemoryStore`` but can be customised by setting the jobflow configuration file.
create_folders
create_folders : bool
Whether to run each job in a new folder.
ensure_success
root_dir : str | Path | None
The root directory to run the jobs in or where to create new subfolders if
``create_folders`` is True. If None then the current working
directory will be used.
ensure_success : bool
Raise an error if the flow was not executed successfully.
allow_external_references
allow_external_references : bool
If False all the references to other outputs should be from other Jobs
of the Flow.
Returns
-------
Dict[str, Dict[int, Response]]
dict[str, dict[int, Response]]
The responses of the jobs, as a dict of ``{uuid: {index: response}}``.
"""
from collections import defaultdict
Expand All @@ -60,6 +66,9 @@ def run_locally(
if store is None:
store = SETTINGS.JOB_STORE

root_dir = Path.cwd() if root_dir is None else Path(root_dir).resolve()
root_dir.mkdir(exist_ok=True)

store.connect()

if log:
Expand All @@ -72,8 +81,6 @@ def run_locally(
responses: dict[str, dict[int, jobflow.Response]] = defaultdict(dict)
stop_jobflow = False

root_dir = Path.cwd()

def _run_job(job: jobflow.Job, parents):
nonlocal stop_jobflow

Expand Down
63 changes: 0 additions & 63 deletions src/jobflow/schemas/job_output_schema.py

This file was deleted.

1 change: 1 addition & 0 deletions tests/core/test_flow.py
Expand Up @@ -497,6 +497,7 @@ def test_serialization():
decoded_flow = MontyDecoder().process_decoded(encoded_flow)

assert decoded_flow[0].host == host_uuid
assert flow_host.jobs[0].hosts == decoded_flow.jobs[0].hosts


def test_update_kwargs():
Expand Down
Expand Up @@ -5,7 +5,7 @@

@pytest.fixture()
def sample_data():
from jobflow.schemas.job_output_schema import JobStoreDocument
from jobflow.core.schemas import JobStoreDocument

return JobStoreDocument(
uuid="abc123",
Expand Down Expand Up @@ -33,7 +33,7 @@ def test_job_store_document_model(sample_data):

def test_job_store_update(memory_jobstore, sample_data):
# Storing document as a JobStoreDocument
from jobflow.schemas.job_output_schema import JobStoreDocument
from jobflow.core.schemas import JobStoreDocument

d = {
"index": 1,
Expand Down

0 comments on commit e8fa70c

Please sign in to comment.