From 156cf462c1fc80c1c185c2d15bb70d09d493eb1b Mon Sep 17 00:00:00 2001 From: Chris Filo Gorgolewski Date: Fri, 7 Dec 2012 14:10:39 +0100 Subject: [PATCH 1/6] Respect custom execution parameters in graph based models. --- nipype/pipeline/plugins/base.py | 4 ++-- nipype/pipeline/plugins/dagman.py | 22 ++++++++++++++++++---- nipype/pipeline/plugins/pbsgraph.py | 21 ++++++++++++++++++--- nipype/pipeline/plugins/sgegraph.py | 21 ++++++++++++++++++--- 4 files changed, 56 insertions(+), 12 deletions(-) diff --git a/nipype/pipeline/plugins/base.py b/nipype/pipeline/plugins/base.py index 9e34f67295..a21730a578 100644 --- a/nipype/pipeline/plugins/base.py +++ b/nipype/pipeline/plugins/base.py @@ -562,9 +562,9 @@ def run(self, graph, config, updatehash=False): store_exception=False)) dependencies[idx] = [nodes.index(prevnode) for prevnode in graph.predecessors(node)] - self._submit_graph(pyfiles, dependencies) + self._submit_graph(pyfiles, dependencies, nodes) - def _submit_graph(self, pyfiles, dependencies): + def _submit_graph(self, pyfiles, dependencies, nodes): """ pyfiles: list of files corresponding to a topological sort dependencies: dictionary of dependencies based on the toplogical sort diff --git a/nipype/pipeline/plugins/dagman.py b/nipype/pipeline/plugins/dagman.py index d053b15963..e4e646bc50 100644 --- a/nipype/pipeline/plugins/dagman.py +++ b/nipype/pipeline/plugins/dagman.py @@ -49,7 +49,7 @@ def __init__(self, **kwargs): self._dagman_args = plugin_args['dagman_args'] super(CondorDAGManPlugin, self).__init__(**kwargs) - def _submit_graph(self, pyfiles, dependencies): + def _submit_graph(self, pyfiles, dependencies, nodes): # location of all scripts, place dagman output in here too batch_dir, _ = os.path.split(pyfiles[0]) # DAG description filename @@ -58,12 +58,26 @@ def _submit_graph(self, pyfiles, dependencies): # loop over all scripts, create submit files, and define them # as jobs in the DAG for idx, pyscript in enumerate(pyfiles): + node = nodes[idx] + template = self._template + submit_specs = self._submit_specs + if hasattr(node, "plugin_args") and isinstance(node.plugin_args, dict): + if "template" in node.plugin_args: + if 'overwrite' in node.plugin_args and node.plugin_args['overwrite']: + template = node.plugin_args["template"] + else: + template += node.plugin_args["template"] + if "submit_specs" in node.plugin_args: + if 'overwrite' in node.plugin_args and node.plugin_args['overwrite']: + submit_specs = node.plugin_args["submit_specs"] + else: + submit_specs += node.plugin_args['submit_specs'] # XXX redundant with previous value? or could it change between - # scripts? + # scripts? batch_dir, name = os.path.split(pyscript) name = '.'.join(name.split('.')[:-1]) submitspec = '\n'.join( - (self._template, + (template, 'executable = %s' % sys.executable, 'arguments = %s' % pyscript, 'output = %s' % os.path.join(batch_dir, @@ -73,7 +87,7 @@ def _submit_graph(self, pyfiles, dependencies): 'log = %s' % os.path.join(batch_dir, '%s.log' % name), 'getenv = True', - self._submit_specs, + submit_specs, 'queue' )) # write submit spec for this job diff --git a/nipype/pipeline/plugins/pbsgraph.py b/nipype/pipeline/plugins/pbsgraph.py index c177973d91..cdd32f455d 100644 --- a/nipype/pipeline/plugins/pbsgraph.py +++ b/nipype/pipeline/plugins/pbsgraph.py @@ -36,15 +36,30 @@ def __init__(self, **kwargs): self._qsub_args = plugin_args['qsub_args'] super(PBSGraphPlugin, self).__init__(**kwargs) - def _submit_graph(self, pyfiles, dependencies): + def _submit_graph(self, pyfiles, dependencies, nodes): batch_dir, _ = os.path.split(pyfiles[0]) submitjobsfile = os.path.join(batch_dir, 'submit_jobs.sh') with open(submitjobsfile, 'wt') as fp: fp.writelines('#!/usr/bin/env sh\n') for idx, pyscript in enumerate(pyfiles): + node = nodes[idx] + template = self._template + qsub_args = self._qsub_args + if hasattr(node, "plugin_args") and isinstance(node.plugin_args, dict): + if "template" in node.plugin_args: + if 'overwrite' in node.plugin_args and node.plugin_args['overwrite']: + template = node.plugin_args["template"] + else: + template += node.plugin_args["template"] + if "qsub_args" in node.plugin_args: + if 'overwrite' in node.plugin_args and node.plugin_args['overwrite']: + qsub_args = node.plugin_args["qsub_args"] + else: + qsub_args += (" " + node.plugin_args['qsub_args']) + batch_dir, name = os.path.split(pyscript) name = '.'.join(name.split('.')[:-1]) - batchscript = '\n'.join((self._template, + batchscript = '\n'.join((template, '%s %s' % (sys.executable, pyscript))) batchscriptfile = os.path.join(batch_dir, 'batchscript_%s.sh' % name) @@ -57,7 +72,7 @@ def _submit_graph(self, pyfiles, dependencies): if len(values): deps = '-W depend=afterok:%s' % ':'.join(values) fp.writelines('job%05d=`qsub %s %s %s`\n' % (idx, deps, - self._qsub_args, + qsub_args, batchscriptfile)) cmd = CommandLine('sh', environ=os.environ.data) cmd.inputs.args = '%s' % submitjobsfile diff --git a/nipype/pipeline/plugins/sgegraph.py b/nipype/pipeline/plugins/sgegraph.py index 64cb49ec08..34df9c5b56 100644 --- a/nipype/pipeline/plugins/sgegraph.py +++ b/nipype/pipeline/plugins/sgegraph.py @@ -38,15 +38,30 @@ def __init__(self, **kwargs): self._qsub_args = plugin_args['qsub_args'] super(SGEGraphPlugin, self).__init__(**kwargs) - def _submit_graph(self, pyfiles, dependencies): + def _submit_graph(self, pyfiles, dependencies, nodes): batch_dir, _ = os.path.split(pyfiles[0]) submitjobsfile = os.path.join(batch_dir, 'submit_jobs.sh') with open(submitjobsfile, 'wt') as fp: fp.writelines('#!/usr/bin/env bash\n') for idx, pyscript in enumerate(pyfiles): + node = nodes[idx] + template = self._template + qsub_args = self._qsub_args + if hasattr(node, "plugin_args") and isinstance(node.plugin_args, dict): + if "template" in node.plugin_args: + if 'overwrite' in node.plugin_args and node.plugin_args['overwrite']: + template = node.plugin_args["template"] + else: + template += node.plugin_args["template"] + if "qsub_args" in node.plugin_args: + if 'overwrite' in node.plugin_args and node.plugin_args['overwrite']: + qsub_args = node.plugin_args["qsub_args"] + else: + qsub_args += (" " + node.plugin_args['qsub_args']) + batch_dir, name = os.path.split(pyscript) name = '.'.join(name.split('.')[:-1]) - batchscript = '\n'.join((self._template, + batchscript = '\n'.join((template, '%s %s' % (sys.executable, pyscript))) batchscriptfile = os.path.join(batch_dir, 'batchscript_%s.sh' % name) @@ -77,7 +92,7 @@ def _submit_graph(self, pyfiles, dependencies): jobNm=jobname, outFileOption=stdoutFile, errFileOption=stderrFile, - extraQSubArgs=self._qsub_args, + extraQSubArgs=qsub_args, dependantIndex=deps, batchscript=batchscriptfile) fp.writelines( full_line ) From c0195248608c5a18cfabeed20d56843c42729176 Mon Sep 17 00:00:00 2001 From: Chris Filo Gorgolewski Date: Fri, 7 Dec 2012 15:43:50 +0100 Subject: [PATCH 2/6] Reafactoring - less redundant code between SGE and PBS. --- nipype/pipeline/plugins/pbsgraph.py | 33 ++++--------------------- nipype/pipeline/plugins/sgegraph.py | 38 ++++++++++++++++------------- 2 files changed, 26 insertions(+), 45 deletions(-) diff --git a/nipype/pipeline/plugins/pbsgraph.py b/nipype/pipeline/plugins/pbsgraph.py index cdd32f455d..bbf8c5be72 100644 --- a/nipype/pipeline/plugins/pbsgraph.py +++ b/nipype/pipeline/plugins/pbsgraph.py @@ -7,9 +7,10 @@ from .base import (GraphPluginBase, logger) from ...interfaces.base import CommandLine +from .sgegraph import SGEGraphPlugin -class PBSGraphPlugin(GraphPluginBase): +class PBSGraphPlugin(SGEGraphPlugin): """Execute using PBS/Torque The plugin_args input to run can be used to control the SGE execution. @@ -20,21 +21,9 @@ class PBSGraphPlugin(GraphPluginBase): qsub call """ - - def __init__(self, **kwargs): - self._template = """ + _template = """ #PBS -V - """ - self._qsub_args = None - if 'plugin_args' in kwargs: - plugin_args = kwargs['plugin_args'] - if 'template' in plugin_args: - self._template = plugin_args['template'] - if os.path.isfile(self._template): - self._template = open(self._template).read() - if 'qsub_args' in plugin_args: - self._qsub_args = plugin_args['qsub_args'] - super(PBSGraphPlugin, self).__init__(**kwargs) +""" def _submit_graph(self, pyfiles, dependencies, nodes): batch_dir, _ = os.path.split(pyfiles[0]) @@ -43,19 +32,7 @@ def _submit_graph(self, pyfiles, dependencies, nodes): fp.writelines('#!/usr/bin/env sh\n') for idx, pyscript in enumerate(pyfiles): node = nodes[idx] - template = self._template - qsub_args = self._qsub_args - if hasattr(node, "plugin_args") and isinstance(node.plugin_args, dict): - if "template" in node.plugin_args: - if 'overwrite' in node.plugin_args and node.plugin_args['overwrite']: - template = node.plugin_args["template"] - else: - template += node.plugin_args["template"] - if "qsub_args" in node.plugin_args: - if 'overwrite' in node.plugin_args and node.plugin_args['overwrite']: - qsub_args = node.plugin_args["qsub_args"] - else: - qsub_args += (" " + node.plugin_args['qsub_args']) + template, qsub_args = self._get_args(node) batch_dir, name = os.path.split(pyscript) name = '.'.join(name.split('.')[:-1]) diff --git a/nipype/pipeline/plugins/sgegraph.py b/nipype/pipeline/plugins/sgegraph.py index 34df9c5b56..889a03d42e 100644 --- a/nipype/pipeline/plugins/sgegraph.py +++ b/nipype/pipeline/plugins/sgegraph.py @@ -20,13 +20,12 @@ class SGEGraphPlugin(GraphPluginBase): qsub call """ - - def __init__(self, **kwargs): - self._template = """ + _template = """ #!/bin/bash #$ -V #$ -S /bin/bash - """ +""" + def __init__(self, **kwargs): self._qsub_args = '' if 'plugin_args' in kwargs: plugin_args = kwargs['plugin_args'] @@ -38,6 +37,23 @@ def __init__(self, **kwargs): self._qsub_args = plugin_args['qsub_args'] super(SGEGraphPlugin, self).__init__(**kwargs) + + def _get_args(self, node): + template = self._template + qsub_args = self._qsub_args + if hasattr(node, "plugin_args") and isinstance(node.plugin_args, dict): + if "template" in node.plugin_args: + if 'overwrite' in node.plugin_args and node.plugin_args['overwrite']: + template = node.plugin_args["template"] + else: + template += node.plugin_args["template"] + if "qsub_args" in node.plugin_args: + if 'overwrite' in node.plugin_args and node.plugin_args['overwrite']: + qsub_args = node.plugin_args["qsub_args"] + else: + qsub_args += " " + node.plugin_args['qsub_args'] + return template, qsub_args + def _submit_graph(self, pyfiles, dependencies, nodes): batch_dir, _ = os.path.split(pyfiles[0]) submitjobsfile = os.path.join(batch_dir, 'submit_jobs.sh') @@ -45,19 +61,7 @@ def _submit_graph(self, pyfiles, dependencies, nodes): fp.writelines('#!/usr/bin/env bash\n') for idx, pyscript in enumerate(pyfiles): node = nodes[idx] - template = self._template - qsub_args = self._qsub_args - if hasattr(node, "plugin_args") and isinstance(node.plugin_args, dict): - if "template" in node.plugin_args: - if 'overwrite' in node.plugin_args and node.plugin_args['overwrite']: - template = node.plugin_args["template"] - else: - template += node.plugin_args["template"] - if "qsub_args" in node.plugin_args: - if 'overwrite' in node.plugin_args and node.plugin_args['overwrite']: - qsub_args = node.plugin_args["qsub_args"] - else: - qsub_args += (" " + node.plugin_args['qsub_args']) + template, qsub_args = self._get_args(node) batch_dir, name = os.path.split(pyscript) name = '.'.join(name.split('.')[:-1]) From 546665cec19e1b3f355d0a0f05b3c651db942ef7 Mon Sep 17 00:00:00 2001 From: Chris Filo Gorgolewski Date: Fri, 7 Dec 2012 16:10:59 +0100 Subject: [PATCH 3/6] Further refactoring --- nipype/pipeline/plugins/base.py | 13 +++++++++++++ nipype/pipeline/plugins/dagman.py | 14 +------------- nipype/pipeline/plugins/pbsgraph.py | 2 +- nipype/pipeline/plugins/sgegraph.py | 19 +------------------ 4 files changed, 16 insertions(+), 32 deletions(-) diff --git a/nipype/pipeline/plugins/base.py b/nipype/pipeline/plugins/base.py index a21730a578..99a2c4296c 100644 --- a/nipype/pipeline/plugins/base.py +++ b/nipype/pipeline/plugins/base.py @@ -563,6 +563,19 @@ def run(self, graph, config, updatehash=False): dependencies[idx] = [nodes.index(prevnode) for prevnode in graph.predecessors(node)] self._submit_graph(pyfiles, dependencies, nodes) + + def _get_args(self, node, keywords): + values = () + for keyword in keywords: + value = getattr(self, "_" + keyword) + if hasattr(node, "plugin_args") and isinstance(node.plugin_args, dict) and keyword in node.plugin_args: + if 'overwrite' in node.plugin_args and node.plugin_args['overwrite']: + value = node.plugin_args[keyword] + else: + value += node.plugin_args[keyword] + else: + values += (value, ) + return values def _submit_graph(self, pyfiles, dependencies, nodes): """ diff --git a/nipype/pipeline/plugins/dagman.py b/nipype/pipeline/plugins/dagman.py index e4e646bc50..b72f1c1b4d 100644 --- a/nipype/pipeline/plugins/dagman.py +++ b/nipype/pipeline/plugins/dagman.py @@ -59,19 +59,7 @@ def _submit_graph(self, pyfiles, dependencies, nodes): # as jobs in the DAG for idx, pyscript in enumerate(pyfiles): node = nodes[idx] - template = self._template - submit_specs = self._submit_specs - if hasattr(node, "plugin_args") and isinstance(node.plugin_args, dict): - if "template" in node.plugin_args: - if 'overwrite' in node.plugin_args and node.plugin_args['overwrite']: - template = node.plugin_args["template"] - else: - template += node.plugin_args["template"] - if "submit_specs" in node.plugin_args: - if 'overwrite' in node.plugin_args and node.plugin_args['overwrite']: - submit_specs = node.plugin_args["submit_specs"] - else: - submit_specs += node.plugin_args['submit_specs'] + template, submit_specs = self._get_args(node, ["template", "submit_specs"]) # XXX redundant with previous value? or could it change between # scripts? batch_dir, name = os.path.split(pyscript) diff --git a/nipype/pipeline/plugins/pbsgraph.py b/nipype/pipeline/plugins/pbsgraph.py index bbf8c5be72..495ce651fb 100644 --- a/nipype/pipeline/plugins/pbsgraph.py +++ b/nipype/pipeline/plugins/pbsgraph.py @@ -32,7 +32,7 @@ def _submit_graph(self, pyfiles, dependencies, nodes): fp.writelines('#!/usr/bin/env sh\n') for idx, pyscript in enumerate(pyfiles): node = nodes[idx] - template, qsub_args = self._get_args(node) + template, qsub_args = self._get_args(node, ["template", "qsub_args"]) batch_dir, name = os.path.split(pyscript) name = '.'.join(name.split('.')[:-1]) diff --git a/nipype/pipeline/plugins/sgegraph.py b/nipype/pipeline/plugins/sgegraph.py index 889a03d42e..6a547ed923 100644 --- a/nipype/pipeline/plugins/sgegraph.py +++ b/nipype/pipeline/plugins/sgegraph.py @@ -37,23 +37,6 @@ def __init__(self, **kwargs): self._qsub_args = plugin_args['qsub_args'] super(SGEGraphPlugin, self).__init__(**kwargs) - - def _get_args(self, node): - template = self._template - qsub_args = self._qsub_args - if hasattr(node, "plugin_args") and isinstance(node.plugin_args, dict): - if "template" in node.plugin_args: - if 'overwrite' in node.plugin_args and node.plugin_args['overwrite']: - template = node.plugin_args["template"] - else: - template += node.plugin_args["template"] - if "qsub_args" in node.plugin_args: - if 'overwrite' in node.plugin_args and node.plugin_args['overwrite']: - qsub_args = node.plugin_args["qsub_args"] - else: - qsub_args += " " + node.plugin_args['qsub_args'] - return template, qsub_args - def _submit_graph(self, pyfiles, dependencies, nodes): batch_dir, _ = os.path.split(pyfiles[0]) submitjobsfile = os.path.join(batch_dir, 'submit_jobs.sh') @@ -61,7 +44,7 @@ def _submit_graph(self, pyfiles, dependencies, nodes): fp.writelines('#!/usr/bin/env bash\n') for idx, pyscript in enumerate(pyfiles): node = nodes[idx] - template, qsub_args = self._get_args(node) + template, qsub_args = self._get_args(node, ["template", "qsub_args"]) batch_dir, name = os.path.split(pyscript) name = '.'.join(name.split('.')[:-1]) From ca8d229126e96f998b6b1b1db63b5fa304eff14b Mon Sep 17 00:00:00 2001 From: Chris Filo Gorgolewski Date: Fri, 7 Dec 2012 16:12:16 +0100 Subject: [PATCH 4/6] PEP8 --- nipype/pipeline/plugins/base.py | 32 ++++++++++++++------------- nipype/pipeline/plugins/dagman.py | 34 ++++++++++++++--------------- nipype/pipeline/plugins/pbsgraph.py | 9 ++++---- nipype/pipeline/plugins/sgegraph.py | 28 ++++++++++++++---------- 4 files changed, 55 insertions(+), 48 deletions(-) diff --git a/nipype/pipeline/plugins/base.py b/nipype/pipeline/plugins/base.py index 99a2c4296c..ce099b994d 100644 --- a/nipype/pipeline/plugins/base.py +++ b/nipype/pipeline/plugins/base.py @@ -218,7 +218,7 @@ def run(self, graph, config, updatehash=False): # setup polling - TODO: change to threaded model notrun = [] while np.any(self.proc_done == False) | \ - np.any(self.proc_pending == True): + np.any(self.proc_pending == True): toappend = [] # trigger callbacks for any pending results while self.pending_tasks: @@ -297,11 +297,12 @@ def _submit_mapnode(self, jobid): self.procs.extend(mapnodesubids) self.depidx = ssp.vstack((self.depidx, ssp.lil_matrix(np.zeros((numnodes, - self.depidx.shape[1])))), + self.depidx.shape[1])))), 'lil') self.depidx = ssp.hstack((self.depidx, - ssp.lil_matrix(np.zeros((self.depidx.shape[0], - numnodes)))), + ssp.lil_matrix( + np.zeros((self.depidx.shape[0], + numnodes)))), 'lil') self.depidx[-numnodes:, jobid] = 1 self.proc_done = np.concatenate((self.proc_done, @@ -315,7 +316,7 @@ def _send_procs_to_workers(self, updatehash=False, slots=None, graph=None): """ while np.any(self.proc_done == False): # Check to see if a job is available - jobids = np.flatnonzero((self.proc_done == False) & \ + jobids = np.flatnonzero((self.proc_done == False) & (self.depidx.sum(axis=0) == 0).__array__()) if len(jobids) > 0: # send all available jobs @@ -336,20 +337,21 @@ def _send_procs_to_workers(self, updatehash=False, slots=None, graph=None): self.proc_done[jobid] = True self.proc_pending[jobid] = True # Send job to task manager and add to pending tasks - logger.info('Executing: %s ID: %d' % \ - (self.procs[jobid]._id, jobid)) + logger.info('Executing: %s ID: %d' % + (self.procs[jobid]._id, jobid)) if self._status_callback: self._status_callback(self.procs[jobid], 'start') continue_with_submission = True if str2bool(self.procs[jobid].config['execution']['local_hash_check']): logger.debug('checking hash locally') try: - hash_exists, _, _, _ = self.procs[jobid].hash_exists() + hash_exists, _, _, _ = self.procs[ + jobid].hash_exists() logger.debug('Hash exists %s' % str(hash_exists)) if (hash_exists and - (self.procs[jobid].overwrite == False or - (self.procs[jobid].overwrite == None and - not self.procs[jobid]._interface.always_run))): + (self.procs[jobid].overwrite == False or + (self.procs[jobid].overwrite == None and + not self.procs[jobid]._interface.always_run))): continue_with_submission = False self._task_finished_cb(jobid) self._remove_node_dirs() @@ -385,7 +387,7 @@ def _task_finished_cb(self, jobid): This is called when a job is completed. """ - logger.info('[Job finished] jobname: %s jobid: %d' % \ + logger.info('[Job finished] jobname: %s jobid: %d' % (self.procs[jobid]._id, jobid)) if self._status_callback: self._status_callback(self.procs[jobid], 'end') @@ -431,7 +433,7 @@ def _remove_node_dirs(self): self.refidx[idx, idx] = -1 outdir = self.procs[idx]._output_directory() logger.info(('[node dependencies finished] ' - 'removing node: %s from directory %s') % \ + 'removing node: %s from directory %s') % (self.procs[idx]._id, outdir)) shutil.rmtree(outdir) @@ -563,14 +565,14 @@ def run(self, graph, config, updatehash=False): dependencies[idx] = [nodes.index(prevnode) for prevnode in graph.predecessors(node)] self._submit_graph(pyfiles, dependencies, nodes) - + def _get_args(self, node, keywords): values = () for keyword in keywords: value = getattr(self, "_" + keyword) if hasattr(node, "plugin_args") and isinstance(node.plugin_args, dict) and keyword in node.plugin_args: if 'overwrite' in node.plugin_args and node.plugin_args['overwrite']: - value = node.plugin_args[keyword] + value = node.plugin_args[keyword] else: value += node.plugin_args[keyword] else: diff --git a/nipype/pipeline/plugins/dagman.py b/nipype/pipeline/plugins/dagman.py index b72f1c1b4d..0ee20acf4c 100644 --- a/nipype/pipeline/plugins/dagman.py +++ b/nipype/pipeline/plugins/dagman.py @@ -59,28 +59,29 @@ def _submit_graph(self, pyfiles, dependencies, nodes): # as jobs in the DAG for idx, pyscript in enumerate(pyfiles): node = nodes[idx] - template, submit_specs = self._get_args(node, ["template", "submit_specs"]) + template, submit_specs = self._get_args( + node, ["template", "submit_specs"]) # XXX redundant with previous value? or could it change between - # scripts? + # scripts? batch_dir, name = os.path.split(pyscript) name = '.'.join(name.split('.')[:-1]) submitspec = '\n'.join( - (template, - 'executable = %s' % sys.executable, - 'arguments = %s' % pyscript, - 'output = %s' % os.path.join(batch_dir, - '%s.out' % name), - 'error = %s' % os.path.join(batch_dir, - '%s.err' % name), - 'log = %s' % os.path.join(batch_dir, - '%s.log' % name), - 'getenv = True', - submit_specs, - 'queue' - )) + (template, + 'executable = %s' % sys.executable, + 'arguments = %s' % pyscript, + 'output = %s' % os.path.join(batch_dir, + '%s.out' % name), + 'error = %s' % os.path.join(batch_dir, + '%s.err' % name), + 'log = %s' % os.path.join(batch_dir, + '%s.log' % name), + 'getenv = True', + submit_specs, + 'queue' + )) # write submit spec for this job submitfile = os.path.join(batch_dir, - '%s.submit' % name) + '%s.submit' % name) with open(submitfile, 'wt') as submitfileprt: submitfileprt.writelines(submitspec) submitfileprt.close() @@ -100,4 +101,3 @@ def _submit_graph(self, pyfiles, dependencies, nodes): self._dagman_args) cmd.run() logger.info('submitted all jobs to Condor DAGMan') - diff --git a/nipype/pipeline/plugins/pbsgraph.py b/nipype/pipeline/plugins/pbsgraph.py index 495ce651fb..710f93a8a1 100644 --- a/nipype/pipeline/plugins/pbsgraph.py +++ b/nipype/pipeline/plugins/pbsgraph.py @@ -32,8 +32,9 @@ def _submit_graph(self, pyfiles, dependencies, nodes): fp.writelines('#!/usr/bin/env sh\n') for idx, pyscript in enumerate(pyfiles): node = nodes[idx] - template, qsub_args = self._get_args(node, ["template", "qsub_args"]) - + template, qsub_args = self._get_args( + node, ["template", "qsub_args"]) + batch_dir, name = os.path.split(pyscript) name = '.'.join(name.split('.')[:-1]) batchscript = '\n'.join((template, @@ -45,7 +46,8 @@ def _submit_graph(self, pyfiles, dependencies, nodes): batchfp.close() deps = '' if idx in dependencies: - values = ['$job%05d' % jobid for jobid in dependencies[idx]] + values = ['$job%05d' % + jobid for jobid in dependencies[idx]] if len(values): deps = '-W depend=afterok:%s' % ':'.join(values) fp.writelines('job%05d=`qsub %s %s %s`\n' % (idx, deps, @@ -55,4 +57,3 @@ def _submit_graph(self, pyfiles, dependencies, nodes): cmd.inputs.args = '%s' % submitjobsfile cmd.run() logger.info('submitted all jobs to queue') - diff --git a/nipype/pipeline/plugins/sgegraph.py b/nipype/pipeline/plugins/sgegraph.py index 6a547ed923..11ac4e4477 100644 --- a/nipype/pipeline/plugins/sgegraph.py +++ b/nipype/pipeline/plugins/sgegraph.py @@ -25,6 +25,7 @@ class SGEGraphPlugin(GraphPluginBase): #$ -V #$ -S /bin/bash """ + def __init__(self, **kwargs): self._qsub_args = '' if 'plugin_args' in kwargs: @@ -44,8 +45,9 @@ def _submit_graph(self, pyfiles, dependencies, nodes): fp.writelines('#!/usr/bin/env bash\n') for idx, pyscript in enumerate(pyfiles): node = nodes[idx] - template, qsub_args = self._get_args(node, ["template", "qsub_args"]) - + template, qsub_args = self._get_args( + node, ["template", "qsub_args"]) + batch_dir, name = os.path.split(pyscript) name = '.'.join(name.split('.')[:-1]) batchscript = '\n'.join((template, @@ -67,22 +69,24 @@ def _submit_graph(self, pyfiles, dependencies, nodes): if 'job' in values: values = values.rstrip(',') deps = '-hold_jid%s' % values - jobname = 'job%05d' % ( idx ) + jobname = 'job%05d' % (idx) ## Do not use default output locations if they are set in self._qsub_args stderrFile = '' if self._qsub_args.count('-e ') == 0: - stderrFile='-e {errFile}'.format(errFile=batchscripterrfile) + stderrFile = '-e {errFile}'.format( + errFile=batchscripterrfile) stdoutFile = '' if self._qsub_args.count('-o ') == 0: - stdoutFile='-o {outFile}'.format(outFile=batchscriptoutfile) + stdoutFile = '-o {outFile}'.format( + outFile=batchscriptoutfile) full_line = '{jobNm}=$(qsub {outFileOption} {errFileOption} {extraQSubArgs} {dependantIndex} -N {jobNm} {batchscript})\n'.format( - jobNm=jobname, - outFileOption=stdoutFile, - errFileOption=stderrFile, - extraQSubArgs=qsub_args, - dependantIndex=deps, - batchscript=batchscriptfile) - fp.writelines( full_line ) + jobNm=jobname, + outFileOption=stdoutFile, + errFileOption=stderrFile, + extraQSubArgs=qsub_args, + dependantIndex=deps, + batchscript=batchscriptfile) + fp.writelines(full_line) cmd = CommandLine('bash', environ=os.environ.data) cmd.inputs.args = '%s' % submitjobsfile From 5c3bc19393cd85844b3b131959a06dc49abcff71 Mon Sep 17 00:00:00 2001 From: Chris Filo Gorgolewski Date: Fri, 7 Dec 2012 16:12:40 +0100 Subject: [PATCH 5/6] API fix for somaworkflow --- nipype/pipeline/plugins/somaflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nipype/pipeline/plugins/somaflow.py b/nipype/pipeline/plugins/somaflow.py index b9847e4910..b54262d4aa 100644 --- a/nipype/pipeline/plugins/somaflow.py +++ b/nipype/pipeline/plugins/somaflow.py @@ -23,7 +23,7 @@ def __init__(self, plugin_args=None): raise ImportError('SomaFlow could not be imported') super(SomaFlowPlugin, self).__init__(plugin_args=plugin_args) - def _submit_graph(self, pyfiles, dependencies): + def _submit_graph(self, pyfiles, dependencies, nodes): jobs = [] soma_deps = [] for idx, fname in enumerate(pyfiles): From b67b1dde9fa6414dfdcb0ac54d6554a48d6036bd Mon Sep 17 00:00:00 2001 From: Chris Filo Gorgolewski Date: Fri, 7 Dec 2012 16:32:06 +0100 Subject: [PATCH 6/6] add support for template files --- nipype/pipeline/plugins/base.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/nipype/pipeline/plugins/base.py b/nipype/pipeline/plugins/base.py index ce099b994d..ff9f428c35 100644 --- a/nipype/pipeline/plugins/base.py +++ b/nipype/pipeline/plugins/base.py @@ -570,11 +570,15 @@ def _get_args(self, node, keywords): values = () for keyword in keywords: value = getattr(self, "_" + keyword) + if keyword == "template" and os.path.isfile(value): + value = open(value).read() if hasattr(node, "plugin_args") and isinstance(node.plugin_args, dict) and keyword in node.plugin_args: + if keyword == "template" and os.path.isfile(node.plugin_args[keyword]): + tmp_value = open(node.plugin_args[keyword]).read() if 'overwrite' in node.plugin_args and node.plugin_args['overwrite']: - value = node.plugin_args[keyword] + value = tmp_value else: - value += node.plugin_args[keyword] + value += tmp_value else: values += (value, ) return values