Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
Next release
============

* ENH: Allow control over terminal output for commandline interfaces

Release 0.7.0 (Dec 18, 2012)
============================
Expand Down
6 changes: 4 additions & 2 deletions nipype/interfaces/afni/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ def version():
Version number as string or None if AFNI not found

"""
clout = CommandLine(command='afni_vcheck').run()
clout = CommandLine(command='afni_vcheck',
terminal_output='allatonce').run()
out = clout.runtime.stdout
return out.split('\n')[1]

Expand Down Expand Up @@ -87,7 +88,8 @@ def standard_image(img_name):
'''Grab an image from the standard location.

Could be made more fancy to allow for more relocatability'''
clout = CommandLine('which afni').run()
clout = CommandLine('which afni',
terminal_output='allatonce').run()
if clout.runtime.returncode is not 0:
return None

Expand Down
127 changes: 92 additions & 35 deletions nipype/interfaces/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,7 @@ def _read(self, drain):
self._lastidx = len(self._rows)


def run_command(runtime, timeout=0.01):
def run_command(runtime, output=None, timeout=0.01):
"""
Run a command, read stdout and stderr, prefix with timestamp. The returned
runtime contains a merged stdout+stderr log with timestamps
Expand All @@ -1038,50 +1038,81 @@ def run_command(runtime, timeout=0.01):
shell=True,
cwd=runtime.cwd,
env=runtime.environ)
streams = [
Stream('stdout', proc.stdout),
Stream('stderr', proc.stderr)
]
result = {}
if output == 'stream':
streams = [
Stream('stdout', proc.stdout),
Stream('stderr', proc.stderr)
]

def _process(drain=0):
try:
res = select.select(streams, [], [], timeout)
except select.error, e:
iflogger.info(str(e))
if e[0] == errno.EINTR:
return
def _process(drain=0):
try:
res = select.select(streams, [], [], timeout)
except select.error, e:
iflogger.info(str(e))
if e[0] == errno.EINTR:
return
else:
raise
else:
raise
else:
for stream in res[0]:
stream.read(drain)

while proc.returncode is None:
proc.poll()
_process()
runtime.returncode = proc.returncode
_process(drain=1)

# collect results, merge and return
result = {}
temp = []
for stream in streams:
rows = stream._rows
temp += rows
result[stream._name] = [r[2] for r in rows]
temp.sort()
result['merged'] = [r[1] for r in temp]
for stream in res[0]:
stream.read(drain)

while proc.returncode is None:
proc.poll()
_process()
_process(drain=1)

# collect results, merge and return
result = {}
temp = []
for stream in streams:
rows = stream._rows
temp += rows
result[stream._name] = [r[2] for r in rows]
temp.sort()
result['merged'] = [r[1] for r in temp]
if output == 'allatonce':
stdout, stderr = proc.communicate()
result['stdout'] = stdout.split('\n')
result['stderr'] = stderr.split('\n')
result['merged'] = ''
if output == 'file':
errfile = os.path.join(runtime.cwd, 'stderr.nipype')
outfile = os.path.join(runtime.cwd, 'stdout.nipype')
stderr = open(errfile, 'wt')
stdout = open(outfile, 'wt')
proc = subprocess.Popen(runtime.cmdline,
stdout=stdout,
stderr=stderr,
shell=True,
cwd=runtime.cwd,
env=runtime.environ)
ret_code = proc.wait()
stderr.flush()
stdout.flush()
result['stdout'] = [line.strip() for line in open(outfile).readlines()]
result['stderr'] = [line.strip() for line in open(errfile).readlines()]
result['merged'] = ''
if output == 'none':
proc.communicate()
result['stdout'] = []
result['stderr'] = []
result['merged'] = ''
runtime.stderr = '\n'.join(result['stderr'])
runtime.stdout = '\n'.join(result['stdout'])
runtime.merged = result['merged']
runtime.returncode = proc.returncode
return runtime


class CommandLineInputSpec(BaseInterfaceInputSpec):
args = traits.Str(argstr='%s', desc='Additional parameters to the command')
environ = traits.DictStrStr(desc='Environment variables', usedefault=True,
nohash=True)

terminal_output = traits.Enum('stream', 'allatonce', 'file', 'none',
desc='Control terminal output', nohash=True,
mandatory=True)

