Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ A simple dataflow engine with scalable semantics.
The goal of pydra is to provide a lightweight Python dataflow engine for DAG construction, manipulation, and distributed execution.

Feature list:
1. Python 3.7+ using type annotation and dataclasses
1. Python 3.7+ using type annotation and [attrs](https://www.attrs.org/en/stable/)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still use 3.7+ features, or can we relax to 3.6? (I know that at this point 3.7 isn't cutting edge, so maybe it doesn't matter.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my current intent was to get this merged, then do additional trimming/testing/relaxing. i did mention the 3.6 expansion to dorota when we met.

2. Composable dataflows with simple node semantics. A dataflow can be a node of another dataflow.
3. `splitter` and `combiner` provides many ways of compressing complex loop semantics
4. Cached execution with support for a global cache across dataflows and users
Expand Down
1 change: 1 addition & 0 deletions docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
attrs
cloudpickle
filelock
git+https://github.com/AleksandarPetrov/napoleon.git@0dc3f28a309ad602be5f44a9049785a1026451b3#egg=sphinxcontrib-napoleon
Expand Down
1 change: 1 addition & 0 deletions min-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Auto-generated by tools/update_min_requirements.py
attrs
cloudpickle == 0.8.0
filelock == 3.0.0
etelemetry
4 changes: 2 additions & 2 deletions pydra/engine/audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
from pathlib import Path
import json
import dataclasses as dc
import attr
from ..utils.messenger import send_message, make_message, gen_uuid, now, AuditFlag
from .helpers import ensure_list, gather_runtime_info

Expand Down Expand Up @@ -84,7 +84,7 @@ def finalize_audit(self, result):
)
# audit resources/runtime information
self.eid = "uid:{}".format(gen_uuid())
entity = dc.asdict(result.runtime)
entity = attr.asdict(result.runtime)
entity.update(
**{
"@id": self.eid,
Expand Down
66 changes: 44 additions & 22 deletions pydra/engine/core.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Basic processing graph elements."""
import abc
import dataclasses as dc
import attr
import json
import logging
import os
Expand All @@ -15,7 +15,16 @@

from . import state
from . import helpers_state as hlpst
from .specs import File, BaseSpec, RuntimeSpec, Result, SpecInfo, LazyField, TaskHook
from .specs import (
File,
BaseSpec,
RuntimeSpec,
Result,
SpecInfo,
LazyField,
TaskHook,
attr_fields,
)
from .helpers import (
make_klass,
create_checksum,
Expand All @@ -28,6 +37,7 @@
output_from_inputfields,
output_names_from_inputfields,
)
from .helpers_file import copyfile_input, template_update
from .graph import DiGraph
from .audit import Audit
from ..utils.messenger import AuditFlag
Expand Down Expand Up @@ -122,13 +132,15 @@ def __init__(
# todo should be used to input_check in spec??
self.inputs = klass(
**{
f.name: (None if f.default is dc.MISSING else f.default)
for f in dc.fields(klass)
(f.name[1:] if f.name.startswith("_") else f.name): (
None if f.default == attr.NOTHING else f.default
)
for f in attr.fields(klass)
}
)
self.input_names = [
field.name
for field in dc.fields(klass)
for field in attr.fields(klass)
if field.name not in ["_func", "_graph_checksums"]
]
# dictionary to save the connections with lazy fields
Expand All @@ -149,7 +161,7 @@ def __init__(
if self._input_sets is None or inputs not in self._input_sets:
raise ValueError("Unknown input set {!r}".format(inputs))
inputs = self._input_sets[inputs]
self.inputs = dc.replace(self.inputs, **inputs)
self.inputs = attr.evolve(self.inputs, **inputs)
self.inputs.check_metadata()
self.state_inputs = inputs

Expand All @@ -174,7 +186,12 @@ def __getstate__(self):
state = self.__dict__.copy()
state["input_spec"] = cp.dumps(state["input_spec"])
state["output_spec"] = cp.dumps(state["output_spec"])
state["inputs"] = dc.asdict(state["inputs"])
inputs = {}
for k, v in attr.asdict(state["inputs"]).items():
if k.startswith("_"):
k = k[1:]
inputs[k] = v
state["inputs"] = inputs
return state

def __setstate__(self, state):
Expand Down Expand Up @@ -272,7 +289,7 @@ def set_state(self, splitter, combiner=None):
@property
def output_names(self):
"""Get the names of the parameters generated by the task."""
output_spec_names = [f.name for f in dc.fields(make_klass(self.output_spec))]
output_spec_names = [f.name for f in attr.fields(make_klass(self.output_spec))]
from_input_spec_names = output_names_from_inputfields(self.inputs)
return output_spec_names + from_input_spec_names

Expand Down Expand Up @@ -342,7 +359,7 @@ def __call__(self, submitter=None, plugin=None, **kwargs):
return res

def _run(self, **kwargs):
self.inputs = dc.replace(self.inputs, **kwargs)
self.inputs = attr.evolve(self.inputs, **kwargs)
self.inputs.check_fields_input_spec()
checksum = self.checksum
lockfile = self.cache_dir / (checksum + ".lock")
Expand All @@ -359,8 +376,11 @@ def _run(self, **kwargs):
shutil.rmtree(odir)
cwd = os.getcwd()
odir.mkdir(parents=False, exist_ok=True if self.can_resume else False)
self.inputs.copyfile_input(self.output_dir)
self.inputs.template_update()
orig_inputs = attr.asdict(self.inputs)
map_copyfiles = copyfile_input(self.inputs, self.output_dir)
modified_inputs = template_update(self.inputs, map_copyfiles)
if modified_inputs is not None:
self.inputs = attr.evolve(self.inputs, **modified_inputs)
self.audit.start_audit(odir)
result = Result(output=None, runtime=None, errored=False)
self.hooks.pre_run_task(self)
Expand All @@ -376,6 +396,8 @@ def _run(self, **kwargs):
self.hooks.post_run_task(self, result)
self.audit.finalize_audit(result)
save(odir, result=result, task=self)
for k, v in orig_inputs.items():
setattr(self.inputs, k, v)
os.chdir(cwd)
self.hooks.post_run(self, result)
return result
Expand All @@ -384,11 +406,11 @@ def _collect_outputs(self):
run_output = self.output_
self.output_spec = output_from_inputfields(self.output_spec, self.inputs)
output_klass = make_klass(self.output_spec)
output = output_klass(**{f.name: None for f in dc.fields(output_klass)})
output = output_klass(**{f.name: None for f in attr.fields(output_klass)})
other_output = output.collect_additional_outputs(
self.input_spec, self.inputs, self.output_dir
)
return dc.replace(output, **run_output, **other_output)
return attr.evolve(output, **run_output, **other_output)

def split(self, splitter, overwrite=False, **kwargs):
"""
Expand All @@ -410,7 +432,7 @@ def split(self, splitter, overwrite=False, **kwargs):
"if you want to overwrite it - use overwrite=True"
)
if kwargs:
self.inputs = dc.replace(self.inputs, **kwargs)
self.inputs = attr.evolve(self.inputs, **kwargs)
self.state_inputs = kwargs
if not self.state or splitter != self.state.splitter:
self.set_state(splitter)
Expand Down Expand Up @@ -482,7 +504,7 @@ def to_job(self, ind):
# dj might be needed
# el._checksum = None
_, inputs_dict = self.get_input_el(ind)
el.inputs = dc.replace(el.inputs, **inputs_dict)
el.inputs = attr.evolve(el.inputs, **inputs_dict)
return el

@property
Expand Down Expand Up @@ -561,7 +583,7 @@ def result(self, state_index=None):

def _reset(self):
"""Reset the connections between inputs and LazyFields."""
for field in dc.fields(self.inputs):
for field in attr_fields(self.inputs):
if field.name in self.inp_lf:
setattr(self.inputs, field.name, self.inp_lf[field.name])
if is_workflow(self):
Expand Down Expand Up @@ -618,11 +640,11 @@ def __init__(
+ [
(
nm,
ty.Any,
dc.field(
attr.ib(
type=ty.Any,
metadata={
"help_string": f"{nm} input from {name} workflow"
}
},
),
)
for nm in input_spec
Expand Down Expand Up @@ -716,7 +738,7 @@ def create_connections(self, task):

"""
other_states = {}
for field in dc.fields(task.inputs):
for field in attr_fields(task.inputs):
val = getattr(task.inputs, field.name)
if isinstance(val, LazyField):
# saving all connections with LazyFields
Expand Down Expand Up @@ -826,14 +848,14 @@ def set_output(self, connections):

def _collect_outputs(self):
output_klass = make_klass(self.output_spec)
output = output_klass(**{f.name: None for f in dc.fields(output_klass)})
output = output_klass(**{f.name: None for f in attr.fields(output_klass)})
# collecting outputs from tasks
output_wf = {}
for name, val in self._connections:
if not isinstance(val, LazyField):
raise ValueError("all connections must be lazy")
output_wf[name] = val.get_value(self)
return dc.replace(output, **output_wf)
return attr.evolve(output, **output_wf)


def is_task(obj):
Expand Down
38 changes: 27 additions & 11 deletions pydra/engine/helpers.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
"""Administrative support for the engine framework."""
import asyncio
import asyncio.subprocess as asp
import dataclasses as dc
import attr
import cloudpickle as cp
from pathlib import Path
import os
import sys
from hashlib import sha256
import subprocess as sp

from .specs import Runtime, File
from .specs import Runtime, File, attr_fields


def ensure_list(obj, tuple2list=False):
Expand Down Expand Up @@ -45,21 +45,21 @@ def print_help(obj):
"""Visit a task object and print its input/output interface."""
lines = ["Help for {}".format(obj.__class__.__name__)]
input_klass = make_klass(obj.input_spec)
if dc.fields(input_klass):
if attr.fields(input_klass):
lines += ["Input Parameters:"]
for f in dc.fields(input_klass):
for f in attr.fields(input_klass):
default = ""
if f.default is not dc.MISSING and not f.name.startswith("_"):
if f.default != attr.NOTHING and not f.name.startswith("_"):
default = " (default: {})".format(f.default)
try:
name = f.type.__name__
except AttributeError:
name = str(f.type)
lines += ["- {}: {}{}".format(f.name, name, default)]
output_klass = make_klass(obj.output_spec)
if dc.fields(output_klass):
if attr.fields(output_klass):
lines += ["Output Parameters:"]
for f in dc.fields(output_klass):
for f in attr.fields(output_klass):
try:
name = f.type.__name__
except AttributeError:
Expand Down Expand Up @@ -183,7 +183,23 @@ def make_klass(spec):
"""
if spec is None:
return None
return dc.make_dataclass(spec.name, spec.fields, bases=spec.bases)
fields = spec.fields
if fields:
newfields = dict()
for item in fields:
if len(item) == 2:
if isinstance(item[1], attr._make._CountingAttr):
newfields[item[0]] = item[1]
else:
newfields[item[0]] = attr.ib(type=item[1])
else:
if isinstance(item[2], attr._make._CountingAttr):
raise ValueError("Three part should not have attr")
# newfields[item[0]] = item[2]
else:
newfields[item[0]] = attr.ib(item[2], type=item[1])
fields = newfields
return attr.make_class(spec.name, fields, bases=spec.bases, kw_only=True)


async def read_stream_and_display(stream, display):
Expand Down Expand Up @@ -391,7 +407,7 @@ def output_names_from_inputfields(inputs):

"""
output_names = []
for fld in dc.fields(inputs):
for fld in attr_fields(inputs):
if "output_file_template" in fld.metadata:
if "output_field_name" in fld.metadata:
field_name = fld.metadata["output_field_name"]
Expand All @@ -413,14 +429,14 @@ def output_from_inputfields(output_spec, inputs):
TODO

"""
for fld in dc.fields(inputs):
for fld in attr_fields(inputs):
if "output_file_template" in fld.metadata:
value = getattr(inputs, fld.name)
if "output_field_name" in fld.metadata:
field_name = fld.metadata["output_field_name"]
else:
field_name = fld.name
output_spec.fields.append(
(field_name, File, dc.field(metadata={"value": value}))
(field_name, attr.ib(type=File, metadata={"value": value}))
)
return output_spec
Loading