Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 34 additions & 15 deletions nipype/pipeline/plugins/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def run(self, graph, config, updatehash=False):
# setup polling - TODO: change to threaded model
notrun = []
while np.any(self.proc_done == False) | \
np.any(self.proc_pending == True):
np.any(self.proc_pending == True):
toappend = []
# trigger callbacks for any pending results
while self.pending_tasks:
Expand Down Expand Up @@ -297,11 +297,12 @@ def _submit_mapnode(self, jobid):
self.procs.extend(mapnodesubids)
self.depidx = ssp.vstack((self.depidx,
ssp.lil_matrix(np.zeros((numnodes,
self.depidx.shape[1])))),
self.depidx.shape[1])))),
'lil')
self.depidx = ssp.hstack((self.depidx,
ssp.lil_matrix(np.zeros((self.depidx.shape[0],
numnodes)))),
ssp.lil_matrix(
np.zeros((self.depidx.shape[0],
numnodes)))),
'lil')
self.depidx[-numnodes:, jobid] = 1
self.proc_done = np.concatenate((self.proc_done,
Expand All @@ -315,7 +316,7 @@ def _send_procs_to_workers(self, updatehash=False, slots=None, graph=None):
"""
while np.any(self.proc_done == False):
# Check to see if a job is available
jobids = np.flatnonzero((self.proc_done == False) & \
jobids = np.flatnonzero((self.proc_done == False) &
(self.depidx.sum(axis=0) == 0).__array__())
if len(jobids) > 0:
# send all available jobs
Expand All @@ -336,20 +337,21 @@ def _send_procs_to_workers(self, updatehash=False, slots=None, graph=None):
self.proc_done[jobid] = True
self.proc_pending[jobid] = True
# Send job to task manager and add to pending tasks
logger.info('Executing: %s ID: %d' % \
(self.procs[jobid]._id, jobid))
logger.info('Executing: %s ID: %d' %
(self.procs[jobid]._id, jobid))
if self._status_callback:
self._status_callback(self.procs[jobid], 'start')
continue_with_submission = True
if str2bool(self.procs[jobid].config['execution']['local_hash_check']):
logger.debug('checking hash locally')
try:
hash_exists, _, _, _ = self.procs[jobid].hash_exists()
hash_exists, _, _, _ = self.procs[
jobid].hash_exists()
logger.debug('Hash exists %s' % str(hash_exists))
if (hash_exists and
(self.procs[jobid].overwrite == False or
(self.procs[jobid].overwrite == None and
not self.procs[jobid]._interface.always_run))):
(self.procs[jobid].overwrite == False or
(self.procs[jobid].overwrite == None and
not self.procs[jobid]._interface.always_run))):
continue_with_submission = False
self._task_finished_cb(jobid)
self._remove_node_dirs()
Expand Down Expand Up @@ -385,7 +387,7 @@ def _task_finished_cb(self, jobid):

This is called when a job is completed.
"""
logger.info('[Job finished] jobname: %s jobid: %d' % \
logger.info('[Job finished] jobname: %s jobid: %d' %
(self.procs[jobid]._id, jobid))
if self._status_callback:
self._status_callback(self.procs[jobid], 'end')
Expand Down Expand Up @@ -431,7 +433,7 @@ def _remove_node_dirs(self):
self.refidx[idx, idx] = -1
outdir = self.procs[idx]._output_directory()
logger.info(('[node dependencies finished] '
'removing node: %s from directory %s') % \
'removing node: %s from directory %s') %
(self.procs[idx]._id, outdir))
shutil.rmtree(outdir)

Expand Down Expand Up @@ -562,9 +564,26 @@ def run(self, graph, config, updatehash=False):
store_exception=False))
dependencies[idx] = [nodes.index(prevnode) for prevnode in
graph.predecessors(node)]
self._submit_graph(pyfiles, dependencies)
self._submit_graph(pyfiles, dependencies, nodes)

def _get_args(self, node, keywords):
values = ()
for keyword in keywords:
value = getattr(self, "_" + keyword)
if keyword == "template" and os.path.isfile(value):
value = open(value).read()
if hasattr(node, "plugin_args") and isinstance(node.plugin_args, dict) and keyword in node.plugin_args:
if keyword == "template" and os.path.isfile(node.plugin_args[keyword]):
tmp_value = open(node.plugin_args[keyword]).read()
if 'overwrite' in node.plugin_args and node.plugin_args['overwrite']:
value = tmp_value
else:
value += tmp_value
else:
values += (value, )
return values

