Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions nipype/interfaces/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1333,6 +1333,9 @@ def get_max_resources_used(pid, mem_mb, num_threads, pyfunc=False):
try:
mem_mb = max(mem_mb, _get_ram_mb(pid, pyfunc=pyfunc))
num_threads = max(num_threads, _get_num_threads(psutil.Process(pid)))
except psutil.AccessDenied as exc:
Copy link
Contributor

@oesteban oesteban Sep 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe disable the runtime_profile here?

iflogger.debug('Could not get resources used by process. Error: %s'\
% exc)
except Exception as exc:
iflogger.info('Could not get resources used by process. Error: %s'\
% exc)
Expand Down
35 changes: 18 additions & 17 deletions nipype/interfaces/tests/test_runtime_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ def setUp(self):
# Input number of sub-threads (not including parent threads)
self.num_threads = 2
# Acceptable percent error for memory profiled against input
self.mem_err_gb = 0.25
self.mem_err_gb = 0.4
self.num_err_thr = 1

# ! Only used for benchmarking the profiler over a range of
# ! RAM usage and number of threads
Expand Down Expand Up @@ -355,7 +356,7 @@ def _run_function_workflow(self, num_gb, num_threads):
return start_str, finish_str

# Test resources were used as expected in cmdline interface
@unittest.skipIf(run_profiler == False, skip_profile_msg)
@unittest.skipIf(run_profiler==False, skip_profile_msg)
def test_cmdline_profiling(self):
'''
Test runtime profiler correctly records workflow RAM/CPUs consumption
Expand All @@ -382,22 +383,22 @@ def test_cmdline_profiling(self):
# Get margin of error for RAM GB
allowed_gb_err = self.mem_err_gb
runtime_gb_err = np.abs(runtime_gb-num_gb)
#
expected_runtime_threads = num_threads
# Get margin of error for number of threads
allowed_thr_err = self.num_err_thr
runtime_thr_err = np.abs(runtime_threads-num_threads)

# Error message formatting
mem_err = 'Input memory: %f is not within %.3f GB of runtime '\
'memory: %f' % (num_gb, self.mem_err_gb, runtime_gb)
threads_err = 'Input threads: %d is not equal to runtime threads: %d' \
% (expected_runtime_threads, runtime_threads)
'memory: %f' % (num_gb, allowed_gb_err, runtime_gb)
threads_err = 'Input threads: %d is not within $d of runtime threads: %d' \
% (num_threads, allowed_thr_err)

# Assert runtime stats are what was input
self.assertLessEqual(runtime_gb_err, allowed_gb_err, msg=mem_err)
self.assertTrue(abs(expected_runtime_threads - runtime_threads) <= 1,
msg=threads_err)
self.assertLessEqual(runtime_thr_err, allowed_thr_err, msg=threads_err)

# Test resources were used as expected
@unittest.skipIf(run_profiler == False, skip_profile_msg)
@unittest.skipIf(run_profiler==False, skip_profile_msg)
def test_function_profiling(self):
'''
Test runtime profiler correctly records workflow RAM/CPUs consumption
Expand All @@ -424,19 +425,19 @@ def test_function_profiling(self):
# Get margin of error for RAM GB
allowed_gb_err = self.mem_err_gb
runtime_gb_err = np.abs(runtime_gb-num_gb)
#
expected_runtime_threads = num_threads
# Get margin of error for number of threads
allowed_thr_err = self.num_err_thr
runtime_thr_err = np.abs(runtime_threads-num_threads)

# Error message formatting
mem_err = 'Input memory: %f is not within %.3f GB of runtime '\
'memory: %f' % (num_gb, self.mem_err_gb, runtime_gb)
threads_err = 'Input threads: %d is not equal to runtime threads: %d' \
% (expected_runtime_threads, runtime_threads)
'memory: %f' % (num_gb, allowed_gb_err, runtime_gb)
threads_err = 'Input threads: %d is not within $d of runtime threads: %d' \
% (num_threads, allowed_thr_err)

# Assert runtime stats are what was input
self.assertLessEqual(runtime_gb_err, allowed_gb_err, msg=mem_err)
self.assertTrue(abs(expected_runtime_threads - runtime_threads) <= 1,
msg=threads_err)
self.assertLessEqual(runtime_thr_err, allowed_thr_err, msg=threads_err)


# Command-line run-able unittest module
Expand Down
1 change: 0 additions & 1 deletion nipype/pipeline/plugins/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,6 @@ def run(self, graph, config, updatehash=False):
if toappend:
self.pending_tasks.extend(toappend)
num_jobs = len(self.pending_tasks)
logger.debug('Number of pending tasks: %d' % num_jobs)
if num_jobs < self.max_jobs:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This trace appeared too often, maybe it's ok to remove it. But I ask @satra first.

self._send_procs_to_workers(updatehash=updatehash,
graph=graph)
Expand Down
8 changes: 3 additions & 5 deletions nipype/pipeline/plugins/multiproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,9 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
jobids = sorted(jobids,
key=lambda item: (self.procs[item]._interface.estimated_memory_gb,
self.procs[item]._interface.num_threads))

logger.debug('Free memory (GB): %d, Free processors: %d',
free_memory_gb, free_processors)
if len(jobids) > 0:
Copy link
Contributor

@oesteban oesteban Sep 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @satra protected this trace printing with the runtime_profile boolean as well.

logger.debug('Free memory (GB): %d, Free processors: %d',
free_memory_gb, free_processors)

