diff --git a/pydra/engine/auxiliary.py b/pydra/engine/auxiliary.py index a34463bc2f..68a4bbc6bb 100644 --- a/pydra/engine/auxiliary.py +++ b/pydra/engine/auxiliary.py @@ -1,9 +1,11 @@ -import pdb -import inspect, os +# import pdb +import os +import inspect import logging -logger = logging.getLogger('nipype.workflow') from nipype import Node +logger = logging.getLogger('nipype.workflow') + # dj: might create a new class or move to State @@ -50,7 +52,7 @@ def _ordering(el, i, output_mapper, current_sign=None, other_mappers=None): output_mapper.append(el) else: raise Exception("mapper has to be a string, a tuple or a list") - + if i > 0: output_mapper.append(current_sign) @@ -58,24 +60,26 @@ def _ordering(el, i, output_mapper, current_sign=None, other_mappers=None): def _iterate_list(element, sign, other_mappers, output_mapper): """ Used in the mapper2rpn to get recursion. """ for i, el in enumerate(element): - _ordering(el, i, current_sign=sign, other_mappers=other_mappers, output_mapper=output_mapper) + _ordering(el, i, current_sign=sign, other_mappers=other_mappers, + output_mapper=output_mapper) # functions used in State to know which element should be used for a specific axis def mapping_axis(state_inputs, mapper_rpn): - """Having inputs and mapper (in rpn notation), functions returns the axes of output for every input.""" + """Given inputs and mapper (in rpn notation), return the axes of output of each input.""" axis_for_input = {} stack = [] current_axis = None current_shape = None - #pdb.set_trace() + # pdb.set_trace() for el in mapper_rpn: if el == ".": right = stack.pop() left = stack.pop() if left == "OUT": - if state_inputs[right].shape == current_shape: #todo:should we allow for one-element array? + # todo:should we allow for one-element array? + if state_inputs[right].shape == current_shape: axis_for_input[right] = current_axis else: raise Exception("arrays for scalar operations should have the same size") @@ -94,7 +98,7 @@ def mapping_axis(state_inputs, mapper_rpn): axis_for_input[right] = current_axis else: raise Exception("arrays for scalar operations should have the same size") - + stack.append("OUT") elif el == "*": @@ -120,7 +124,7 @@ def mapping_axis(state_inputs, mapper_rpn): axis_for_input[right] = [i + state_inputs[left].ndim for i in range(state_inputs[right].ndim)] current_axis = axis_for_input[left] + axis_for_input[right] - current_shape = tuple([i for i in + current_shape = tuple([i for i in state_inputs[left].shape + state_inputs[right].shape]) stack.append("OUT") @@ -149,12 +153,12 @@ def converting_axis2input(state_inputs, axis_for_input, ndim): for i in range(ndim): input_for_axis.append([]) shape.append(0) - + for inp, axis in axis_for_input.items(): for (i, ax) in enumerate(axis): input_for_axis[ax].append(inp) shape[ax] = state_inputs[inp].shape[i] - + return input_for_axis, shape @@ -190,8 +194,6 @@ def _add_name(mlist, name): return mlist -#Function interface - class FunctionInterface(object): """ A new function interface """ def __init__(self, function, output_nm, out_read=False, input_map=None): @@ -209,7 +211,6 @@ def __init__(self, function, output_nm, out_read=False, input_map=None): # flags if we want to read the txt file to save in node.output self.out_read = out_read - def run(self, input): self.output = {} if self.input_map: @@ -226,7 +227,7 @@ def run(self, input): self.output[self._output_nm[i]] = out else: raise Exception("length of output_nm doesnt match length of the function output") - elif len(self._output_nm)==1: + elif len(self._output_nm) == 1: self.output[self._output_nm[0]] = fun_output else: raise Exception("output_nm doesnt match length of the function output") @@ -241,8 +242,8 @@ class DotDict(dict): """dot.notation access to dictionary attributes""" def __getattr__(self, attr): return self.get(attr) - __setattr__= dict.__setitem__ - __delattr__= dict.__delitem__ + __setattr__ = dict.__setitem__ + __delattr__ = dict.__delitem__ def __getstate__(self): return self @@ -262,7 +263,7 @@ def run(self, inputs, base_dir, dir_nm_el): for key, val in inputs.items(): key = key.split(".")[-1] setattr(self.nn.inputs, key, val) - #have to set again self._output_dir in case of mapper + # have to set again self._output_dir in case of mapper self.nn._output_dir = os.path.join(self.nn.base_dir, self.nn.name) res = self.nn.run() - return res \ No newline at end of file + return res diff --git a/pydra/engine/newengine.py b/pydra/engine/newengine.py index f0ed18c406..6a3bebd8f4 100644 --- a/pydra/engine/newengine.py +++ b/pydra/engine/newengine.py @@ -5,24 +5,24 @@ from . import state from . import auxiliary as aux -from . import submitter as sub from nipype.utils.filemanip import loadpkl import logging logger = logging.getLogger('nipype.workflow') -import pdb # dj ??: should I use EngineBase? class NewBase(object): def __init__(self, name, mapper=None, inputs=None, other_mappers=None, mem_gb=None, cache_location=None, print_val=True, *args, **kwargs): self.name = name - #dj TODO: I should think what is needed in the __init__ (I redefine some of rhe attributes anyway) + # dj TODO: I should think what is needed in the __init__ (I redefine some of rhe attributes + # anyway) if inputs: # adding name of the node to the input name - self._inputs = dict(("{}.{}".format(self.name, key), value) for (key, value) in inputs.items()) + self._inputs = dict(("{}.{}".format(self.name, key), value) + for key, value in inputs.items()) self._inputs = dict((key, np.array(val)) if type(val) is list else (key, val) for (key, val) in self._inputs.items()) self._state_inputs = self._inputs.copy() @@ -33,10 +33,13 @@ def __init__(self, name, mapper=None, inputs=None, other_mappers=None, mem_gb=No # adding name of the node to the input name within the mapper mapper = aux.change_mapper(mapper, self.name) self._mapper = mapper - # information about other nodes' mappers from workflow (in case the mapper from previous node is used) + # information about other nodes' mappers from workflow + # (in case the mapper from previous node is used) self._other_mappers = other_mappers - # create state (takes care of mapper, connects inputs with axes, so we can ask for specifc element) - self._state = state.State(mapper=self._mapper, node_name=self.name, other_mappers=self._other_mappers) + # create state (takes care of mapper, connects inputs with axes, so we can ask for specifc + # element) + self._state = state.State(mapper=self._mapper, node_name=self.name, + other_mappers=self._other_mappers) self._output = {} self._result = {} # flag that says if the node/wf is ready to run (has all input) @@ -45,14 +48,14 @@ def __init__(self, name, mapper=None, inputs=None, other_mappers=None, mem_gb=No self.needed_outputs = [] # flag that says if node finished all jobs self._is_complete = False - # flag that says if value of state input should be printed in output and directories (otherwise indices) + # flag that says if value of state input should be printed in output and directories + # (otherwise indices) self.print_val = print_val # TODO: don't use it yet self.mem_gb = mem_gb self.cache_location = cache_location - # TBD def join(self, field): pass @@ -69,7 +72,8 @@ def mapper(self): def mapper(self, mapper): self._mapper = mapper # updating state - self._state = state.State(mapper=self._mapper, node_name=self.name, other_mappers=self._other_mappers) + self._state = state.State(mapper=self._mapper, node_name=self.name, + other_mappers=self._other_mappers) @property def state_inputs(self): @@ -79,7 +83,6 @@ def state_inputs(self): def state_inputs(self, state_inputs): self._state_inputs.update(state_inputs) - @property def output(self): return self._output @@ -90,11 +93,9 @@ def result(self): self._reading_results() return self._result - def prepare_state_input(self): self._state.prepare_state_input(state_inputs=self.state_inputs) - def map(self, mapper, inputs=None): if self._mapper: raise Exception("mapper is already set") @@ -102,30 +103,25 @@ def map(self, mapper, inputs=None): self._mapper = aux.change_mapper(mapper, self.name) if inputs: - inputs = dict(("{}.{}".format(self.name, key), value) for (key, value) in inputs.items()) + inputs = dict(("{}.{}".format(self.name, key), value) + for (key, value) in inputs.items()) inputs = dict((key, np.array(val)) if type(val) is list else (key, val) for (key, val) in inputs.items()) self._inputs.update(inputs) self._state_inputs.update(inputs) if mapper: # updating state if we have a new mapper - self._state = state.State(mapper=self._mapper, node_name=self.name, other_mappers=self._other_mappers) - - - def join(self, field, node=None): - # TBD - pass - + self._state = state.State(mapper=self._mapper, node_name=self.name, + other_mappers=self._other_mappers) def checking_input_el(self, ind): """checking if all inputs are available (for specific state element)""" try: self.get_input_el(ind) return True - except: #TODO specify + except Exception: # TODO specify return False - # dj: this is not used for a single node def get_input_el(self, ind): """collecting all inputs required to run the node (for specific state element)""" @@ -141,12 +137,13 @@ def get_input_el(self, ind): dir_nm_el_from = "" if is_node(from_node) and is_current_interface(from_node.interface): - file_from = self._reading_ci_output(node=from_node, dir_nm_el=dir_nm_el_from, out_nm=from_socket) + file_from = self._reading_ci_output(node=from_node, dir_nm_el=dir_nm_el_from, + out_nm=from_socket) if file_from and os.path.exists(file_from): inputs_dict["{}.{}".format(self.name, to_socket)] = file_from else: raise Exception("{} doesnt exist".format(file_from)) - else: # assuming here that I want to read the file (will not be used with the current interfaces) + else: # assuming I want to read the file (will not be used with current interfaces) file_from = os.path.join(from_node.workingdir, dir_nm_el_from, from_socket+".txt") with open(file_from) as f: content = f.readline() @@ -158,11 +155,12 @@ def get_input_el(self, ind): return state_dict, inputs_dict def _reading_ci_output(self, dir_nm_el, out_nm, node=None): - """used for current interfaces: checking if the output exists and returns the path if it does""" + """used for current interfaces: check if output exists and return the path if it does""" if not node: node = self result_pklfile = os.path.join(os.getcwd(), node.workingdir, dir_nm_el, - node.interface.nn.name, "result_{}.pklz".format(node.interface.nn.name)) + node.interface.nn.name, + "result_{}.pklz".format(node.interface.nn.name)) if os.path.exists(result_pklfile): out_file = getattr(loadpkl(result_pklfile).outputs, out_nm) if os.path.exists(out_file): @@ -172,7 +170,6 @@ def _reading_ci_output(self, dir_nm_el, out_nm, node=None): else: return False - # checking if all outputs are saved @property def is_complete(self): @@ -183,7 +180,6 @@ def is_complete(self): else: return self._check_all_results() - def get_output(self): raise NotImplementedError @@ -193,7 +189,6 @@ def _check_all_results(self): def _reading_results(self): raise NotImplementedError - def _dict_tuple2list(self, container): if type(container) is dict: val_l = [val for (_, val) in container.items()] @@ -220,7 +215,7 @@ def __init__(self, name, interface, inputs=None, mapper=None, join_by=None, if is_function_interface(self.interface): # adding node name to the interface's name mapping self.interface.input_map = dict((key, "{}.{}".format(self.name, value)) - for (key, value) in self.interface.input_map.items()) + for (key, value) in self.interface.input_map.items()) # list of output names taken from interface output name self.output_names = self.interface._output_nm elif is_current_interface(self.interface): @@ -229,8 +224,6 @@ def __init__(self, name, interface, inputs=None, mapper=None, join_by=None, if not self.output_names: self.output_names = [] - - # dj: not sure if I need it # def __deepcopy__(self, memo): # memo is a dict of id's to copies # id_self = id(self) # memoization avoids unnecesary recursion @@ -249,7 +242,6 @@ def __init__(self, name, interface, inputs=None, mapper=None, join_by=None, # memo[id_self] = _copy # return _copy - @property def inputs(self): return self._inputs @@ -258,7 +250,6 @@ def inputs(self): def inputs(self, inputs): self._inputs.update(inputs) - def run_interface_el(self, i, ind): """ running interface one element generated from node_state.""" logger.debug("Run interface el, name={}, i={}, ind={}".format(self.name, i, ind)) @@ -278,22 +269,24 @@ def run_interface_el(self, i, ind): elif is_current_interface(self.interface): if not self.mapper: dir_nm_el = "" - res = self.interface.run(inputs=inputs_dict, base_dir=os.path.join(os.getcwd(), self.workingdir), + res = self.interface.run(inputs=inputs_dict, + base_dir=os.path.join(os.getcwd(), self.workingdir), dir_nm_el=dir_nm_el) # TODO when join - #if self._joinByKey: - # dir_join = "join_" + "_".join(["{}.{}".format(i, j) for i, j in list(state_dict.items()) if i not in self._joinByKey]) - #elif self._join: - # dir_join = "join_" - #if self._joinByKey or self._join: - # os.makedirs(os.path.join(self.nodedir, dir_join), exist_ok=True) - # dir_nm_el = os.path.join(dir_join, dir_nm_el) + # if self._joinByKey: + # dir_join = "join_" + "_".join(["{}.{}".format(i, j) + # for i, j in list(state_dict.items()) + # if i not in self._joinByKey]) + # elif self._join: + # dir_join = "join_" + # if self._joinByKey or self._join: + # os.makedirs(os.path.join(self.nodedir, dir_join), exist_ok=True) + # dir_nm_el = os.path.join(dir_join, dir_nm_el) return res - def _writting_results_tmp(self, state_dict, dir_nm_el, output): - """temporary method to write the results in the files (this is usually part of a interface)""" + """temporary method to write results in files (this is usually part of a interface)""" if not self.mapper: dir_nm_el = '' os.makedirs(os.path.join(self.workingdir, dir_nm_el), exist_ok=True) @@ -301,7 +294,6 @@ def _writting_results_tmp(self, state_dict, dir_nm_el, output): with open(os.path.join(self.workingdir, dir_nm_el, key_out+".txt"), "w") as fout: fout.write(str(val_out)) - def get_output(self): """collecting all outputs and updating self._output""" for key_out in self.output_names: @@ -324,8 +316,10 @@ def get_output(self): output = content self._output[key_out][dir_nm_el] = (state_dict, output) elif is_current_interface(self.interface): - self._output[key_out][dir_nm_el] = \ - (state_dict, (state_dict, self._reading_ci_output(dir_nm_el=dir_nm_el, out_nm=key_out))) + self._output[key_out][dir_nm_el] = (state_dict, (state_dict, + self._reading_ci_output( + dir_nm_el=dir_nm_el, + out_nm=key_out))) else: if is_function_interface(self.interface): output = os.path.join(self.workingdir, key_out + ".txt") @@ -341,7 +335,6 @@ def get_output(self): (state_dict, self._reading_ci_output(dir_nm_el="", out_nm=key_out)) return self._output - # dj: version without join def _check_all_results(self): """checking if all files that should be created are present""" @@ -356,7 +349,8 @@ def _check_all_results(self): for key_out in self.output_names: if is_function_interface(self.interface): - if not os.path.isfile(os.path.join(self.workingdir, dir_nm_el, key_out+".txt")): + if not os.path.isfile(os.path.join(self.workingdir, dir_nm_el, + key_out+".txt")): return False elif is_current_interface(self.interface): if not self._reading_ci_output(dir_nm_el, key_out): @@ -364,14 +358,13 @@ def _check_all_results(self): self._is_complete = True return True - def _reading_results(self): """temporary: reading results from output files (that is now just txt) should be probably just reading output for self.output_names """ for key_out in self.output_names: self._result[key_out] = [] - #pdb.set_trace() + # pdb.set_trace() if self._state_inputs: val_l = self._dict_tuple2list(self._output[key_out]) for (st_dict, filename) in val_l: @@ -395,10 +388,12 @@ def _reading_results(self): class NewWorkflow(NewBase): - def __init__(self, name, inputs=None, wf_output_names=None, mapper=None, #join_by=None, - nodes=None, workingdir=None, mem_gb=None, cache_location=None, print_val=True, *args, **kwargs): + def __init__(self, name, inputs=None, wf_output_names=None, mapper=None, # join_by=None, + nodes=None, workingdir=None, mem_gb=None, cache_location=None, print_val=True, + *args, **kwargs): super(NewWorkflow, self).__init__(name=name, mapper=mapper, inputs=inputs, mem_gb=mem_gb, - cache_location=cache_location, print_val=print_val, *args, **kwargs) + cache_location=cache_location, print_val=print_val, + *args, **kwargs) self.graph = nx.DiGraph() # all nodes in the workflow (probably will be removed) @@ -417,25 +412,27 @@ def __init__(self, name, inputs=None, wf_output_names=None, mapper=None, #join_b self._node_mappers = {} # dj: not sure if this should be different than base_dir self.workingdir = os.path.join(os.getcwd(), workingdir) - # list of (nodename, output name in the name, output name in wf) or (nodename, output name in the name) + # list of (nodename, output name in the name, output name in wf) or + # (nodename, output name in the name) # dj: using different name than for node, since this one it is defined by a user self.wf_output_names = wf_output_names - # nodes that are created when the workflow has mapper (key: node name, value: list of nodes) + # nodes that are created when the workflow has mapper + # (key: node name, value: list of nodes) self.inner_nodes = {} # in case of inner workflow this points to the main/parent workflow self.parent_wf = None # dj not sure what was the motivation, wf_klasses gives an empty list - #mro = self.__class__.mro() - #wf_klasses = mro[:mro.index(NewWorkflow)][::-1] - #items = {} - #for klass in wf_klasses: - # items.update(klass.__dict__) - #for name, runnable in items.items(): - # if name in ('__module__', '__doc__'): - # continue + # mro = self.__class__.mro() + # wf_klasses = mro[:mro.index(NewWorkflow)][::-1] + # items = {} + # for klass in wf_klasses: + # items.update(klass.__dict__) + # for name, runnable in items.items(): + # if name in ('__module__', '__doc__'): + # continue - # self.add(name, value) + # self.add(name, value) @property def inputs(self): @@ -443,8 +440,8 @@ def inputs(self): @inputs.setter def inputs(self, inputs): - self._inputs.update(dict(("{}.{}".format(self.name, key), value) for (key, value) in inputs.items())) - + self._inputs.update(dict(("{}.{}".format(self.name, key), value) + for (key, value) in inputs.items())) @property def nodes(self): @@ -455,7 +452,6 @@ def graph_sorted(self): # TODO: should I always update the graph? return list(nx.topological_sort(self.graph)) - def map_node(self, mapper, node=None, inputs=None): """this is setting a mapper to the wf's nodes (not to the wf)""" if not node: @@ -465,9 +461,9 @@ def map_node(self, mapper, node=None, inputs=None): node.map(mapper=mapper, inputs=inputs) self._node_mappers[node.name] = node.mapper - def get_output(self): - # not sure, if I should collecto output of all nodes or only the ones that are used in wf.output + # not sure, if I should collecto output of all nodes or only the ones that are used in + # wf.output self.node_outputs = {} for nn in self.graph: if self.mapper: @@ -488,18 +484,21 @@ def get_output(self): for (i, ind) in enumerate(itertools.product(*self.state.all_elements)): if self.print_val: wf_inputs_dict = self.state.state_values(ind) - dir_nm_el = "_".join(["{}:{}".format(i, j) for i, j in list(wf_inputs_dict.items())]) + dir_nm_el = "_".join(["{}:{}".format(i, j) + for i, j in list(wf_inputs_dict.items())]) else: wf_ind_dict = self.state.state_ind(ind) - dir_nm_el = "_".join(["{}:{}".format(i, j) for i, j in list(wf_ind_dict.items())]) - self._output[out_wf_nm][dir_nm_el] = self.node_outputs[node_nm][i][out_nd_nm] + dir_nm_el = "_".join(["{}:{}".format(i, j) + for i, j in list(wf_ind_dict.items())]) + self._output[out_wf_nm][dir_nm_el] = \ + self.node_outputs[node_nm][i][out_nd_nm] else: self._output[out_wf_nm] = self.node_outputs[node_nm][out_nd_nm] else: - raise Exception("the key {} is already used in workflow.result".format(out_wf_nm)) + raise Exception( + "the key {} is already used in workflow.result".format(out_wf_nm)) return self._output - # dj: version without join # TODO: might merge with the function from Node def _check_all_results(self): @@ -514,7 +513,6 @@ def _check_all_results(self): self._is_complete = True return True - # TODO: should try to merge with the function from Node def _reading_results(self): """reading all results of the workflow @@ -522,7 +520,7 @@ def _reading_results(self): """ if self.wf_output_names: for out in self.wf_output_names: - key_out = out[2] if len(out)==3 else out[1] + key_out = out[2] if len(out) == 3 else out[1] self._result[key_out] = [] if self.mapper: for (i, ind) in enumerate(itertools.product(*self.state.all_elements)): @@ -530,26 +528,28 @@ def _reading_results(self): wf_inputs_dict = self.state.state_values(ind) else: wf_inputs_dict = self.state.state_ind(ind) - dir_nm_el = "_".join(["{}:{}".format(i, j) for i, j in list(wf_inputs_dict.items())]) - res_l= [] + dir_nm_el = "_".join(["{}:{}".format(i, j) + for i, j in wf_inputs_dict.items()]) + res_l = [] val_l = self._dict_tuple2list(self.output[key_out][dir_nm_el]) for val in val_l: with open(val[1]) as fout: - logger.debug('Reading Results: file={}, st_dict={}'.format(val[1], val[0])) + logger.debug('Reading Results: file={}, st_dict={}'.format(val[1], + val[0])) res_l.append((val[0], eval(fout.readline()))) self._result[key_out].append((wf_inputs_dict, res_l)) else: val_l = self._dict_tuple2list(self.output[key_out]) for val in val_l: - #TODO: I think that val shouldn't be dict here... + # TODO: I think that val shouldn't be dict here... # TMP solution if type(val) is dict: - val = [v for k,v in val.items()][0] + val = [v for k, v in val.items()][0] with open(val[1]) as fout: - logger.debug('Reading Results: file={}, st_dict={}'.format(val[1], val[0])) + logger.debug('Reading Results: file={}, st_dict={}'.format(val[1], + val[0])) self._result[key_out].append((val[0], eval(fout.readline()))) - def add_nodes(self, nodes): """adding nodes without defining connections most likely it will be removed at the end @@ -557,41 +557,43 @@ def add_nodes(self, nodes): self.graph.add_nodes_from(nodes) for nn in nodes: self._nodes.append(nn) - #self._inputs.update(nn.inputs) + # self._inputs.update(nn.inputs) self.connected_var[nn] = {} self._node_names[nn.name] = nn self._node_mappers[nn.name] = nn.mapper - # TODO: workingir shouldn't have None - def add(self, runnable, name=None, workingdir=None, inputs=None, output_names=None, mapper=None, - mem_gb=None, print_val=True, out_read=False, **kwargs): + def add(self, runnable, name=None, workingdir=None, inputs=None, output_names=None, + mapper=None, mem_gb=None, print_val=True, out_read=False, **kwargs): if is_function(runnable): if not output_names: output_names = ["out"] - interface = aux.FunctionInterface(function=runnable, output_nm=output_names, out_read=out_read) + interface = aux.FunctionInterface(function=runnable, output_nm=output_names, + out_read=out_read) if not name: raise Exception("you have to specify name for the node") if not workingdir: workingdir = name - node = NewNode(interface=interface, workingdir=workingdir, name=name, inputs=inputs, mapper=mapper, - other_mappers=self._node_mappers, mem_gb=mem_gb, print_val=print_val) + node = NewNode(interface=interface, workingdir=workingdir, name=name, inputs=inputs, + mapper=mapper, other_mappers=self._node_mappers, mem_gb=mem_gb, + print_val=print_val) elif is_function_interface(runnable) or is_current_interface(runnable): if not name: raise Exception("you have to specify name for the node") if not workingdir: workingdir = name - node = NewNode(interface=runnable, workingdir=workingdir, name=name, inputs=inputs, mapper=mapper, - other_mappers=self._node_mappers, mem_gb_node=mem_gb, output_names=output_names, - print_val=print_val) + node = NewNode(interface=runnable, workingdir=workingdir, name=name, inputs=inputs, + mapper=mapper, other_mappers=self._node_mappers, mem_gb_node=mem_gb, + output_names=output_names, print_val=print_val) elif is_nipype_interface(runnable): ci = aux.CurrentInterface(interface=runnable, name=name) if not name: raise Exception("you have to specify name for the node") if not workingdir: workingdir = name - node = NewNode(interface=ci, workingdir=workingdir, name=name, inputs=inputs, mapper=mapper, - other_mappers=self._node_mappers, mem_gb_node=mem_gb, output_names=output_names, + node = NewNode(interface=ci, workingdir=workingdir, name=name, inputs=inputs, + mapper=mapper, other_mappers=self._node_mappers, mem_gb_node=mem_gb, + output_names=output_names, print_val=print_val) elif is_node(runnable): node = runnable @@ -612,52 +614,53 @@ def add(self, runnable, name=None, workingdir=None, inputs=None, output_names=No self.connect_wf_input(source, node.name, inp) return self - def connect(self, from_node_nm, from_socket, to_node_nm, to_socket): from_node = self._node_names[from_node_nm] to_node = self._node_names[to_node_nm] self.graph.add_edges_from([(from_node, to_node)]) - if not to_node in self.nodes: + if to_node not in self.nodes: self.add_nodes(to_node) self.connected_var[to_node][to_socket] = (from_node, from_socket) # from_node.sending_output.append((from_socket, to_node, to_socket)) logger.debug('connecting {} and {}'.format(from_node, to_node)) - def connect_wf_input(self, inp_wf, node_nm, inp_nd): self.needed_inp_wf.append((node_nm, inp_wf, inp_nd)) - def preparing(self, wf_inputs=None, wf_inputs_ind=None): """preparing nodes which are connected: setting the final mapper and state_inputs""" - #pdb.set_trace() + # pdb.set_trace() for node_nm, inp_wf, inp_nd in self.needed_inp_wf: node = self._node_names[node_nm] if "{}.{}".format(self.name, inp_wf) in wf_inputs: - node.state_inputs.update({"{}.{}".format(node_nm, inp_nd): wf_inputs["{}.{}".format(self.name, inp_wf)]}) - node.inputs.update({"{}.{}".format(node_nm, inp_nd): wf_inputs["{}.{}".format(self.name, inp_wf)]}) + node.state_inputs.update({"{}.{}".format(node_nm, inp_nd): + wf_inputs["{}.{}".format(self.name, inp_wf)]}) + node.inputs.update({"{}.{}".format(node_nm, inp_nd): + wf_inputs["{}.{}".format(self.name, inp_wf)]}) else: raise Exception("{}.{} not in the workflow inputs".format(self.name, inp_wf)) for nn in self.graph_sorted: if self.print_val: dir_nm_el = "_".join(["{}:{}".format(i, j) for i, j in list(wf_inputs.items())]) else: - dir_nm_el = "_".join(["{}:{}".format(i, j) for i, j in list(wf_inputs_ind.items())]) + dir_nm_el = "_".join(["{}:{}".format(i, j) + for i, j in list(wf_inputs_ind.items())]) if not self.mapper: dir_nm_el = "" nn.workingdir = os.path.join(self.workingdir, dir_nm_el, nn.name) - nn._is_complete = False # helps when mp is used + nn._is_complete = False # helps when mp is used try: for inp, (out_node, out_var) in self.connected_var[nn].items(): - nn.ready2run = False #it has some history (doesnt have to be in the loop) + nn.ready2run = False # it has some history (doesnt have to be in the loop) nn.state_inputs.update(out_node.state_inputs) nn.needed_outputs.append((out_node, out_var, inp)) - #if there is no mapper provided, i'm assuming that mapper is taken from the previous node + # if there is no mapper provided, i'm assuming that mapper is taken from the + # previous node if (not nn.mapper or nn.mapper == out_node.mapper) and out_node.mapper: nn.mapper = out_node.mapper else: pass - #TODO: implement inner mapper + # TODO: implement inner mapper except(KeyError): # tmp: we don't care about nn that are not in self.connected_var pass @@ -678,17 +681,22 @@ def preparing(self, wf_inputs=None, wf_inputs_ind=None): def is_function(obj): return hasattr(obj, '__call__') + def is_function_interface(obj): return type(obj) is aux.FunctionInterface + def is_current_interface(obj): return type(obj) is aux.CurrentInterface + def is_nipype_interface(obj): return hasattr(obj, "_run_interface") + def is_node(obj): return type(obj) is NewNode + def is_workflow(obj): return type(obj) is NewWorkflow diff --git a/pydra/engine/state.py b/pydra/engine/state.py index 354b8d80af..349fad4871 100644 --- a/pydra/engine/state.py +++ b/pydra/engine/state.py @@ -1,9 +1,9 @@ from collections import OrderedDict import itertools -import pdb from . import auxiliary as aux + class State(object): def __init__(self, node_name, mapper=None, other_mappers=None): self._mapper = mapper @@ -17,12 +17,11 @@ def __init__(self, node_name, mapper=None, other_mappers=None): self._mapper_rpn = [] self._input_names_mapper = [] - def prepare_state_input(self, state_inputs): """prepare all inputs, should be called once all input is available""" - # dj TOTHINK: I actually stopped using state_inputs for now, since people wanted to have mapper not only - # for state inputs. Might have to come back.... + # dj TOTHINK: I actually stopped using state_inputs for now, since people wanted to have + # mapper not only for state inputs. Might have to come back.... self.state_inputs = state_inputs # not all input field have to be use in the mapper, can be an extra scalar @@ -37,35 +36,32 @@ def prepare_state_input(self, state_inputs): # e.g. [['e', 'd'], ['r', 'd']] # shape - list, e.g. [2,3] self._input_for_axis, self._shape = aux.converting_axis2input(self.state_inputs, - self._axis_for_input, self._ndim) + self._axis_for_input, + self._ndim) # list of all possible indexes in each dim, will be use to iterate # e.g. [[0, 1], [0, 1, 2]] self.all_elements = [range(i) for i in self._shape] self.index_generator = itertools.product(*self.all_elements) - def __getitem__(self, ind): if type(ind) is int: ind = (ind,) return self.state_values(ind) # not used? - #@property - #def mapper(self): - # return self._mapper - + # @property + # def mapper(self): + # return self._mapper @property def ndim(self): return self._ndim - @property def shape(self): return self._shape - def state_values(self, ind): """returns state input as a dictionary (input name, value)""" if len(ind) > self._ndim: @@ -73,14 +69,16 @@ def state_values(self, ind): for ii, index in enumerate(ind): if index > self._shape[ii] - 1: - raise IndexError("index {} is out of bounds for axis {} with size {}".format(index, ii, self._shape[ii])) + raise IndexError( + "index {} is out of bounds for axis {} with size {}".format(index, ii, + self._shape[ii])) state_dict = {} for input, ax in self._axis_for_input.items(): # checking which axes are important for the input sl_ax = slice(ax[0], ax[-1]+1) # taking the indexes for the axes - ind_inp = tuple(ind[sl_ax]) #used to be list + ind_inp = tuple(ind[sl_ax]) # used to be list state_dict[input] = self.state_inputs[input][ind_inp] # adding values from input that are not used in the mapper for input in set(self._input_names) - set(self._input_names_mapper): @@ -90,7 +88,6 @@ def state_values(self, ind): # returning a named tuple? return OrderedDict(sorted(state_dict.items(), key=lambda t: t[0])) - def state_ind(self, ind): """similar to state value but returns indices (not values)""" if len(ind) > self._ndim: @@ -98,14 +95,16 @@ def state_ind(self, ind): for ii, index in enumerate(ind): if index > self._shape[ii] - 1: - raise IndexError("index {} is out of bounds for axis {} with size {}".format(index, ii, self._shape[ii])) + raise IndexError( + "index {} is out of bounds for axis {} with size {}".format(index, ii, + self._shape[ii])) state_dict = {} for input, ax in self._axis_for_input.items(): # checking which axes are important for the input sl_ax = slice(ax[0], ax[-1]+1) # taking the indexes for the axes - ind_inp = tuple(ind[sl_ax]) #used to be list + ind_inp = tuple(ind[sl_ax]) # used to be list ind_inp_str = "x".join([str(el) for el in ind_inp]) state_dict[input] = ind_inp_str # adding inputs that are not used in the mapper @@ -114,4 +113,4 @@ def state_ind(self, ind): # in py3.7 we can skip OrderedDict # returning a named tuple? - return OrderedDict(sorted(state_dict.items(), key=lambda t: t[0])) \ No newline at end of file + return OrderedDict(sorted(state_dict.items(), key=lambda t: t[0])) diff --git a/pydra/engine/submitter.py b/pydra/engine/submitter.py index cd2e72ed83..a3f91f9dfc 100644 --- a/pydra/engine/submitter.py +++ b/pydra/engine/submitter.py @@ -1,4 +1,4 @@ -import os, pdb, time +import time from copy import deepcopy from .workers import MpWorker, SerialWorker, DaskWorker, ConcurrentFuturesWorker @@ -12,7 +12,7 @@ class Submitter(object): def __init__(self, plugin, runnable): self.plugin = plugin self.node_line = [] - self._to_finish = [] # used only for wf + self._to_finish = [] # used only for wf if self.plugin == "mp": self.worker = MpWorker() elif self.plugin == "serial": @@ -24,14 +24,13 @@ def __init__(self, plugin, runnable): else: raise Exception("plugin {} not available".format(self.plugin)) - if hasattr(runnable, 'interface'): # a node + if hasattr(runnable, 'interface'): # a node self.node = runnable - elif hasattr(runnable, "graph"): # a workflow + elif hasattr(runnable, "graph"): # a workflow self.workflow = runnable else: raise Exception("runnable has to be a Node or Workflow") - def run(self): """main running method, checks if submitter id for Node or Workflow""" if hasattr(self, "node"): @@ -39,7 +38,6 @@ def run(self): elif hasattr(self, "workflow"): self.run_workflow() - def run_node(self): """the main method to run a Node""" self.node.prepare_state_input() @@ -49,7 +47,6 @@ def run_node(self): time.sleep(3) self.node.get_output() - def _submit_node(self, node): """submitting nodes's interface for all states""" for (i, ind) in enumerate(node.state.index_generator): @@ -60,7 +57,6 @@ def _submit_node_el(self, node, i, ind): logger.debug("SUBMIT WORKER, node: {}, ind: {}".format(node, ind)) self.worker.run_el(node.run_interface_el, (i, ind)) - def run_workflow(self, workflow=None, ready=True): """the main function to run Workflow""" if not workflow: @@ -104,7 +100,6 @@ def run_workflow(self, workflow=None, ready=True): if workflow is self.workflow: workflow.get_output() - def _run_workflow_el(self, workflow, i, ind, collect_inp=False): """running one internal workflow (if workflow has a mapper)""" # TODO: can I simplify and remove collect inp? where should it be? @@ -119,15 +114,16 @@ def _run_workflow_el(self, workflow, i, ind, collect_inp=False): workflow.preparing(wf_inputs=wf_inputs, wf_inputs_ind=wf_inputs_ind) self._run_workflow_nd(workflow=workflow) - def _run_workflow_nd(self, workflow): - """iterating over all nodes from a workflow and submitting them or adding to the node_line""" + """iterate over all nodes from a workflow and submit or add to the node_line""" for (i_n, node) in enumerate(workflow.graph_sorted): - if workflow.parent_wf and workflow.parent_wf.mapper: # for now if parent_wf, parent_wf has to have mapper + # for now if parent_wf, parent_wf has to have mapper + if workflow.parent_wf and workflow.parent_wf.mapper: workflow.parent_wf.inner_nodes[node.name].append(node) node.prepare_state_input() self._to_finish.append(node) - # submitting all the nodes who are self sufficient (self.workflow.graph is already sorted) + # submitting all the nodes who are self sufficient + # self.workflow.graph is already sorted if node.ready2run: if hasattr(node, 'interface'): self._submit_node(node) @@ -148,10 +144,9 @@ def _run_workflow_nd(self, workflow): for (i, ind) in enumerate(nn.state.index_generator): self._to_finish.append(nn) self.node_line.append((nn, i, ind)) - else: #wf + else: # wf self.run_workflow(workflow=nn, ready=False) - def _nodes_check(self): """checking which nodes-states are ready to run and running the ones that are ready""" _to_remove = [] @@ -163,7 +158,7 @@ def _nodes_check(self): _to_remove.append((to_node, i, ind)) else: pass - else: #wf + else: # wf if to_node.checking_input_el(ind): self._run_workflow_el(workflow=to_node, i=i, ind=ind, collect_inp=True) _to_remove.append((to_node, i, ind)) @@ -174,7 +169,6 @@ def _nodes_check(self): self.node_line.remove(rn) return self.node_line - # this I believe can be done for entire node def _output_check(self): """"checking if all nodes are done""" @@ -187,6 +181,5 @@ def _output_check(self): self._to_finish.remove(rn) return self._to_finish - def close(self): self.worker.close() diff --git a/pydra/engine/tests/test_auxiliary.py b/pydra/engine/tests/test_auxiliary.py index 4c5df1df2f..775e6efeb8 100644 --- a/pydra/engine/tests/test_auxiliary.py +++ b/pydra/engine/tests/test_auxiliary.py @@ -3,54 +3,60 @@ import numpy as np import pytest -@pytest.mark.parametrize("mapper, rpn", - [ - ("a", ["a"]), - (("a", "b"), ["a", "b", "."]), - (["a", "b"], ["a", "b", "*"]), - (["a", ("b", "c")], ["a", "b", "c", ".", "*"]), - ([("a", "b"), "c"], ["a", "b", ".", "c", "*"]), - (["a", ("b", ["c", "d"])], ["a", "b", "c", "d", "*", ".", "*"]) - ]) + +@pytest.mark.parametrize( + "mapper, rpn", + [ + ("a", ["a"]), + (("a", "b"), ["a", "b", "."]), + (["a", "b"], ["a", "b", "*"]), + (["a", ("b", "c")], ["a", "b", "c", ".", "*"]), + ([("a", "b"), "c"], ["a", "b", ".", "c", "*"]), + (["a", ("b", ["c", "d"])], ["a", "b", "c", "d", "*", ".", "*"]) + ]) def test_mapper2rpn(mapper, rpn): assert aux.mapper2rpn(mapper) == rpn -@pytest.mark.parametrize("mapper, other_mappers, rpn", - [ - (["a", "_NA"], {"NA": ("b", "c")}, ["a", "NA.b", "NA.c", ".", "*"]), - (["_NA", "c"], {"NA": ("a", "b")}, ["NA.a", "NA.b", ".", "c", "*"]), - (["a", ("b", "_NA")], {"NA": ["c", "d"]}, ["a", "b", "NA.c", "NA.d", "*", ".", "*"]) - ]) - +@pytest.mark.parametrize( + "mapper, other_mappers, rpn", + [ + (["a", "_NA"], {"NA": ("b", "c")}, ["a", "NA.b", "NA.c", ".", "*"]), + (["_NA", "c"], {"NA": ("a", "b")}, ["NA.a", "NA.b", ".", "c", "*"]), + (["a", ("b", "_NA")], {"NA": ["c", "d"]}, ["a", "b", "NA.c", "NA.d", "*", ".", "*"]) + ]) def test_mapper2rpn_wf_mapper(mapper, other_mappers, rpn): assert aux.mapper2rpn(mapper, other_mappers=other_mappers) == rpn -@pytest.mark.parametrize("mapper, mapper_changed", - [ - ("a", "Node.a"), - (["a", ("b", "c")], ["Node.a", ("Node.b", "Node.c")]), - (("a", ["b", "c"]), ("Node.a", ["Node.b", "Node.c"])) - ]) +@pytest.mark.parametrize( + "mapper, mapper_changed", + [ + ("a", "Node.a"), + (["a", ("b", "c")], ["Node.a", ("Node.b", "Node.c")]), + (("a", ["b", "c"]), ("Node.a", ["Node.b", "Node.c"])) + ]) def test_change_mapper(mapper, mapper_changed): assert aux.change_mapper(mapper, "Node") == mapper_changed -@pytest.mark.parametrize("inputs, rpn, expected", - [ - ({"a": np.array([1, 2])}, ["a"], {"a": [0]}), - ({"a": np.array([1, 2]), "b": np.array([3, 4])}, ["a", "b", "."], {"a": [0], "b": [0]}), - ({"a": np.array([1, 2]), "b": np.array([3, 4, 1])}, ["a", "b", "*"], {"a": [0], "b": [1]}), - ({"a": np.array([1, 2]), "b": np.array([3, 4]), "c": np.array([1, 2, 3])}, ["a", "b", ".", "c", "*"], - {"a": [0], "b": [0], "c": [1]}), - ({"a": np.array([1, 2]), "b": np.array([3, 4]), "c": np.array([1, 2, 3])}, - ["c", "a", "b", ".", "*"], {"a": [1], "b": [1], "c": [0]}), - ({"a": np.array([[1, 2], [1, 2]]), "b": np.array([[3, 4], [3, 3]]), "c": np.array([1, 2, 3])}, - ["a", "b", ".", "c", "*"], {"a": [0, 1], "b": [0, 1], "c": [2]}), - ({"a": np.array([[1, 2], [1, 2]]), "b": np.array([[3, 4], [3, 3]]), - "c": np.array([1, 2, 3])}, ["c", "a", "b", ".", "*"], {"a": [1, 2], "b": [1, 2], "c": [0]}) - ]) +@pytest.mark.parametrize( + "inputs, rpn, expected", + [ + ({"a": np.array([1, 2])}, ["a"], {"a": [0]}), + ({"a": np.array([1, 2]), "b": np.array([3, 4])}, ["a", "b", "."], {"a": [0], "b": [0]}), + ({"a": np.array([1, 2]), "b": np.array([3, 4, 1])}, ["a", "b", "*"], {"a": [0], "b": [1]}), + ({"a": np.array([1, 2]), "b": np.array([3, 4]), "c": np.array([1, 2, 3])}, + ["a", "b", ".", "c", "*"], {"a": [0], "b": [0], "c": [1]}), + ({"a": np.array([1, 2]), "b": np.array([3, 4]), "c": np.array([1, 2, 3])}, + ["c", "a", "b", ".", "*"], {"a": [1], "b": [1], "c": [0]}), + ({"a": np.array([[1, 2], [1, 2]]), "b": np.array([[3, 4], [3, 3]]), + "c": np.array([1, 2, 3])}, + ["a", "b", ".", "c", "*"], {"a": [0, 1], "b": [0, 1], "c": [2]}), + ({"a": np.array([[1, 2], [1, 2]]), "b": np.array([[3, 4], [3, 3]]), + "c": np.array([1, 2, 3])}, + ["c", "a", "b", ".", "*"], {"a": [1, 2], "b": [1, 2], "c": [0]}) + ]) def test_mapping_axis(inputs, rpn, expected): res = aux.mapping_axis(inputs, rpn)[0] print(res) @@ -63,22 +69,24 @@ def test_mapping_axis_error(): aux.mapping_axis({"a": np.array([1, 2]), "b": np.array([3, 4, 5])}, ["a", "b", "."]) -@pytest.mark.parametrize("inputs, axis_inputs, ndim, expected", - [ - ({"a": np.array([1, 2])}, {"a": [0]}, 1, [["a"]]), - ({"a": np.array([1, 2]), "b": np.array([3, 4])}, {"a": [0], "b": [0]}, 1, - [["a", "b"]]), - ({"a": np.array([1, 2]), "b": np.array([3, 4, 1])}, {"a": [0], "b": [1]}, 2, - [["a"], ["b"]]), - ({"a": np.array([1, 2]), "b": np.array([3, 4]), "c": np.array([1, 2, 3])}, - {"a": [0], "b": [0], "c": [1]}, 2, [["a", "b"]]), - ({"a": np.array([1, 2]), "b": np.array([3, 4]), "c": np.array([1, 2, 3])}, - {"a": [1], "b": [1], "c": [0]}, 2, [["c"], ["a", "b"]]), - ({"a": np.array([[1, 2], [1, 2]]), "b": np.array([[3, 4], [3, 3]]), "c": np.array([1, 2, 3])}, - {"a": [0, 1], "b": [0, 1], "c": [2]}, 3, [["a", "b"], ["a", "b"], ["c"]]), - ({"a": np.array([[1, 2], [1, 2]]), "b": np.array([[3, 4], [3, 3]]), - "c": np.array([1, 2, 3])}, {"a": [1, 2], "b": [1, 2], "c": [0]}, 3, - [["c"], ["a", "b"], ["a", "b"]]) - ]) +@pytest.mark.parametrize( + "inputs, axis_inputs, ndim, expected", + [ + ({"a": np.array([1, 2])}, {"a": [0]}, 1, [["a"]]), + ({"a": np.array([1, 2]), "b": np.array([3, 4])}, {"a": [0], "b": [0]}, 1, + [["a", "b"]]), + ({"a": np.array([1, 2]), "b": np.array([3, 4, 1])}, {"a": [0], "b": [1]}, 2, + [["a"], ["b"]]), + ({"a": np.array([1, 2]), "b": np.array([3, 4]), "c": np.array([1, 2, 3])}, + {"a": [0], "b": [0], "c": [1]}, 2, [["a", "b"]]), + ({"a": np.array([1, 2]), "b": np.array([3, 4]), "c": np.array([1, 2, 3])}, + {"a": [1], "b": [1], "c": [0]}, 2, [["c"], ["a", "b"]]), + ({"a": np.array([[1, 2], [1, 2]]), "b": np.array([[3, 4], [3, 3]]), + "c": np.array([1, 2, 3])}, + {"a": [0, 1], "b": [0, 1], "c": [2]}, 3, [["a", "b"], ["a", "b"], ["c"]]), + ({"a": np.array([[1, 2], [1, 2]]), "b": np.array([[3, 4], [3, 3]]), + "c": np.array([1, 2, 3])}, {"a": [1, 2], "b": [1, 2], "c": [0]}, 3, + [["c"], ["a", "b"], ["a", "b"]]) + ]) def test_converting_axis2input(inputs, axis_inputs, ndim, expected): aux.converting_axis2input(inputs, axis_inputs, ndim)[0] == expected diff --git a/pydra/engine/tests/test_newnode.py b/pydra/engine/tests/test_newnode.py index a795250281..179ad5af1c 100644 --- a/pydra/engine/tests/test_newnode.py +++ b/pydra/engine/tests/test_newnode.py @@ -1,16 +1,15 @@ -from nipype.utils.filemanip import save_json, makedirs, to_str +from nipype.utils.filemanip import makedirs from nipype.interfaces import fsl from ..newengine import NewNode, NewWorkflow from ..auxiliary import FunctionInterface, CurrentInterface from ..submitter import Submitter -import sys, time, os +import os +import time import numpy as np -import pytest, pdb +import pytest -python35_only = pytest.mark.skipif(sys.version_info < (3, 5), - reason="requires Python>3.4") @pytest.fixture(scope="module") def change_dir(request): @@ -28,12 +27,14 @@ def move2orig(): Plugins = ["serial"] Plugins = ["serial", "mp", "cf", "dask"] + def fun_addtwo(a): time.sleep(1) if a == 3: time.sleep(2) return a + 2 + def fun_addvar(a, b): return a + b @@ -100,7 +101,6 @@ def test_node_4a(): @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_node_5(plugin, change_dir): """Node with interface and inputs, no mapper, running interface""" interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) @@ -126,7 +126,6 @@ def test_node_5(plugin, change_dir): @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_node_6(plugin, change_dir): """Node with interface, inputs and the simplest mapper, running interface""" interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) @@ -153,7 +152,6 @@ def test_node_6(plugin, change_dir): @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_node_7(plugin, change_dir): """Node with interface, inputs and scalar mapper, running interface""" interf_addvar = FunctionInterface(fun_addvar, ["out"]) @@ -182,7 +180,6 @@ def test_node_7(plugin, change_dir): @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_node_8(plugin, change_dir): """Node with interface, inputs and vector mapper, running interface""" interf_addvar = FunctionInterface(fun_addvar, ["out"]) @@ -212,7 +209,6 @@ def test_node_8(plugin, change_dir): # tests for workflows -@python35_only def test_workflow_0(plugin="serial"): """workflow (without run) with one node with a mapper""" wf = NewWorkflow(name="wf0", workingdir="test_wf0_{}".format(plugin)) @@ -226,8 +222,8 @@ def test_workflow_0(plugin="serial"): assert (wf.nodes[0].inputs['NA.a'] == np.array([3, 5])).all() assert len(wf.graph.nodes) == 1 + @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_workflow_1(plugin, change_dir): """workflow with one node with a mapper""" wf = NewWorkflow(name="wf1", workingdir="test_wf1_{}".format(plugin)) @@ -250,7 +246,6 @@ def test_workflow_1(plugin, change_dir): @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_workflow_2(plugin, change_dir): """workflow with two nodes, second node without mapper""" wf = NewWorkflow(name="wf2", workingdir="test_wf2_{}".format(plugin)) @@ -291,7 +286,6 @@ def test_workflow_2(plugin, change_dir): @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_workflow_2a(plugin, change_dir): """workflow with two nodes, second node with a scalar mapper""" wf = NewWorkflow(name="wf2", workingdir="test_wf2a_{}".format(plugin)) @@ -333,7 +327,6 @@ def test_workflow_2a(plugin, change_dir): @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_workflow_2b(plugin): """workflow with two nodes, second node with a vector mapper""" wf = NewWorkflow(name="wf2", workingdir="test_wf2b_{}".format(plugin)) @@ -378,7 +371,6 @@ def test_workflow_2b(plugin): # using add method to add nodes @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_workflow_3(plugin, change_dir): """using add(node) method""" wf = NewWorkflow(name="wf3", workingdir="test_wf3_{}".format(plugin)) @@ -404,7 +396,6 @@ def test_workflow_3(plugin, change_dir): @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_workflow_3a(plugin, change_dir): """using add(interface) method""" wf = NewWorkflow(name="wf3a", workingdir="test_wf3a_{}".format(plugin)) @@ -429,7 +420,6 @@ def test_workflow_3a(plugin, change_dir): @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_workflow_3b(plugin, change_dir): """using add (function) method""" wf = NewWorkflow(name="wf3b", workingdir="test_wf3b_{}".format(plugin)) @@ -451,9 +441,7 @@ def test_workflow_3b(plugin, change_dir): assert wf.nodes[0].result["out"][i][1] == res[1] - @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_workflow_4(plugin, change_dir): """ using add(node) method using wf.connect to connect two nodes @@ -495,7 +483,6 @@ def test_workflow_4(plugin, change_dir): @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_workflow_4a(plugin, change_dir): """ using add(node) method with kwarg arg to connect nodes (instead of wf.connect) """ wf = NewWorkflow(name="wf4a", workingdir="test_wf4a_{}".format(plugin)) @@ -532,11 +519,9 @@ def test_workflow_4a(plugin, change_dir): assert wf.nodes[1].result["out"][i][1] == res[1] - # using map after add method @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_workflow_5(plugin, change_dir): """using a map method for one node""" wf = NewWorkflow(name="wf5", workingdir="test_wf5_{}".format(plugin)) @@ -561,14 +546,14 @@ def test_workflow_5(plugin, change_dir): @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_workflow_5a(plugin, change_dir): """using a map method for one node (using add and map in one chain)""" wf = NewWorkflow(name="wf5a", workingdir="test_wf5a_{}".format(plugin)) interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) na = NewNode(name="NA", interface=interf_addtwo, workingdir="na") - wf.add(na).map_node(mapper="a", inputs={"a": [3, 5]}) + wf.add(na)\ + .map_node(mapper="a", inputs={"a": [3, 5]}) sub = Submitter(runnable=wf, plugin=plugin) sub.run() @@ -584,7 +569,6 @@ def test_workflow_5a(plugin, change_dir): @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_workflow_6(plugin, change_dir): """using a map method for two nodes (using last added node as default)""" wf = NewWorkflow(name="wf6", workingdir="test_wf6_{}".format(plugin)) @@ -622,7 +606,6 @@ def test_workflow_6(plugin, change_dir): @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_workflow_6a(plugin, change_dir): """using a map method for two nodes (specifying the node)""" wf = NewWorkflow(name="wf6a", workingdir="test_wf6a_{}".format(plugin)) @@ -661,9 +644,8 @@ def test_workflow_6a(plugin, change_dir): @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_workflow_6b(plugin, change_dir): - """using a map method for two nodes (specifying the node), using kwarg arg instead of connect""" + """use a map method for two nodes (specifying the node), using kwarg instead of connect""" wf = NewWorkflow(name="wf6b", workingdir="test_wf6b_{}".format(plugin)) interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) na = NewNode(name="NA", interface=interf_addtwo, workingdir="na") @@ -700,7 +682,6 @@ def test_workflow_6b(plugin, change_dir): # tests for a workflow that have its own input @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_workflow_7(plugin, change_dir): """using inputs for workflow and connect_workflow""" # adding inputs to the workflow directly @@ -727,7 +708,6 @@ def test_workflow_7(plugin, change_dir): @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_workflow_7a(plugin, change_dir): """using inputs for workflow and kwarg arg in add (instead of connect)""" wf = NewWorkflow(name="wf7a", inputs={"wfa": [3, 5]}, workingdir="test_wf7a_{}".format(plugin)) @@ -751,7 +731,6 @@ def test_workflow_7a(plugin, change_dir): @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_workflow_8(plugin, change_dir): """using inputs for workflow and connect_wf_input for the second node""" wf = NewWorkflow(name="wf8", workingdir="test_wf8_{}".format(plugin), inputs={"b": 10}) @@ -779,7 +758,6 @@ def test_workflow_8(plugin, change_dir): assert wf.nodes[0].result["out"][i][0] == res[0] assert wf.nodes[0].result["out"][i][1] == res[1] - expected_B = [({"NA.a": 3, "NB.b": 10}, 15), ({"NA.a": 5, "NB.b": 10}, 17)] key_sort = list(expected_B[0][0].keys()) expected_B.sort(key=lambda t: [t[0][key] for key in key_sort]) @@ -792,15 +770,17 @@ def test_workflow_8(plugin, change_dir): # testing if _NA in mapper works, using interfaces in add @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_workflow_9(plugin, change_dir): """using add(interface) method and mapper from previous nodes""" wf = NewWorkflow(name="wf9", workingdir="test_wf9_{}".format(plugin)) interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - wf.add(name="NA", runnable=interf_addtwo, workingdir="na").map_node(mapper="a", inputs={"a": [3, 5]}) + wf.add(name="NA", runnable=interf_addtwo, workingdir="na")\ + .map_node(mapper="a", inputs={"a": [3, 5]}) interf_addvar = FunctionInterface(fun_addvar, ["out"]) # _NA means that I'm using mapper from the NA node, it's the same as ("NA.a", "b") - wf.add(name="NB", runnable=interf_addvar, workingdir="nb", a="NA.out").map_node(mapper=("_NA", "b"), inputs={"b": [2, 1]}) + wf.add(name="NB", runnable=interf_addvar, workingdir="nb", + a="NA.out")\ + .map_node(mapper=("_NA", "b"), inputs={"b": [2, 1]}) sub = Submitter(runnable=wf, plugin=plugin) sub.run() @@ -824,15 +804,16 @@ def test_workflow_9(plugin, change_dir): @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_workflow_10(plugin, change_dir): """using add(interface) method and scalar mapper from previous nodes""" wf = NewWorkflow(name="wf10", workingdir="test_wf10_{}".format(plugin)) interf_addvar1 = FunctionInterface(fun_addvar, ["out"]) - wf.add(name="NA", runnable=interf_addvar1, workingdir="na").map_node(mapper=("a", "b"), inputs={"a": [3, 5], "b": [0, 10]}) + wf.add(name="NA", runnable=interf_addvar1, workingdir="na")\ + .map_node(mapper=("a", "b"), inputs={"a": [3, 5], "b": [0, 10]}) interf_addvar2 = FunctionInterface(fun_addvar, ["out"]) # _NA means that I'm using mapper from the NA node, it's the same as (("NA.a", NA.b), "b") - wf.add(name="NB", runnable=interf_addvar2, workingdir="nb", a="NA.out").map_node(mapper=("_NA", "b"), inputs={"b": [2, 1]}) + wf.add(name="NB", runnable=interf_addvar2, workingdir="nb", a="NA.out")\ + .map_node(mapper=("_NA", "b"), inputs={"b": [2, 1]}) sub = Submitter(runnable=wf, plugin=plugin) sub.run() @@ -856,15 +837,16 @@ def test_workflow_10(plugin, change_dir): @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_workflow_10a(plugin, change_dir): """using add(interface) method and vector mapper from previous nodes""" wf = NewWorkflow(name="wf10a", workingdir="test_wf10a_{}".format(plugin)) interf_addvar1 = FunctionInterface(fun_addvar, ["out"]) - wf.add(name="NA", runnable=interf_addvar1, workingdir="na").map_node(mapper=["a", "b"], inputs={"a": [3, 5], "b": [0, 10]}) + wf.add(name="NA", runnable=interf_addvar1, workingdir="na")\ + .map_node(mapper=["a", "b"], inputs={"a": [3, 5], "b": [0, 10]}) interf_addvar2 = FunctionInterface(fun_addvar, ["out"]) # _NA means that I'm using mapper from the NA node, it's the same as (["NA.a", NA.b], "b") - wf.add(name="NB", runnable=interf_addvar2, workingdir="nb", a="NA.out").map_node(mapper=("_NA", "b"), inputs={"b": [[2, 1], [0, 0]]}) + wf.add(name="NB", runnable=interf_addvar2, workingdir="nb", a="NA.out")\ + .map_node(mapper=("_NA", "b"), inputs={"b": [[2, 1], [0, 0]]}) sub = Submitter(runnable=wf, plugin=plugin) sub.run() @@ -890,17 +872,20 @@ def test_workflow_10a(plugin, change_dir): @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_workflow_11(plugin, change_dir): """using add(interface) method and vector mapper from previous two nodes""" wf = NewWorkflow(name="wf11", workingdir="test_wf11_{}".format(plugin)) interf_addvar1 = FunctionInterface(fun_addvar, ["out"]) - wf.add(name="NA", runnable=interf_addvar1, workingdir="na").map_node(mapper=("a", "b"), inputs={"a": [3, 5], "b": [0, 10]}) + wf.add(name="NA", runnable=interf_addvar1, workingdir="na")\ + .map_node(mapper=("a", "b"), inputs={"a": [3, 5], "b": [0, 10]}) interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - wf.add(name="NB", runnable=interf_addtwo, workingdir="nb").map_node(mapper="a", inputs={"a": [2, 1]}) + wf.add(name="NB", runnable=interf_addtwo, workingdir="nb")\ + .map_node(mapper="a", inputs={"a": [2, 1]}) interf_addvar2 = FunctionInterface(fun_addvar, ["out"]) - # _NA, _NB means that I'm using mappers from the NA/NB nodes, it's the same as [("NA.a", NA.b), "NB.a"] - wf.add(name="NC", runnable=interf_addvar2, workingdir="nc", a="NA.out", b="NB.out").map_node(mapper=["_NA", "_NB"]) # TODO: this should eb default? + # _NA, _NB means that I'm using mappers from the NA/NB nodes + # it's the same as [("NA.a", NA.b), "NB.a"] + wf.add(name="NC", runnable=interf_addvar2, workingdir="nc", a="NA.out", b="NB.out")\ + .map_node(mapper=["_NA", "_NB"]) # TODO: this should eb default? sub = Submitter(runnable=wf, plugin=plugin) sub.run() @@ -914,9 +899,10 @@ def test_workflow_11(plugin, change_dir): assert wf.nodes[0].result["out"][i][0] == res[0] assert wf.nodes[0].result["out"][i][1] == res[1] - - expected_C = [({"NA.a": 3, "NA.b": 0, "NB.a": 1}, 6), ({"NA.a": 3, "NA.b": 0, "NB.a": 2}, 7), - ({"NA.a": 5, "NA.b": 10, "NB.a": 1}, 18), ({"NA.a": 5, "NA.b": 10, "NB.a": 2}, 19)] + expected_C = [({"NA.a": 3, "NA.b": 0, "NB.a": 1}, 6), + ({"NA.a": 3, "NA.b": 0, "NB.a": 2}, 7), + ({"NA.a": 5, "NA.b": 10, "NB.a": 1}, 18), + ({"NA.a": 5, "NA.b": 10, "NB.a": 2}, 19)] key_sort = list(expected_C[0][0].keys()) expected_C.sort(key=lambda t: [t[0][key] for key in key_sort]) wf.nodes[2].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) @@ -928,7 +914,6 @@ def test_workflow_11(plugin, change_dir): # checking workflow.result @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_workflow_12(plugin, change_dir): """testing if wf.result works (the same workflow as in test_workflow_6)""" wf = NewWorkflow(name="wf12", workingdir="test_wf12_{}".format(plugin), @@ -958,7 +943,7 @@ def test_workflow_12(plugin, change_dir): key_sort = list(expected[0][0].keys()) expected.sort(key=lambda t: [t[0][key] for key in key_sort]) wf.result["NA_out"].sort(key=lambda t: [t[0][key] for key in key_sort]) - #pdb.set_trace() + # pdb.set_trace() assert wf.is_complete for i, res in enumerate(expected): assert wf.result["NA_out"][i][0] == res[0] @@ -974,7 +959,6 @@ def test_workflow_12(plugin, change_dir): @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_workflow_12a(plugin, change_dir): """testing if wf.result raises exceptione (the same workflow as in test_workflow_6)""" wf = NewWorkflow(name="wf12a", workingdir="test_wf12a_{}".format(plugin), @@ -1001,10 +985,10 @@ def test_workflow_12a(plugin, change_dir): @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_workflow_13(plugin, change_dir): """using inputs for workflow and connect_wf_input""" - wf = NewWorkflow(name="wf13", inputs={"wfa": [3, 5]}, mapper="wfa", workingdir="test_wf13_{}".format(plugin), + wf = NewWorkflow(name="wf13", inputs={"wfa": [3, 5]}, mapper="wfa", + workingdir="test_wf13_{}".format(plugin), wf_output_names=[("NA", "out", "NA_out")]) interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) na = NewNode(name="NA", interface=interf_addtwo, workingdir="na") @@ -1025,13 +1009,14 @@ def test_workflow_13(plugin, change_dir): @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_workflow_13a(plugin, change_dir): """using inputs for workflow and connect_wf_input (the node has 2 inputs)""" - wf = NewWorkflow(name="wf13a", inputs={"wfa": [3, 5]}, mapper="wfa", workingdir="test_wf13a_{}".format(plugin), + wf = NewWorkflow(name="wf13a", inputs={"wfa": [3, 5]}, mapper="wfa", + workingdir="test_wf13a_{}".format(plugin), wf_output_names=[("NA", "out", "NA_out")]) interf_addvar = FunctionInterface(fun_addvar, ["out"]) - na = NewNode(name="NA", interface=interf_addvar, workingdir="na", mapper="b", inputs={"b": [10, 20]}) + na = NewNode(name="NA", interface=interf_addvar, workingdir="na", mapper="b", + inputs={"b": [10, 20]}) wf.add(na) wf.connect_wf_input("wfa", "NA", "a") @@ -1050,7 +1035,6 @@ def test_workflow_13a(plugin, change_dir): @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_workflow_13c(plugin, change_dir): """using inputs for workflow and connect_wf_input, using wf.map(mapper, inputs)""" wf = NewWorkflow(name="wf13c", workingdir="test_wf13c_{}".format(plugin), @@ -1073,7 +1057,6 @@ def test_workflow_13c(plugin, change_dir): assert wf.result["NA_out"][i][1][0][1] == res[1][0][1] @pytest.mark.parametrize("plugin", Plugins) - @python35_only def test_workflow_13b(plugin, change_dir): """using inputs for workflow and connect_wf_input, using wf.map(mapper)""" wf = NewWorkflow(name="wf13b", inputs={"wfa": [3, 5]}, @@ -1100,7 +1083,6 @@ def test_workflow_13b(plugin, change_dir): # workflow as a node @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_workflow_14(plugin, change_dir): """workflow with a workflow as a node (no mapper)""" interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) @@ -1125,7 +1107,6 @@ def test_workflow_14(plugin, change_dir): @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_workflow_14a(plugin, change_dir): """workflow with a workflow as a node (no mapper, using connect_wf_input in wfa)""" interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) @@ -1151,7 +1132,6 @@ def test_workflow_14a(plugin, change_dir): @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_workflow_14b(plugin, change_dir): """workflow with a workflow as a node (no mapper, using connect_wf_input in wfa and wf)""" interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) @@ -1178,7 +1158,6 @@ def test_workflow_14b(plugin, change_dir): @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_workflow_15(plugin, change_dir): """workflow with a workflow as a node with mapper (like 14 but with a mapper)""" interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) @@ -1204,7 +1183,6 @@ def test_workflow_15(plugin, change_dir): @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_workflow_16(plugin, change_dir): """workflow with two nodes, and one is a workflow (no mapper)""" wf = NewWorkflow(name="wf16", workingdir="test_wf16_{}".format(plugin), @@ -1244,7 +1222,6 @@ def test_workflow_16(plugin, change_dir): @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_workflow_16a(plugin, change_dir): """workflow with two nodes, and one is a workflow (with mapper)""" wf = NewWorkflow(name="wf16a", workingdir="test_wf16a_{}".format(plugin), @@ -1292,9 +1269,9 @@ def test_workflow_16a(plugin, change_dir): # testing CurrentInterface that is a temporary wrapper for current interfaces -@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data") +@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), + reason="adding data") @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_current_node_1(change_dir, plugin): """Node with a current interface and inputs, no mapper, running interface""" interf_bet = CurrentInterface(interface=fsl.BET(), name="fsl_interface") @@ -1309,9 +1286,9 @@ def test_current_node_1(change_dir, plugin): assert "out_file" in nn.output.keys() -@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data") +@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), + reason="adding data") @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_current_node_2(change_dir, plugin): """Node with a current interface and mapper""" interf_bet = CurrentInterface(interface=fsl.BET(), name="fsl_interface") @@ -1330,9 +1307,9 @@ def test_current_node_2(change_dir, plugin): assert "NA.in_file:1" in nn.output["out_file"].keys() -@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data") +@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), + reason="adding data") @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_current_wf_1(change_dir, plugin): """Wf with a current interface, no mapper""" interf_bet = CurrentInterface(interface=fsl.BET(), name="fsl_interface") @@ -1340,7 +1317,8 @@ def test_current_wf_1(change_dir, plugin): nn = NewNode(name="fsl", inputs={"in_file": "/Users/dorota/nipype_workshop/data/ds000114/sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz"}, interface=interf_bet, workingdir="nn", output_names=["out_file"], print_val=False) - wf = NewWorkflow( workingdir="test_cwf_1_{}".format(plugin), name="cw1", wf_output_names=[("fsl", "out_file", "fsl_out")], print_val=False) + wf = NewWorkflow(workingdir="test_cwf_1_{}".format(plugin), name="cw1", + wf_output_names=[("fsl", "out_file", "fsl_out")], print_val=False) wf.add_nodes([nn]) sub = Submitter(plugin=plugin, runnable=wf) @@ -1350,9 +1328,9 @@ def test_current_wf_1(change_dir, plugin): assert "fsl_out" in wf.output.keys() -@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data") +@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), + reason="adding data") @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_current_wf_1a(change_dir, plugin): """Wf with a current interface, no mapper""" interf_bet = CurrentInterface(interface=fsl.BET(), name="fsl_interface") @@ -1370,9 +1348,9 @@ def test_current_wf_1a(change_dir, plugin): assert "fsl_out" in wf.output.keys() -@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data") +@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), + reason="adding data") @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_current_wf_1b(change_dir, plugin): """Wf with a current interface, no mapper; using wf.add(nipype CurrentInterface)""" interf_bet = CurrentInterface(interface=fsl.BET(), name="fsl_interface") @@ -1388,9 +1366,9 @@ def test_current_wf_1b(change_dir, plugin): assert "fsl_out" in wf.output.keys() -@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data") +@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), + reason="adding data") @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_current_wf_1c(change_dir, plugin): """Wf with a current interface, no mapper; using wf.add(nipype interface) """ @@ -1405,9 +1383,9 @@ def test_current_wf_1c(change_dir, plugin): assert "fsl_out" in wf.output.keys() -@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data") +@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), + reason="adding data") @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_current_wf_2(change_dir, plugin): """Wf with a current interface and mapper""" interf_bet = CurrentInterface(interface=fsl.BET(), name="fsl_interface") @@ -1418,8 +1396,9 @@ def test_current_wf_2(change_dir, plugin): nn = NewNode(name="fsl", interface=interf_bet, print_val=False, workingdir="nn", output_names=["out_file"]) - wf = NewWorkflow( workingdir="test_cwf_2_{}".format(plugin), name="cw2", wf_output_names=[("fsl", "out_file", "fsl_out")], - inputs={"in_file": in_file_l}, mapper="in_file", print_val=False) + wf = NewWorkflow(workingdir="test_cwf_2_{}".format(plugin), name="cw2", + wf_output_names=[("fsl", "out_file", "fsl_out")], + inputs={"in_file": in_file_l}, mapper="in_file", print_val=False) wf.add_nodes([nn]) wf.connect_wf_input("in_file", "fsl", "in_file") @@ -1432,9 +1411,9 @@ def test_current_wf_2(change_dir, plugin): assert 'cw2.in_file:1' in wf.output["fsl_out"].keys() -@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data") +@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), + reason="adding data") @pytest.mark.parametrize("plugin", Plugins) -@python35_only def test_current_wf_2a(change_dir, plugin): """Wf with a current interface and mapper""" interf_bet = CurrentInterface(interface=fsl.BET(), name="fsl_interface") @@ -1446,10 +1425,10 @@ def test_current_wf_2a(change_dir, plugin): workingdir="nn", output_names=["out_file"], inputs={"in_file": in_file_l}, mapper="in_file") - wf = NewWorkflow( workingdir="test_cwf_2a_{}".format(plugin), name="cw2a", wf_output_names=[("fsl", "out_file", "fsl_out")], - print_val=False) + wf = NewWorkflow(workingdir="test_cwf_2a_{}".format(plugin), name="cw2a", + wf_output_names=[("fsl", "out_file", "fsl_out")], print_val=False) wf.add_nodes([nn]) - # wf.connect_wf_input("in_file", "fsl", "in_file") + # wf.connect_wf_input("in_file", "fsl", "in_file") sub = Submitter(plugin=plugin, runnable=wf) sub.run() diff --git a/pydra/engine/tests/test_newnode_neuro.py b/pydra/engine/tests/test_newnode_neuro.py index a7b6acab85..2369a015fa 100644 --- a/pydra/engine/tests/test_newnode_neuro.py +++ b/pydra/engine/tests/test_newnode_neuro.py @@ -1,10 +1,10 @@ import os import pytest -from ..newengine import NewNode, NewWorkflow +from ..newengine import NewWorkflow from ..submitter import Submitter -#dj niworkflows vs ...?? +# dj niworkflows vs ...?? from nipype.interfaces.utility import Rename import nipype.interfaces.freesurfer as fs @@ -14,6 +14,7 @@ except ImportError: no_fmriprep = True + @pytest.fixture() def change_dir(request): orig_dir = os.getcwd() @@ -26,7 +27,6 @@ def move2orig(): request.addfinalizer(move2orig) -import pdb Name = "example" DEFAULT_MEMORY_MIN_GB = None @@ -37,15 +37,17 @@ def move2orig(): "t1_preproc": "/fmriprep_test/output1/fmriprep/sub-01/anat/sub-01_T1w_preproc.nii.gz", "t1_2_fsnative_forward_transform": "/fmriprep_test/workdir1/fmriprep_wf/single_subject_01_wf/anat_preproc_wf/surface_recon_wf/t1_2_fsnative_xfm/out.lta", "subjects_dir": "/fmriprep_test/output1/freesurfer/" -} + } Plugins = ["serial"] Plugins = ["serial", "mp", "cf", "dask"] + def select_target(subject_id, space): """ Given a source subject ID and a target space, get the target subject ID """ return subject_id if space == 'fsnative' else space + @pytest.mark.skipif(no_fmriprep, reason="No fmriprep") @pytest.mark.parametrize("plugin", Plugins) def test_neuro(change_dir, plugin): @@ -56,29 +58,28 @@ def test_neuro(change_dir, plugin): # 'mem_gb', 'output_spaces', 'medial_surface_nan'], # outputs='surfaces') # - #dj: why do I need outputs? + # dj: why do I need outputs? - - wf = NewWorkflow(name=Name, inputs=Inputs, workingdir="test_neuro_{}".format(plugin), print_val=False, - wf_output_names=[("sampler", "out_file", "sampler_out"), ("targets", "out", "target_out")]) + wf = NewWorkflow(name=Name, inputs=Inputs, workingdir="test_neuro_{}".format(plugin), + print_val=False, + wf_output_names=[("sampler", "out_file", "sampler_out"), + ("targets", "out", "target_out")]) # @interface # def select_target(subject_id, space): # """ Given a source subject ID and a target space, get the target subject ID """ # return subject_id if space == 'fsnative' else space - - # wf.add('targets', select_target(subject_id=wf.inputs.subject_id)) # .map('space', space=[space for space in wf.inputs.output_spaces # if space.startswith('fs')]) - #dj: don't have option in map to connect with wf input + # dj: don't have option in map to connect with wf input wf.add(runnable=select_target, name="targets", subject_id="subject_id", output_names=["out"], out_read=True, print_val=False)\ .map_node(mapper="space", inputs={"space": [space for space in Inputs["output_spaces"] - if space.startswith("fs")]}) + if space.startswith("fs")]}) # wf.add('rename_src', Rename(format_string='%(subject)s', # keep_ext=True, @@ -87,12 +88,12 @@ def test_neuro(change_dir, plugin): wf.add(name='rename_src', runnable=Rename(format_string='%(subject)s', keep_ext=True), - in_file="source_file", - output_names=["out_file"], + in_file="source_file", + output_names=["out_file"], print_val=False)\ - .map_node('subject', inputs={"subject": [space for space in Inputs["output_spaces"] - if space.startswith("fs")]}) #TODO: now it's only one subject - + .map_node('subject', + inputs={"subject": [space for space in Inputs["output_spaces"] + if space.startswith("fs")]}) # TODO: now only one subject # wf.add('resampling_xfm', # fs.utils.LTAConvert(in_lta='identity.nofile', @@ -103,17 +104,14 @@ def test_neuro(change_dir, plugin): # in_lta2=wf.inputs.t1_2_fsnative_forward_transform, # in_lta1=wf.resampling_xfm.out_lta)) - wf.add(name='resampling_xfm', runnable=fs.utils.LTAConvert(in_lta='identity.nofile', out_lta=True), source_file="source_file", target_file="t1_preproc", output_names=["out_lta"], print_val=False)\ .add(name='set_xfm_source', runnable=ConcatenateLTA(out_type='RAS2RAS'), - in_lta2="t1_2_fsnative_forward_transform", in_lta1="resampling_xfm.out_lta", - output_names=["out_file"], print_val=False) - - + in_lta2="t1_2_fsnative_forward_transform", in_lta1="resampling_xfm.out_lta", + output_names=["out_file"], print_val=False) # wf.add('sampler', # fs.SampleToSurface(sampling_method='average', sampling_range=(0, 1, 0.2), @@ -128,17 +126,17 @@ def test_neuro(change_dir, plugin): # mem_gb=mem_gb * 3) # .map([('source_file', 'target_subject'), 'hemi'], hemi=['lh', 'rh']) - wf.add(name='sampler', runnable=fs.SampleToSurface(sampling_method='average', sampling_range=(0, 1, 0.2), - sampling_units='frac', interp_method='trilinear', - cortex_mask=True, override_reg_subj=True, - out_type='gii'), print_val=False, - subjects_dir="subjects_dir", subject_id="subject_id", reg_file="set_xfm_source.out_file", - target_subject="targets.out", source_file="rename_src.out_file", output_names=["out_file"])\ + sampling_units='frac', interp_method='trilinear', + cortex_mask=True, override_reg_subj=True, + out_type='gii'), + print_val=False, + subjects_dir="subjects_dir", subject_id="subject_id", + reg_file="set_xfm_source.out_file", target_subject="targets.out", + source_file="rename_src.out_file", output_names=["out_file"])\ .map_node(mapper=[('_targets', "_rename_src"), 'hemi'], inputs={"hemi": ['lh', 'rh']}) - sub = Submitter(plugin=plugin, runnable=wf) sub.run() sub.close() diff --git a/pydra/engine/workers.py b/pydra/engine/workers.py index 1014ca712c..4ce8a0421e 100644 --- a/pydra/engine/workers.py +++ b/pydra/engine/workers.py @@ -1,9 +1,6 @@ -import re, os, pdb, time +import time import multiprocessing as mp -#import multiprocess as mp -import itertools -#from pycon_utils import make_cluster from dask.distributed import Client import concurrent.futures as cf @@ -25,7 +22,7 @@ def close(self): class MpWorker(Worker): - def __init__(self, nr_proc=4): #should be none + def __init__(self, nr_proc=4): # should be none self.nr_proc = nr_proc self.pool = mp.Pool(processes=self.nr_proc) logger.debug('Initialize MpWorker') @@ -34,8 +31,9 @@ def run_el(self, interface, inp): self.pool.apply_async(interface, (inp[0], inp[1])) def close(self): - # added this method since I was having somtetimes problem with reading results from (existing) files - # i thought that pool.close() should work, but still was getting some errors, so testing terminate + # added this method since I was having somtetimes problem with reading results from + # (existing) files. i thought that pool.close() should work, but still was getting some + # errors, so testing terminate self.pool.terminate() @@ -59,9 +57,9 @@ def __init__(self, nr_proc=4): def run_el(self, interface, inp): x = self.pool.submit(interface, inp[0], inp[1]) - #print("X, DONE", x.done()) + # print("X, DONE", x.done()) x.add_done_callback(lambda x: print("DONE ", interface, inp, x.done)) - #print("DIR", x.result()) + # print("DIR", x.result()) def close(self): self.pool.shutdown() @@ -69,24 +67,23 @@ def close(self): class DaskWorker(Worker): def __init__(self): - from distributed.deploy.local import LocalCluster + # from distributed.deploy.local import LocalCluster logger.debug("Initialize Dask Worker") - #self.cluster = LocalCluster() - self.client = Client()#self.cluster) - #print("BOKEH", self.client.scheduler_info()["address"] + ":" + str(self.client.scheduler_info()["services"]["bokeh"])) - + # self.cluster = LocalCluster() + self.client = Client() # self.cluster) + # print("BOKEH", self.client.scheduler_info()["address"] + ":" + str( + # self.client.scheduler_info()["services"]["bokeh"])) def run_el(self, interface, inp): print("DASK, run_el: ", interface, inp, time.time()) - # dask doesn't copy the node second time, so it doesn't see that I change input in the meantime (??) + # dask doesn't copy the node second time, so it doesn't see that I change input in the + # meantime (??) x = self.client.submit(interface, inp[0], inp[1]) print("DASK, status: ", x.status) # this important, otherwise dask will not finish the job x.add_done_callback(lambda x: print("DONE ", interface, inp)) print("res", x.result()) - def close(self): - #self.cluster.close() + # self.cluster.close() self.client.close() - diff --git a/setup.cfg b/setup.cfg index c865f3e421..dcd4d704c0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -8,3 +8,8 @@ versionfile_source = pydra/_version.py versionfile_build = pydra/_version.py tag_prefix = parentdir_prefix = + +[flake8] +max-line-length = 99 +doctests = True +exclude = **/__init__.py