def _submit_graph(self, pyfiles, dependencies):
def _submit_graph(self, pyfiles, dependencies, nodes):
"""
pyfiles: list of files corresponding to a topological sort
dependencies: dictionary of dependencies based on the toplogical sort
Expand Down
34 changes: 18 additions & 16 deletions nipype/pipeline/plugins/dagman.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def __init__(self, **kwargs):
self._dagman_args = plugin_args['dagman_args']
super(CondorDAGManPlugin, self).__init__(**kwargs)

def _submit_graph(self, pyfiles, dependencies):
def _submit_graph(self, pyfiles, dependencies, nodes):
# location of all scripts, place dagman output in here too
batch_dir, _ = os.path.split(pyfiles[0])
# DAG description filename
Expand All @@ -58,27 +58,30 @@ def _submit_graph(self, pyfiles, dependencies):
# loop over all scripts, create submit files, and define them
# as jobs in the DAG
for idx, pyscript in enumerate(pyfiles):
node = nodes[idx]
template, submit_specs = self._get_args(
node, ["template", "submit_specs"])
# XXX redundant with previous value? or could it change between
# scripts?
batch_dir, name = os.path.split(pyscript)
name = '.'.join(name.split('.')[:-1])
submitspec = '\n'.join(
(self._template,
'executable = %s' % sys.executable,
'arguments = %s' % pyscript,
'output = %s' % os.path.join(batch_dir,
'%s.out' % name),
'error = %s' % os.path.join(batch_dir,
'%s.err' % name),
'log = %s' % os.path.join(batch_dir,
'%s.log' % name),
'getenv = True',
self._submit_specs,
'queue'
))
(template,
'executable = %s' % sys.executable,
'arguments = %s' % pyscript,
'output = %s' % os.path.join(batch_dir,
'%s.out' % name),
'error = %s' % os.path.join(batch_dir,
'%s.err' % name),
'log = %s' % os.path.join(batch_dir,
'%s.log' % name),
'getenv = True',
submit_specs,
'queue'
))
# write submit spec for this job
submitfile = os.path.join(batch_dir,
'%s.submit' % name)
'%s.submit' % name)
with open(submitfile, 'wt') as submitfileprt:
submitfileprt.writelines(submitspec)
submitfileprt.close()
Expand All @@ -98,4 +101,3 @@ def _submit_graph(self, pyfiles, dependencies):
self._dagman_args)
cmd.run()
logger.info('submitted all jobs to Condor DAGMan')

33 changes: 13 additions & 20 deletions nipype/pipeline/plugins/pbsgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
from .base import (GraphPluginBase, logger)

from ...interfaces.base import CommandLine
from .sgegraph import SGEGraphPlugin


class PBSGraphPlugin(GraphPluginBase):
class PBSGraphPlugin(SGEGraphPlugin):
"""Execute using PBS/Torque

The plugin_args input to run can be used to control the SGE execution.
Expand All @@ -20,31 +21,23 @@ class PBSGraphPlugin(GraphPluginBase):
qsub call

"""

def __init__(self, **kwargs):
self._template = """
_template = """
#PBS -V
"""
self._qsub_args = None
if 'plugin_args' in kwargs:
plugin_args = kwargs['plugin_args']
if 'template' in plugin_args:
self._template = plugin_args['template']
if os.path.isfile(self._template):
self._template = open(self._template).read()
if 'qsub_args' in plugin_args:
self._qsub_args = plugin_args['qsub_args']
super(PBSGraphPlugin, self).__init__(**kwargs)
"""
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PBS inherits from SGE. SGE init will call super


def _submit_graph(self, pyfiles, dependencies):
def _submit_graph(self, pyfiles, dependencies, nodes):
batch_dir, _ = os.path.split(pyfiles[0])
submitjobsfile = os.path.join(batch_dir, 'submit_jobs.sh')
with open(submitjobsfile, 'wt') as fp:
fp.writelines('#!/usr/bin/env sh\n')
for idx, pyscript in enumerate(pyfiles):
node = nodes[idx]
template, qsub_args = self._get_args(
node, ["template", "qsub_args"])

batch_dir, name = os.path.split(pyscript)
name = '.'.join(name.split('.')[:-1])
batchscript = '\n'.join((self._template,
batchscript = '\n'.join((template,
'%s %s' % (sys.executable, pyscript)))
batchscriptfile = os.path.join(batch_dir,
'batchscript_%s.sh' % name)
Expand All @@ -53,14 +46,14 @@ def _submit_graph(self, pyfiles, dependencies):
batchfp.close()
deps = ''
if idx in dependencies:
values = ['$job%05d' % jobid for jobid in dependencies[idx]]
values = ['$job%05d' %
jobid for jobid in dependencies[idx]]
if len(values):
deps = '-W depend=afterok:%s' % ':'.join(values)
fp.writelines('job%05d=`qsub %s %s %s`\n' % (idx, deps,
self._qsub_args,
qsub_args,
batchscriptfile))
cmd = CommandLine('sh', environ=os.environ.data)
cmd.inputs.args = '%s' % submitjobsfile
cmd.run()
logger.info('submitted all jobs to queue')

38 changes: 22 additions & 16 deletions nipype/pipeline/plugins/sgegraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ class SGEGraphPlugin(GraphPluginBase):
qsub call

"""