class CommandLine(BaseInterface):
"""Implements functionality to interact with command line programs
Expand All @@ -1107,7 +1138,7 @@ class must be instantiated with a command argument
'ls -al'

>>> cli.inputs.trait_get()
{'ignore_exception': False, 'args': '-al', 'environ': {'DISPLAY': ':1'}}
{'ignore_exception': False, 'terminal_output': 'stream', 'environ': {'DISPLAY': ':1'}, 'args': '-al'}

>>> cli.inputs.get_hashval()
({'args': '-al'}, 'a2f45e04a34630c5f33a75ea2a533cdd')
Expand All @@ -1117,6 +1148,7 @@ class must be instantiated with a command argument
input_spec = CommandLineInputSpec
_cmd = None
_version = None
_terminal_output = 'stream'

def __init__(self, command=None, **inputs):
super(CommandLine, self).__init__(**inputs)
Expand All @@ -1127,6 +1159,31 @@ def __init__(self, command=None, **inputs):
raise Exception("Missing command")
if command:
self._cmd = command
self.inputs.on_trait_change(self._terminal_output_update,
'terminal_output')
if not isdefined(self.inputs.terminal_output):
self.inputs.terminal_output = self._terminal_output
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not using the usedefault in traits?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because you want to be able to control it both at the class level and node level. usedefault doesn't allow changing class level defaults.

else:
self._terminal_output_update()

def _terminal_output_update(self):
self._terminal_output = self.inputs.terminal_output

@classmethod
def set_default_terminal_output(cls, output_type):
"""Set the default output type for FSL classes.

This method is used to set the default output type for all fSL
subclasses. However, setting this will not update the output
type for any existing instances. For these, assign the
<instance>.inputs.output_type.
"""

if output_type in ['stream', 'allatonce', 'file', 'none']:
cls._terminal_output = output_type
else:
raise AttributeError('Invalid terminal output_type: %s' %
output_type)

@property
def cmd(self):
Expand Down Expand Up @@ -1207,7 +1264,7 @@ def _run_interface(self, runtime):
if not self._exists_in_path(self.cmd.split()[0]):
raise IOError("%s could not be found on host %s" % (self.cmd.split()[0],
runtime.hostname))
runtime = run_command(runtime)
runtime = run_command(runtime, output=self.inputs.terminal_output)
if runtime.returncode is None or runtime.returncode != 0:
self.raise_exception(runtime)

Expand Down
3 changes: 2 additions & 1 deletion nipype/interfaces/diffusion_toolkit/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ def version():
Version number as string or None if FSL not found

"""
clout = CommandLine(command='dti_recon').run()
clout = CommandLine(command='dti_recon',
terminal_output='allatonce').run()

if clout.runtime.returncode is not 0:
return None
Expand Down
3 changes: 2 additions & 1 deletion nipype/interfaces/fsl/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ def version():
except KeyError:
return None
clout = CommandLine(command='cat',
args='%s/etc/fslversion' % (basedir)).run()
args='%s/etc/fslversion' % (basedir),
terminal_output='allatonce').run()
out = clout.runtime.stdout
return out.strip('\n')

Expand Down
3 changes: 2 additions & 1 deletion nipype/interfaces/fsl/tests/test_preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ def test_flirt():
# Skip mandatory inputs and the trait methods
if key in ('trait_added', 'trait_modified', 'in_file', 'reference',
'environ', 'output_type', 'out_file', 'out_matrix_file',
'in_matrix_file', 'apply_xfm', 'ignore_exception'):
'in_matrix_file', 'apply_xfm', 'ignore_exception',
'terminal_output'):
continue
param = None
value = None
Expand Down
7 changes: 6 additions & 1 deletion nipype/interfaces/matlab.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ def get_matlab_command():
matlab_cmd = 'matlab'

try:
res = CommandLine(command='which', args=matlab_cmd).run()
res = CommandLine(command='which', args=matlab_cmd,
terminal_output='allatonce').run()
matlab_path = res.runtime.stdout.strip()
except Exception, e:
return None
Expand Down Expand Up @@ -95,6 +96,9 @@ def __init__(self, matlab_cmd = None, **inputs):
not isdefined(self.inputs.uses_mcr):
if config.getboolean('execution','single_thread_matlab'):
self.inputs.single_comp_thread = True
# For matlab commands force all output to be returned since matlab
# does not have a clean way of notifying an error
self.inputs.terminal_output = 'allatonce'

@classmethod
def set_default_matlab_cmd(cls, matlab_cmd):
Expand Down Expand Up @@ -130,6 +134,7 @@ def set_default_paths(cls, paths):
cls._default_paths = paths

def _run_interface(self,runtime):
self.inputs.terminal_output = 'allatonce'
runtime = super(MatlabCommand, self)._run_interface(runtime)
try:
# Matlab can leave the terminal in a barbbled state
Expand Down
50 changes: 50 additions & 0 deletions nipype/interfaces/tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,3 +461,53 @@ def test_Commandline_environ():
ci3.inputs.environ = {'DISPLAY' : ':2'}
res = ci3.run()
yield assert_equal, res.runtime.environ['DISPLAY'], ':2'