# While have enough memory and processors for first job
# Submit first job on the list
Expand Down Expand Up @@ -304,5 +304,3 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
self.pending_tasks.insert(0, (tid, jobid))
else:
break

logger.debug('No jobs waiting to execute')
43 changes: 24 additions & 19 deletions nipype/utils/draw_gantt_chart.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ def create_event_dict(start_time, nodes_list):
runtime_threads = node.get('runtime_threads', 0)
runtime_memory_gb = node.get('runtime_memory_gb', 0.0)

# Init and format event-based nodes
node['estimated_threads'] = estimated_threads
node['estimated_memory_gb'] = estimated_memory_gb
node['runtime_threads'] = runtime_threads
node['runtime_memory_gb'] = runtime_memory_gb
# Init and format event-based nodes
node['estimated_threads'] = estimated_threads
node['estimated_memory_gb'] = estimated_memory_gb
node['runtime_threads'] = runtime_threads
node['runtime_memory_gb'] = runtime_memory_gb
start_node = node
finish_node = copy.deepcopy(node)
start_node['event'] = 'start'
Expand All @@ -63,11 +63,14 @@ def create_event_dict(start_time, nodes_list):
finish_delta = (node['finish'] - start_time).total_seconds()

# Populate dictionary
if events.has_key(start_delta) or events.has_key(finish_delta):
err_msg = 'Event logged twice or events started at exact same time!'
raise KeyError(err_msg)
events[start_delta] = start_node
events[finish_delta] = finish_node
if events.has_key(start_delta):
events[start_delta].append(start_node)
else:
events[start_delta] = [start_node]
if events.has_key(finish_delta):
events[finish_delta].append(finish_node)
else:
events[finish_delta] = [finish_node]

# Return events dictionary
return events
Expand Down Expand Up @@ -178,15 +181,16 @@ def calculate_resource_timeseries(events, resource):
all_res = 0.0

# Iterate through the events
for tdelta, event in sorted(events.items()):
if event['event'] == "start":
if resource in event and event[resource] != 'Unknown':
all_res += float(event[resource])
current_time = event['start'];
elif event['event'] == "finish":
if resource in event and event[resource] != 'Unknown':
all_res -= float(event[resource])
current_time = event['finish'];
for tdelta, events in sorted(events.items()):
for event in events:
if event['event'] == "start":
if resource in event and event[resource] != 'Unknown':
all_res += float(event[resource])
current_time = event['start']
elif event['event'] == "finish":
if resource in event and event[resource] != 'Unknown':
all_res -= float(event[resource])
current_time = event['finish']
res[current_time] = all_res

# Formulate the pandas timeseries
Expand Down Expand Up @@ -341,6 +345,7 @@ def draw_nodes(start, nodes_list, cores, minute_scale, space_between_minutes,
# Return html string for nodes
return result


def draw_resource_bar(start_time, finish_time, time_series, space_between_minutes,
minute_scale, color, left, resource):
'''
Expand Down
42 changes: 37 additions & 5 deletions tools/run_examples.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
from __future__ import print_function
import os
import sys
from shutil import rmtree
from shutil import rmtree, copyfile
from multiprocessing import cpu_count


def run_examples(example, pipelines, data_path, plugin=None):
'''
Run example workflows
'''

# Import packages
from nipype import config
from nipype.interfaces.base import CommandLine
from nipype.utils import draw_gantt_chart
from nipype.pipeline.plugins import log_nodes_cb

if plugin is None:
plugin = 'MultiProc'
Expand All @@ -22,7 +29,9 @@ def run_examples(example, pipelines, data_path, plugin=None):
plugin_args['n_procs'] = cpu_count()

__import__(example)

for pipeline in pipelines:
# Init and run workflow
wf = getattr(sys.modules[example], pipeline)
wf.base_dir = os.path.join(os.getcwd(), 'output', example, plugin)
if os.path.exists(wf.base_dir):
Expand All @@ -35,16 +44,39 @@ def run_examples(example, pipelines, data_path, plugin=None):
os.makedirs(log_dir)
wf.config = {'execution': {'hash_method': 'timestamp',
'stop_on_first_rerun': 'true',
'write_provenance': 'true'},
'logging': {'log_directory': log_dir, 'log_to_file': True}}
'write_provenance': 'true'}}

# Callback log setup
if example == 'fmri_spm_nested' and plugin == 'MultiProc' and \
pipeline == 'l2pipeline':
# Init callback log
import logging
cb_log_path = os.path.join(os.path.expanduser('~'), 'callback.log')
cb_logger = logging.getLogger('callback')
cb_logger.setLevel(logging.DEBUG)
handler = logging.FileHandler(cb_log_path)
cb_logger.addHandler(handler)
plugin_args = {'n_procs' : 4, 'status_callback' : log_nodes_cb}
else:
plugin_args = {'n_procs' : 4}
try:
wf.inputs.inputnode.in_data = os.path.abspath(data_path)
except AttributeError:
pass # the workflow does not have inputnode.in_data

wf.run(plugin=plugin, plugin_args=plugin_args)
# run twice to check if nothing is rerunning
wf.run(plugin=plugin)

# Draw gantt chart only if pandas is installed
try:
import pandas
pandas_flg = True
except ImportError as exc:
pandas_flg = False

if plugin_args.has_key('status_callback') and pandas_flg:
draw_gantt_chart.generate_gantt_chart(cb_log_path, 4)
dst_log_html = os.path.join(os.path.expanduser('~'), 'callback.log.html')
copyfile(cb_log_path+'.html', dst_log_html)

if __name__ == '__main__':
path, file = os.path.split(__file__)
Expand Down