diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000..3ed8cd175f --- /dev/null +++ b/.gitignore @@ -0,0 +1,16 @@ +pydra.egg-info +build +dist + +__pycache__ +*.pyc + +.ipynb_checkpoints +*.ipynb + +.coverage* +!.coveragerc +cov.xml + +.*.swp +*~ diff --git a/.style.yapf b/.style.yapf new file mode 100644 index 0000000000..9d209b37a8 --- /dev/null +++ b/.style.yapf @@ -0,0 +1,260 @@ +[style] +# Align closing bracket with visual indentation. +align_closing_bracket_with_visual_indent=True + +# Allow dictionary keys to exist on multiple lines. For example: +# +# x = { +# ('this is the first element of a tuple', +# 'this is the second element of a tuple'): +# value, +# } +allow_multiline_dictionary_keys=False + +# Allow lambdas to be formatted on more than one line. +allow_multiline_lambdas=False + +# Allow splits before the dictionary value. +allow_split_before_dict_value=True + +# Number of blank lines surrounding top-level function and class +# definitions. +blank_lines_around_top_level_definition=2 + +# Insert a blank line before a class-level docstring. +blank_line_before_class_docstring=False + +# Insert a blank line before a module docstring. +blank_line_before_module_docstring=False + +# Insert a blank line before a 'def' or 'class' immediately nested +# within another 'def' or 'class'. For example: +# +# class Foo: +# # <------ this blank line +# def method(): +# ... +blank_line_before_nested_class_or_def=False + +# Do not split consecutive brackets. Only relevant when +# dedent_closing_brackets is set. For example: +# +# call_func_that_takes_a_dict( +# { +# 'key1': 'value1', +# 'key2': 'value2', +# } +# ) +# +# would reformat to: +# +# call_func_that_takes_a_dict({ +# 'key1': 'value1', +# 'key2': 'value2', +# }) +coalesce_brackets=False + +# The column limit. +column_limit=99 + +# The style for continuation alignment. Possible values are: +# +# - SPACE: Use spaces for continuation alignment. This is default behavior. +# - FIXED: Use fixed number (CONTINUATION_INDENT_WIDTH) of columns +# (ie: CONTINUATION_INDENT_WIDTH/INDENT_WIDTH tabs) for continuation +# alignment. +# - LESS: Slightly left if cannot vertically align continuation lines with +# indent characters. +# - VALIGN-RIGHT: Vertically align continuation lines with indent +# characters. Slightly right (one more indent character) if cannot +# vertically align continuation lines with indent characters. +# +# For options FIXED, and VALIGN-RIGHT are only available when USE_TABS is +# enabled. +continuation_align_style=SPACE + +# Indent width used for line continuations. +continuation_indent_width=4 + +# Put closing brackets on a separate line, dedented, if the bracketed +# expression can't fit in a single line. Applies to all kinds of brackets, +# including function definitions and calls. For example: +# +# config = { +# 'key1': 'value1', +# 'key2': 'value2', +# } # <--- this bracket is dedented and on a separate line +# +# time_series = self.remote_client.query_entity_counters( +# entity='dev3246.region1', +# key='dns.query_latency_tcp', +# transform=Transformation.AVERAGE(window=timedelta(seconds=60)), +# start_ts=now()-timedelta(days=3), +# end_ts=now(), +# ) # <--- this bracket is dedented and on a separate line +dedent_closing_brackets=False + +# Disable the heuristic which places each list element on a separate line +# if the list is comma-terminated. +disable_ending_comma_heuristic=False + +# Place each dictionary entry onto its own line. +each_dict_entry_on_separate_line=True + +# The regex for an i18n comment. The presence of this comment stops +# reformatting of that line, because the comments are required to be +# next to the string they translate. +i18n_comment= + +# The i18n function call names. The presence of this function stops +# reformattting on that line, because the string it has cannot be moved +# away from the i18n comment. +i18n_function_call= + +# Indent the dictionary value if it cannot fit on the same line as the +# dictionary key. For example: +# +# config = { +# 'key1': +# 'value1', +# 'key2': value1 + +# value2, +# } +indent_dictionary_value=False + +# The number of columns to use for indentation. +indent_width=4 + +# Join short lines into one line. E.g., single line 'if' statements. +join_multiple_lines=True + +# Do not include spaces around selected binary operators. For example: +# +# 1 + 2 * 3 - 4 / 5 +# +# will be formatted as follows when configured with "*,/": +# +# 1 + 2*3 - 4/5 +# +no_spaces_around_selected_binary_operators= + +# Use spaces around default or named assigns. +spaces_around_default_or_named_assign=False + +# Use spaces around the power operator. +spaces_around_power_operator=False + +# The number of spaces required before a trailing comment. +spaces_before_comment=2 + +# Insert a space between the ending comma and closing bracket of a list, +# etc. +space_between_ending_comma_and_closing_bracket=True + +# Split before arguments +split_all_comma_separated_values=False + +# Split before arguments if the argument list is terminated by a +# comma. +split_arguments_when_comma_terminated=False + +# Set to True to prefer splitting before '&', '|' or '^' rather than +# after. +split_before_bitwise_operator=True + +# Split before the closing bracket if a list or dict literal doesn't fit on +# a single line. +split_before_closing_bracket=True + +# Split before a dictionary or set generator (comp_for). For example, note +# the split before the 'for': +# +# foo = { +# variable: 'Hello world, have a nice day!' +# for variable in bar if variable != 42 +# } +split_before_dict_set_generator=True + +# Split before the '.' if we need to split a longer expression: +# +# foo = ('This is a really long string: {}, {}, {}, {}'.format(a, b, c, d)) +# +# would reformat to something like: +# +# foo = ('This is a really long string: {}, {}, {}, {}' +# .format(a, b, c, d)) +split_before_dot=False + +# Split after the opening paren which surrounds an expression if it doesn't +# fit on a single line. +split_before_expression_after_opening_paren=False + +# If an argument / parameter list is going to be split, then split before +# the first argument. +split_before_first_argument=False + +# Set to True to prefer splitting before 'and' or 'or' rather than +# after. +split_before_logical_operator=True + +# Split named assignments onto individual lines. +split_before_named_assigns=True + +# Set to True to split list comprehensions and generators that have +# non-trivial expressions and multiple clauses before each of these +# clauses. For example: +# +# result = [ +# a_long_var + 100 for a_long_var in xrange(1000) +# if a_long_var % 10] +# +# would reformat to something like: +# +# result = [ +# a_long_var + 100 +# for a_long_var in xrange(1000) +# if a_long_var % 10] +split_complex_comprehension=False + +# The penalty for splitting right after the opening bracket. +split_penalty_after_opening_bracket=30 + +# The penalty for splitting the line after a unary operator. +split_penalty_after_unary_operator=10000 + +# The penalty for splitting right before an if expression. +split_penalty_before_if_expr=0 + +# The penalty of splitting the line around the '&', '|', and '^' +# operators. +split_penalty_bitwise_operator=300 + +# The penalty for splitting a list comprehension or generator +# expression. +split_penalty_comprehension=80 + +# The penalty for characters over the column limit. +split_penalty_excess_character=7000 + +# The penalty incurred by adding a line split to the unwrapped line. The +# more line splits added the higher the penalty. +split_penalty_for_added_line_split=30 + +# The penalty of splitting a list of "import as" names. For example: +# +# from a_very_long_or_indented_module_name_yada_yad import (long_argument_1, +# long_argument_2, +# long_argument_3) +# +# would reformat to something like: +# +# from a_very_long_or_indented_module_name_yada_yad import ( +# long_argument_1, long_argument_2, long_argument_3) +split_penalty_import_names=0 + +# The penalty of splitting the line around the 'and' and 'or' +# operators. +split_penalty_logical_operator=300 + +# Use the Tab character for indentation. +use_tabs=False diff --git a/Makefile b/Makefile new file mode 100644 index 0000000000..bbc1623961 --- /dev/null +++ b/Makefile @@ -0,0 +1,25 @@ +install: + python setup.py install + +dist: clean + python setup.py sdist bdist_wheel + +clean-pyc: + find . -name '*.pyc' -type f -exec rm {} + + find . -name '*.pyo' -type f -exec rm {} + + find . -name '__pycache__' -type d -exec rm --recursive {} + + +clean-build: + rm --recursive --force build/ + rm --recursive --force dist/ + +clean: clean-pyc clean-build + +format: + yapf --parallel --in-place --recursive --exclude 'pydra/_version.py' pydra + +lint: + flake8 + +test: clean-pyc + py.test -vs -n auto --cov pydra --cov-config .coveragerc --cov-report xml:cov.xml --doctest-modules pydra diff --git a/pydra/__about__.py b/pydra/__about__.py index ad504098fb..e8a8230245 100644 --- a/pydra/__about__.py +++ b/pydra/__about__.py @@ -8,14 +8,10 @@ del get_versions CLASSIFIERS = [ - 'Development Status :: 2 - Pre-Alpha', - 'Environment :: Console', - 'Intended Audience :: Science/Research', - 'License :: OSI Approved :: Apache Software License', - 'Operating System :: MacOS :: MacOS X', - 'Operating System :: POSIX :: Linux', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7', + 'Development Status :: 2 - Pre-Alpha', 'Environment :: Console', + 'Intended Audience :: Science/Research', 'License :: OSI Approved :: Apache Software License', + 'Operating System :: MacOS :: MacOS X', 'Operating System :: POSIX :: Linux', + 'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.7', 'Topic :: Scientific/Engineering' ] @@ -45,9 +41,8 @@ __longdesc__ = long_description __url__ = 'https://github.com/nipype/pydra' -DOWNLOAD_URL = ( - 'http://github.com/nipype/{name}/archives/{ver}.tar.gz'.format( - name=__packagename__, ver=__version__)) +DOWNLOAD_URL = ('http://github.com/nipype/{name}/archives/{ver}.tar.gz'.format( + name=__packagename__, ver=__version__)) PLATFORMS = 'OS Independent' MAJOR = __version__.split('.')[0] MINOR = __version__.split('.')[1] @@ -64,8 +59,7 @@ SETUP_REQUIRES = ['setuptools>=27.0'] TESTS_REQUIRES = ['pytest-cov', 'codecov', 'pytest-env', 'pytest-xdist'] -LINKS_REQUIRES = [ -] +LINKS_REQUIRES = [] EXTRA_REQUIRES = { 'tests': TESTS_REQUIRES, diff --git a/pydra/__init__.py b/pydra/__init__.py index 6fda8d2f65..38d0e8dd75 100644 --- a/pydra/__init__.py +++ b/pydra/__init__.py @@ -1,12 +1,2 @@ -from .__about__ import ( - __version__, - __author__, - __license__, - __maintainer__, - __email__, - __status__, - __url__, - __packagename__, - __description__, - __longdesc__ -) +from .__about__ import (__version__, __author__, __license__, __maintainer__, __email__, + __status__, __url__, __packagename__, __description__, __longdesc__) diff --git a/pydra/engine/auxiliary.py b/pydra/engine/auxiliary.py index 5d3bcde9fd..09e2e6fed1 100644 --- a/pydra/engine/auxiliary.py +++ b/pydra/engine/auxiliary.py @@ -6,6 +6,7 @@ # dj: might create a new class or move to State + # Function to change user provided mapper to "reverse polish notation" used in State def mapper2rpn(mapper, other_mappers=None): """ Functions that translate mapper to "reverse polish notation.""" @@ -57,11 +58,13 @@ def _ordering(el, i, output_mapper, current_sign=None, other_mappers=None): def _iterate_list(element, sign, other_mappers, output_mapper): """ Used in the mapper2rpn to get recursion. """ for i, el in enumerate(element): - _ordering(el, i, current_sign=sign, other_mappers=other_mappers, output_mapper=output_mapper) + _ordering( + el, i, current_sign=sign, other_mappers=other_mappers, output_mapper=output_mapper) # functions used in State to know which element should be used for a specific axis + def mapping_axis(state_inputs, mapper_rpn): """Having inputs and mapper (in rpn notation), functions returns the axes of output for every input.""" axis_for_input = {} @@ -74,7 +77,8 @@ def mapping_axis(state_inputs, mapper_rpn): right = stack.pop() left = stack.pop() if left == "OUT": - if state_inputs[right].shape == current_shape: #todo:should we allow for one-element array? + if state_inputs[ + right].shape == current_shape: #todo:should we allow for one-element array? axis_for_input[right] = current_axis else: raise Exception("arrays for scalar operations should have the same size") @@ -100,27 +104,33 @@ def mapping_axis(state_inputs, mapper_rpn): right = stack.pop() left = stack.pop() if left == "OUT": - axis_for_input[right] = [i + 1 + current_axis[-1] - for i in range(state_inputs[right].ndim)] + axis_for_input[right] = [ + i + 1 + current_axis[-1] for i in range(state_inputs[right].ndim) + ] current_axis = current_axis + axis_for_input[right] current_shape = tuple([i for i in current_shape + state_inputs[right].shape]) elif right == "OUT": for key in axis_for_input: - axis_for_input[key] = [i + state_inputs[left].ndim - for i in axis_for_input[key]] - - axis_for_input[left] = [i - len(current_shape) + current_axis[-1] + 1 - for i in range(state_inputs[left].ndim)] - current_axis = current_axis + [i + 1 + current_axis[-1] - for i in range(state_inputs[left].ndim)] + axis_for_input[key] = [ + i + state_inputs[left].ndim for i in axis_for_input[key] + ] + + axis_for_input[left] = [ + i - len(current_shape) + current_axis[-1] + 1 + for i in range(state_inputs[left].ndim) + ] + current_axis = current_axis + [ + i + 1 + current_axis[-1] for i in range(state_inputs[left].ndim) + ] current_shape = tuple([i for i in state_inputs[left].shape + current_shape]) else: axis_for_input[left] = list(range(state_inputs[left].ndim)) - axis_for_input[right] = [i + state_inputs[left].ndim - for i in range(state_inputs[right].ndim)] + axis_for_input[right] = [ + i + state_inputs[left].ndim for i in range(state_inputs[right].ndim) + ] current_axis = axis_for_input[left] + axis_for_input[right] - current_shape = tuple([i for i in - state_inputs[left].shape + state_inputs[right].shape]) + current_shape = tuple( + [i for i in state_inputs[left].shape + state_inputs[right].shape]) stack.append("OUT") else: @@ -159,6 +169,7 @@ def converting_axis2input(state_inputs, axis_for_input, ndim): # used in the Node to change names in a mapper + def change_mapper(mapper, name): """changing names of mapper: adding names of the node""" if isinstance(mapper, str): @@ -191,8 +202,10 @@ def _add_name(mlist, name): #Function interface + class FunctionInterface(object): """ A new function interface """ + def __init__(self, function, output_nm, out_read=False, input_map=None): self.function = function if type(output_nm) is list: @@ -208,7 +221,6 @@ def __init__(self, function, output_nm, out_read=False, input_map=None): # flags if we want to read the txt file to save in node.output self.out_read = out_read - def run(self, input): self.output = {} if self.input_map: @@ -225,7 +237,7 @@ def run(self, input): self.output[self._output_nm[i]] = out else: raise Exception("length of output_nm doesnt match length of the function output") - elif len(self._output_nm)==1: + elif len(self._output_nm) == 1: self.output[self._output_nm[0]] = fun_output else: raise Exception("output_nm doesnt match length of the function output") @@ -238,10 +250,12 @@ def run(self, input): # https://stackoverflow.com/questions/2352181/how-to-use-a-dot-to-access-members-of-dictionary class DotDict(dict): """dot.notation access to dictionary attributes""" + def __getattr__(self, attr): return self.get(attr) - __setattr__= dict.__setitem__ - __delattr__= dict.__delitem__ + + __setattr__ = dict.__setitem__ + __delattr__ = dict.__delitem__ def __getstate__(self): return self diff --git a/pydra/engine/node.py b/pydra/engine/node.py index 9c92d411d9..bede18ac63 100644 --- a/pydra/engine/node.py +++ b/pydra/engine/node.py @@ -5,7 +5,6 @@ import networkx as nx import numpy as np - from nipype.utils.filemanip import loadpkl from nipype import logging @@ -15,8 +14,14 @@ class NodeBase(object): - def __init__(self, name, mapper=None, inputs=None, other_mappers=None, - write_state=True, *args, **kwargs): + def __init__(self, + name, + mapper=None, + inputs=None, + other_mappers=None, + write_state=True, + *args, + **kwargs): """A base structure for nodes in the computational graph (i.e. both ``Node`` and ``Workflow``). @@ -52,7 +57,8 @@ def __init__(self, name, mapper=None, inputs=None, other_mappers=None, self._mapper = mapper self._other_mappers = other_mappers # create state (takes care of mapper, connects inputs with axes, so we can ask for specifc element) - self._state = state.State(mapper=self._mapper, node_name=self.name, other_mappers=self._other_mappers) + self._state = state.State( + mapper=self._mapper, node_name=self.name, other_mappers=self._other_mappers) self._output = {} self._result = {} # flag that says if the node/wf is ready to run (has all input) @@ -79,7 +85,8 @@ def mapper(self): def mapper(self, mapper): self._mapper = mapper # updating state - self._state = state.State(mapper=self._mapper, node_name=self.name, other_mappers=self._other_mappers) + self._state = state.State( + mapper=self._mapper, node_name=self.name, other_mappers=self._other_mappers) @property def inputs(self): @@ -88,9 +95,10 @@ def inputs(self): @inputs.setter def inputs(self, inputs): # Massage inputs dict - inputs = {".".join((self.name, key)): value - if not isinstance(value, list) else np.array(value) - for key, value in inputs.items()} + inputs = { + ".".join((self.name, key)): value if not isinstance(value, list) else np.array(value) + for key, value in inputs.items() + } self._inputs.update(inputs) self._state_inputs.update(inputs) @@ -102,7 +110,6 @@ def state_inputs(self): def state_inputs(self, state_inputs): self._state_inputs.update(state_inputs) - @property def output(self): return self._output @@ -113,11 +120,9 @@ def result(self): self._reading_results() return self._result - def prepare_state_input(self): self._state.prepare_state_input(state_inputs=self.state_inputs) - def map(self, mapper, inputs=None): if self._mapper: raise Exception("mapper is already set") @@ -129,23 +134,21 @@ def map(self, mapper, inputs=None): self._state_inputs.update(self.inputs) if mapper: # updating state if we have a new mapper - self._state = state.State(mapper=self._mapper, node_name=self.name, other_mappers=self._other_mappers) - + self._state = state.State( + mapper=self._mapper, node_name=self.name, other_mappers=self._other_mappers) def join(self, field, node=None): # TBD pass - def checking_input_el(self, ind): """checking if all inputs are available (for specific state element)""" try: self.get_input_el(ind) return True - except: #TODO specify + except: #TODO specify return False - # dj: this is not used for a single node def get_input_el(self, ind): """collecting all inputs required to run the node (for specific state element)""" @@ -155,19 +158,23 @@ def get_input_el(self, ind): state_dict = self.state.state_ind(ind) # reading extra inputs that come from previous nodes for (from_node, from_socket, to_socket) in self.needed_outputs: - dir_nm_el_from = "_".join(["{}:{}".format(i, j) for i, j in list(state_dict.items()) - if i in list(from_node._state_inputs.keys())]) + dir_nm_el_from = "_".join([ + "{}:{}".format(i, j) for i, j in list(state_dict.items()) + if i in list(from_node._state_inputs.keys()) + ]) if not from_node.mapper: dir_nm_el_from = "" if is_node(from_node) and is_current_interface(from_node.interface): - file_from = self._reading_ci_output(node=from_node, dir_nm_el=dir_nm_el_from, out_nm=from_socket) + file_from = self._reading_ci_output( + node=from_node, dir_nm_el=dir_nm_el_from, out_nm=from_socket) if file_from and os.path.exists(file_from): inputs_dict["{}.{}".format(self.name, to_socket)] = file_from else: raise Exception("{} doesnt exist".format(file_from)) - else: # assuming here that I want to read the file (will not be used with the current interfaces) - file_from = os.path.join(from_node.workingdir, dir_nm_el_from, from_socket+".txt") + else: # assuming here that I want to read the file (will not be used with the current interfaces) + file_from = os.path.join(from_node.workingdir, dir_nm_el_from, + from_socket + ".txt") with open(file_from) as f: content = f.readline() try: @@ -182,7 +189,8 @@ def _reading_ci_output(self, dir_nm_el, out_nm, node=None): if not node: node = self result_pklfile = os.path.join(os.getcwd(), node.workingdir, dir_nm_el, - node.interface.nn.name, "result_{}.pklz".format(node.interface.nn.name)) + node.interface.nn.name, "result_{}.pklz".format( + node.interface.nn.name)) if os.path.exists(result_pklfile): out_file = getattr(loadpkl(result_pklfile).outputs, out_nm) if os.path.exists(out_file): @@ -190,7 +198,6 @@ def _reading_ci_output(self, dir_nm_el, out_nm, node=None): return False - # checking if all outputs are saved @property def is_complete(self): @@ -201,7 +208,6 @@ def is_complete(self): else: return self._check_all_results() - def get_output(self): raise NotImplementedError @@ -211,7 +217,6 @@ def _check_all_results(self): def _reading_results(self): raise NotImplementedError - def _dict_tuple2list(self, container): if type(container) is dict: val_l = [val for (_, val) in container.items()] @@ -223,12 +228,26 @@ def _dict_tuple2list(self, container): class Node(NodeBase): - def __init__(self, name, interface, inputs=None, mapper=None, join_by=None, - workingdir=None, other_mappers=None, - output_names=None, write_state=True, *args, **kwargs): - super(Node, self).__init__(name=name, mapper=mapper, inputs=inputs, - other_mappers=other_mappers, write_state=write_state, - *args, **kwargs) + def __init__(self, + name, + interface, + inputs=None, + mapper=None, + join_by=None, + workingdir=None, + other_mappers=None, + output_names=None, + write_state=True, + *args, + **kwargs): + super(Node, self).__init__( + name=name, + mapper=mapper, + inputs=inputs, + other_mappers=other_mappers, + write_state=write_state, + *args, + **kwargs) # working directory for node, will be change if node is a part of a wf self.workingdir = workingdir @@ -237,7 +256,7 @@ def __init__(self, name, interface, inputs=None, mapper=None, join_by=None, if is_function_interface(self.interface): # adding node name to the interface's name mapping self.interface.input_map = dict((key, "{}.{}".format(self.name, value)) - for (key, value) in self.interface.input_map.items()) + for (key, value) in self.interface.input_map.items()) # list of output names taken from interface output name self.output_names = self.interface._output_nm elif is_current_interface(self.interface): @@ -246,8 +265,6 @@ def __init__(self, name, interface, inputs=None, mapper=None, join_by=None, if not self.output_names: self.output_names = [] - - # dj: not sure if I need it # def __deepcopy__(self, memo): # memo is a dict of id's to copies # id_self = id(self) # memoization avoids unnecesary recursion @@ -275,7 +292,7 @@ def run_interface_el(self, i, ind): dir_nm_el = "_".join(["{}:{}".format(i, j) for i, j in list(state_dict.items())]) print("Run interface el, dict={}".format(state_dict)) logger.debug("Run interface el, name={}, inputs_dict={}, state_dict={}".format( - self.name, inputs_dict, state_dict)) + self.name, inputs_dict, state_dict)) if is_function_interface(self.interface): res = self.interface.run(inputs_dict) output = self.interface.output @@ -285,8 +302,10 @@ def run_interface_el(self, i, ind): elif is_current_interface(self.interface): if not self.mapper: dir_nm_el = "" - res = self.interface.run(inputs=inputs_dict, base_dir=os.path.join(os.getcwd(), self.workingdir), - dir_nm_el=dir_nm_el) + res = self.interface.run( + inputs=inputs_dict, + base_dir=os.path.join(os.getcwd(), self.workingdir), + dir_nm_el=dir_nm_el) # TODO when join #if self._joinByKey: @@ -298,17 +317,15 @@ def run_interface_el(self, i, ind): # dir_nm_el = os.path.join(dir_join, dir_nm_el) return res - def _writting_results_tmp(self, state_dict, dir_nm_el, output): """temporary method to write the results in the files (this is usually part of a interface)""" if not self.mapper: dir_nm_el = '' os.makedirs(os.path.join(self.workingdir, dir_nm_el), exist_ok=True) for key_out, val_out in output.items(): - with open(os.path.join(self.workingdir, dir_nm_el, key_out+".txt"), "w") as fout: + with open(os.path.join(self.workingdir, dir_nm_el, key_out + ".txt"), "w") as fout: fout.write(str(val_out)) - def get_output(self): """collecting all outputs and updating self._output""" for key_out in self.output_names: @@ -348,7 +365,6 @@ def get_output(self): (state_dict, self._reading_ci_output(dir_nm_el="", out_nm=key_out)) return self._output - # dj: version without join def _check_all_results(self): """checking if all files that should be created are present""" @@ -363,7 +379,8 @@ def _check_all_results(self): for key_out in self.output_names: if is_function_interface(self.interface): - if not os.path.isfile(os.path.join(self.workingdir, dir_nm_el, key_out+".txt")): + if not os.path.isfile( + os.path.join(self.workingdir, dir_nm_el, key_out + ".txt")): return False elif is_current_interface(self.interface): if not self._reading_ci_output(dir_nm_el, key_out): @@ -371,7 +388,6 @@ def _check_all_results(self): self._is_complete = True return True - def _reading_results(self): """temporary: reading results from output files (that is now just txt) should be probably just reading output for self.output_names @@ -402,10 +418,19 @@ def _reading_results(self): class Workflow(NodeBase): - def __init__(self, name, inputs=None, wf_output_names=None, mapper=None, #join_by=None, - nodes=None, workingdir=None, write_state=True, *args, **kwargs): - super(Workflow, self).__init__(name=name, mapper=mapper, inputs=inputs, - write_state=write_state, *args, **kwargs) + def __init__( + self, + name, + inputs=None, + wf_output_names=None, + mapper=None, #join_by=None, + nodes=None, + workingdir=None, + write_state=True, + *args, + **kwargs): + super(Workflow, self).__init__( + name=name, mapper=mapper, inputs=inputs, write_state=write_state, *args, **kwargs) self.graph = nx.DiGraph() # all nodes in the workflow (probably will be removed) @@ -444,7 +469,6 @@ def __init__(self, name, inputs=None, wf_output_names=None, mapper=None, #join_b # self.add(name, value) - @property def nodes(self): return self._nodes @@ -454,7 +478,6 @@ def graph_sorted(self): # TODO: should I always update the graph? return list(nx.topological_sort(self.graph)) - def map_node(self, mapper, node=None, inputs=None): """this is setting a mapper to the wf's nodes (not to the wf)""" if not node: @@ -464,7 +487,6 @@ def map_node(self, mapper, node=None, inputs=None): node.map(mapper=mapper, inputs=inputs) self._node_mappers[node.name] = node.mapper - def get_output(self): # not sure, if I should collecto output of all nodes or only the ones that are used in wf.output self.node_outputs = {} @@ -487,18 +509,22 @@ def get_output(self): for (i, ind) in enumerate(itertools.product(*self.state.all_elements)): if self.write_state: wf_inputs_dict = self.state.state_values(ind) - dir_nm_el = "_".join(["{}:{}".format(i, j) for i, j in list(wf_inputs_dict.items())]) + dir_nm_el = "_".join([ + "{}:{}".format(i, j) for i, j in list(wf_inputs_dict.items()) + ]) else: wf_ind_dict = self.state.state_ind(ind) - dir_nm_el = "_".join(["{}:{}".format(i, j) for i, j in list(wf_ind_dict.items())]) - self._output[out_wf_nm][dir_nm_el] = self.node_outputs[node_nm][i][out_nd_nm] + dir_nm_el = "_".join( + ["{}:{}".format(i, j) for i, j in list(wf_ind_dict.items())]) + self._output[out_wf_nm][dir_nm_el] = self.node_outputs[node_nm][i][ + out_nd_nm] else: self._output[out_wf_nm] = self.node_outputs[node_nm][out_nd_nm] else: - raise Exception("the key {} is already used in workflow.result".format(out_wf_nm)) + raise Exception( + "the key {} is already used in workflow.result".format(out_wf_nm)) return self._output - # dj: version without join # TODO: might merge with the function from Node def _check_all_results(self): @@ -513,7 +539,6 @@ def _check_all_results(self): self._is_complete = True return True - # TODO: should try to merge with the function from Node def _reading_results(self): """reading all results of the workflow @@ -521,7 +546,7 @@ def _reading_results(self): """ if self.wf_output_names: for out in self.wf_output_names: - key_out = out[2] if len(out)==3 else out[1] + key_out = out[2] if len(out) == 3 else out[1] self._result[key_out] = [] if self.mapper: for (i, ind) in enumerate(itertools.product(*self.state.all_elements)): @@ -529,12 +554,14 @@ def _reading_results(self): wf_inputs_dict = self.state.state_values(ind) else: wf_inputs_dict = self.state.state_ind(ind) - dir_nm_el = "_".join(["{}:{}".format(i, j) for i, j in list(wf_inputs_dict.items())]) - res_l= [] + dir_nm_el = "_".join( + ["{}:{}".format(i, j) for i, j in list(wf_inputs_dict.items())]) + res_l = [] val_l = self._dict_tuple2list(self.output[key_out][dir_nm_el]) for val in val_l: with open(val[1]) as fout: - logger.debug('Reading Results: file={}, st_dict={}'.format(val[1], val[0])) + logger.debug('Reading Results: file={}, st_dict={}'.format( + val[1], val[0])) res_l.append((val[0], eval(fout.readline()))) self._result[key_out].append((wf_inputs_dict, res_l)) else: @@ -543,12 +570,12 @@ def _reading_results(self): #TODO: I think that val shouldn't be dict here... # TMP solution if type(val) is dict: - val = [v for k,v in val.items()][0] + val = [v for k, v in val.items()][0] with open(val[1]) as fout: - logger.debug('Reading Results: file={}, st_dict={}'.format(val[1], val[0])) + logger.debug('Reading Results: file={}, st_dict={}'.format( + val[1], val[0])) self._result[key_out].append((val[0], eval(fout.readline()))) - def add_nodes(self, nodes): """adding nodes without defining connections most likely it will be removed at the end @@ -561,37 +588,63 @@ def add_nodes(self, nodes): self._node_names[nn.name] = nn self._node_mappers[nn.name] = nn.mapper - # TODO: workingir shouldn't have None - def add(self, runnable, name=None, workingdir=None, inputs=None, output_names=None, mapper=None, - write_state=True, out_read=False, **kwargs): + def add(self, + runnable, + name=None, + workingdir=None, + inputs=None, + output_names=None, + mapper=None, + write_state=True, + out_read=False, + **kwargs): if is_function(runnable): if not output_names: output_names = ["out"] - interface = aux.FunctionInterface(function=runnable, output_nm=output_names, out_read=out_read) + interface = aux.FunctionInterface( + function=runnable, output_nm=output_names, out_read=out_read) if not name: raise Exception("you have to specify name for the node") if not workingdir: workingdir = name - node = Node(interface=interface, workingdir=workingdir, name=name, inputs=inputs, mapper=mapper, - other_mappers=self._node_mappers, write_state=write_state) + node = Node( + interface=interface, + workingdir=workingdir, + name=name, + inputs=inputs, + mapper=mapper, + other_mappers=self._node_mappers, + write_state=write_state) elif is_function_interface(runnable) or is_current_interface(runnable): if not name: raise Exception("you have to specify name for the node") if not workingdir: workingdir = name - node = Node(interface=runnable, workingdir=workingdir, name=name, inputs=inputs, mapper=mapper, - other_mappers=self._node_mappers, output_names=output_names, - write_state=write_state) + node = Node( + interface=runnable, + workingdir=workingdir, + name=name, + inputs=inputs, + mapper=mapper, + other_mappers=self._node_mappers, + output_names=output_names, + write_state=write_state) elif is_nipype_interface(runnable): ci = aux.CurrentInterface(interface=runnable, name=name) if not name: raise Exception("you have to specify name for the node") if not workingdir: workingdir = name - node = Node(interface=ci, workingdir=workingdir, name=name, inputs=inputs, mapper=mapper, - other_mappers=self._node_mappers, output_names=output_names, - write_state=write_state) + node = Node( + interface=ci, + workingdir=workingdir, + name=name, + inputs=inputs, + mapper=mapper, + other_mappers=self._node_mappers, + output_names=output_names, + write_state=write_state) elif is_node(runnable): node = runnable elif is_workflow(runnable): @@ -607,11 +660,10 @@ def add(self, runnable, name=None, workingdir=None, inputs=None, output_names=No from_node_nm, from_socket = source.split(".") self.connect(from_node_nm, from_socket, node.name, inp) # TODO not sure if i need it, just check if from_node_nm is not None?? - except(ValueError): + except (ValueError): self.connect_wf_input(source, node.name, inp) return self - def connect(self, from_node_nm, from_socket, to_node_nm, to_socket): from_node = self._node_names[from_node_nm] to_node = self._node_names[to_node_nm] @@ -622,33 +674,38 @@ def connect(self, from_node_nm, from_socket, to_node_nm, to_socket): # from_node.sending_output.append((from_socket, to_node, to_socket)) logger.debug('connecting {} and {}'.format(from_node, to_node)) - def connect_wf_input(self, inp_wf, node_nm, inp_nd): self.needed_inp_wf.append((node_nm, inp_wf, inp_nd)) - def preparing(self, wf_inputs=None, wf_inputs_ind=None): """preparing nodes which are connected: setting the final mapper and state_inputs""" #pdb.set_trace() for node_nm, inp_wf, inp_nd in self.needed_inp_wf: node = self._node_names[node_nm] if "{}.{}".format(self.name, inp_wf) in wf_inputs: - node.state_inputs.update({"{}.{}".format(node_nm, inp_nd): wf_inputs["{}.{}".format(self.name, inp_wf)]}) - node.inputs.update({"{}.{}".format(node_nm, inp_nd): wf_inputs["{}.{}".format(self.name, inp_wf)]}) + node.state_inputs.update({ + "{}.{}".format(node_nm, inp_nd): + wf_inputs["{}.{}".format(self.name, inp_wf)] + }) + node.inputs.update({ + "{}.{}".format(node_nm, inp_nd): + wf_inputs["{}.{}".format(self.name, inp_wf)] + }) else: raise Exception("{}.{} not in the workflow inputs".format(self.name, inp_wf)) for nn in self.graph_sorted: if self.write_state: dir_nm_el = "_".join(["{}:{}".format(i, j) for i, j in list(wf_inputs.items())]) else: - dir_nm_el = "_".join(["{}:{}".format(i, j) for i, j in list(wf_inputs_ind.items())]) + dir_nm_el = "_".join( + ["{}:{}".format(i, j) for i, j in list(wf_inputs_ind.items())]) if not self.mapper: dir_nm_el = "" nn.workingdir = os.path.join(self.workingdir, dir_nm_el, nn.name) - nn._is_complete = False # helps when mp is used + nn._is_complete = False # helps when mp is used try: for inp, (out_node, out_var) in self.connected_var[nn].items(): - nn.ready2run = False #it has some history (doesnt have to be in the loop) + nn.ready2run = False #it has some history (doesnt have to be in the loop) nn.state_inputs.update(out_node.state_inputs) nn.needed_outputs.append((out_node, out_var, inp)) #if there is no mapper provided, i'm assuming that mapper is taken from the previous node @@ -657,7 +714,7 @@ def preparing(self, wf_inputs=None, wf_inputs_ind=None): else: pass #TODO: implement inner mapper - except(KeyError): + except (KeyError): # tmp: we don't care about nn that are not in self.connected_var pass @@ -677,17 +734,22 @@ def preparing(self, wf_inputs=None, wf_inputs_ind=None): def is_function(obj): return hasattr(obj, '__call__') + def is_function_interface(obj): return type(obj) is aux.FunctionInterface + def is_current_interface(obj): return type(obj) is aux.CurrentInterface + def is_nipype_interface(obj): return hasattr(obj, "_run_interface") + def is_node(obj): return type(obj) is Node + def is_workflow(obj): return type(obj) is Workflow diff --git a/pydra/engine/state.py b/pydra/engine/state.py index 354b8d80af..534cad23a1 100644 --- a/pydra/engine/state.py +++ b/pydra/engine/state.py @@ -4,6 +4,7 @@ from . import auxiliary as aux + class State(object): def __init__(self, node_name, mapper=None, other_mappers=None): self._mapper = mapper @@ -17,7 +18,6 @@ def __init__(self, node_name, mapper=None, other_mappers=None): self._mapper_rpn = [] self._input_names_mapper = [] - def prepare_state_input(self, state_inputs): """prepare all inputs, should be called once all input is available""" @@ -36,18 +36,17 @@ def prepare_state_input(self, state_inputs): # list of inputs variable for each axis # e.g. [['e', 'd'], ['r', 'd']] # shape - list, e.g. [2,3] - self._input_for_axis, self._shape = aux.converting_axis2input(self.state_inputs, - self._axis_for_input, self._ndim) + self._input_for_axis, self._shape = aux.converting_axis2input( + self.state_inputs, self._axis_for_input, self._ndim) # list of all possible indexes in each dim, will be use to iterate # e.g. [[0, 1], [0, 1, 2]] self.all_elements = [range(i) for i in self._shape] self.index_generator = itertools.product(*self.all_elements) - def __getitem__(self, ind): if type(ind) is int: - ind = (ind,) + ind = (ind, ) return self.state_values(ind) # not used? @@ -55,17 +54,14 @@ def __getitem__(self, ind): #def mapper(self): # return self._mapper - @property def ndim(self): return self._ndim - @property def shape(self): return self._shape - def state_values(self, ind): """returns state input as a dictionary (input name, value)""" if len(ind) > self._ndim: @@ -73,14 +69,15 @@ def state_values(self, ind): for ii, index in enumerate(ind): if index > self._shape[ii] - 1: - raise IndexError("index {} is out of bounds for axis {} with size {}".format(index, ii, self._shape[ii])) + raise IndexError("index {} is out of bounds for axis {} with size {}".format( + index, ii, self._shape[ii])) state_dict = {} for input, ax in self._axis_for_input.items(): # checking which axes are important for the input - sl_ax = slice(ax[0], ax[-1]+1) + sl_ax = slice(ax[0], ax[-1] + 1) # taking the indexes for the axes - ind_inp = tuple(ind[sl_ax]) #used to be list + ind_inp = tuple(ind[sl_ax]) #used to be list state_dict[input] = self.state_inputs[input][ind_inp] # adding values from input that are not used in the mapper for input in set(self._input_names) - set(self._input_names_mapper): @@ -90,7 +87,6 @@ def state_values(self, ind): # returning a named tuple? return OrderedDict(sorted(state_dict.items(), key=lambda t: t[0])) - def state_ind(self, ind): """similar to state value but returns indices (not values)""" if len(ind) > self._ndim: @@ -98,14 +94,15 @@ def state_ind(self, ind): for ii, index in enumerate(ind): if index > self._shape[ii] - 1: - raise IndexError("index {} is out of bounds for axis {} with size {}".format(index, ii, self._shape[ii])) + raise IndexError("index {} is out of bounds for axis {} with size {}".format( + index, ii, self._shape[ii])) state_dict = {} for input, ax in self._axis_for_input.items(): # checking which axes are important for the input - sl_ax = slice(ax[0], ax[-1]+1) + sl_ax = slice(ax[0], ax[-1] + 1) # taking the indexes for the axes - ind_inp = tuple(ind[sl_ax]) #used to be list + ind_inp = tuple(ind[sl_ax]) #used to be list ind_inp_str = "x".join([str(el) for el in ind_inp]) state_dict[input] = ind_inp_str # adding inputs that are not used in the mapper @@ -114,4 +111,4 @@ def state_ind(self, ind): # in py3.7 we can skip OrderedDict # returning a named tuple? - return OrderedDict(sorted(state_dict.items(), key=lambda t: t[0])) \ No newline at end of file + return OrderedDict(sorted(state_dict.items(), key=lambda t: t[0])) diff --git a/pydra/engine/submitter.py b/pydra/engine/submitter.py index 1c17c5fde9..e7758ecd2c 100644 --- a/pydra/engine/submitter.py +++ b/pydra/engine/submitter.py @@ -12,7 +12,7 @@ class Submitter(object): def __init__(self, plugin, runnable): self.plugin = plugin self.node_line = [] - self._to_finish = [] # used only for wf + self._to_finish = [] # used only for wf if self.plugin == "mp": self.worker = MpWorker() elif self.plugin == "serial": @@ -24,14 +24,13 @@ def __init__(self, plugin, runnable): else: raise Exception("plugin {} not available".format(self.plugin)) - if hasattr(runnable, 'interface'): # a node + if hasattr(runnable, 'interface'): # a node self.node = runnable - elif hasattr(runnable, "graph"): # a workflow + elif hasattr(runnable, "graph"): # a workflow self.workflow = runnable else: raise Exception("runnable has to be a Node or Workflow") - def run(self): """main running method, checks if submitter id for Node or Workflow""" if hasattr(self, "node"): @@ -39,7 +38,6 @@ def run(self): elif hasattr(self, "workflow"): self.run_workflow() - def run_node(self): """the main method to run a Node""" self.node.prepare_state_input() @@ -49,7 +47,6 @@ def run_node(self): time.sleep(3) self.node.get_output() - def _submit_node(self, node): """submitting nodes's interface for all states""" for (i, ind) in enumerate(node.state.index_generator): @@ -60,7 +57,6 @@ def _submit_node_el(self, node, i, ind): logger.debug("SUBMIT WORKER, node: {}, ind: {}".format(node, ind)) self.worker.run_el(node.run_interface_el, (i, ind)) - def run_workflow(self, workflow=None, ready=True): """the main function to run Workflow""" if not workflow: @@ -81,7 +77,7 @@ def run_workflow(self, workflow=None, ready=True): else: if ready: if workflow.write_state: - workflow.preparing(wf_inputs=workflow.inputs) + workflow.preparing(wf_inputs=workflow.inputs) else: inputs_ind = dict((key, None) for (key, _) in workflow.inputs.items()) workflow.preparing(wf_inputs=workflow.inputs, wf_inputs_ind=inputs_ind) @@ -104,7 +100,6 @@ def run_workflow(self, workflow=None, ready=True): if workflow is self.workflow: workflow.get_output() - def _run_workflow_el(self, workflow, i, ind, collect_inp=False): """running one internal workflow (if workflow has a mapper)""" # TODO: can I simplify and remove collect inp? where should it be? @@ -119,15 +114,14 @@ def _run_workflow_el(self, workflow, i, ind, collect_inp=False): workflow.preparing(wf_inputs=wf_inputs, wf_inputs_ind=wf_inputs_ind) self._run_workflow_nd(workflow=workflow) - def _run_workflow_nd(self, workflow): """iterating over all nodes from a workflow and submitting them or adding to the node_line""" for (i_n, node) in enumerate(workflow.graph_sorted): - if workflow.parent_wf and workflow.parent_wf.mapper: # for now if parent_wf, parent_wf has to have mapper + if workflow.parent_wf and workflow.parent_wf.mapper: # for now if parent_wf, parent_wf has to have mapper workflow.parent_wf.inner_nodes[node.name].append(node) node.prepare_state_input() self._to_finish.append(node) - # submitting all the nodes who are self sufficient (self.workflow.graph is already sorted) + # submitting all the nodes who are self sufficient (self.workflow.graph is already sorted) if node.ready2run: if hasattr(node, 'interface'): self._submit_node(node) @@ -148,10 +142,9 @@ def _run_workflow_nd(self, workflow): for (i, ind) in enumerate(nn.state.index_generator): self._to_finish.append(nn) self.node_line.append((nn, i, ind)) - else: #wf + else: #wf self.run_workflow(workflow=nn, ready=False) - def _nodes_check(self): """checking which nodes-states are ready to run and running the ones that are ready""" _to_remove = [] @@ -163,7 +156,7 @@ def _nodes_check(self): _to_remove.append((to_node, i, ind)) else: pass - else: #wf + else: #wf if to_node.checking_input_el(ind): self._run_workflow_el(workflow=to_node, i=i, ind=ind, collect_inp=True) _to_remove.append((to_node, i, ind)) @@ -174,7 +167,6 @@ def _nodes_check(self): self.node_line.remove(rn) return self.node_line - # this I believe can be done for entire node def _output_check(self): """"checking if all nodes are done""" @@ -187,6 +179,5 @@ def _output_check(self): self._to_finish.remove(rn) return self._to_finish - def close(self): self.worker.close() diff --git a/pydra/engine/tests/test_auxiliary.py b/pydra/engine/tests/test_auxiliary.py index 4c5df1df2f..3ece8cb674 100644 --- a/pydra/engine/tests/test_auxiliary.py +++ b/pydra/engine/tests/test_auxiliary.py @@ -3,54 +3,90 @@ import numpy as np import pytest -@pytest.mark.parametrize("mapper, rpn", - [ - ("a", ["a"]), - (("a", "b"), ["a", "b", "."]), - (["a", "b"], ["a", "b", "*"]), - (["a", ("b", "c")], ["a", "b", "c", ".", "*"]), - ([("a", "b"), "c"], ["a", "b", ".", "c", "*"]), - (["a", ("b", ["c", "d"])], ["a", "b", "c", "d", "*", ".", "*"]) - ]) + +@pytest.mark.parametrize( + "mapper, rpn", [("a", ["a"]), (("a", "b"), ["a", "b", "."]), (["a", "b"], ["a", "b", "*"]), + (["a", ("b", "c")], ["a", "b", "c", ".", "*"]), + ([("a", "b"), "c"], ["a", "b", ".", "c", "*"]), + (["a", ("b", ["c", "d"])], ["a", "b", "c", "d", "*", ".", "*"])]) def test_mapper2rpn(mapper, rpn): assert aux.mapper2rpn(mapper) == rpn @pytest.mark.parametrize("mapper, other_mappers, rpn", - [ - (["a", "_NA"], {"NA": ("b", "c")}, ["a", "NA.b", "NA.c", ".", "*"]), - (["_NA", "c"], {"NA": ("a", "b")}, ["NA.a", "NA.b", ".", "c", "*"]), - (["a", ("b", "_NA")], {"NA": ["c", "d"]}, ["a", "b", "NA.c", "NA.d", "*", ".", "*"]) - ]) - + [(["a", "_NA"], { + "NA": ("b", "c") + }, ["a", "NA.b", "NA.c", ".", "*"]), + (["_NA", "c"], { + "NA": ("a", "b") + }, ["NA.a", "NA.b", ".", "c", "*"]), + (["a", ("b", "_NA")], { + "NA": ["c", "d"] + }, ["a", "b", "NA.c", "NA.d", "*", ".", "*"])]) def test_mapper2rpn_wf_mapper(mapper, other_mappers, rpn): assert aux.mapper2rpn(mapper, other_mappers=other_mappers) == rpn @pytest.mark.parametrize("mapper, mapper_changed", - [ - ("a", "Node.a"), - (["a", ("b", "c")], ["Node.a", ("Node.b", "Node.c")]), - (("a", ["b", "c"]), ("Node.a", ["Node.b", "Node.c"])) - ]) + [("a", "Node.a"), (["a", ("b", "c")], ["Node.a", ("Node.b", "Node.c")]), + (("a", ["b", "c"]), ("Node.a", ["Node.b", "Node.c"]))]) def test_change_mapper(mapper, mapper_changed): assert aux.change_mapper(mapper, "Node") == mapper_changed -@pytest.mark.parametrize("inputs, rpn, expected", - [ - ({"a": np.array([1, 2])}, ["a"], {"a": [0]}), - ({"a": np.array([1, 2]), "b": np.array([3, 4])}, ["a", "b", "."], {"a": [0], "b": [0]}), - ({"a": np.array([1, 2]), "b": np.array([3, 4, 1])}, ["a", "b", "*"], {"a": [0], "b": [1]}), - ({"a": np.array([1, 2]), "b": np.array([3, 4]), "c": np.array([1, 2, 3])}, ["a", "b", ".", "c", "*"], - {"a": [0], "b": [0], "c": [1]}), - ({"a": np.array([1, 2]), "b": np.array([3, 4]), "c": np.array([1, 2, 3])}, - ["c", "a", "b", ".", "*"], {"a": [1], "b": [1], "c": [0]}), - ({"a": np.array([[1, 2], [1, 2]]), "b": np.array([[3, 4], [3, 3]]), "c": np.array([1, 2, 3])}, - ["a", "b", ".", "c", "*"], {"a": [0, 1], "b": [0, 1], "c": [2]}), - ({"a": np.array([[1, 2], [1, 2]]), "b": np.array([[3, 4], [3, 3]]), - "c": np.array([1, 2, 3])}, ["c", "a", "b", ".", "*"], {"a": [1, 2], "b": [1, 2], "c": [0]}) - ]) +@pytest.mark.parametrize("inputs, rpn, expected", [({ + "a": np.array([1, 2]) +}, ["a"], { + "a": [0] +}), ({ + "a": np.array([1, 2]), + "b": np.array([3, 4]) +}, ["a", "b", "."], { + "a": [0], + "b": [0] +}), ({ + "a": np.array([1, 2]), + "b": np.array([3, 4, 1]) +}, ["a", "b", "*"], { + "a": [0], + "b": [1] +}), + ({ + "a": np.array([1, 2]), + "b": np.array([3, 4]), + "c": np.array([1, 2, 3]) + }, ["a", "b", ".", "c", "*"], { + "a": [0], + "b": [0], + "c": [1] + }), + ({ + "a": np.array([1, 2]), + "b": np.array([3, 4]), + "c": np.array([1, 2, 3]) + }, ["c", "a", "b", ".", "*"], { + "a": [1], + "b": [1], + "c": [0] + }), + ({ + "a": np.array([[1, 2], [1, 2]]), + "b": np.array([[3, 4], [3, 3]]), + "c": np.array([1, 2, 3]) + }, ["a", "b", ".", "c", "*"], { + "a": [0, 1], + "b": [0, 1], + "c": [2] + }), + ({ + "a": np.array([[1, 2], [1, 2]]), + "b": np.array([[3, 4], [3, 3]]), + "c": np.array([1, 2, 3]) + }, ["c", "a", "b", ".", "*"], { + "a": [1, 2], + "b": [1, 2], + "c": [0] + })]) def test_mapping_axis(inputs, rpn, expected): res = aux.mapping_axis(inputs, rpn)[0] print(res) @@ -64,21 +100,60 @@ def test_mapping_axis_error(): @pytest.mark.parametrize("inputs, axis_inputs, ndim, expected", - [ - ({"a": np.array([1, 2])}, {"a": [0]}, 1, [["a"]]), - ({"a": np.array([1, 2]), "b": np.array([3, 4])}, {"a": [0], "b": [0]}, 1, - [["a", "b"]]), - ({"a": np.array([1, 2]), "b": np.array([3, 4, 1])}, {"a": [0], "b": [1]}, 2, - [["a"], ["b"]]), - ({"a": np.array([1, 2]), "b": np.array([3, 4]), "c": np.array([1, 2, 3])}, - {"a": [0], "b": [0], "c": [1]}, 2, [["a", "b"]]), - ({"a": np.array([1, 2]), "b": np.array([3, 4]), "c": np.array([1, 2, 3])}, - {"a": [1], "b": [1], "c": [0]}, 2, [["c"], ["a", "b"]]), - ({"a": np.array([[1, 2], [1, 2]]), "b": np.array([[3, 4], [3, 3]]), "c": np.array([1, 2, 3])}, - {"a": [0, 1], "b": [0, 1], "c": [2]}, 3, [["a", "b"], ["a", "b"], ["c"]]), - ({"a": np.array([[1, 2], [1, 2]]), "b": np.array([[3, 4], [3, 3]]), - "c": np.array([1, 2, 3])}, {"a": [1, 2], "b": [1, 2], "c": [0]}, 3, - [["c"], ["a", "b"], ["a", "b"]]) - ]) + [({ + "a": np.array([1, 2]) + }, { + "a": [0] + }, 1, [["a"]]), + ({ + "a": np.array([1, 2]), + "b": np.array([3, 4]) + }, { + "a": [0], + "b": [0] + }, 1, [["a", "b"]]), + ({ + "a": np.array([1, 2]), + "b": np.array([3, 4, 1]) + }, { + "a": [0], + "b": [1] + }, 2, [["a"], ["b"]]), + ({ + "a": np.array([1, 2]), + "b": np.array([3, 4]), + "c": np.array([1, 2, 3]) + }, { + "a": [0], + "b": [0], + "c": [1] + }, 2, [["a", "b"]]), + ({ + "a": np.array([1, 2]), + "b": np.array([3, 4]), + "c": np.array([1, 2, 3]) + }, { + "a": [1], + "b": [1], + "c": [0] + }, 2, [["c"], ["a", "b"]]), + ({ + "a": np.array([[1, 2], [1, 2]]), + "b": np.array([[3, 4], [3, 3]]), + "c": np.array([1, 2, 3]) + }, { + "a": [0, 1], + "b": [0, 1], + "c": [2] + }, 3, [["a", "b"], ["a", "b"], ["c"]]), + ({ + "a": np.array([[1, 2], [1, 2]]), + "b": np.array([[3, 4], [3, 3]]), + "c": np.array([1, 2, 3]) + }, { + "a": [1, 2], + "b": [1, 2], + "c": [0] + }, 3, [["c"], ["a", "b"], ["a", "b"]])]) def test_converting_axis2input(inputs, axis_inputs, ndim, expected): aux.converting_axis2input(inputs, axis_inputs, ndim)[0] == expected diff --git a/pydra/engine/tests/test_newnode.py b/pydra/engine/tests/test_newnode.py index ac3e45a931..ca36acdb91 100644 --- a/pydra/engine/tests/test_newnode.py +++ b/pydra/engine/tests/test_newnode.py @@ -9,8 +9,8 @@ import numpy as np import pytest, pdb -python35_only = pytest.mark.skipif(sys.version_info < (3, 5), - reason="requires Python>3.4") +python35_only = pytest.mark.skipif(sys.version_info < (3, 5), reason="requires Python>3.4") + @pytest.fixture(scope="module") def change_dir(request): @@ -28,12 +28,14 @@ def move2orig(): Plugins = ["serial"] Plugins = ["serial", "mp", "cf", "dask"] + def fun_addtwo(a): time.sleep(1) if a == 3: time.sleep(2) return a + 2 + def fun_addvar(a, b): return a + b @@ -104,8 +106,11 @@ def test_node_4a(): def test_node_5(plugin, change_dir): """Node with interface and inputs, no mapper, running interface""" interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - nn = Node(name="NA", inputs={"a": 3}, interface=interf_addtwo, - workingdir="test_nd5_{}".format(plugin)) + nn = Node( + name="NA", + inputs={"a": 3}, + interface=interf_addtwo, + workingdir="test_nd5_{}".format(plugin)) assert (nn.inputs["NA.a"] == np.array([3])).all() @@ -199,8 +204,19 @@ def test_node_8(plugin, change_dir): sub.close() # checking teh results - expected = [({"NA.a": 3, "NA.b": 1}, 4), ({"NA.a": 3, "NA.b": 2}, 5), - ({"NA.a": 5, "NA.b": 1}, 6), ({"NA.a": 5, "NA.b": 2}, 7)] + expected = [({ + "NA.a": 3, + "NA.b": 1 + }, 4), ({ + "NA.a": 3, + "NA.b": 2 + }, 5), ({ + "NA.a": 5, + "NA.b": 1 + }, 6), ({ + "NA.a": 5, + "NA.b": 2 + }, 7)] # to be sure that there is the same order (not sure if node itself should keep the order) key_sort = list(expected[0][0].keys()) expected.sort(key=lambda t: [t[0][key] for key in key_sort]) @@ -212,6 +228,7 @@ def test_node_8(plugin, change_dir): # tests for workflows + @python35_only def test_workflow_0(plugin="serial"): """workflow (without run) with one node with a mapper""" @@ -226,6 +243,7 @@ def test_workflow_0(plugin="serial"): assert (wf.nodes[0].inputs['NA.a'] == np.array([3, 5])).all() assert len(wf.graph.nodes) == 1 + @pytest.mark.parametrize("plugin", Plugins) @python35_only def test_workflow_1(plugin, change_dir): @@ -365,8 +383,19 @@ def test_workflow_2b(plugin): assert wf.nodes[0].result["out"][i][1] == res[1] # four elements (outer product) - expected_B = [({"NA.a": 3, "NB.b": 1}, 6), ({"NA.a": 3, "NB.b": 2}, 7), - ({"NA.a": 5, "NB.b": 1}, 8), ({"NA.a": 5, "NB.b": 2}, 9)] + expected_B = [({ + "NA.a": 3, + "NB.b": 1 + }, 6), ({ + "NA.a": 3, + "NB.b": 2 + }, 7), ({ + "NA.a": 5, + "NB.b": 1 + }, 8), ({ + "NA.a": 5, + "NB.b": 2 + }, 9)] key_sort = list(expected_B[0][0].keys()) expected_B.sort(key=lambda t: [t[0][key] for key in key_sort]) wf.nodes[1].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) @@ -377,6 +406,7 @@ def test_workflow_2b(plugin): # using add method to add nodes + @pytest.mark.parametrize("plugin", Plugins) @python35_only def test_workflow_3(plugin, change_dir): @@ -451,7 +481,6 @@ def test_workflow_3b(plugin, change_dir): assert wf.nodes[0].result["out"][i][1] == res[1] - @pytest.mark.parametrize("plugin", Plugins) @python35_only def test_workflow_4(plugin, change_dir): @@ -532,9 +561,9 @@ def test_workflow_4a(plugin, change_dir): assert wf.nodes[1].result["out"][i][1] == res[1] - # using map after add method + @pytest.mark.parametrize("plugin", Plugins) @python35_only def test_workflow_5(plugin, change_dir): @@ -699,6 +728,7 @@ def test_workflow_6b(plugin, change_dir): # tests for a workflow that have its own input + @pytest.mark.parametrize("plugin", Plugins) @python35_only def test_workflow_7(plugin, change_dir): @@ -779,7 +809,6 @@ def test_workflow_8(plugin, change_dir): assert wf.nodes[0].result["out"][i][0] == res[0] assert wf.nodes[0].result["out"][i][1] == res[1] - expected_B = [({"NA.a": 3, "NB.b": 10}, 15), ({"NA.a": 5, "NB.b": 10}, 17)] key_sort = list(expected_B[0][0].keys()) expected_B.sort(key=lambda t: [t[0][key] for key in key_sort]) @@ -791,16 +820,21 @@ def test_workflow_8(plugin, change_dir): # testing if _NA in mapper works, using interfaces in add + @pytest.mark.parametrize("plugin", Plugins) @python35_only def test_workflow_9(plugin, change_dir): """using add(interface) method and mapper from previous nodes""" wf = Workflow(name="wf9", workingdir="test_wf9_{}".format(plugin)) interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - wf.add(name="NA", runnable=interf_addtwo, workingdir="na").map_node(mapper="a", inputs={"a": [3, 5]}) + wf.add( + name="NA", runnable=interf_addtwo, workingdir="na").map_node( + mapper="a", inputs={"a": [3, 5]}) interf_addvar = FunctionInterface(fun_addvar, ["out"]) # _NA means that I'm using mapper from the NA node, it's the same as ("NA.a", "b") - wf.add(name="NB", runnable=interf_addvar, workingdir="nb", a="NA.out").map_node(mapper=("_NA", "b"), inputs={"b": [2, 1]}) + wf.add( + name="NB", runnable=interf_addvar, workingdir="nb", a="NA.out").map_node( + mapper=("_NA", "b"), inputs={"b": [2, 1]}) sub = Submitter(runnable=wf, plugin=plugin) sub.run() @@ -829,10 +863,17 @@ def test_workflow_10(plugin, change_dir): """using add(interface) method and scalar mapper from previous nodes""" wf = Workflow(name="wf10", workingdir="test_wf10_{}".format(plugin)) interf_addvar1 = FunctionInterface(fun_addvar, ["out"]) - wf.add(name="NA", runnable=interf_addvar1, workingdir="na").map_node(mapper=("a", "b"), inputs={"a": [3, 5], "b": [0, 10]}) + wf.add( + name="NA", runnable=interf_addvar1, workingdir="na").map_node( + mapper=("a", "b"), inputs={ + "a": [3, 5], + "b": [0, 10] + }) interf_addvar2 = FunctionInterface(fun_addvar, ["out"]) # _NA means that I'm using mapper from the NA node, it's the same as (("NA.a", NA.b), "b") - wf.add(name="NB", runnable=interf_addvar2, workingdir="nb", a="NA.out").map_node(mapper=("_NA", "b"), inputs={"b": [2, 1]}) + wf.add( + name="NB", runnable=interf_addvar2, workingdir="nb", a="NA.out").map_node( + mapper=("_NA", "b"), inputs={"b": [2, 1]}) sub = Submitter(runnable=wf, plugin=plugin) sub.run() @@ -861,17 +902,35 @@ def test_workflow_10a(plugin, change_dir): """using add(interface) method and vector mapper from previous nodes""" wf = Workflow(name="wf10a", workingdir="test_wf10a_{}".format(plugin)) interf_addvar1 = FunctionInterface(fun_addvar, ["out"]) - wf.add(name="NA", runnable=interf_addvar1, workingdir="na").map_node(mapper=["a", "b"], inputs={"a": [3, 5], "b": [0, 10]}) + wf.add( + name="NA", runnable=interf_addvar1, workingdir="na").map_node( + mapper=["a", "b"], inputs={ + "a": [3, 5], + "b": [0, 10] + }) interf_addvar2 = FunctionInterface(fun_addvar, ["out"]) # _NA means that I'm using mapper from the NA node, it's the same as (["NA.a", NA.b], "b") - wf.add(name="NB", runnable=interf_addvar2, workingdir="nb", a="NA.out").map_node(mapper=("_NA", "b"), inputs={"b": [[2, 1], [0, 0]]}) + wf.add( + name="NB", runnable=interf_addvar2, workingdir="nb", a="NA.out").map_node( + mapper=("_NA", "b"), inputs={"b": [[2, 1], [0, 0]]}) sub = Submitter(runnable=wf, plugin=plugin) sub.run() sub.close() - expected = [({"NA.a": 3, "NA.b": 0}, 3), ({"NA.a": 3, "NA.b": 10}, 13), - ({"NA.a": 5, "NA.b": 0}, 5), ({"NA.a": 5, "NA.b": 10}, 15)] + expected = [({ + "NA.a": 3, + "NA.b": 0 + }, 3), ({ + "NA.a": 3, + "NA.b": 10 + }, 13), ({ + "NA.a": 5, + "NA.b": 0 + }, 5), ({ + "NA.a": 5, + "NA.b": 10 + }, 15)] key_sort = list(expected[0][0].keys()) expected.sort(key=lambda t: [t[0][key] for key in key_sort]) wf.nodes[0].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) @@ -879,8 +938,23 @@ def test_workflow_10a(plugin, change_dir): assert wf.nodes[0].result["out"][i][0] == res[0] assert wf.nodes[0].result["out"][i][1] == res[1] - expected_B = [({"NA.a": 3, "NA.b": 0, "NB.b": 2}, 5), ({"NA.a": 3, "NA.b": 10, "NB.b": 1}, 14), - ({"NA.a": 5, "NA.b": 0, "NB.b": 0}, 5), ({"NA.a": 5, "NA.b": 10, "NB.b": 0}, 15)] + expected_B = [({ + "NA.a": 3, + "NA.b": 0, + "NB.b": 2 + }, 5), ({ + "NA.a": 3, + "NA.b": 10, + "NB.b": 1 + }, 14), ({ + "NA.a": 5, + "NA.b": 0, + "NB.b": 0 + }, 5), ({ + "NA.a": 5, + "NA.b": 10, + "NB.b": 0 + }, 15)] key_sort = list(expected_B[0][0].keys()) expected_B.sort(key=lambda t: [t[0][key] for key in key_sort]) wf.nodes[1].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) @@ -895,12 +969,21 @@ def test_workflow_11(plugin, change_dir): """using add(interface) method and vector mapper from previous two nodes""" wf = Workflow(name="wf11", workingdir="test_wf11_{}".format(plugin)) interf_addvar1 = FunctionInterface(fun_addvar, ["out"]) - wf.add(name="NA", runnable=interf_addvar1, workingdir="na").map_node(mapper=("a", "b"), inputs={"a": [3, 5], "b": [0, 10]}) + wf.add( + name="NA", runnable=interf_addvar1, workingdir="na").map_node( + mapper=("a", "b"), inputs={ + "a": [3, 5], + "b": [0, 10] + }) interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - wf.add(name="NB", runnable=interf_addtwo, workingdir="nb").map_node(mapper="a", inputs={"a": [2, 1]}) + wf.add( + name="NB", runnable=interf_addtwo, workingdir="nb").map_node( + mapper="a", inputs={"a": [2, 1]}) interf_addvar2 = FunctionInterface(fun_addvar, ["out"]) # _NA, _NB means that I'm using mappers from the NA/NB nodes, it's the same as [("NA.a", NA.b), "NB.a"] - wf.add(name="NC", runnable=interf_addvar2, workingdir="nc", a="NA.out", b="NB.out").map_node(mapper=["_NA", "_NB"]) # TODO: this should eb default? + wf.add( + name="NC", runnable=interf_addvar2, workingdir="nc", a="NA.out", b="NB.out").map_node( + mapper=["_NA", "_NB"]) # TODO: this should eb default? sub = Submitter(runnable=wf, plugin=plugin) sub.run() @@ -914,9 +997,23 @@ def test_workflow_11(plugin, change_dir): assert wf.nodes[0].result["out"][i][0] == res[0] assert wf.nodes[0].result["out"][i][1] == res[1] - - expected_C = [({"NA.a": 3, "NA.b": 0, "NB.a": 1}, 6), ({"NA.a": 3, "NA.b": 0, "NB.a": 2}, 7), - ({"NA.a": 5, "NA.b": 10, "NB.a": 1}, 18), ({"NA.a": 5, "NA.b": 10, "NB.a": 2}, 19)] + expected_C = [({ + "NA.a": 3, + "NA.b": 0, + "NB.a": 1 + }, 6), ({ + "NA.a": 3, + "NA.b": 0, + "NB.a": 2 + }, 7), ({ + "NA.a": 5, + "NA.b": 10, + "NB.a": 1 + }, 18), ({ + "NA.a": 5, + "NA.b": 10, + "NB.a": 2 + }, 19)] key_sort = list(expected_C[0][0].keys()) expected_C.sort(key=lambda t: [t[0][key] for key in key_sort]) wf.nodes[2].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) @@ -927,12 +1024,15 @@ def test_workflow_11(plugin, change_dir): # checking workflow.result + @pytest.mark.parametrize("plugin", Plugins) @python35_only def test_workflow_12(plugin, change_dir): """testing if wf.result works (the same workflow as in test_workflow_6)""" - wf = Workflow(name="wf12", workingdir="test_wf12_{}".format(plugin), - wf_output_names=[("NA", "out", "NA_out"), ("NB", "out")]) + wf = Workflow( + name="wf12", + workingdir="test_wf12_{}".format(plugin), + wf_output_names=[("NA", "out", "NA_out"), ("NB", "out")]) interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) na = Node(name="NA", interface=interf_addtwo, workingdir="na") @@ -977,8 +1077,10 @@ def test_workflow_12(plugin, change_dir): @python35_only def test_workflow_12a(plugin, change_dir): """testing if wf.result raises exceptione (the same workflow as in test_workflow_6)""" - wf = Workflow(name="wf12a", workingdir="test_wf12a_{}".format(plugin), - wf_output_names=[("NA", "out", "wf_out"), ("NB", "out", "wf_out")]) + wf = Workflow( + name="wf12a", + workingdir="test_wf12a_{}".format(plugin), + wf_output_names=[("NA", "out", "wf_out"), ("NB", "out", "wf_out")]) interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) na = Node(name="NA", interface=interf_addtwo, workingdir="na") @@ -997,6 +1099,7 @@ def test_workflow_12a(plugin, change_dir): sub.run() assert str(exinfo.value) == "the key wf_out is already used in workflow.result" + # tests for a workflow that have its own input and mapper @@ -1004,8 +1107,12 @@ def test_workflow_12a(plugin, change_dir): @python35_only def test_workflow_13(plugin, change_dir): """using inputs for workflow and connect_wf_input""" - wf = Workflow(name="wf13", inputs={"wfa": [3, 5]}, mapper="wfa", workingdir="test_wf13_{}".format(plugin), - wf_output_names=[("NA", "out", "NA_out")]) + wf = Workflow( + name="wf13", + inputs={"wfa": [3, 5]}, + mapper="wfa", + workingdir="test_wf13_{}".format(plugin), + wf_output_names=[("NA", "out", "NA_out")]) interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) na = Node(name="NA", interface=interf_addtwo, workingdir="na") wf.add(na) @@ -1016,8 +1123,7 @@ def test_workflow_13(plugin, change_dir): sub.close() assert wf.is_complete - expected = [({"wf13.wfa": 3}, [({"NA.a": 3}, 5)]), - ({'wf13.wfa': 5}, [({"NA.a": 5}, 7)])] + expected = [({"wf13.wfa": 3}, [({"NA.a": 3}, 5)]), ({'wf13.wfa': 5}, [({"NA.a": 5}, 7)])] for i, res in enumerate(expected): assert wf.result["NA_out"][i][0] == res[0] assert wf.result["NA_out"][i][1][0][0] == res[1][0][0] @@ -1028,10 +1134,15 @@ def test_workflow_13(plugin, change_dir): @python35_only def test_workflow_13a(plugin, change_dir): """using inputs for workflow and connect_wf_input (the node has 2 inputs)""" - wf = Workflow(name="wf13a", inputs={"wfa": [3, 5]}, mapper="wfa", workingdir="test_wf13a_{}".format(plugin), - wf_output_names=[("NA", "out", "NA_out")]) + wf = Workflow( + name="wf13a", + inputs={"wfa": [3, 5]}, + mapper="wfa", + workingdir="test_wf13a_{}".format(plugin), + wf_output_names=[("NA", "out", "NA_out")]) interf_addvar = FunctionInterface(fun_addvar, ["out"]) - na = Node(name="NA", interface=interf_addvar, workingdir="na", mapper="b", inputs={"b": [10, 20]}) + na = Node( + name="NA", interface=interf_addvar, workingdir="na", mapper="b", inputs={"b": [10, 20]}) wf.add(na) wf.connect_wf_input("wfa", "NA", "a") @@ -1040,8 +1151,23 @@ def test_workflow_13a(plugin, change_dir): sub.close() assert wf.is_complete - expected = [({"wf13a.wfa": 3}, [({"NA.a": 3, "NA.b": 10}, 13), ({"NA.a": 3, "NA.b": 20}, 23)]), - ({'wf13a.wfa': 5}, [({"NA.a": 5, "NA.b": 10}, 15), ({"NA.a": 5, "NA.b": 20}, 25)])] + expected = [({ + "wf13a.wfa": 3 + }, [({ + "NA.a": 3, + "NA.b": 10 + }, 13), ({ + "NA.a": 3, + "NA.b": 20 + }, 23)]), ({ + 'wf13a.wfa': 5 + }, [({ + "NA.a": 5, + "NA.b": 10 + }, 15), ({ + "NA.a": 5, + "NA.b": 20 + }, 25)])] for i, res in enumerate(expected): assert wf.result["NA_out"][i][0] == res[0] for j in range(len(res[1])): @@ -1053,8 +1179,10 @@ def test_workflow_13a(plugin, change_dir): @python35_only def test_workflow_13c(plugin, change_dir): """using inputs for workflow and connect_wf_input, using wf.map(mapper, inputs)""" - wf = Workflow(name="wf13c", workingdir="test_wf13c_{}".format(plugin), - wf_output_names=[("NA", "out", "NA_out")]) + wf = Workflow( + name="wf13c", + workingdir="test_wf13c_{}".format(plugin), + wf_output_names=[("NA", "out", "NA_out")]) interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) na = Node(name="NA", interface=interf_addtwo, workingdir="na") wf.add(na).map(mapper="wfa", inputs={"wfa": [3, 5]}) @@ -1065,8 +1193,7 @@ def test_workflow_13c(plugin, change_dir): sub.close() assert wf.is_complete - expected = [({"wf13c.wfa": 3}, [({"NA.a": 3}, 5)]), - ({'wf13c.wfa': 5}, [({"NA.a": 5}, 7)])] + expected = [({"wf13c.wfa": 3}, [({"NA.a": 3}, 5)]), ({'wf13c.wfa': 5}, [({"NA.a": 5}, 7)])] for i, res in enumerate(expected): assert wf.result["NA_out"][i][0] == res[0] assert wf.result["NA_out"][i][1][0][0] == res[1][0][0] @@ -1076,9 +1203,11 @@ def test_workflow_13c(plugin, change_dir): @python35_only def test_workflow_13b(plugin, change_dir): """using inputs for workflow and connect_wf_input, using wf.map(mapper)""" - wf = Workflow(name="wf13b", inputs={"wfa": [3, 5]}, - workingdir="test_wf13b_{}".format(plugin), - wf_output_names=[("NA", "out", "NA_out")]) + wf = Workflow( + name="wf13b", + inputs={"wfa": [3, 5]}, + workingdir="test_wf13b_{}".format(plugin), + wf_output_names=[("NA", "out", "NA_out")]) interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) na = Node(name="NA", interface=interf_addtwo, workingdir="na") wf.add(na).map(mapper="wfa") @@ -1089,8 +1218,7 @@ def test_workflow_13b(plugin, change_dir): sub.close() assert wf.is_complete - expected = [({"wf13b.wfa": 3}, [({"NA.a": 3}, 5)]), - ({'wf13b.wfa': 5}, [({"NA.a": 5}, 7)])] + expected = [({"wf13b.wfa": 3}, [({"NA.a": 3}, 5)]), ({'wf13b.wfa': 5}, [({"NA.a": 5}, 7)])] for i, res in enumerate(expected): assert wf.result["NA_out"][i][0] == res[0] assert wf.result["NA_out"][i][1][0][0] == res[1][0][0] @@ -1099,18 +1227,20 @@ def test_workflow_13b(plugin, change_dir): # workflow as a node + @pytest.mark.parametrize("plugin", Plugins) @python35_only def test_workflow_14(plugin, change_dir): """workflow with a workflow as a node (no mapper)""" interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) na = Node(name="NA", interface=interf_addtwo, workingdir="na", inputs={"a": 3}) - wfa = Workflow(name="wfa", workingdir="test_wfa", - wf_output_names=[("NA", "out", "NA_out")]) + wfa = Workflow(name="wfa", workingdir="test_wfa", wf_output_names=[("NA", "out", "NA_out")]) wfa.add(na) - wf = Workflow(name="wf14", workingdir="test_wf14_{}".format(plugin), - wf_output_names=[("wfa", "NA_out", "wfa_out")]) + wf = Workflow( + name="wf14", + workingdir="test_wf14_{}".format(plugin), + wf_output_names=[("wfa", "NA_out", "wfa_out")]) wf.add(wfa) sub = Submitter(runnable=wf, plugin=plugin) @@ -1130,13 +1260,18 @@ def test_workflow_14a(plugin, change_dir): """workflow with a workflow as a node (no mapper, using connect_wf_input in wfa)""" interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) na = Node(name="NA", interface=interf_addtwo, workingdir="na") - wfa = Workflow(name="wfa", workingdir="test_wfa", inputs={"a": 3}, - wf_output_names=[("NA", "out", "NA_out")]) + wfa = Workflow( + name="wfa", + workingdir="test_wfa", + inputs={"a": 3}, + wf_output_names=[("NA", "out", "NA_out")]) wfa.add(na) wfa.connect_wf_input("a", "NA", "a") - wf = Workflow(name="wf14a", workingdir="test_wf14a_{}".format(plugin), - wf_output_names=[("wfa", "NA_out", "wfa_out")]) + wf = Workflow( + name="wf14a", + workingdir="test_wf14a_{}".format(plugin), + wf_output_names=[("wfa", "NA_out", "wfa_out")]) wf.add(wfa) sub = Submitter(runnable=wf, plugin=plugin) @@ -1156,13 +1291,15 @@ def test_workflow_14b(plugin, change_dir): """workflow with a workflow as a node (no mapper, using connect_wf_input in wfa and wf)""" interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) na = Node(name="NA", interface=interf_addtwo, workingdir="na") - wfa = Workflow(name="wfa", workingdir="test_wfa", - wf_output_names=[("NA", "out", "NA_out")]) + wfa = Workflow(name="wfa", workingdir="test_wfa", wf_output_names=[("NA", "out", "NA_out")]) wfa.add(na) wfa.connect_wf_input("a", "NA", "a") - wf = Workflow(name="wf14b", workingdir="test_wf14b_{}".format(plugin), - wf_output_names=[("wfa", "NA_out", "wfa_out")], inputs={"a": 3}) + wf = Workflow( + name="wf14b", + workingdir="test_wf14b_{}".format(plugin), + wf_output_names=[("wfa", "NA_out", "wfa_out")], + inputs={"a": 3}) wf.add(wfa) wf.connect_wf_input("a", "wfa", "a") @@ -1182,14 +1319,15 @@ def test_workflow_14b(plugin, change_dir): def test_workflow_15(plugin, change_dir): """workflow with a workflow as a node with mapper (like 14 but with a mapper)""" interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - na = Node(name="NA", interface=interf_addtwo, workingdir="na", - inputs={"a": [3, 5]}, mapper="a") - wfa = Workflow(name="wfa", workingdir="test_wfa", - wf_output_names=[("NA", "out", "NA_out")]) + na = Node( + name="NA", interface=interf_addtwo, workingdir="na", inputs={"a": [3, 5]}, mapper="a") + wfa = Workflow(name="wfa", workingdir="test_wfa", wf_output_names=[("NA", "out", "NA_out")]) wfa.add(na) - wf = Workflow(name="wf15", workingdir="test_wf15_{}".format(plugin), - wf_output_names=[("wfa", "NA_out", "wfa_out")]) + wf = Workflow( + name="wf15", + workingdir="test_wf15_{}".format(plugin), + wf_output_names=[("wfa", "NA_out", "wfa_out")]) wf.add(wfa) sub = Submitter(runnable=wf, plugin=plugin) @@ -1207,8 +1345,10 @@ def test_workflow_15(plugin, change_dir): @python35_only def test_workflow_16(plugin, change_dir): """workflow with two nodes, and one is a workflow (no mapper)""" - wf = Workflow(name="wf16", workingdir="test_wf16_{}".format(plugin), - wf_output_names=[("wfb", "NB_out"), ("NA", "out", "NA_out")]) + wf = Workflow( + name="wf16", + workingdir="test_wf16_{}".format(plugin), + wf_output_names=[("wfb", "NB_out"), ("NA", "out", "NA_out")]) interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) na = Node(name="NA", interface=interf_addtwo, workingdir="na", inputs={"a": 3}) wf.add(na) @@ -1216,8 +1356,11 @@ def test_workflow_16(plugin, change_dir): # the second node does not have explicit mapper (but keeps the mapper from the NA node) interf_addvar = FunctionInterface(fun_addvar, ["out"]) nb = Node(name="NB", interface=interf_addvar, workingdir="nb") - wfb = Workflow(name="wfb", workingdir="test_wfb", inputs={"b": 10}, - wf_output_names=[("NB", "out", "NB_out")]) + wfb = Workflow( + name="wfb", + workingdir="test_wfb", + inputs={"b": 10}, + wf_output_names=[("NB", "out", "NB_out")]) wfb.add(nb) wfb.connect_wf_input("b", "NB", "b") wfb.connect_wf_input("a", "NB", "a") @@ -1247,8 +1390,10 @@ def test_workflow_16(plugin, change_dir): @python35_only def test_workflow_16a(plugin, change_dir): """workflow with two nodes, and one is a workflow (with mapper)""" - wf = Workflow(name="wf16a", workingdir="test_wf16a_{}".format(plugin), - wf_output_names=[("wfb", "NB_out"), ("NA", "out", "NA_out")]) + wf = Workflow( + name="wf16a", + workingdir="test_wf16a_{}".format(plugin), + wf_output_names=[("wfb", "NB_out"), ("NA", "out", "NA_out")]) interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) na = Node(name="NA", interface=interf_addtwo, workingdir="na") na.map(mapper="a", inputs={"a": [3, 5]}) @@ -1257,8 +1402,11 @@ def test_workflow_16a(plugin, change_dir): # the second node does not have explicit mapper (but keeps the mapper from the NA node) interf_addvar = FunctionInterface(fun_addvar, ["out"]) nb = Node(name="NB", interface=interf_addvar, workingdir="nb") - wfb = Workflow(name="wfb", workingdir="test_wfb", inputs={"b": 10}, - wf_output_names=[("NB", "out", "NB_out")]) + wfb = Workflow( + name="wfb", + workingdir="test_wfb", + inputs={"b": 10}, + wf_output_names=[("NB", "out", "NB_out")]) wfb.add(nb) wfb.connect_wf_input("b", "NB", "b") wfb.connect_wf_input("a", "NB", "a") @@ -1292,15 +1440,24 @@ def test_workflow_16a(plugin, change_dir): # testing CurrentInterface that is a temporary wrapper for current interfaces -@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data") + +@pytest.mark.skipif( + not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data") @pytest.mark.parametrize("plugin", Plugins) @python35_only def test_current_node_1(change_dir, plugin): """Node with a current interface and inputs, no mapper, running interface""" interf_bet = CurrentInterface(interface=fsl.BET(), name="fsl_interface") - nn = Node(name="NA", inputs={"in_file": "/Users/dorota/nipype_workshop/data/ds000114/sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz"}, interface=interf_bet, - workingdir="test_cnd1_{}".format(plugin), output_names=["out_file"]) + nn = Node( + name="NA", + inputs={ + "in_file": + "/Users/dorota/nipype_workshop/data/ds000114/sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz" + }, + interface=interf_bet, + workingdir="test_cnd1_{}".format(plugin), + output_names=["out_file"]) sub = Submitter(plugin=plugin, runnable=nn) sub.run() @@ -1309,17 +1466,26 @@ def test_current_node_1(change_dir, plugin): assert "out_file" in nn.output.keys() -@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data") +@pytest.mark.skipif( + not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data") @pytest.mark.parametrize("plugin", Plugins) @python35_only def test_current_node_2(change_dir, plugin): """Node with a current interface and mapper""" interf_bet = CurrentInterface(interface=fsl.BET(), name="fsl_interface") - in_file_l = ["/Users/dorota/nipype_workshop/data/ds000114/sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz", - "/Users/dorota/nipype_workshop/data/ds000114/sub-02/ses-test/anat/sub-02_ses-test_T1w.nii.gz"] - nn = Node(name="NA", inputs={"in_file": in_file_l}, mapper="in_file", interface=interf_bet, write_state=False, - workingdir="test_cnd2_{}".format(plugin), output_names=["out_file"]) + in_file_l = [ + "/Users/dorota/nipype_workshop/data/ds000114/sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz", + "/Users/dorota/nipype_workshop/data/ds000114/sub-02/ses-test/anat/sub-02_ses-test_T1w.nii.gz" + ] + nn = Node( + name="NA", + inputs={"in_file": in_file_l}, + mapper="in_file", + interface=interf_bet, + write_state=False, + workingdir="test_cnd2_{}".format(plugin), + output_names=["out_file"]) sub = Submitter(plugin=plugin, runnable=nn) sub.run() @@ -1330,18 +1496,30 @@ def test_current_node_2(change_dir, plugin): assert "NA.in_file:1" in nn.output["out_file"].keys() -@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data") +@pytest.mark.skipif( + not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data") @pytest.mark.parametrize("plugin", Plugins) @python35_only def test_current_wf_1(change_dir, plugin): """Wf with a current interface, no mapper""" interf_bet = CurrentInterface(interface=fsl.BET(), name="fsl_interface") - nn = Node(name="fsl", inputs={"in_file": "/Users/dorota/nipype_workshop/data/ds000114/sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz"}, interface=interf_bet, - workingdir="nn", output_names=["out_file"], write_state=False) - - wf = Workflow( workingdir="test_cwf_1_{}".format(plugin), name="cw1", wf_output_names=[("fsl", "out_file", "fsl_out")], - write_state=False) + nn = Node( + name="fsl", + inputs={ + "in_file": + "/Users/dorota/nipype_workshop/data/ds000114/sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz" + }, + interface=interf_bet, + workingdir="nn", + output_names=["out_file"], + write_state=False) + + wf = Workflow( + workingdir="test_cwf_1_{}".format(plugin), + name="cw1", + wf_output_names=[("fsl", "out_file", "fsl_out")], + write_state=False) wf.add_nodes([nn]) sub = Submitter(plugin=plugin, runnable=wf) @@ -1351,18 +1529,30 @@ def test_current_wf_1(change_dir, plugin): assert "fsl_out" in wf.output.keys() -@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data") +@pytest.mark.skipif( + not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data") @pytest.mark.parametrize("plugin", Plugins) @python35_only def test_current_wf_1a(change_dir, plugin): """Wf with a current interface, no mapper""" interf_bet = CurrentInterface(interface=fsl.BET(), name="fsl_interface") - nn = Node(name="fsl", inputs={"in_file": "/Users/dorota/nipype_workshop/data/ds000114/sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz"}, interface=interf_bet, - workingdir="nn", output_names=["out_file"], write_state=False) - - wf = Workflow(workingdir="test_cwf_1a_{}".format(plugin), name="cw1", wf_output_names=[("fsl", "out_file", "fsl_out")], - write_state=False) + nn = Node( + name="fsl", + inputs={ + "in_file": + "/Users/dorota/nipype_workshop/data/ds000114/sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz" + }, + interface=interf_bet, + workingdir="nn", + output_names=["out_file"], + write_state=False) + + wf = Workflow( + workingdir="test_cwf_1a_{}".format(plugin), + name="cw1", + wf_output_names=[("fsl", "out_file", "fsl_out")], + write_state=False) wf.add(runnable=nn) sub = Submitter(plugin=plugin, runnable=wf) @@ -1372,17 +1562,29 @@ def test_current_wf_1a(change_dir, plugin): assert "fsl_out" in wf.output.keys() -@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data") +@pytest.mark.skipif( + not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data") @pytest.mark.parametrize("plugin", Plugins) @python35_only def test_current_wf_1b(change_dir, plugin): """Wf with a current interface, no mapper; using wf.add(nipype CurrentInterface)""" interf_bet = CurrentInterface(interface=fsl.BET(), name="fsl_interface") - wf = Workflow(workingdir="test_cwf_1b_{}".format(plugin), name="cw1", wf_output_names=[("fsl", "out_file", "fsl_out")], - write_state=False) - wf.add(runnable=interf_bet, name="fsl", workingdir="nn", output_names=["out_file"], write_state=False, - inputs={"in_file": "/Users/dorota/nipype_workshop/data/ds000114/sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz"}) + wf = Workflow( + workingdir="test_cwf_1b_{}".format(plugin), + name="cw1", + wf_output_names=[("fsl", "out_file", "fsl_out")], + write_state=False) + wf.add( + runnable=interf_bet, + name="fsl", + workingdir="nn", + output_names=["out_file"], + write_state=False, + inputs={ + "in_file": + "/Users/dorota/nipype_workshop/data/ds000114/sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz" + }) sub = Submitter(plugin=plugin, runnable=wf) sub.run() @@ -1391,16 +1593,28 @@ def test_current_wf_1b(change_dir, plugin): assert "fsl_out" in wf.output.keys() -@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data") +@pytest.mark.skipif( + not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data") @pytest.mark.parametrize("plugin", Plugins) @python35_only def test_current_wf_1c(change_dir, plugin): """Wf with a current interface, no mapper; using wf.add(nipype interface) """ - wf = Workflow(workingdir="test_cwf_1c_{}".format(plugin), name="cw1", wf_output_names=[("fsl", "out_file", "fsl_out")], - write_state=False) - wf.add(runnable=fsl.BET(), name="fsl", workingdir="nn", output_names=["out_file"], write_state=False, - inputs={"in_file": "/Users/dorota/nipype_workshop/data/ds000114/sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz"}) + wf = Workflow( + workingdir="test_cwf_1c_{}".format(plugin), + name="cw1", + wf_output_names=[("fsl", "out_file", "fsl_out")], + write_state=False) + wf.add( + runnable=fsl.BET(), + name="fsl", + workingdir="nn", + output_names=["out_file"], + write_state=False, + inputs={ + "in_file": + "/Users/dorota/nipype_workshop/data/ds000114/sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz" + }) sub = Submitter(plugin=plugin, runnable=wf) sub.run() @@ -1409,21 +1623,33 @@ def test_current_wf_1c(change_dir, plugin): assert "fsl_out" in wf.output.keys() -@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data") +@pytest.mark.skipif( + not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data") @pytest.mark.parametrize("plugin", Plugins) @python35_only def test_current_wf_2(change_dir, plugin): """Wf with a current interface and mapper""" interf_bet = CurrentInterface(interface=fsl.BET(), name="fsl_interface") - in_file_l = ["/Users/dorota/nipype_workshop/data/ds000114/sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz", - "/Users/dorota/nipype_workshop/data/ds000114/sub-02/ses-test/anat/sub-02_ses-test_T1w.nii.gz"] - - nn = Node(name="fsl", interface=interf_bet, write_state=False, - workingdir="nn", output_names=["out_file"]) - - wf = Workflow( workingdir="test_cwf_2_{}".format(plugin), name="cw2", wf_output_names=[("fsl", "out_file", "fsl_out")], - inputs={"in_file": in_file_l}, mapper="in_file", write_state=False) + in_file_l = [ + "/Users/dorota/nipype_workshop/data/ds000114/sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz", + "/Users/dorota/nipype_workshop/data/ds000114/sub-02/ses-test/anat/sub-02_ses-test_T1w.nii.gz" + ] + + nn = Node( + name="fsl", + interface=interf_bet, + write_state=False, + workingdir="nn", + output_names=["out_file"]) + + wf = Workflow( + workingdir="test_cwf_2_{}".format(plugin), + name="cw2", + wf_output_names=[("fsl", "out_file", "fsl_out")], + inputs={"in_file": in_file_l}, + mapper="in_file", + write_state=False) wf.add_nodes([nn]) wf.connect_wf_input("in_file", "fsl", "in_file") @@ -1436,24 +1662,35 @@ def test_current_wf_2(change_dir, plugin): assert 'cw2.in_file:1' in wf.output["fsl_out"].keys() -@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data") +@pytest.mark.skipif( + not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data") @pytest.mark.parametrize("plugin", Plugins) @python35_only def test_current_wf_2a(change_dir, plugin): """Wf with a current interface and mapper""" interf_bet = CurrentInterface(interface=fsl.BET(), name="fsl_interface") - in_file_l = ["/data/ds000114/sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz", - "/data/ds000114/sub-02/ses-test/anat/sub-02_ses-test_T1w.nii.gz"] - - nn = Node(name="fsl", interface=interf_bet, write_state=False, - workingdir="nn", output_names=["out_file"], - inputs={"in_file": in_file_l}, mapper="in_file") - - wf = Workflow( workingdir="test_cwf_2a_{}".format(plugin), name="cw2a", wf_output_names=[("fsl", "out_file", "fsl_out")], - write_state=False) + in_file_l = [ + "/data/ds000114/sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz", + "/data/ds000114/sub-02/ses-test/anat/sub-02_ses-test_T1w.nii.gz" + ] + + nn = Node( + name="fsl", + interface=interf_bet, + write_state=False, + workingdir="nn", + output_names=["out_file"], + inputs={"in_file": in_file_l}, + mapper="in_file") + + wf = Workflow( + workingdir="test_cwf_2a_{}".format(plugin), + name="cw2a", + wf_output_names=[("fsl", "out_file", "fsl_out")], + write_state=False) wf.add_nodes([nn]) - # wf.connect_wf_input("in_file", "fsl", "in_file") + # wf.connect_wf_input("in_file", "fsl", "in_file") sub = Submitter(plugin=plugin, runnable=wf) sub.run() diff --git a/pydra/engine/tests/test_newnode_neuro.py b/pydra/engine/tests/test_newnode_neuro.py index 7b8392bcfe..2043bf5fb1 100644 --- a/pydra/engine/tests/test_newnode_neuro.py +++ b/pydra/engine/tests/test_newnode_neuro.py @@ -14,6 +14,7 @@ except ImportError: no_fmriprep = True + @pytest.fixture() def change_dir(request): orig_dir = os.getcwd() @@ -26,26 +27,35 @@ def move2orig(): request.addfinalizer(move2orig) + import pdb Name = "example" DEFAULT_MEMORY_MIN_GB = None # TODO, adding fields to Inputs (subject_id) -Inputs = {"subject_id": "sub-01", - "output_spaces": ["fsaverage", "fsaverage5"], - "source_file": "/fmriprep_test/workdir1/fmriprep_wf/single_subject_01_wf/func_preproc_ses_test_task_fingerfootlips_wf/bold_t1_trans_wf/merge/vol0000_xform-00000_merged.nii", - "t1_preproc": "/fmriprep_test/output1/fmriprep/sub-01/anat/sub-01_T1w_preproc.nii.gz", - "t1_2_fsnative_forward_transform": "/fmriprep_test/workdir1/fmriprep_wf/single_subject_01_wf/anat_preproc_wf/surface_recon_wf/t1_2_fsnative_xfm/out.lta", - "subjects_dir": "/fmriprep_test/output1/freesurfer/" +Inputs = { + "subject_id": + "sub-01", + "output_spaces": ["fsaverage", "fsaverage5"], + "source_file": + "/fmriprep_test/workdir1/fmriprep_wf/single_subject_01_wf/func_preproc_ses_test_task_fingerfootlips_wf/bold_t1_trans_wf/merge/vol0000_xform-00000_merged.nii", + "t1_preproc": + "/fmriprep_test/output1/fmriprep/sub-01/anat/sub-01_T1w_preproc.nii.gz", + "t1_2_fsnative_forward_transform": + "/fmriprep_test/workdir1/fmriprep_wf/single_subject_01_wf/anat_preproc_wf/surface_recon_wf/t1_2_fsnative_xfm/out.lta", + "subjects_dir": + "/fmriprep_test/output1/freesurfer/" } Plugins = ["serial"] Plugins = ["serial", "mp", "cf", "dask"] + def select_target(subject_id, space): """ Given a source subject ID and a target space, get the target subject ID """ return subject_id if space == 'fsnative' else space + @pytest.mark.skipif(no_fmriprep, reason="No fmriprep") @pytest.mark.parametrize("plugin", Plugins) def test_neuro(change_dir, plugin): @@ -58,17 +68,18 @@ def test_neuro(change_dir, plugin): # #dj: why do I need outputs? - - wf = Workflow(name=Name, inputs=Inputs, workingdir="test_neuro_{}".format(plugin), write_state=False, - wf_output_names=[("sampler", "out_file", "sampler_out"), ("targets", "out", "target_out")]) + wf = Workflow( + name=Name, + inputs=Inputs, + workingdir="test_neuro_{}".format(plugin), + write_state=False, + wf_output_names=[("sampler", "out_file", "sampler_out"), ("targets", "out", "target_out")]) # @interface # def select_target(subject_id, space): # """ Given a source subject ID and a target space, get the target subject ID """ # return subject_id if space == 'fsnative' else space - - # wf.add('targets', select_target(subject_id=wf.inputs.subject_id)) # .map('space', space=[space for space in wf.inputs.output_spaces # if space.startswith('fs')]) @@ -93,7 +104,6 @@ def test_neuro(change_dir, plugin): .map_node('subject', inputs={"subject": [space for space in Inputs["output_spaces"] if space.startswith("fs")]}) #TODO: now it's only one subject - # wf.add('resampling_xfm', # fs.utils.LTAConvert(in_lta='identity.nofile', # out_lta=True, @@ -113,8 +123,6 @@ def test_neuro(change_dir, plugin): in_lta2="t1_2_fsnative_forward_transform", in_lta1="resampling_xfm.out_lta", output_names=["out_file"], write_state=False) - - # wf.add('sampler', # fs.SampleToSurface(sampling_method='average', sampling_range=(0, 1, 0.2), # sampling_units='frac', interp_method='trilinear', @@ -138,7 +146,6 @@ def test_neuro(change_dir, plugin): target_subject="targets.out", source_file="rename_src.out_file", output_names=["out_file"])\ .map_node(mapper=[('_targets', "_rename_src"), 'hemi'], inputs={"hemi": ['lh', 'rh']}) - sub = Submitter(plugin=plugin, runnable=wf) sub.run() sub.close() diff --git a/pydra/engine/workers.py b/pydra/engine/workers.py index 1014ca712c..d8e7716d11 100644 --- a/pydra/engine/workers.py +++ b/pydra/engine/workers.py @@ -25,7 +25,7 @@ def close(self): class MpWorker(Worker): - def __init__(self, nr_proc=4): #should be none + def __init__(self, nr_proc=4): #should be none self.nr_proc = nr_proc self.pool = mp.Pool(processes=self.nr_proc) logger.debug('Initialize MpWorker') @@ -72,10 +72,9 @@ def __init__(self): from distributed.deploy.local import LocalCluster logger.debug("Initialize Dask Worker") #self.cluster = LocalCluster() - self.client = Client()#self.cluster) + self.client = Client() #self.cluster) #print("BOKEH", self.client.scheduler_info()["address"] + ":" + str(self.client.scheduler_info()["services"]["bokeh"])) - def run_el(self, interface, inp): print("DASK, run_el: ", interface, inp, time.time()) # dask doesn't copy the node second time, so it doesn't see that I change input in the meantime (??) @@ -85,8 +84,6 @@ def run_el(self, interface, inp): x.add_done_callback(lambda x: print("DONE ", interface, inp)) print("res", x.result()) - def close(self): #self.cluster.close() self.client.close() - diff --git a/setup.cfg b/setup.cfg index c865f3e421..2924475c85 100644 --- a/setup.cfg +++ b/setup.cfg @@ -8,3 +8,8 @@ versionfile_source = pydra/_version.py versionfile_build = pydra/_version.py tag_prefix = parentdir_prefix = + +[flake8] +doctests = True +exclude = **/__init__.py +max-line-length=99