def test_CommandLine_output():
tmp_infile = setup_file()
tmpd, name = os.path.split(tmp_infile)
pwd = os.getcwd()
os.chdir(tmpd)
yield assert_true, os.path.exists(tmp_infile)
ci = nib.CommandLine(command='ls -l')
ci.inputs.terminal_output = 'allatonce'
res = ci.run()
yield assert_equal, res.runtime.merged, ''
yield assert_true, name in res.runtime.stdout
ci = nib.CommandLine(command='ls -l')
ci.inputs.terminal_output = 'file'
res = ci.run()
yield assert_true, 'stdout.nipype' in res.runtime.stdout
ci = nib.CommandLine(command='ls -l')
ci.inputs.terminal_output = 'none'
res = ci.run()
yield assert_equal, res.runtime.stdout, ''
ci = nib.CommandLine(command='ls -l')
res = ci.run()
yield assert_true, 'stdout.nipype' in res.runtime.stdout
os.chdir(pwd)
teardown_file(tmpd)

def test_global_CommandLine_output():
tmp_infile = setup_file()
tmpd, name = os.path.split(tmp_infile)
pwd = os.getcwd()
os.chdir(tmpd)
ci = nib.CommandLine(command='ls -l')
res = ci.run()
yield assert_true, name in res.runtime.stdout
yield assert_true, os.path.exists(tmp_infile)
nib.CommandLine.set_default_terminal_output('allatonce')
ci = nib.CommandLine(command='ls -l')
res = ci.run()
yield assert_equal, res.runtime.merged, ''
yield assert_true, name in res.runtime.stdout
nib.CommandLine.set_default_terminal_output('file')
ci = nib.CommandLine(command='ls -l')
res = ci.run()
yield assert_true, 'stdout.nipype' in res.runtime.stdout
nib.CommandLine.set_default_terminal_output('none')
ci = nib.CommandLine(command='ls -l')
res = ci.run()
yield assert_equal, res.runtime.stdout, ''
os.chdir(pwd)
teardown_file(tmpd)
6 changes: 4 additions & 2 deletions nipype/pipeline/plugins/condor.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ def __init__(self, **kwargs):
super(CondorPlugin, self).__init__(template, **kwargs)

def _is_pending(self, taskid):
cmd = CommandLine('condor_q')
cmd = CommandLine('condor_q',
terminal_output='allatonce')
cmd.inputs.args = '%d' % taskid
# check condor cluster
oldlevel = iflogger.level
Expand All @@ -57,7 +58,8 @@ def _is_pending(self, taskid):
return False

def _submit_batchtask(self, scriptfile, node):
cmd = CommandLine('condor_qsub', environ=os.environ.data)
cmd = CommandLine('condor_qsub', environ=os.environ.data,
terminal_output='allatonce')
path = os.path.dirname(scriptfile)
qsubargs = ''
if self._qsub_args:
Expand Down
3 changes: 2 additions & 1 deletion nipype/pipeline/plugins/dagman.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ def _submit_graph(self, pyfiles, dependencies, nodes):
% (' '.join([str(i) for i in parents]),
child))
# hand over DAG to condor_dagman
cmd = CommandLine('condor_submit_dag', environ=os.environ.data)
cmd = CommandLine('condor_submit_dag', environ=os.environ.data,
terminal_output='allatonce')
# needs -update_submit or re-running a workflow will fail
cmd.inputs.args = '-update_submit %s %s' % (dagfilename,
self._dagman_args)
Expand Down
6 changes: 4 additions & 2 deletions nipype/pipeline/plugins/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ def _is_pending(self, taskid):
and 'RUN' when it is actively being processed. But _is_pending should return True until a job has
finished and is ready to be checked for completeness. So return True if status is either 'PEND'
or 'RUN'"""
cmd = CommandLine('bjobs')
cmd = CommandLine('bjobs',
terminal_output='allatonce')
cmd.inputs.args = '%d' % taskid
# check lsf task
oldlevel = iflogger.level
Expand All @@ -57,7 +58,8 @@ def _is_pending(self, taskid):
return False

def _submit_batchtask(self, scriptfile, node):
cmd = CommandLine('bsub', environ=os.environ.data)
cmd = CommandLine('bsub', environ=os.environ.data,
terminal_output='allatonce')
path = os.path.dirname(scriptfile)
bsubargs = ''
if self._bsub_args:
Expand Down
3 changes: 2 additions & 1 deletion nipype/pipeline/plugins/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ def _is_pending(self, taskid):
return errmsg not in e

def _submit_batchtask(self, scriptfile, node):
cmd = CommandLine('qsub', environ=os.environ.data)
cmd = CommandLine('qsub', environ=os.environ.data,
terminal_output='allatonce')
path = os.path.dirname(scriptfile)
qsubargs = ''
if self._qsub_args:
Expand Down
3 changes: 2 additions & 1 deletion nipype/pipeline/plugins/pbsgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ def _submit_graph(self, pyfiles, dependencies, nodes):
fp.writelines('job%05d=`qsub %s %s %s`\n' % (idx, deps,
qsub_args,
batchscriptfile))
cmd = CommandLine('sh', environ=os.environ.data)
cmd = CommandLine('sh', environ=os.environ.data,
terminal_output='allatonce')
cmd.inputs.args = '%s' % submitjobsfile
cmd.run()
logger.info('submitted all jobs to queue')
Loading