diff --git a/nipype/pipeline/plugins/base.py b/nipype/pipeline/plugins/base.py index 9e34f67295..ff9f428c35 100644 --- a/nipype/pipeline/plugins/base.py +++ b/nipype/pipeline/plugins/base.py @@ -218,7 +218,7 @@ def run(self, graph, config, updatehash=False): # setup polling - TODO: change to threaded model notrun = [] while np.any(self.proc_done == False) | \ - np.any(self.proc_pending == True): + np.any(self.proc_pending == True): toappend = [] # trigger callbacks for any pending results while self.pending_tasks: @@ -297,11 +297,12 @@ def _submit_mapnode(self, jobid): self.procs.extend(mapnodesubids) self.depidx = ssp.vstack((self.depidx, ssp.lil_matrix(np.zeros((numnodes, - self.depidx.shape[1])))), + self.depidx.shape[1])))), 'lil') self.depidx = ssp.hstack((self.depidx, - ssp.lil_matrix(np.zeros((self.depidx.shape[0], - numnodes)))), + ssp.lil_matrix( + np.zeros((self.depidx.shape[0], + numnodes)))), 'lil') self.depidx[-numnodes:, jobid] = 1 self.proc_done = np.concatenate((self.proc_done, @@ -315,7 +316,7 @@ def _send_procs_to_workers(self, updatehash=False, slots=None, graph=None): """ while np.any(self.proc_done == False): # Check to see if a job is available - jobids = np.flatnonzero((self.proc_done == False) & \ + jobids = np.flatnonzero((self.proc_done == False) & (self.depidx.sum(axis=0) == 0).__array__()) if len(jobids) > 0: # send all available jobs @@ -336,20 +337,21 @@ def _send_procs_to_workers(self, updatehash=False, slots=None, graph=None): self.proc_done[jobid] = True self.proc_pending[jobid] = True # Send job to task manager and add to pending tasks - logger.info('Executing: %s ID: %d' % \ - (self.procs[jobid]._id, jobid)) + logger.info('Executing: %s ID: %d' % + (self.procs[jobid]._id, jobid)) if self._status_callback: self._status_callback(self.procs[jobid], 'start') continue_with_submission = True if str2bool(self.procs[jobid].config['execution']['local_hash_check']): logger.debug('checking hash locally') try: - hash_exists, _, _, _ = self.procs[jobid].hash_exists() + hash_exists, _, _, _ = self.procs[ + jobid].hash_exists() logger.debug('Hash exists %s' % str(hash_exists)) if (hash_exists and - (self.procs[jobid].overwrite == False or - (self.procs[jobid].overwrite == None and - not self.procs[jobid]._interface.always_run))): + (self.procs[jobid].overwrite == False or + (self.procs[jobid].overwrite == None and + not self.procs[jobid]._interface.always_run))): continue_with_submission = False self._task_finished_cb(jobid) self._remove_node_dirs() @@ -385,7 +387,7 @@ def _task_finished_cb(self, jobid): This is called when a job is completed. """ - logger.info('[Job finished] jobname: %s jobid: %d' % \ + logger.info('[Job finished] jobname: %s jobid: %d' % (self.procs[jobid]._id, jobid)) if self._status_callback: self._status_callback(self.procs[jobid], 'end') @@ -431,7 +433,7 @@ def _remove_node_dirs(self): self.refidx[idx, idx] = -1 outdir = self.procs[idx]._output_directory() logger.info(('[node dependencies finished] ' - 'removing node: %s from directory %s') % \ + 'removing node: %s from directory %s') % (self.procs[idx]._id, outdir)) shutil.rmtree(outdir) @@ -562,9 +564,26 @@ def run(self, graph, config, updatehash=False): store_exception=False)) dependencies[idx] = [nodes.index(prevnode) for prevnode in graph.predecessors(node)] - self._submit_graph(pyfiles, dependencies) + self._submit_graph(pyfiles, dependencies, nodes) + + def _get_args(self, node, keywords): + values = () + for keyword in keywords: + value = getattr(self, "_" + keyword) + if keyword == "template" and os.path.isfile(value): + value = open(value).read() + if hasattr(node, "plugin_args") and isinstance(node.plugin_args, dict) and keyword in node.plugin_args: + if keyword == "template" and os.path.isfile(node.plugin_args[keyword]): + tmp_value = open(node.plugin_args[keyword]).read() + if 'overwrite' in node.plugin_args and node.plugin_args['overwrite']: + value = tmp_value + else: + value += tmp_value + else: + values += (value, ) + return values - def _submit_graph(self, pyfiles, dependencies): + def _submit_graph(self, pyfiles, dependencies, nodes): """ pyfiles: list of files corresponding to a topological sort dependencies: dictionary of dependencies based on the toplogical sort diff --git a/nipype/pipeline/plugins/dagman.py b/nipype/pipeline/plugins/dagman.py index d053b15963..0ee20acf4c 100644 --- a/nipype/pipeline/plugins/dagman.py +++ b/nipype/pipeline/plugins/dagman.py @@ -49,7 +49,7 @@ def __init__(self, **kwargs): self._dagman_args = plugin_args['dagman_args'] super(CondorDAGManPlugin, self).__init__(**kwargs) - def _submit_graph(self, pyfiles, dependencies): + def _submit_graph(self, pyfiles, dependencies, nodes): # location of all scripts, place dagman output in here too batch_dir, _ = os.path.split(pyfiles[0]) # DAG description filename @@ -58,27 +58,30 @@ def _submit_graph(self, pyfiles, dependencies): # loop over all scripts, create submit files, and define them # as jobs in the DAG for idx, pyscript in enumerate(pyfiles): + node = nodes[idx] + template, submit_specs = self._get_args( + node, ["template", "submit_specs"]) # XXX redundant with previous value? or could it change between # scripts? batch_dir, name = os.path.split(pyscript) name = '.'.join(name.split('.')[:-1]) submitspec = '\n'.join( - (self._template, - 'executable = %s' % sys.executable, - 'arguments = %s' % pyscript, - 'output = %s' % os.path.join(batch_dir, - '%s.out' % name), - 'error = %s' % os.path.join(batch_dir, - '%s.err' % name), - 'log = %s' % os.path.join(batch_dir, - '%s.log' % name), - 'getenv = True', - self._submit_specs, - 'queue' - )) + (template, + 'executable = %s' % sys.executable, + 'arguments = %s' % pyscript, + 'output = %s' % os.path.join(batch_dir, + '%s.out' % name), + 'error = %s' % os.path.join(batch_dir, + '%s.err' % name), + 'log = %s' % os.path.join(batch_dir, + '%s.log' % name), + 'getenv = True', + submit_specs, + 'queue' + )) # write submit spec for this job submitfile = os.path.join(batch_dir, - '%s.submit' % name) + '%s.submit' % name) with open(submitfile, 'wt') as submitfileprt: submitfileprt.writelines(submitspec) submitfileprt.close() @@ -98,4 +101,3 @@ def _submit_graph(self, pyfiles, dependencies): self._dagman_args) cmd.run() logger.info('submitted all jobs to Condor DAGMan') - diff --git a/nipype/pipeline/plugins/pbsgraph.py b/nipype/pipeline/plugins/pbsgraph.py index c177973d91..710f93a8a1 100644 --- a/nipype/pipeline/plugins/pbsgraph.py +++ b/nipype/pipeline/plugins/pbsgraph.py @@ -7,9 +7,10 @@ from .base import (GraphPluginBase, logger) from ...interfaces.base import CommandLine +from .sgegraph import SGEGraphPlugin -class PBSGraphPlugin(GraphPluginBase): +class PBSGraphPlugin(SGEGraphPlugin): """Execute using PBS/Torque The plugin_args input to run can be used to control the SGE execution. @@ -20,31 +21,23 @@ class PBSGraphPlugin(GraphPluginBase): qsub call """ - - def __init__(self, **kwargs): - self._template = """ + _template = """ #PBS -V - """ - self._qsub_args = None - if 'plugin_args' in kwargs: - plugin_args = kwargs['plugin_args'] - if 'template' in plugin_args: - self._template = plugin_args['template'] - if os.path.isfile(self._template): - self._template = open(self._template).read() - if 'qsub_args' in plugin_args: - self._qsub_args = plugin_args['qsub_args'] - super(PBSGraphPlugin, self).__init__(**kwargs) +""" - def _submit_graph(self, pyfiles, dependencies): + def _submit_graph(self, pyfiles, dependencies, nodes): batch_dir, _ = os.path.split(pyfiles[0]) submitjobsfile = os.path.join(batch_dir, 'submit_jobs.sh') with open(submitjobsfile, 'wt') as fp: fp.writelines('#!/usr/bin/env sh\n') for idx, pyscript in enumerate(pyfiles): + node = nodes[idx] + template, qsub_args = self._get_args( + node, ["template", "qsub_args"]) + batch_dir, name = os.path.split(pyscript) name = '.'.join(name.split('.')[:-1]) - batchscript = '\n'.join((self._template, + batchscript = '\n'.join((template, '%s %s' % (sys.executable, pyscript))) batchscriptfile = os.path.join(batch_dir, 'batchscript_%s.sh' % name) @@ -53,14 +46,14 @@ def _submit_graph(self, pyfiles, dependencies): batchfp.close() deps = '' if idx in dependencies: - values = ['$job%05d' % jobid for jobid in dependencies[idx]] + values = ['$job%05d' % + jobid for jobid in dependencies[idx]] if len(values): deps = '-W depend=afterok:%s' % ':'.join(values) fp.writelines('job%05d=`qsub %s %s %s`\n' % (idx, deps, - self._qsub_args, + qsub_args, batchscriptfile)) cmd = CommandLine('sh', environ=os.environ.data) cmd.inputs.args = '%s' % submitjobsfile cmd.run() logger.info('submitted all jobs to queue') - diff --git a/nipype/pipeline/plugins/sgegraph.py b/nipype/pipeline/plugins/sgegraph.py index 64cb49ec08..11ac4e4477 100644 --- a/nipype/pipeline/plugins/sgegraph.py +++ b/nipype/pipeline/plugins/sgegraph.py @@ -20,13 +20,13 @@ class SGEGraphPlugin(GraphPluginBase): qsub call """ - - def __init__(self, **kwargs): - self._template = """ + _template = """ #!/bin/bash #$ -V #$ -S /bin/bash - """ +""" + + def __init__(self, **kwargs): self._qsub_args = '' if 'plugin_args' in kwargs: plugin_args = kwargs['plugin_args'] @@ -38,15 +38,19 @@ def __init__(self, **kwargs): self._qsub_args = plugin_args['qsub_args'] super(SGEGraphPlugin, self).__init__(**kwargs) - def _submit_graph(self, pyfiles, dependencies): + def _submit_graph(self, pyfiles, dependencies, nodes): batch_dir, _ = os.path.split(pyfiles[0]) submitjobsfile = os.path.join(batch_dir, 'submit_jobs.sh') with open(submitjobsfile, 'wt') as fp: fp.writelines('#!/usr/bin/env bash\n') for idx, pyscript in enumerate(pyfiles): + node = nodes[idx] + template, qsub_args = self._get_args( + node, ["template", "qsub_args"]) + batch_dir, name = os.path.split(pyscript) name = '.'.join(name.split('.')[:-1]) - batchscript = '\n'.join((self._template, + batchscript = '\n'.join((template, '%s %s' % (sys.executable, pyscript))) batchscriptfile = os.path.join(batch_dir, 'batchscript_%s.sh' % name) @@ -65,22 +69,24 @@ def _submit_graph(self, pyfiles, dependencies): if 'job' in values: values = values.rstrip(',') deps = '-hold_jid%s' % values - jobname = 'job%05d' % ( idx ) + jobname = 'job%05d' % (idx) ## Do not use default output locations if they are set in self._qsub_args stderrFile = '' if self._qsub_args.count('-e ') == 0: - stderrFile='-e {errFile}'.format(errFile=batchscripterrfile) + stderrFile = '-e {errFile}'.format( + errFile=batchscripterrfile) stdoutFile = '' if self._qsub_args.count('-o ') == 0: - stdoutFile='-o {outFile}'.format(outFile=batchscriptoutfile) + stdoutFile = '-o {outFile}'.format( + outFile=batchscriptoutfile) full_line = '{jobNm}=$(qsub {outFileOption} {errFileOption} {extraQSubArgs} {dependantIndex} -N {jobNm} {batchscript})\n'.format( - jobNm=jobname, - outFileOption=stdoutFile, - errFileOption=stderrFile, - extraQSubArgs=self._qsub_args, - dependantIndex=deps, - batchscript=batchscriptfile) - fp.writelines( full_line ) + jobNm=jobname, + outFileOption=stdoutFile, + errFileOption=stderrFile, + extraQSubArgs=qsub_args, + dependantIndex=deps, + batchscript=batchscriptfile) + fp.writelines(full_line) cmd = CommandLine('bash', environ=os.environ.data) cmd.inputs.args = '%s' % submitjobsfile diff --git a/nipype/pipeline/plugins/somaflow.py b/nipype/pipeline/plugins/somaflow.py index b9847e4910..b54262d4aa 100644 --- a/nipype/pipeline/plugins/somaflow.py +++ b/nipype/pipeline/plugins/somaflow.py @@ -23,7 +23,7 @@ def __init__(self, plugin_args=None): raise ImportError('SomaFlow could not be imported') super(SomaFlowPlugin, self).__init__(plugin_args=plugin_args) - def _submit_graph(self, pyfiles, dependencies): + def _submit_graph(self, pyfiles, dependencies, nodes): jobs = [] soma_deps = [] for idx, fname in enumerate(pyfiles):