diff --git a/README.md b/README.md index 65df77347b..0e4d5def64 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ A simple dataflow engine with scalable semantics. The goal of pydra is to provide a lightweight Python dataflow engine for DAG construction, manipulation, and distributed execution. Feature list: -1. Python 3.7+ using type annotation and dataclasses +1. Python 3.7+ using type annotation and [attrs](https://www.attrs.org/en/stable/) 2. Composable dataflows with simple node semantics. A dataflow can be a node of another dataflow. 3. `splitter` and `combiner` provides many ways of compressing complex loop semantics 4. Cached execution with support for a global cache across dataflows and users diff --git a/docs/requirements.txt b/docs/requirements.txt index c37663bce8..ca468ebf9a 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -1,3 +1,4 @@ +attrs cloudpickle filelock git+https://github.com/AleksandarPetrov/napoleon.git@0dc3f28a309ad602be5f44a9049785a1026451b3#egg=sphinxcontrib-napoleon diff --git a/min-requirements.txt b/min-requirements.txt index 1a2a645d22..babf634dce 100644 --- a/min-requirements.txt +++ b/min-requirements.txt @@ -1,4 +1,5 @@ # Auto-generated by tools/update_min_requirements.py +attrs cloudpickle == 0.8.0 filelock == 3.0.0 etelemetry diff --git a/pydra/engine/audit.py b/pydra/engine/audit.py index 72ea8a12b5..1b9561c270 100644 --- a/pydra/engine/audit.py +++ b/pydra/engine/audit.py @@ -2,7 +2,7 @@ import os from pathlib import Path import json -import dataclasses as dc +import attr from ..utils.messenger import send_message, make_message, gen_uuid, now, AuditFlag from .helpers import ensure_list, gather_runtime_info @@ -84,7 +84,7 @@ def finalize_audit(self, result): ) # audit resources/runtime information self.eid = "uid:{}".format(gen_uuid()) - entity = dc.asdict(result.runtime) + entity = attr.asdict(result.runtime) entity.update( **{ "@id": self.eid, diff --git a/pydra/engine/core.py b/pydra/engine/core.py index 5421e011c2..111113966b 100644 --- a/pydra/engine/core.py +++ b/pydra/engine/core.py @@ -1,6 +1,6 @@ """Basic processing graph elements.""" import abc -import dataclasses as dc +import attr import json import logging import os @@ -15,7 +15,16 @@ from . import state from . import helpers_state as hlpst -from .specs import File, BaseSpec, RuntimeSpec, Result, SpecInfo, LazyField, TaskHook +from .specs import ( + File, + BaseSpec, + RuntimeSpec, + Result, + SpecInfo, + LazyField, + TaskHook, + attr_fields, +) from .helpers import ( make_klass, create_checksum, @@ -28,6 +37,7 @@ output_from_inputfields, output_names_from_inputfields, ) +from .helpers_file import copyfile_input, template_update from .graph import DiGraph from .audit import Audit from ..utils.messenger import AuditFlag @@ -122,13 +132,15 @@ def __init__( # todo should be used to input_check in spec?? self.inputs = klass( **{ - f.name: (None if f.default is dc.MISSING else f.default) - for f in dc.fields(klass) + (f.name[1:] if f.name.startswith("_") else f.name): ( + None if f.default == attr.NOTHING else f.default + ) + for f in attr.fields(klass) } ) self.input_names = [ field.name - for field in dc.fields(klass) + for field in attr.fields(klass) if field.name not in ["_func", "_graph_checksums"] ] # dictionary to save the connections with lazy fields @@ -149,7 +161,7 @@ def __init__( if self._input_sets is None or inputs not in self._input_sets: raise ValueError("Unknown input set {!r}".format(inputs)) inputs = self._input_sets[inputs] - self.inputs = dc.replace(self.inputs, **inputs) + self.inputs = attr.evolve(self.inputs, **inputs) self.inputs.check_metadata() self.state_inputs = inputs @@ -174,7 +186,12 @@ def __getstate__(self): state = self.__dict__.copy() state["input_spec"] = cp.dumps(state["input_spec"]) state["output_spec"] = cp.dumps(state["output_spec"]) - state["inputs"] = dc.asdict(state["inputs"]) + inputs = {} + for k, v in attr.asdict(state["inputs"]).items(): + if k.startswith("_"): + k = k[1:] + inputs[k] = v + state["inputs"] = inputs return state def __setstate__(self, state): @@ -272,7 +289,7 @@ def set_state(self, splitter, combiner=None): @property def output_names(self): """Get the names of the parameters generated by the task.""" - output_spec_names = [f.name for f in dc.fields(make_klass(self.output_spec))] + output_spec_names = [f.name for f in attr.fields(make_klass(self.output_spec))] from_input_spec_names = output_names_from_inputfields(self.inputs) return output_spec_names + from_input_spec_names @@ -342,7 +359,7 @@ def __call__(self, submitter=None, plugin=None, **kwargs): return res def _run(self, **kwargs): - self.inputs = dc.replace(self.inputs, **kwargs) + self.inputs = attr.evolve(self.inputs, **kwargs) self.inputs.check_fields_input_spec() checksum = self.checksum lockfile = self.cache_dir / (checksum + ".lock") @@ -359,8 +376,11 @@ def _run(self, **kwargs): shutil.rmtree(odir) cwd = os.getcwd() odir.mkdir(parents=False, exist_ok=True if self.can_resume else False) - self.inputs.copyfile_input(self.output_dir) - self.inputs.template_update() + orig_inputs = attr.asdict(self.inputs) + map_copyfiles = copyfile_input(self.inputs, self.output_dir) + modified_inputs = template_update(self.inputs, map_copyfiles) + if modified_inputs is not None: + self.inputs = attr.evolve(self.inputs, **modified_inputs) self.audit.start_audit(odir) result = Result(output=None, runtime=None, errored=False) self.hooks.pre_run_task(self) @@ -376,6 +396,8 @@ def _run(self, **kwargs): self.hooks.post_run_task(self, result) self.audit.finalize_audit(result) save(odir, result=result, task=self) + for k, v in orig_inputs.items(): + setattr(self.inputs, k, v) os.chdir(cwd) self.hooks.post_run(self, result) return result @@ -384,11 +406,11 @@ def _collect_outputs(self): run_output = self.output_ self.output_spec = output_from_inputfields(self.output_spec, self.inputs) output_klass = make_klass(self.output_spec) - output = output_klass(**{f.name: None for f in dc.fields(output_klass)}) + output = output_klass(**{f.name: None for f in attr.fields(output_klass)}) other_output = output.collect_additional_outputs( self.input_spec, self.inputs, self.output_dir ) - return dc.replace(output, **run_output, **other_output) + return attr.evolve(output, **run_output, **other_output) def split(self, splitter, overwrite=False, **kwargs): """ @@ -410,7 +432,7 @@ def split(self, splitter, overwrite=False, **kwargs): "if you want to overwrite it - use overwrite=True" ) if kwargs: - self.inputs = dc.replace(self.inputs, **kwargs) + self.inputs = attr.evolve(self.inputs, **kwargs) self.state_inputs = kwargs if not self.state or splitter != self.state.splitter: self.set_state(splitter) @@ -482,7 +504,7 @@ def to_job(self, ind): # dj might be needed # el._checksum = None _, inputs_dict = self.get_input_el(ind) - el.inputs = dc.replace(el.inputs, **inputs_dict) + el.inputs = attr.evolve(el.inputs, **inputs_dict) return el @property @@ -561,7 +583,7 @@ def result(self, state_index=None): def _reset(self): """Reset the connections between inputs and LazyFields.""" - for field in dc.fields(self.inputs): + for field in attr_fields(self.inputs): if field.name in self.inp_lf: setattr(self.inputs, field.name, self.inp_lf[field.name]) if is_workflow(self): @@ -618,11 +640,11 @@ def __init__( + [ ( nm, - ty.Any, - dc.field( + attr.ib( + type=ty.Any, metadata={ "help_string": f"{nm} input from {name} workflow" - } + }, ), ) for nm in input_spec @@ -716,7 +738,7 @@ def create_connections(self, task): """ other_states = {} - for field in dc.fields(task.inputs): + for field in attr_fields(task.inputs): val = getattr(task.inputs, field.name) if isinstance(val, LazyField): # saving all connections with LazyFields @@ -826,14 +848,14 @@ def set_output(self, connections): def _collect_outputs(self): output_klass = make_klass(self.output_spec) - output = output_klass(**{f.name: None for f in dc.fields(output_klass)}) + output = output_klass(**{f.name: None for f in attr.fields(output_klass)}) # collecting outputs from tasks output_wf = {} for name, val in self._connections: if not isinstance(val, LazyField): raise ValueError("all connections must be lazy") output_wf[name] = val.get_value(self) - return dc.replace(output, **output_wf) + return attr.evolve(output, **output_wf) def is_task(obj): diff --git a/pydra/engine/helpers.py b/pydra/engine/helpers.py index f5e1d54c9e..f012fdfb30 100644 --- a/pydra/engine/helpers.py +++ b/pydra/engine/helpers.py @@ -1,7 +1,7 @@ """Administrative support for the engine framework.""" import asyncio import asyncio.subprocess as asp -import dataclasses as dc +import attr import cloudpickle as cp from pathlib import Path import os @@ -9,7 +9,7 @@ from hashlib import sha256 import subprocess as sp -from .specs import Runtime, File +from .specs import Runtime, File, attr_fields def ensure_list(obj, tuple2list=False): @@ -45,11 +45,11 @@ def print_help(obj): """Visit a task object and print its input/output interface.""" lines = ["Help for {}".format(obj.__class__.__name__)] input_klass = make_klass(obj.input_spec) - if dc.fields(input_klass): + if attr.fields(input_klass): lines += ["Input Parameters:"] - for f in dc.fields(input_klass): + for f in attr.fields(input_klass): default = "" - if f.default is not dc.MISSING and not f.name.startswith("_"): + if f.default != attr.NOTHING and not f.name.startswith("_"): default = " (default: {})".format(f.default) try: name = f.type.__name__ @@ -57,9 +57,9 @@ def print_help(obj): name = str(f.type) lines += ["- {}: {}{}".format(f.name, name, default)] output_klass = make_klass(obj.output_spec) - if dc.fields(output_klass): + if attr.fields(output_klass): lines += ["Output Parameters:"] - for f in dc.fields(output_klass): + for f in attr.fields(output_klass): try: name = f.type.__name__ except AttributeError: @@ -183,7 +183,23 @@ def make_klass(spec): """ if spec is None: return None - return dc.make_dataclass(spec.name, spec.fields, bases=spec.bases) + fields = spec.fields + if fields: + newfields = dict() + for item in fields: + if len(item) == 2: + if isinstance(item[1], attr._make._CountingAttr): + newfields[item[0]] = item[1] + else: + newfields[item[0]] = attr.ib(type=item[1]) + else: + if isinstance(item[2], attr._make._CountingAttr): + raise ValueError("Three part should not have attr") + # newfields[item[0]] = item[2] + else: + newfields[item[0]] = attr.ib(item[2], type=item[1]) + fields = newfields + return attr.make_class(spec.name, fields, bases=spec.bases, kw_only=True) async def read_stream_and_display(stream, display): @@ -391,7 +407,7 @@ def output_names_from_inputfields(inputs): """ output_names = [] - for fld in dc.fields(inputs): + for fld in attr_fields(inputs): if "output_file_template" in fld.metadata: if "output_field_name" in fld.metadata: field_name = fld.metadata["output_field_name"] @@ -413,7 +429,7 @@ def output_from_inputfields(output_spec, inputs): TODO """ - for fld in dc.fields(inputs): + for fld in attr_fields(inputs): if "output_file_template" in fld.metadata: value = getattr(inputs, fld.name) if "output_field_name" in fld.metadata: @@ -421,6 +437,6 @@ def output_from_inputfields(output_spec, inputs): else: field_name = fld.name output_spec.fields.append( - (field_name, File, dc.field(metadata={"value": value})) + (field_name, attr.ib(type=File, metadata={"value": value})) ) return output_spec diff --git a/pydra/engine/helpers_file.py b/pydra/engine/helpers_file.py index b2aa3a202a..68f47c53fb 100644 --- a/pydra/engine/helpers_file.py +++ b/pydra/engine/helpers_file.py @@ -1,4 +1,5 @@ """Functions ported from Nipype 1, after removing parts that were related to py2.""" +import attr import subprocess as sp from hashlib import sha256 import os @@ -8,6 +9,7 @@ import posixpath from builtins import str, bytes, open import logging +from pathlib import Path related_filetype_sets = [(".hdr", ".img", ".mat"), (".nii", ".mat"), (".BRIK", ".HEAD")] """List of neuroimaging file types that are to be interpreted together.""" @@ -458,3 +460,60 @@ def ensure_list(filename): return [x for x in filename] return None + + +# not sure if this might be useful for Function Task +def copyfile_input(inputs, output_dir): + """Implement the base class method.""" + from .specs import attr_fields, File + + map_copyfiles = {} + for fld in attr_fields(inputs): + copy = fld.metadata.get("copyfile") + if copy is not None and fld.type is not File: + raise Exception( + f"if copyfile set, field has to be a File " f"but {fld.type} provided" + ) + if copy in [True, False]: + file = getattr(inputs, fld.name) + newfile = output_dir.joinpath(Path(getattr(inputs, fld.name)).name) + copyfile(file, newfile, copy=copy) + map_copyfiles[fld.name] = str(newfile) + return map_copyfiles or None + + +# not sure if this might be useful for Function Task +def template_update(inputs, map_copyfiles=None): + """ + Update all templates that are present in the input spec. + + Should be run when all inputs used in the templates are already set. + + """ + dict_ = attr.asdict(inputs) + if map_copyfiles is not None: + dict_.update(map_copyfiles) + + from .specs import attr_fields + + fields = attr_fields(inputs) + # TODO: Create a dependency graph first and then traverse it + for fld in fields: + if getattr(inputs, fld.name) is not None: + continue + if fld.metadata.get("output_file_template"): + if fld.type is str: + value = fld.metadata["output_file_template"].format(**dict_) + dict_[fld.name] = str(value) + else: + raise Exception( + f"output_file_template metadata for " + "{fld.name} should be a string" + ) + return {k: v for k, v in dict_.items() if getattr(inputs, k) != v} + + +def is_local_file(f): + from .specs import File + + return f.type is File and "container_path" not in f.metadata diff --git a/pydra/engine/helpers_state.py b/pydra/engine/helpers_state.py index 9e5d3dd4ed..65a580de72 100644 --- a/pydra/engine/helpers_state.py +++ b/pydra/engine/helpers_state.py @@ -1,5 +1,6 @@ """Additional functions used mostly by the State class.""" +import attr import itertools from functools import reduce from copy import deepcopy @@ -839,7 +840,7 @@ def _left_right_check(splitter_part, other_states): def inputs_types_to_dict(name, inputs): """Convert type.Inputs to dictionary.""" # dj: any better option? - input_names = [nm for nm in inputs.__dataclass_fields__.keys() if nm != "_func"] + input_names = [field for field in attr.asdict(inputs) if field != "_func"] inputs_dict = {} for field in input_names: inputs_dict["{}.{}".format(name, field)] = getattr(inputs, field) diff --git a/pydra/engine/specs.py b/pydra/engine/specs.py index 240438544d..bf1ef43803 100644 --- a/pydra/engine/specs.py +++ b/pydra/engine/specs.py @@ -1,34 +1,34 @@ """Task I/O specifications.""" -import dataclasses as dc +import attr from pathlib import Path -import os import typing as ty -from copy import deepcopy -from .helpers_file import copyfile +def attr_fields(x): + return x.__attrs_attrs__ -class File(Path): + +class File: """An :obj:`os.pathlike` object, designating a file.""" -class Directory(Path): +class Directory: """An :obj:`os.pathlike` object, designating a folder.""" -@dc.dataclass +@attr.s(auto_attribs=True, kw_only=True) class SpecInfo: """Base data structure for metadata of specifications.""" name: str """A name for the specification.""" - fields: ty.List[ty.Tuple] = dc.field(default_factory=list) + fields: ty.List[ty.Tuple] = attr.ib(factory=list) """List of names of fields (inputs or outputs).""" - bases: ty.Tuple[dc.dataclass] = dc.field(default_factory=tuple) + bases: ty.Tuple[ty.Type] = attr.ib(factory=tuple) """Keeps track of this specification inheritance.""" -@dc.dataclass(order=True) +@attr.s(auto_attribs=True, kw_only=True) class BaseSpec: """The base dataclass specs for all inputs and outputs.""" @@ -42,16 +42,18 @@ def hash(self): from .helpers import hash_function from .helpers_file import hash_file - inp_dict = { - field.name: hash_file(getattr(self, field.name)) - if field.type == File - else getattr(self, field.name) - for field in dc.fields(self) - if ( - field.name not in ["_graph_checksums", "bindings"] - and not field.metadata.get("output_file_template") - ) - } + inp_dict = {} + for field in attr_fields(self): + if field.name in ["_graph_checksums", "bindings"] or field.metadata.get( + "output_file_template" + ): + continue + if field.type == File and "container_path" not in field.metadata: + inp_dict[field.name] = hash_file(getattr(self, field.name)) + elif isinstance(getattr(self, field.name), tuple): + inp_dict[field.name] = list(getattr(self, field.name)) + else: + inp_dict[field.name] = getattr(self, field.name) inp_hash = hash_function(inp_dict) if hasattr(self, "_graph_checksums"): return hash_function((inp_hash, self._graph_checksums)) @@ -61,7 +63,7 @@ def hash(self): def retrieve_values(self, wf, state_index=None): """Get values contained by this spec.""" temp_values = {} - for field in dc.fields(self): + for field in attr_fields(self): value = getattr(self, field.name) if isinstance(value, LazyField): value = value.get_value(wf, state_index=state_index) @@ -82,7 +84,7 @@ def copyfile_input(self, output_dir): """Copy the file pointed by a :class:`File` input.""" -@dc.dataclass +@attr.s(auto_attribs=True, kw_only=True) class Runtime: """Represent run time metadata.""" @@ -94,7 +96,7 @@ class Runtime: """Peak in cpu consumption.""" -@dc.dataclass +@attr.s(auto_attribs=True, kw_only=True) class Result: """Metadata regarding the outputs of processing.""" @@ -105,21 +107,23 @@ class Result: def __getstate__(self): state = self.__dict__.copy() if state["output"] is not None: - fields = tuple((el.name, el.type) for el in dc.fields(state["output"])) + fields = tuple((el.name, el.type) for el in attr_fields(state["output"])) state["output_spec"] = (state["output"].__class__.__name__, fields) - state["output"] = dc.asdict(state["output"]) + state["output"] = attr.asdict(state["output"]) return state def __setstate__(self, state): if "output_spec" in state: spec = list(state["output_spec"]) del state["output_spec"] - klass = dc.make_dataclass(spec[0], list(spec[1])) + klass = attr.make_class( + spec[0], {k: attr.ib(type=v) for k, v in list(spec[1])} + ) state["output"] = klass(**state["output"]) self.__dict__.update(state) -@dc.dataclass +@attr.s(auto_attribs=True, kw_only=True) class RuntimeSpec: """ Specification for a task. @@ -144,27 +148,28 @@ class RuntimeSpec: network: bool = False -@dc.dataclass +@attr.s(auto_attribs=True, kw_only=True) class ShellSpec(BaseSpec): """Specification for a process invoked from a shell.""" - executable: ty.Union[str, ty.List[str]] = dc.field( + executable: ty.Union[str, ty.List[str]] = attr.ib( metadata={ "help_string": "the first part of the command, can be a string, " "e.g. 'ls', or a list, e.g. ['ls', '-l', 'dirname']" } ) - args: ty.Union[str, ty.List[str]] = dc.field( + args: ty.Union[str, ty.List[str], None] = attr.ib( + None, metadata={ "help_string": "the last part of the command, can be a string, " "e.g. , or a list" - } + }, ) def retrieve_values(self, wf, state_index=None): """Parse output results.""" temp_values = {} - for field in dc.fields(self): + for field in attr_fields(self): # retrieving values that do not have templates if not field.metadata.get("output_file_template"): value = getattr(self, field.name) @@ -200,7 +205,7 @@ def check_metadata(self): # special inputs, don't have to follow rules for standard inputs special_input = ["_func", "_graph_checksums"] - fields = [fld for fld in dc.fields(self) if fld.name not in special_input] + fields = [fld for fld in attr_fields(self) if fld.name not in special_input] for fld in fields: mdata = fld.metadata # checking keys from metadata @@ -215,7 +220,7 @@ def check_metadata(self): # fld.default can't be different than default_value when both set if ( - not isinstance(fld.default, dc._MISSING_TYPE) + not fld.default == attr.NOTHING and mdata.get("default_value") is not None and mdata.get("default_value") != fld.default ): @@ -224,7 +229,7 @@ def check_metadata(self): ) # assuming that fields with output_file_template shouldn't have default if ( - not isinstance(fld.default, dc._MISSING_TYPE) + not fld.default == attr.NOTHING or mdata.get("default_value") is not None ) and mdata.get("output_file_template"): raise Exception( @@ -232,64 +237,19 @@ def check_metadata(self): ) # not allowing for default if the field is mandatory if ( - not isinstance(fld.default, dc._MISSING_TYPE) + not fld.default == attr.NOTHING or mdata.get("default_value") is not None ) and mdata.get("mandatory"): raise Exception( "default value should not be set when the field is mandatory" ) # setting default if value not provided and default is available - if dc.asdict(self)[fld.name] is None: + if getattr(self, fld.name) is None: if mdata.get("default_value") is not None: setattr(self, fld.name, mdata["default_value"]) - elif not isinstance(fld.default, dc._MISSING_TYPE): + elif not fld.default == attr.NOTHING: setattr(self, fld.name, fld.default) - # not sure if this might be useful for Function Task - def template_update(self): - """ - Update all templates that are present in the input spec. - - Should be run when all inputs used in the templates are already set. - - """ - dict_ = deepcopy(self.__dict__) - dict_.update(self.map_copyfiles) - - fields = dc.fields(self) - for fld in fields: - if fld.metadata.get("output_file_template"): - if fld.type is str: - value = fld.metadata["output_file_template"].format(**dict_) - setattr(self, fld.name, value) - elif fld.type is tuple: - name, ext = os.path.splitext( - fld.metadata["output_file_template"][0].format(**dict_) - ) - value = f"{name}{fld.metadata['output_file_template'][1]}{ext}" - setattr(self, fld.name, value) - else: - raise Exception( - "output names should be a string or a tuple of two strings" - ) - - # not sure if this might be useful for Function Task - def copyfile_input(self, output_dir): - """Implement the base class method.""" - self.map_copyfiles = {} - for fld in dc.fields(self): - copy = fld.metadata.get("copyfile") - if copy is not None and fld.type is not File: - raise Exception( - f"if copyfile set, field has to be a File " - f"but {fld.type} provided" - ) - if copy in [True, False]: - file = getattr(self, fld.name) - newfile = output_dir.joinpath(Path(getattr(self, fld.name)).name) - copyfile(file, newfile, copy=copy) - self.map_copyfiles[fld.name] = newfile - def check_fields_input_spec(self): """ Check fields from input spec based on the medatada. @@ -297,13 +257,13 @@ def check_fields_input_spec(self): e.g., if xor, requires are fulfilled, if value provided when mandatory. """ - fields = dc.fields(self) + fields = attr_fields(self) names = [] require_to_check = {} for fld in fields: mdata = fld.metadata # checking if the mandatory field is provided - if dc.asdict(self)[fld.name] is None: + if getattr(self, fld.name) is None: if mdata.get("mandatory"): raise Exception(f"{fld.name} is mandatory, but no value provided") elif mdata.get("default_value") is not None: @@ -341,15 +301,19 @@ def _file_check(self, field): raise Exception(f"the file from the {field.name} input does not exist") def _type_checking(self): - """Use fld.type to check the types TODO.""" - fields = dc.fields(self) + """Use fld.type to check the types TODO. + + This may be done through attr validators. + + """ + fields = attr_fields(self) allowed_keys = ["min_val", "max_val", "range", "enum"] # noqa for fld in fields: # TODO pass -@dc.dataclass +@attr.s(auto_attribs=True, kw_only=True) class ShellOutSpec(BaseSpec): """Output specification of a generic shell process.""" @@ -363,19 +327,16 @@ class ShellOutSpec(BaseSpec): def collect_additional_outputs(self, input_spec, inputs, output_dir): """Collect additional outputs from shelltask output_spec.""" additional_out = {} - for fld in dc.fields(self): + for fld in attr_fields(self): if fld.name not in ["return_code", "stdout", "stderr"]: if fld.type is File: # assuming that field should have either default or metadata, but not both if ( - not ( - fld.default is None - or isinstance(fld.default, dc._MISSING_TYPE) - ) + not (fld.default is None or fld.default == attr.NOTHING) and fld.metadata ): raise Exception("File has to have default value or metadata") - elif not isinstance(fld.default, dc._MISSING_TYPE): + elif not fld.default == attr.NOTHING: additional_out[fld.name] = self._field_defaultvalue( fld, output_dir ) @@ -395,26 +356,25 @@ def _field_defaultvalue(self, fld, output_dir): f"should be a string or a Path, " f"{fld.default} provided" ) - if isinstance(fld.default, str): - fld.default = Path(fld.default) + default = fld.default + if isinstance(default, str): + default = Path(default) - fld.default = output_dir / fld.default + default = output_dir / default - if "*" not in fld.default.name: - if fld.default.exists(): - return fld.default + if "*" not in default.name: + if default.exists(): + return default else: - raise Exception(f"file {fld.default.name} does not exist") + raise Exception(f"file {default} does not exist") else: - all_files = list( - Path(fld.default.parent).expanduser().glob(fld.default.name) - ) + all_files = list(Path(default.parent).expanduser().glob(default.name)) if len(all_files) > 1: return all_files elif len(all_files) == 1: return all_files[0] else: - raise Exception(f"no file matches {fld.default.name}") + raise Exception(f"no file matches {default.name}") def _field_metadata(self, fld, inputs, output_dir): """Collect output file if metadata specified.""" @@ -430,18 +390,18 @@ def _field_metadata(self, fld, inputs, output_dir): raise Exception("not implemented") -@dc.dataclass +@attr.s(auto_attribs=True, kw_only=True) class ContainerSpec(ShellSpec): """Refine the generic command-line specification to container execution.""" - image: ty.Union[File, str] = dc.field(metadata={"help_string": "image"}) + image: ty.Union[File, str] = attr.ib(metadata={"help_string": "image"}) """The image to be containerized.""" - container: ty.Union[File, str, None] = dc.field( + container: ty.Union[File, str, None] = attr.ib( metadata={"help_string": "container"} ) """The container.""" - container_xargs: ty.Optional[ty.List[str]] = dc.field( - metadata={"help_string": "todo"} + container_xargs: ty.Optional[ty.List[str]] = attr.ib( + default=None, metadata={"help_string": "todo"} ) """Execution arguments to run the image.""" bindings: ty.Optional[ @@ -452,15 +412,16 @@ class ContainerSpec(ShellSpec): ty.Optional[str], # mount mode ] ] - ] = dc.field(metadata={"help_string": "bindings"}) + ] = attr.ib(default=None, metadata={"help_string": "bindings"}) """Mount points to be bound into the container.""" def _file_check(self, field): file = Path(getattr(self, field.name)) if field.metadata.get("container_path"): # if the path is in a container the input should be treated as a str (hash as a str) - field.type = "str" - setattr(self, field.name, str(file)) + # field.type = "str" + # setattr(self, field.name, str(file)) + pass # if this is a local path, checking if the path exists elif file.exists(): if self.bindings is None: @@ -474,22 +435,18 @@ def _file_check(self, field): ) -@dc.dataclass +@attr.s(auto_attribs=True, kw_only=True) class DockerSpec(ContainerSpec): """Particularize container specifications to the Docker engine.""" - container: str = dc.field( - metadata={"default_value": "docker", "help_string": "container"} - ) + container: str = attr.ib("docker", metadata={"help_string": "container"}) -@dc.dataclass +@attr.s(auto_attribs=True, kw_only=True) class SingularitySpec(ContainerSpec): """Particularize container specifications to Singularity.""" - container: str = dc.field( - metadata={"default_value": "singularity", "help_string": "container type"} - ) + container: str = attr.ib("singularity", metadata={"help_string": "container type"}) class LazyField: @@ -542,7 +499,7 @@ def get_value(self, wf, state_index=None): results_new = [] for res_l in result: if self.field == "all_": - res_l_new = [dc.asdict(res.output) for res in res_l] + res_l_new = [attr.asdict(res.output) for res in res_l] else: res_l_new = [ getattr(res.output, self.field) for res in res_l @@ -551,36 +508,37 @@ def get_value(self, wf, state_index=None): return results_new else: if self.field == "all_": - return [dc.asdict(res.output) for res in result] + return [attr.asdict(res.output) for res in result] else: return [getattr(res.output, self.field) for res in result] else: if self.field == "all_": - return dc.asdict(result.output) + return attr.asdict(result.output) else: return getattr(result.output, self.field) -@dc.dataclass +def donothing(*args, **kwargs): + return None + + +@attr.s(auto_attribs=True, kw_only=True) class TaskHook: """Callable task hooks.""" - def none(*args, **kwargs): - """Return ``None``.""" - return None - - pre_run_task: ty.Callable = none - post_run_task: ty.Callable = none - pre_run: ty.Callable = none - post_run: ty.Callable = none + pre_run_task: ty.Callable = donothing + post_run_task: ty.Callable = donothing + pre_run: ty.Callable = donothing + post_run: ty.Callable = donothing def __setattr__(cls, attr, val): - if not hasattr(cls, attr): + if attr not in ["pre_run_task", "post_run_task", "pre_run", "post_run"]: raise AttributeError("Cannot set unknown hook") super().__setattr__(attr, val) def reset(self): - self.__dict__ = TaskHook().__dict__ + for val in ["pre_run_task", "post_run_task", "pre_run", "post_run"]: + setattr(self, val, donothing) def path_to_string(value): diff --git a/pydra/engine/task.py b/pydra/engine/task.py index 31176b0a0d..8701ae3f56 100644 --- a/pydra/engine/task.py +++ b/pydra/engine/task.py @@ -38,8 +38,8 @@ `__ """ +import attr import cloudpickle as cp -import dataclasses as dc import inspect import typing as ty from pathlib import Path @@ -55,8 +55,10 @@ ContainerSpec, DockerSpec, SingularitySpec, + attr_fields, ) from .helpers import ensure_list, execute +from .helpers_file import template_update, is_local_file class FunctionTask(TaskBase): @@ -106,9 +108,9 @@ def __init__( fields=[ ( val.name, - val.annotation, - dc.field( + attr.ib( default=val.default, + type=val.annotation, metadata={ "help_string": f"{val.name} parameter from {func.__name__}" }, @@ -117,16 +119,19 @@ def __init__( if val.default is not inspect.Signature.empty else ( val.name, - val.annotation, - dc.field(metadata={"help_string": val.name}), + attr.ib( + type=val.annotation, metadata={"help_string": val.name} + ), ) for val in inspect.signature(func).parameters.values() ] - + [("_func", str, cp.dumps(func))], + + [("_func", attr.ib(default=cp.dumps(func), type=str))], bases=(BaseSpec,), ) else: - input_spec.fields.append(("_func", str, cp.dumps(func))) + input_spec.fields.append( + ("_func", attr.ib(default=cp.dumps(func), type=str)) + ) self.input_spec = input_spec if name is None: name = func.__name__ @@ -183,7 +188,7 @@ def __init__( self.output_spec = output_spec def _run_task(self): - inputs = dc.asdict(self.inputs) + inputs = attr.asdict(self.inputs) del inputs["_func"] self.output_ = None output = cp.loads(self.inputs._func)(**inputs) @@ -262,7 +267,7 @@ def __init__( def command_args(self): """Get command line arguments.""" pos_args = [] # list for (position, command arg) - for f in dc.fields(self.inputs): + for f in attr_fields(self.inputs): if f.name == "executable": pos = 0 # executable should be the first el. of the command elif f.name == "args": @@ -280,12 +285,14 @@ def command_args(self): cmd_add = [] if "argstr" in f.metadata: cmd_add.append(f.metadata["argstr"]) - if f.metadata.get("copyfile") in [True, False]: - value = str(self.inputs.map_copyfiles[f.name]) - else: - value = getattr(self.inputs, f.name) + # if f.metadata.get("copyfile") in [True, False]: + # value = str(self.inputs.map_copyfiles[f.name]) + # else: + value = getattr(self.inputs, f.name) + if is_local_file(f): + value = str(value) # changing path to the cpath (the directory should be mounted) - if getattr(self, "bind_paths", None) and f.type is File: + if getattr(self, "bind_paths", None) and is_local_file(f): lpath = Path(value) cdir = self.bind_paths[lpath.parent][0] cpath = cdir.joinpath(lpath.name) @@ -310,12 +317,18 @@ def command_args(self): @command_args.setter def command_args(self, args: ty.Dict): - self.inputs = dc.replace(self.inputs, **args) + self.inputs = attr.evolve(self.inputs, **args) @property def cmdline(self): """Get the actual command line that will be submitted.""" - return " ".join(self.command_args) + orig_inputs = attr.asdict(self.inputs) + modified_inputs = template_update(self.inputs) + if modified_inputs is not None: + self.inputs = attr.evolve(self.inputs, **modified_inputs) + cmdline = " ".join(self.command_args) + self.inputs = attr.evolve(self.inputs, **orig_inputs) + return cmdline def _run_task(self,): self.output_ = None @@ -324,6 +337,8 @@ def _run_task(self,): keys = ["return_code", "stdout", "stderr"] values = execute(args, strip=self.strip) self.output_ = dict(zip(keys, values)) + if self.output_["return_code"]: + raise RuntimeError(self.output_["stderr"]) class ContainerTask(ShellCommandTask): @@ -385,7 +400,13 @@ def __init__( @property def cmdline(self): """Get the actual command line that will be submitted.""" - return " ".join(self.container_args + self.command_args) + orig_inputs = attr.asdict(self.inputs) + modified_inputs = template_update(self.inputs) + if modified_inputs is not None: + self.inputs = attr.evolve(self.inputs, **modified_inputs) + cmdline = " ".join(self.container_args + self.command_args) + self.inputs = attr.evolve(self.inputs, **orig_inputs) + return cmdline @property def container_args(self): @@ -408,7 +429,7 @@ def bind_paths(self): if len(binding) == 3: lpath, cpath, mode = binding elif len(binding) == 2: - lpath, cpath, mode = binding + ("rw",) + lpath, cpath, mode = binding + ["rw"] else: raise Exception( f"binding should have length 2, 3, or 4, it has {len(binding)}" @@ -445,6 +466,8 @@ def _run_task(self): keys = ["return_code", "stdout", "stderr"] values = execute(args, strip=self.strip) self.output_ = dict(zip(keys, values)) + if self.output_["return_code"]: + raise RuntimeError(self.output_["stderr"]) class DockerTask(ContainerTask): @@ -503,6 +526,7 @@ def __init__( output_cpath=output_cpath, **kwargs, ) + self.inputs.container_xargs = ["--rm"] @property def container_args(self): diff --git a/pydra/engine/tests/test_dockertask.py b/pydra/engine/tests/test_dockertask.py index b3d2d800d3..eaaa048efd 100644 --- a/pydra/engine/tests/test_dockertask.py +++ b/pydra/engine/tests/test_dockertask.py @@ -3,7 +3,7 @@ import os, shutil import subprocess as sp import pytest -import dataclasses as dc +import attr from ..task import DockerTask from ..submitter import Submitter @@ -32,7 +32,7 @@ def test_docker_1_nosubm(): assert docky.inputs.container == "docker" assert ( docky.cmdline - == f"docker run -v {docky.output_dir}:/output_pydra:rw -w /output_pydra {docky.inputs.image} {cmd}" + == f"docker run --rm -v {docky.output_dir}:/output_pydra:rw -w /output_pydra {docky.inputs.image} {cmd}" ) res = docky() @@ -70,7 +70,7 @@ def test_docker_2_nosubm(): docky = DockerTask(name="docky", executable=cmd, image="busybox") assert ( docky.cmdline - == f"docker run -v {docky.output_dir}:/output_pydra:rw -w /output_pydra {docky.inputs.image} {' '.join(cmd)}" + == f"docker run --rm -v {docky.output_dir}:/output_pydra:rw -w /output_pydra {docky.inputs.image} {' '.join(cmd)}" ) res = docky() @@ -90,7 +90,7 @@ def test_docker_2(plugin): docky = DockerTask(name="docky", executable=cmd, image="busybox") assert ( docky.cmdline - == f"docker run -v {docky.output_dir}:/output_pydra:rw -w /output_pydra {docky.inputs.image} {' '.join(cmd)}" + == f"docker run --rm -v {docky.output_dir}:/output_pydra:rw -w /output_pydra {docky.inputs.image} {' '.join(cmd)}" ) with Submitter(plugin=plugin) as sub: @@ -116,7 +116,7 @@ def test_docker_2a_nosubm(): assert docky.inputs.executable == "echo" assert ( docky.cmdline - == f"docker run -v {docky.output_dir}:/output_pydra:rw -w /output_pydra {docky.inputs.image} {cmd_exec} {' '.join(cmd_args)}" + == f"docker run --rm -v {docky.output_dir}:/output_pydra:rw -w /output_pydra {docky.inputs.image} {cmd_exec} {' '.join(cmd_args)}" ) res = docky() @@ -141,7 +141,7 @@ def test_docker_2a(plugin): assert docky.inputs.executable == "echo" assert ( docky.cmdline - == f"docker run -v {docky.output_dir}:/output_pydra:rw -w /output_pydra {docky.inputs.image} {cmd_exec} {' '.join(cmd_args)}" + == f"docker run --rm -v {docky.output_dir}:/output_pydra:rw -w /output_pydra {docky.inputs.image} {cmd_exec} {' '.join(cmd_args)}" ) with Submitter(plugin=plugin) as sub: @@ -304,6 +304,27 @@ def test_wf_docker_1(plugin, tmpdir): assert res.output.out == "message from the previous task: hello from pydra" +@need_docker +@pytest.mark.parametrize("plugin", Plugins) +def test_wf_docker_2pre(plugin, tmpdir): + """ a workflow with two connected task that run python scripts + the first one creates a text file and the second one reads the file + """ + + scripts_dir = os.path.join(os.path.dirname(__file__), "data_tests") + + cmd1 = ["python", "/scripts/saving.py", "-f", "/outputs/tmp.txt"] + dt = DockerTask( + name="save", + image="python:3.7-alpine", + executable=cmd1, + bindings=[(str(tmpdir), "/outputs"), (scripts_dir, "/scripts", "ro")], + strip=True, + ) + res = dt(plugin=plugin) + assert res.output.stdout == "/outputs/tmp.txt" + + @need_docker @pytest.mark.parametrize("plugin", Plugins) def test_wf_docker_2(plugin, tmpdir): @@ -390,13 +411,13 @@ def test_docker_inputspec_1(plugin, tmpdir): fields=[ ( "file", - File, - dc.field( + attr.ib( + type=File, metadata={ "mandatory": True, "position": 1, "help_string": "input file", - } + }, ), ) ], @@ -433,8 +454,8 @@ def test_docker_inputspec_1a(plugin, tmpdir): fields=[ ( "file", - File, - dc.field( + attr.ib( + type=File, default=filename, metadata={"position": 1, "help_string": "input file"}, ), @@ -474,13 +495,14 @@ def test_docker_inputspec_2(plugin, tmpdir): fields=[ ( "file1", - File, - dc.field(metadata={"position": 1, "help_string": "input file 1"}), + attr.ib( + type=File, metadata={"position": 1, "help_string": "input file 1"} + ), ), ( "file2", - File, - dc.field( + attr.ib( + type=File, default=filename_2, metadata={"position": 2, "help_string": "input file 2"}, ), @@ -523,31 +545,34 @@ def test_docker_inputspec_2a_except(plugin, tmpdir): fields=[ ( "file1", - File, - dc.field( + attr.ib( + type=File, default=filename_1, metadata={"position": 1, "help_string": "input file 1"}, ), ), ( "file2", - File, - dc.field(metadata={"position": 2, "help_string": "input file 2"}), + attr.ib( + type=File, metadata={"position": 2, "help_string": "input file 2"} + ), ), ], bases=(DockerSpec,), ) - with pytest.raises(TypeError) as excinfo: - docky = DockerTask( - name="docky", - image="busybox", - executable=cmd, - file2=filename_2, - input_spec=my_input_spec, - strip=True, - ) - assert "non-default argument 'file2' follows default argument" == str(excinfo.value) + docky = DockerTask( + name="docky", + image="busybox", + executable=cmd, + file2=filename_2, + input_spec=my_input_spec, + strip=True, + ) + assert docky.inputs.file2 == filename_2 + + res = docky() + assert res.output.stdout == "hello from pydra\nhave a nice one" @need_docker @@ -572,19 +597,20 @@ def test_docker_inputspec_2a(plugin, tmpdir): fields=[ ( "file1", - File, - dc.field( + attr.ib( + type=File, metadata={ "default_value": filename_1, "position": 1, "help_string": "input file 1", - } + }, ), ), ( "file2", - File, - dc.field(metadata={"position": 2, "help_string": "input file 2"}), + attr.ib( + type=File, metadata={"position": 2, "help_string": "input file 2"} + ), ), ], bases=(DockerSpec,), @@ -617,14 +643,14 @@ def test_docker_inputspec_3(plugin, tmpdir): fields=[ ( "file", - File, - dc.field( + attr.ib( + type=File, metadata={ "mandatory": True, "position": 1, "help_string": "input file", "container_path": True, - } + }, ), ) ], @@ -640,8 +666,10 @@ def test_docker_inputspec_3(plugin, tmpdir): strip=True, ) + cmdline = docky.cmdline res = docky() assert "docker" in res.output.stdout + assert cmdline == docky.cmdline @need_docker @@ -660,13 +688,13 @@ def test_docker_inputspec_3a(plugin, tmpdir): fields=[ ( "file", - File, - dc.field( + attr.ib( + type=File, metadata={ "mandatory": True, "position": 1, "help_string": "input file", - } + }, ), ) ], @@ -705,24 +733,24 @@ def test_docker_cmd_inputspec_copyfile_1(plugin, tmpdir): fields=[ ( "orig_file", - File, - dc.field( + attr.ib( + type=File, metadata={ "position": 1, "help_string": "orig file", "mandatory": True, "copyfile": True, - } + }, ), ), ( "out_file", - str, - dc.field( + attr.ib( + type=str, metadata={ "output_file_template": "{orig_file}", "help_string": "output file", - } + }, ), ), ], @@ -770,13 +798,13 @@ def test_docker_inputspec_state_1(plugin, tmpdir): fields=[ ( "file", - File, - dc.field( + attr.ib( + type=File, metadata={ "mandatory": True, "position": 1, "help_string": "input file", - } + }, ), ) ], @@ -819,13 +847,13 @@ def test_docker_inputspec_state_1b(plugin, tmpdir): fields=[ ( "file", - File, - dc.field( + attr.ib( + type=File, metadata={ "mandatory": True, "position": 1, "help_string": "input file", - } + }, ), ) ], @@ -861,13 +889,13 @@ def test_docker_wf_inputspec_1(plugin, tmpdir): fields=[ ( "file", - File, - dc.field( + attr.ib( + type=File, metadata={ "mandatory": True, "position": 1, "help_string": "input file", - } + }, ), ) ], @@ -916,13 +944,13 @@ def test_docker_wf_state_inputspec_1(plugin, tmpdir): fields=[ ( "file", - File, - dc.field( + attr.ib( + type=File, metadata={ "mandatory": True, "position": 1, "help_string": "input file", - } + }, ), ) ], @@ -973,13 +1001,13 @@ def test_docker_wf_ndst_inputspec_1(plugin, tmpdir): fields=[ ( "file", - File, - dc.field( + attr.ib( + type=File, metadata={ "mandatory": True, "position": 1, "help_string": "input file", - } + }, ), ) ], diff --git a/pydra/engine/tests/test_shelltask.py b/pydra/engine/tests/test_shelltask.py index 993cc54ea8..0a5d654160 100644 --- a/pydra/engine/tests/test_shelltask.py +++ b/pydra/engine/tests/test_shelltask.py @@ -1,8 +1,8 @@ # -*- coding: utf-8 -*- +import attr import typing as ty import os, shutil -import dataclasses as dc import pytest from pathlib import Path @@ -289,9 +289,9 @@ def test_shell_cmd_inputspec_1(plugin, results_function): fields=[ ( "opt_n", - bool, - dc.field( - metadata={"position": 1, "argstr": "-n", "help_string": "option"} + attr.ib( + type=bool, + metadata={"position": 1, "argstr": "-n", "help_string": "option"}, ), ) ], @@ -330,14 +330,13 @@ def test_shell_cmd_inputspec_2(plugin, results_function): fields=[ ( "opt_hello", - str, - dc.field(metadata={"position": 3, "help_string": "todo"}), + attr.ib(type=str, metadata={"position": 3, "help_string": "todo"}), ), ( "opt_n", - bool, - dc.field( - metadata={"position": 1, "help_string": "todo", "argstr": "-n"} + attr.ib( + type=bool, + metadata={"position": 1, "help_string": "todo", "argstr": "-n"}, ), ), ], @@ -371,9 +370,9 @@ def test_shell_cmd_inputspec_3(plugin, results_function): fields=[ ( "text", - str, - dc.field( - metadata={"position": 1, "help_string": "text", "mandatory": True} + attr.ib( + type=str, + metadata={"position": 1, "help_string": "text", "mandatory": True}, ), ) ], @@ -401,9 +400,9 @@ def test_shell_cmd_inputspec_3a(plugin, results_function): fields=[ ( "text", - str, - dc.field( - metadata={"position": 1, "help_string": "text", "mandatory": True} + attr.ib( + type=str, + metadata={"position": 1, "help_string": "text", "mandatory": True}, ), ) ], @@ -430,9 +429,9 @@ def test_shell_cmd_inputspec_3b_exception(plugin): fields=[ ( "text", - str, - dc.field( - metadata={"position": 1, "help_string": "text", "mandatory": True} + attr.ib( + type=str, + metadata={"position": 1, "help_string": "text", "mandatory": True}, ), ) ], @@ -457,9 +456,9 @@ def test_shell_cmd_inputspec_3c(plugin, results_function): fields=[ ( "text", - str, - dc.field( - metadata={"position": 1, "help_string": "text", "mandatory": False} + attr.ib( + type=str, + metadata={"position": 1, "help_string": "text", "mandatory": False}, ), ) ], @@ -486,13 +485,13 @@ def test_shell_cmd_inputspec_4(plugin, results_function): fields=[ ( "text", - str, - dc.field( + attr.ib( + type=str, metadata={ "position": 1, "help_string": "text", "default_value": "Hello", - } + }, ), ) ], @@ -521,8 +520,11 @@ def test_shell_cmd_inputspec_4a(plugin, results_function): fields=[ ( "text", - str, - dc.field(default="Hi", metadata={"position": 1, "help_string": "text"}), + attr.ib( + type=str, + default="Hi", + metadata={"position": 1, "help_string": "text"}, + ), ) ], bases=(ShellSpec,), @@ -549,8 +551,8 @@ def test_shell_cmd_inputspec_4b_exception(plugin): fields=[ ( "text", - str, - dc.field( + attr.ib( + type=str, default="Hi", metadata={ "position": 1, @@ -583,14 +585,14 @@ def test_shell_cmd_inputspec_4c_exception(plugin): fields=[ ( "text", - str, - dc.field( + attr.ib( + type=str, metadata={ "position": 1, "help_string": "text", "mandatory": True, "default_value": "Hello", - } + }, ), ) ], @@ -617,14 +619,14 @@ def test_shell_cmd_inputspec_4d_exception(plugin): fields=[ ( "text", - str, - dc.field( + attr.ib( + type=str, metadata={ "position": 1, "help_string": "text", "output_file_template": "exception", "default_value": "Hello", - } + }, ), ) ], @@ -653,26 +655,26 @@ def test_shell_cmd_inputspec_5_nosubm(plugin, results_function): fields=[ ( "opt_t", - bool, - dc.field( + attr.ib( + type=bool, metadata={ "position": 1, "help_string": "opt t", "argstr": "-t", "xor": ["opt_S"], - } + }, ), ), ( "opt_S", - bool, - dc.field( + attr.ib( + type=bool, metadata={ "position": 2, "help_string": "opt S", "argstr": "-S", "xor": ["opt_t"], - } + }, ), ), ], @@ -699,26 +701,26 @@ def test_shell_cmd_inputspec_5a_exception(plugin): fields=[ ( "opt_t", - bool, - dc.field( + attr.ib( + type=bool, metadata={ "position": 1, "help_string": "opt t", "argstr": "-t", "xor": ["opt_S"], - } + }, ), ), ( "opt_S", - bool, - dc.field( + attr.ib( + type=bool, metadata={ "position": 2, "help_string": "opt S", "argstr": "-S", "xor": ["opt_t"], - } + }, ), ), ], @@ -751,21 +753,21 @@ def test_shell_cmd_inputspec_6(plugin, results_function): fields=[ ( "opt_t", - bool, - dc.field( + attr.ib( + type=bool, metadata={ "position": 2, "help_string": "opt t", "argstr": "-t", "requires": ["opt_l"], - } + }, ), ), ( "opt_l", - bool, - dc.field( - metadata={"position": 1, "help_string": "opt l", "argstr": "-l"} + attr.ib( + type=bool, + metadata={"position": 1, "help_string": "opt l", "argstr": "-l"}, ), ), ], @@ -797,21 +799,21 @@ def test_shell_cmd_inputspec_6a_exception(plugin): fields=[ ( "opt_t", - bool, - dc.field( + attr.ib( + type=bool, metadata={ "position": 2, "help_string": "opt t", "argstr": "-t", "requires": ["opt_l"], - } + }, ), ), ( "opt_l", - bool, - dc.field( - metadata={"position": 1, "help_string": "opt l", "argstr": "-l"} + attr.ib( + type=bool, + metadata={"position": 1, "help_string": "opt l", "argstr": "-l"}, ), ), ], @@ -840,21 +842,21 @@ def test_shell_cmd_inputspec_6b(plugin, results_function): fields=[ ( "opt_t", - bool, - dc.field( + attr.ib( + type=bool, metadata={ "position": 2, "help_string": "opt t", "argstr": "-t", "requires": ["opt_l"], - } + }, ), ), ( "opt_l", - bool, - dc.field( - metadata={"position": 1, "help_string": "opt l", "argstr": "-l"} + attr.ib( + type=bool, + metadata={"position": 1, "help_string": "opt l", "argstr": "-l"}, ), ), ], @@ -890,12 +892,12 @@ def test_shell_cmd_inputspec_7(plugin, results_function): fields=[ ( "out1", - str, - dc.field( + attr.ib( + type=str, metadata={ "output_file_template": "{args}", "help_string": "output file", - } + }, ), ) ], @@ -927,13 +929,13 @@ def test_shell_cmd_inputspec_7a(plugin, results_function): fields=[ ( "out1", - str, - dc.field( + attr.ib( + type=str, metadata={ "output_file_template": "{args}", "output_field_name": "out1_changed", "help_string": "output file", - } + }, ), ) ], @@ -967,24 +969,24 @@ def test_shell_cmd_inputspec_copyfile_1(plugin, results_function, tmpdir): fields=[ ( "orig_file", - File, - dc.field( + attr.ib( + type=File, metadata={ "position": 1, "help_string": "orig file", "mandatory": True, "copyfile": True, - } + }, ), ), ( "out_file", - str, - dc.field( + attr.ib( + type=str, metadata={ "output_file_template": "{orig_file}", "help_string": "output file", - } + }, ), ), ], @@ -1025,24 +1027,24 @@ def test_shell_cmd_inputspec_copyfile_1a(plugin, results_function, tmpdir): fields=[ ( "orig_file", - File, - dc.field( + attr.ib( + type=File, metadata={ "position": 1, "help_string": "orig file", "mandatory": True, "copyfile": False, - } + }, ), ), ( "out_file", - str, - dc.field( + attr.ib( + type=str, metadata={ "output_file_template": "{orig_file}", "help_string": "output file", - } + }, ), ), ], @@ -1098,23 +1100,23 @@ def test_shell_cmd_inputspec_copyfile_1b(plugin, results_function, tmpdir): fields=[ ( "orig_file", - File, - dc.field( + attr.ib( + type=File, metadata={ "position": 1, "help_string": "orig file", "mandatory": True, - } + }, ), ), ( "out_file", - str, - dc.field( + attr.ib( + type=str, metadata={ "output_file_template": "{orig_file}", "help_string": "output file", - } + }, ), ), ], @@ -1145,9 +1147,9 @@ def test_shell_cmd_inputspec_state_1(plugin, results_function): fields=[ ( "text", - str, - dc.field( - metadata={"position": 1, "help_string": "text", "mandatory": True} + attr.ib( + type=str, + metadata={"position": 1, "help_string": "text", "mandatory": True}, ), ) ], @@ -1180,12 +1182,12 @@ def test_shell_cmd_inputspec_state_2(plugin, results_function): fields=[ ( "out1", - str, - dc.field( + attr.ib( + type=str, metadata={ "output_file_template": "{args}", "help_string": "output file", - } + }, ), ) ], @@ -1222,9 +1224,9 @@ def test_shell_cmd_inputspec_state_3(plugin, results_function, tmpdir): fields=[ ( "file", - File, - dc.field( - metadata={"position": 1, "help_string": "files", "mandatory": True} + attr.ib( + type=File, + metadata={"position": 1, "help_string": "files", "mandatory": True}, ), ) ], @@ -1264,24 +1266,24 @@ def test_shell_cmd_inputspec_copyfile_state_1(plugin, results_function, tmpdir): fields=[ ( "orig_file", - File, - dc.field( + attr.ib( + type=File, metadata={ "position": 1, "help_string": "orig file", "mandatory": True, "copyfile": True, - } + }, ), ), ( "out_file", - str, - dc.field( + attr.ib( + type=str, metadata={ "output_file_template": "{orig_file}", "help_string": "output file", - } + }, ), ), ], @@ -1324,12 +1326,12 @@ def test_wf_shell_cmd_2(plugin): fields=[ ( "out1", - str, - dc.field( + attr.ib( + type=str, metadata={ "output_file_template": "{args}", "help_string": "output file", - } + }, ), ) ], @@ -1370,12 +1372,12 @@ def test_wf_shell_cmd_2a(plugin): fields=[ ( "out1", - tuple, - dc.field( + attr.ib( + type=str, metadata={ - "output_file_template": ("{args}", ""), + "output_file_template": "{args}", "help_string": "output file", - } + }, ), ) ], @@ -1418,12 +1420,12 @@ def test_wf_shell_cmd_3(plugin): fields=[ ( "file", - str, - dc.field( + attr.ib( + type=str, metadata={ "output_file_template": "{args}", "help_string": "output file", - } + }, ), ) ], @@ -1435,18 +1437,19 @@ def test_wf_shell_cmd_3(plugin): fields=[ ( "orig_file", - File, - dc.field(metadata={"position": 1, "help_string": "output file"}), + attr.ib( + type=File, metadata={"position": 1, "help_string": "output file"} + ), ), ( "out_file", - str, - dc.field( + attr.ib( + type=str, metadata={ "position": 2, - "output_file_template": "{orig_file}", + "output_file_template": "{orig_file}_copy", "help_string": "output file", - } + }, ), ), ], @@ -1506,12 +1509,12 @@ def test_wf_shell_cmd_3a(plugin): fields=[ ( "file", - str, - dc.field( + attr.ib( + type=str, metadata={ "output_file_template": "{args}", "help_string": "output file", - } + }, ), ) ], @@ -1523,18 +1526,19 @@ def test_wf_shell_cmd_3a(plugin): fields=[ ( "orig_file", - str, - dc.field(metadata={"position": 1, "help_string": "output file"}), + attr.ib( + type=str, metadata={"position": 1, "help_string": "output file"} + ), ), ( "out_file", - tuple, - dc.field( + attr.ib( + type=str, metadata={ "position": 2, - "output_file_template": ("{orig_file}", "_cp"), + "output_file_template": "{orig_file}_cp", "help_string": "output file", - } + }, ), ), ], @@ -1594,12 +1598,12 @@ def test_wf_shell_cmd_state_1(plugin): fields=[ ( "file", - str, - dc.field( + attr.ib( + type=str, metadata={ "output_file_template": "{args}", "help_string": "output file", - } + }, ), ) ], @@ -1611,18 +1615,19 @@ def test_wf_shell_cmd_state_1(plugin): fields=[ ( "orig_file", - str, - dc.field(metadata={"position": 1, "help_string": "output file"}), + attr.ib( + type=str, metadata={"position": 1, "help_string": "output file"} + ), ), ( "out_file", - str, - dc.field( + attr.ib( + type=str, metadata={ "position": 2, - "output_file_template": "{orig_file}", + "output_file_template": "{orig_file}_copy", "help_string": "output file", - } + }, ), ), ], @@ -1683,12 +1688,12 @@ def test_wf_shell_cmd_ndst_1(plugin): fields=[ ( "file", - str, - dc.field( + attr.ib( + type=str, metadata={ "output_file_template": "{args}", "help_string": "output file", - } + }, ), ) ], @@ -1700,18 +1705,19 @@ def test_wf_shell_cmd_ndst_1(plugin): fields=[ ( "orig_file", - str, - dc.field(metadata={"position": 1, "help_string": "output file"}), + attr.ib( + type=str, metadata={"position": 1, "help_string": "output file"} + ), ), ( "out_file", - str, - dc.field( + attr.ib( + type=str, metadata={ "position": 2, - "output_file_template": "{orig_file}", + "output_file_template": "{orig_file}_copy", "help_string": "output file", - } + }, ), ), ], @@ -1785,7 +1791,7 @@ def test_shell_cmd_outputspec_1a(plugin, results_function): cmd = ["touch", "newfile_tmp.txt"] my_output_spec = SpecInfo( name="Output", - fields=[("newfile", File, dc.field(default="newfile_tmp.txt"))], + fields=[("newfile", attr.ib(type=File, default="newfile_tmp.txt"))], bases=(ShellOutSpec,), ) shelly = ShellCommandTask(name="shelly", executable=cmd, output_spec=my_output_spec) @@ -1891,7 +1897,7 @@ def gather_output(keyname, output_dir): my_output_spec = SpecInfo( name="Output", - fields=[("newfile", File, dc.field(metadata={"callable": gather_output}))], + fields=[("newfile", attr.ib(type=File, metadata={"callable": gather_output}))], bases=(ShellOutSpec,), ) shelly = ShellCommandTask(name="shelly", executable=cmd, output_spec=my_output_spec) @@ -1918,12 +1924,12 @@ def test_shell_cmd_outputspec_5(plugin, results_function): fields=[ ( "out1", - File, - dc.field( + attr.ib( + type=File, metadata={ "output_file_template": "{args}", "help_string": "output file", - } + }, ), ) ], @@ -1954,12 +1960,12 @@ def test_shell_cmd_state_outputspec_1(plugin, results_function): fields=[ ( "out1", - File, - dc.field( + attr.ib( + type=File, metadata={ "output_file_template": "{args}", "help_string": "output file", - } + }, ), ) ], @@ -2041,207 +2047,211 @@ def change_name(file): fields=[ ( "in_file", - ty.Union[File, str], - dc.field( + attr.ib( + type=File, metadata={ "help_string": "input file to skull strip", "position": 1, "mandatory": True, - } + }, ), ), ( "out_file", - tuple, - dc.field( + attr.ib( + type=str, metadata={ "help_string": "name of output skull stripped image", "position": 2, - "output_file_template": ("{in_file}", "_brain"), - } + "output_file_template": "{in_file}_brain", + }, ), ), ( "outline", - bool, - dc.field( + attr.ib( + type=bool, metadata={ "help_string": "create surface outline image", "argstr": "-o", - } + }, ), ), ( "mask", - bool, - dc.field( - metadata={"help_string": "create binary mask image", "argstr": "-m"} + attr.ib( + type=bool, + metadata={ + "help_string": "create binary mask image", + "argstr": "-m", + }, ), ), ( "skull", - bool, - dc.field( - metadata={"help_string": "create skull image", "argstr": "-s"} + attr.ib( + type=bool, + metadata={"help_string": "create skull image", "argstr": "-s"}, ), ), ( "no_output", - bool, - dc.field( + attr.ib( + type=bool, metadata={ "help_string": "Don't generate segmented output", "argstr": "-n", - } + }, ), ), ( "frac", - float, - dc.field( + attr.ib( + type=float, metadata={ "help_string": "fractional intensity threshold", "argstr": "-f", - } + }, ), ), ( "vertical_gradient", - float, - dc.field( + attr.ib( + type=float, metadata={ "help_string": "vertical gradient in fractional intensity threshold (-1, 1)", "argstr": "-g", "allowed_values": {"min_val": -1, "max_val": 1}, - } + }, ), ), ( "radius", - int, - dc.field(metadata={"argstr": "-r", "help_string": "head radius"}), + attr.ib( + type=int, metadata={"argstr": "-r", "help_string": "head radius"} + ), ), ( "center", - ty.List[int], - dc.field( + attr.ib( + type=ty.List[int], metadata={ "help_string": "center of gravity in voxels", "argstr": "-c", "allowed_values": {"min_value": 0, "max_value": 3}, - } + }, ), ), ( "threshold", - bool, - dc.field( + attr.ib( + type=bool, metadata={ "argstr": "-t", "help_string": "apply thresholding to segmented brain image and mask", - } + }, ), ), ( "mesh", - bool, - dc.field( + attr.ib( + type=bool, metadata={ "argstr": "-e", - "help_sting": "generate a vtk mesh brain surface", - } + "help_string": "generate a vtk mesh brain surface", + }, ), ), ( "robust", - bool, - dc.field( + attr.ib( + type=bool, metadata={ "help_string": "robust brain centre estimation (iterates BET several times)", "argstr": "-R", "xor": _xor_inputs, - } + }, ), ), ( "padding", - bool, - dc.field( + attr.ib( + type=bool, metadata={ "help_string": "improve BET if FOV is very small in Z (by temporarily padding end slices", "argstr": "-Z", "xor": _xor_inputs, - } + }, ), ), ( "remove_eyes", - bool, - dc.field( + attr.ib( + type=bool, metadata={ "help_string": "eye & optic nerve cleanup (can be useful in SIENA)", "argstr": "-S", "xor": _xor_inputs, - } + }, ), ), ( "surfaces", - bool, - dc.field( + attr.ib( + type=bool, metadata={ "help_string": "run bet2 and then betsurf to get additional skull and scalp surfaces (includes registrations)", "argstr": "-A", "xor": _xor_inputs, - } + }, ), ), ( "t2_guided", - ty.Union[File, str], - dc.field( + attr.ib( + type=ty.Union[File, str], metadata={ "help_string": "as with creating surfaces, when also feeding in non-brain-extracted T2 (includes registrations)", "argstr": "-A2", "xor": _xor_inputs, - } + }, ), ), ( "functional", - bool, - dc.field( + attr.ib( + type=bool, metadata={ "argstr": "-F", "xor": _xor_inputs, "help_string": "apply to 4D fMRI data", - } + }, ), ), ( "reduce_bias", - bool, - dc.field( + attr.ib( + type=bool, metadata={ "argstr": "-B", "xor": _xor_inputs, "help_string": "bias field and neck cleanup", - } + }, ), ) - # ("number_classes", int, dc.field(metadata={"help_string": 'number of tissue-type classes', "argstr": '-n', + # ("number_classes", int, attr.ib(metadata={"help_string": 'number of tissue-type classes', "argstr": '-n', # "allowed_values": {"min_val": 1, "max_val": 10}})), # ("output_biasfield", bool, - # dc.field(metadata={"help_string": 'output estimated bias field', "argstr": '-b'})), + # attr.ib(metadata={"help_string": 'output estimated bias field', "argstr": '-b'})), # ("output_biascorrected", bool, - # dc.field(metadata={"help_string": 'output restored image (bias-corrected image)', "argstr": '-B'})), + # attr.ib(metadata={"help_string": 'output restored image (bias-corrected image)', "argstr": '-B'})), ], bases=(ShellSpec,), ) # TODO: not sure why this has to be string - in_file = str(Path(os.path.dirname(os.path.abspath(__file__))) / "data" / "foo.nii") - out_file = str( + in_file = Path(os.path.dirname(os.path.abspath(__file__))) / "data" / "foo.nii" + out_file = ( Path(os.path.dirname(os.path.abspath(__file__))) / "data" / "foo_brain.nii" ) # separate command into exec + args @@ -2249,5 +2259,5 @@ def change_name(file): name="bet_task", executable="bet", in_file=in_file, input_spec=bet_input_spec ) assert shelly.inputs.executable == "bet" - assert shelly.cmdline == f"bet {in_file} {out_file}" - res = shelly(plugin="cf") + assert shelly.cmdline == f"bet {in_file} {in_file}_brain" + # res = shelly(plugin="cf") diff --git a/pydra/engine/tests/test_singularity.py b/pydra/engine/tests/test_singularity.py index 3a718cbf3c..7f7cabe303 100644 --- a/pydra/engine/tests/test_singularity.py +++ b/pydra/engine/tests/test_singularity.py @@ -3,7 +3,7 @@ import os, shutil import subprocess as sp import pytest -import dataclasses as dc +import attr from ..task import SingularityTask, DockerTask from ..submitter import Submitter @@ -26,13 +26,13 @@ @need_singularity -def test_singularity_1_nosubm(): +def test_singularity_1_nosubm(tmpdir): """ simple command in a container, a default bindings and working directory is added no submitter """ cmd = "pwd" image = "library://sylabsed/linux/alpine" - singu = SingularityTask(name="singu", executable=cmd, image=image) + singu = SingularityTask(name="singu", executable=cmd, image=image, cache_dir=tmpdir) assert singu.inputs.image == "library://sylabsed/linux/alpine" assert singu.inputs.container == "singularity" assert ( @@ -46,13 +46,13 @@ def test_singularity_1_nosubm(): @need_singularity -def test_singularity_2_nosubm(): +def test_singularity_2_nosubm(tmpdir): """ a command with arguments, cmd and args given as executable no submitter """ cmd = ["echo", "hail", "pydra"] image = "library://sylabsed/linux/alpine" - singu = SingularityTask(name="singu", executable=cmd, image=image) + singu = SingularityTask(name="singu", executable=cmd, image=image, cache_dir=tmpdir) assert ( singu.cmdline == f"singularity exec -B {singu.output_dir}:/output_pydra:rw {image} {' '.join(cmd)}" @@ -65,13 +65,13 @@ def test_singularity_2_nosubm(): @need_singularity @pytest.mark.parametrize("plugin", Plugins) -def test_singularity_2(plugin): +def test_singularity_2(plugin, tmpdir): """ a command with arguments, cmd and args given as executable using submitter """ cmd = ["echo", "hail", "pydra"] image = "library://sylabsed/linux/alpine" - singu = SingularityTask(name="singu", executable=cmd, image=image) + singu = SingularityTask(name="singu", executable=cmd, image=image, cache_dir=tmpdir) assert ( singu.cmdline == f"singularity exec -B {singu.output_dir}:/output_pydra:rw {image} {' '.join(cmd)}" @@ -86,7 +86,7 @@ def test_singularity_2(plugin): @need_singularity @pytest.mark.parametrize("plugin", Plugins) -def test_singularity_2a(plugin): +def test_singularity_2a(plugin, tmpdir): """ a command with arguments, using executable and args using submitter """ @@ -95,7 +95,7 @@ def test_singularity_2a(plugin): # separate command into exec + args image = "library://sylabsed/linux/alpine" singu = SingularityTask( - name="singu", executable=cmd_exec, args=cmd_args, image=image + name="singu", executable=cmd_exec, args=cmd_args, image=image, cache_dir=tmpdir ) assert ( singu.cmdline @@ -119,7 +119,7 @@ def test_singularity_3(plugin, tmpdir): tmpdir.mkdir("new_dir") cmd = ["ls", "/tmp_dir"] image = "library://sylabsed/linux/alpine" - singu = SingularityTask(name="singu", executable=cmd, image=image) + singu = SingularityTask(name="singu", executable=cmd, image=image, cache_dir=tmpdir) # binding tmp directory to the container singu.inputs.bindings = [(str(tmpdir), "/tmp_dir", "ro")] @@ -127,7 +127,7 @@ def test_singularity_3(plugin, tmpdir): singu(submitter=sub) res = singu.result() - assert res.output.stdout == "new_dir\n" + assert "new_dir\n" in res.output.stdout assert res.output.return_code == 0 @@ -136,15 +136,15 @@ def test_singularity_3(plugin, tmpdir): @need_singularity @pytest.mark.parametrize("plugin", Plugins) -def test_singularity_st_1(plugin): +def test_singularity_st_1(plugin, tmpdir): """ commands without arguments in container splitter = executable """ cmd = ["pwd", "ls"] image = "library://sylabsed/linux/alpine" - singu = SingularityTask(name="singu", executable=cmd, image=image).split( - "executable" - ) + singu = SingularityTask( + name="singu", executable=cmd, image=image, cache_dir=tmpdir + ).split("executable") assert singu.state.splitter == "singu.executable" res = singu(plugin=plugin) @@ -155,13 +155,15 @@ def test_singularity_st_1(plugin): @need_singularity @pytest.mark.parametrize("plugin", Plugins) -def test_singularity_st_2(plugin): +def test_singularity_st_2(plugin, tmpdir): """ command with arguments in docker, checking the distribution splitter = image """ cmd = ["cat", "/etc/issue"] image = ["library://sylabsed/linux/alpine", "library://sylabsed/examples/lolcow"] - singu = SingularityTask(name="singu", executable=cmd, image=image).split("image") + singu = SingularityTask( + name="singu", executable=cmd, image=image, cache_dir=tmpdir + ).split("image") assert singu.state.splitter == "singu.image" res = singu(plugin=plugin) @@ -172,14 +174,14 @@ def test_singularity_st_2(plugin): @need_singularity @pytest.mark.parametrize("plugin", Plugins) -def test_singularity_st_3(plugin): +def test_singularity_st_3(plugin, tmpdir): """ outer splitter image and executable """ cmd = ["pwd", ["cat", "/etc/issue"]] image = ["library://sylabsed/linux/alpine", "library://sylabsed/examples/lolcow"] - singu = SingularityTask(name="singu", executable=cmd, image=image).split( - ["image", "executable"] - ) + singu = SingularityTask( + name="singu", executable=cmd, image=image, cache_dir=tmpdir + ).split(["image", "executable"]) assert singu.state.splitter == ["singu.image", "singu.executable"] res = singu(plugin=plugin) @@ -200,7 +202,7 @@ def test_wf_singularity_1(plugin, tmpdir): f.write("hello from pydra") image = "library://sylabsed/linux/alpine" - wf = Workflow(name="wf", input_spec=["cmd1", "cmd2"]) + wf = Workflow(name="wf", input_spec=["cmd1", "cmd2"], cache_dir=tmpdir) wf.inputs.cmd1 = ["cat", "/tmp_dir/file_pydra.txt"] wf.inputs.cmd2 = ["echo", "message from the previous task:"] wf.add( @@ -243,7 +245,7 @@ def test_wf_singularity_1a(plugin, tmpdir): image_sing = "library://sylabsed/linux/alpine" image_doc = "ubuntu" - wf = Workflow(name="wf", input_spec=["cmd1", "cmd2"]) + wf = Workflow(name="wf", input_spec=["cmd1", "cmd2"], cache_dir=tmpdir) wf.inputs.cmd1 = ["cat", "/tmp_dir/file_pydra.txt"] wf.inputs.cmd2 = ["echo", "message from the previous task:"] wf.add( @@ -292,7 +294,11 @@ def test_singularity_outputspec_1(plugin, tmpdir): bases=(ShellOutSpec,), ) singu = SingularityTask( - name="singu", image=image, executable=cmd, output_spec=my_output_spec + name="singu", + image=image, + executable=cmd, + output_spec=my_output_spec, + cache_dir=tmpdir, ) with Submitter(plugin=plugin) as sub: @@ -322,13 +328,13 @@ def test_singularity_inputspec_1(plugin, tmpdir): fields=[ ( "file", - File, - dc.field( + attr.ib( + type=File, metadata={ "mandatory": True, "position": 1, "help_string": "input file", - } + }, ), ) ], @@ -342,6 +348,7 @@ def test_singularity_inputspec_1(plugin, tmpdir): file=filename, input_spec=my_input_spec, strip=True, + cache_dir=tmpdir, ) res = singu() @@ -366,8 +373,8 @@ def test_singularity_inputspec_1a(plugin, tmpdir): fields=[ ( "file", - File, - dc.field( + attr.ib( + type=File, default=filename, metadata={"position": 1, "help_string": "input file"}, ), @@ -377,7 +384,12 @@ def test_singularity_inputspec_1a(plugin, tmpdir): ) singu = SingularityTask( - name="singu", image=image, executable=cmd, input_spec=my_input_spec, strip=True + name="singu", + image=image, + executable=cmd, + input_spec=my_input_spec, + strip=True, + cache_dir=tmpdir, ) res = singu() @@ -404,13 +416,14 @@ def test_singularity_inputspec_2(plugin, tmpdir): fields=[ ( "file1", - File, - dc.field(metadata={"position": 1, "help_string": "input file 1"}), + attr.ib( + type=File, metadata={"position": 1, "help_string": "input file 1"} + ), ), ( "file2", - File, - dc.field( + attr.ib( + type=File, default=filename_2, metadata={"position": 2, "help_string": "input file 2"}, ), @@ -426,6 +439,7 @@ def test_singularity_inputspec_2(plugin, tmpdir): file1=filename_1, input_spec=my_input_spec, strip=True, + cache_dir=tmpdir, ) res = singu() @@ -454,31 +468,33 @@ def test_singularity_inputspec_2a_except(plugin, tmpdir): fields=[ ( "file1", - File, - dc.field( + attr.ib( + type=File, default=filename_1, metadata={"position": 1, "help_string": "input file 1"}, ), ), ( "file2", - File, - dc.field(metadata={"position": 2, "help_string": "input file 2"}), + attr.ib( + type=File, metadata={"position": 2, "help_string": "input file 2"} + ), ), ], bases=(SingularitySpec,), ) - with pytest.raises(TypeError) as excinfo: - singu = SingularityTask( - name="singu", - image=image, - executable=cmd, - file2=filename_2, - input_spec=my_input_spec, - strip=True, - ) - assert "non-default argument 'file2' follows default argument" == str(excinfo.value) + singu = SingularityTask( + name="singu", + image=image, + executable=cmd, + file2=filename_2, + input_spec=my_input_spec, + strip=True, + cache_dir=tmpdir, + ) + res = singu() + assert res.output.stdout == "hello from pydra\nhave a nice one" @need_singularity @@ -504,19 +520,20 @@ def test_singularity_inputspec_2a(plugin, tmpdir): fields=[ ( "file1", - File, - dc.field( + attr.ib( + type=File, metadata={ "default_value": filename_1, "position": 1, "help_string": "input file 1", - } + }, ), ), ( "file2", - File, - dc.field(metadata={"position": 2, "help_string": "input file 2"}), + attr.ib( + type=File, metadata={"position": 2, "help_string": "input file 2"} + ), ), ], bases=(SingularitySpec,), @@ -529,6 +546,7 @@ def test_singularity_inputspec_2a(plugin, tmpdir): file2=filename_2, input_spec=my_input_spec, strip=True, + cache_dir=tmpdir, ) res = singu() @@ -554,24 +572,24 @@ def test_singularity_cmd_inputspec_copyfile_1(plugin, tmpdir): fields=[ ( "orig_file", - File, - dc.field( + attr.ib( + type=File, metadata={ "position": 1, "help_string": "orig file", "mandatory": True, "copyfile": True, - } + }, ), ), ( "out_file", - str, - dc.field( + attr.ib( + type=str, metadata={ "output_file_template": "{orig_file}", "help_string": "output file", - } + }, ), ), ], @@ -584,6 +602,7 @@ def test_singularity_cmd_inputspec_copyfile_1(plugin, tmpdir): executable=cmd, input_spec=my_input_spec, orig_file=str(file), + cache_dir=tmpdir, ) res = singu() @@ -620,13 +639,13 @@ def test_singularity_inputspec_state_1(plugin, tmpdir): fields=[ ( "file", - File, - dc.field( + attr.ib( + type=File, metadata={ "mandatory": True, "position": 1, "help_string": "input file", - } + }, ), ) ], @@ -640,6 +659,7 @@ def test_singularity_inputspec_state_1(plugin, tmpdir): file=filename, input_spec=my_input_spec, strip=True, + cache_dir=tmpdir, ).split("file") res = singu() @@ -670,13 +690,13 @@ def test_singularity_inputspec_state_1b(plugin, tmpdir): fields=[ ( "file", - File, - dc.field( + attr.ib( + type=File, metadata={ "mandatory": True, "position": 1, "help_string": "input file", - } + }, ), ) ], @@ -690,6 +710,7 @@ def test_singularity_inputspec_state_1b(plugin, tmpdir): file=filename, input_spec=my_input_spec, strip=True, + cache_dir=tmpdir, ).split("file") res = singu() @@ -713,20 +734,20 @@ def test_singularity_wf_inputspec_1(plugin, tmpdir): fields=[ ( "file", - File, - dc.field( + attr.ib( + type=File, metadata={ "mandatory": True, "position": 1, "help_string": "input file", - } + }, ), ) ], bases=(SingularitySpec,), ) - wf = Workflow(name="wf", input_spec=["cmd", "file"]) + wf = Workflow(name="wf", input_spec=["cmd", "file"], cache_dir=tmpdir) wf.inputs.cmd = cmd wf.inputs.file = filename @@ -769,20 +790,20 @@ def test_singularity_wf_state_inputspec_1(plugin, tmpdir): fields=[ ( "file", - File, - dc.field( + attr.ib( + type=File, metadata={ "mandatory": True, "position": 1, "help_string": "input file", - } + }, ), ) ], bases=(SingularitySpec,), ) - wf = Workflow(name="wf", input_spec=["cmd", "file"]) + wf = Workflow(name="wf", input_spec=["cmd", "file"], cache_dir=tmpdir) wf.inputs.cmd = cmd wf.inputs.file = filename @@ -827,20 +848,20 @@ def test_singularity_wf_ndst_inputspec_1(plugin, tmpdir): fields=[ ( "file", - File, - dc.field( + attr.ib( + type=File, metadata={ "mandatory": True, "position": 1, "help_string": "input file", - } + }, ), ) ], bases=(SingularitySpec,), ) - wf = Workflow(name="wf", input_spec=["cmd", "file"]) + wf = Workflow(name="wf", input_spec=["cmd", "file"], cache_dir=tmpdir) wf.inputs.cmd = cmd wf.inputs.file = filename diff --git a/pydra/engine/tests/test_specs.py b/pydra/engine/tests/test_specs.py index 2f08da031c..ff8f3aa0db 100644 --- a/pydra/engine/tests/test_specs.py +++ b/pydra/engine/tests/test_specs.py @@ -43,7 +43,7 @@ def test_result(): def test_shellspec(): with pytest.raises(TypeError): spec = ShellSpec() - spec = ShellSpec("ls", None) # (executable, args) + spec = ShellSpec(executable="ls") # (executable, args) assert hasattr(spec, "executable") assert hasattr(spec, "args") @@ -51,38 +51,28 @@ def test_shellspec(): container_attrs = ["image", "container", "container_xargs", "bindings"] -@pytest.mark.xfail( - reason="won't work after changes in input_spec, " - "all fields would have to be provided" -) def test_container(): with pytest.raises(TypeError): spec = ContainerSpec() - spec = ContainerSpec("ls", None, "busybox", None) # (execut., args, image, cont) + spec = ContainerSpec( + executable="ls", image="busybox", container="docker" + ) # (execut., args, image, cont) assert all([hasattr(spec, attr) for attr in container_attrs]) assert hasattr(spec, "executable") -@pytest.mark.xfail( - reason="won't work after changes in input_spec, " - "all fields would have to be provided" -) def test_docker(): with pytest.raises(TypeError): - spec = DockerSpec("ls", None) - spec = DockerSpec("ls", None, "busybox") + spec = DockerSpec(executable="ls") + spec = DockerSpec(executable="ls", image="busybox") assert all(hasattr(spec, attr) for attr in container_attrs) assert getattr(spec, "container") == "docker" -@pytest.mark.xfail( - reason="won't work after changes in input_spec, " - "all fields would have to be provided" -) def test_singularity(): with pytest.raises(TypeError): spec = SingularitySpec() - spec = SingularitySpec("ls", None, "busybox") + spec = SingularitySpec(executable="ls", image="busybox") assert all(hasattr(spec, attr) for attr in container_attrs) assert getattr(spec, "container") == "singularity" @@ -170,7 +160,7 @@ def test_file_hash(tmpdir): input_spec = SpecInfo(name="Inputs", fields=fields, bases=(BaseSpec,)) inputs = make_klass(input_spec) assert ( - inputs(str(outfile)).hash + inputs(in_file=outfile).hash == "1384a1eb11cd94a5b826a82b948313b9237a0956d406ccff59e79ec92b3c935f" ) with open(outfile, "wt") as fp: @@ -179,6 +169,6 @@ def test_file_hash(tmpdir): input_spec = SpecInfo(name="Inputs", fields=fields, bases=(BaseSpec,)) inputs = make_klass(input_spec) assert ( - inputs(outfile).hash + inputs(in_file=outfile).hash == "088625131e6718a00170ad445a9c295244dffd4e5d847c8ee4b1606d623dacb1" ) diff --git a/pydra/engine/tests/test_task.py b/pydra/engine/tests/test_task.py index 672e2cf614..ce78efe695 100644 --- a/pydra/engine/tests/test_task.py +++ b/pydra/engine/tests/test_task.py @@ -365,7 +365,7 @@ def test_docker_cmd(tmpdir): docky = DockerTask(name="docky", executable="pwd", image="busybox") assert ( docky.cmdline - == f"docker run -v {docky.output_dir}:/output_pydra:rw -w /output_pydra busybox pwd" + == f"docker run --rm -v {docky.output_dir}:/output_pydra:rw -w /output_pydra busybox pwd" ) docky.inputs.container_xargs = ["--rm -it"] assert ( diff --git a/setup.cfg b/setup.cfg index 35a2c9a085..dbd321a7c9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -23,6 +23,7 @@ classifiers = [options] python_requires = >= 3.7 install_requires = + attrs cloudpickle >= 0.8.0 filelock >= 3.0.0 etelemetry