def __init__(self, **kwargs):
self._template = """
_template = """
#!/bin/bash
#$ -V
#$ -S /bin/bash
"""
"""

def __init__(self, **kwargs):
self._qsub_args = ''
if 'plugin_args' in kwargs:
plugin_args = kwargs['plugin_args']
Expand All @@ -38,15 +38,19 @@ def __init__(self, **kwargs):
self._qsub_args = plugin_args['qsub_args']
super(SGEGraphPlugin, self).__init__(**kwargs)

def _submit_graph(self, pyfiles, dependencies):
def _submit_graph(self, pyfiles, dependencies, nodes):
batch_dir, _ = os.path.split(pyfiles[0])
submitjobsfile = os.path.join(batch_dir, 'submit_jobs.sh')
with open(submitjobsfile, 'wt') as fp:
fp.writelines('#!/usr/bin/env bash\n')
for idx, pyscript in enumerate(pyfiles):
node = nodes[idx]
template, qsub_args = self._get_args(
node, ["template", "qsub_args"])

batch_dir, name = os.path.split(pyscript)
name = '.'.join(name.split('.')[:-1])
batchscript = '\n'.join((self._template,
batchscript = '\n'.join((template,
'%s %s' % (sys.executable, pyscript)))
batchscriptfile = os.path.join(batch_dir,
'batchscript_%s.sh' % name)
Expand All @@ -65,22 +69,24 @@ def _submit_graph(self, pyfiles, dependencies):
if 'job' in values:
values = values.rstrip(',')
deps = '-hold_jid%s' % values
jobname = 'job%05d' % ( idx )
jobname = 'job%05d' % (idx)
## Do not use default output locations if they are set in self._qsub_args
stderrFile = ''
if self._qsub_args.count('-e ') == 0:
stderrFile='-e {errFile}'.format(errFile=batchscripterrfile)
stderrFile = '-e {errFile}'.format(
errFile=batchscripterrfile)
stdoutFile = ''
if self._qsub_args.count('-o ') == 0:
stdoutFile='-o {outFile}'.format(outFile=batchscriptoutfile)
stdoutFile = '-o {outFile}'.format(
outFile=batchscriptoutfile)
full_line = '{jobNm}=$(qsub {outFileOption} {errFileOption} {extraQSubArgs} {dependantIndex} -N {jobNm} {batchscript})\n'.format(
jobNm=jobname,
outFileOption=stdoutFile,
errFileOption=stderrFile,
extraQSubArgs=self._qsub_args,
dependantIndex=deps,
batchscript=batchscriptfile)
fp.writelines( full_line )
jobNm=jobname,
outFileOption=stdoutFile,
errFileOption=stderrFile,
extraQSubArgs=qsub_args,
dependantIndex=deps,
batchscript=batchscriptfile)
fp.writelines(full_line)

cmd = CommandLine('bash', environ=os.environ.data)
cmd.inputs.args = '%s' % submitjobsfile
Expand Down
2 changes: 1 addition & 1 deletion nipype/pipeline/plugins/somaflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def __init__(self, plugin_args=None):
raise ImportError('SomaFlow could not be imported')
super(SomaFlowPlugin, self).__init__(plugin_args=plugin_args)

def _submit_graph(self, pyfiles, dependencies):
def _submit_graph(self, pyfiles, dependencies, nodes):
jobs = []
soma_deps = []
for idx, fname in enumerate(pyfiles):
Expand Down