Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
3ff01a2
Added compatibility for same time delta nodes
pintohutch May 12, 2016
395b60a
Added smoke test for draw gantt chart
pintohutch May 17, 2016
220b9af
Should fix issue mentioned https://github.com/nipy/nipype/issues/1488…
pintohutch Jun 7, 2016
d49ed0b
Merge pull request #26 from nipy/master
pintohutch Jun 7, 2016
604fff3
Merge branch 'resource_multiproc' of https://github.com/fcp-indi/nipy…
pintohutch Jun 7, 2016
cff9393
added tolerance for num threads
Jun 17, 2016
35f2d47
Increased memory tolerance
Jun 17, 2016
f9c1092
Added num threads tolerance to both methods
Jun 17, 2016
748bce3
Merge pull request #27 from nipy/master
pintohutch Jun 17, 2016
c898148
Debug statements quantity of free resources should be reduced
pintohutch Jun 26, 2016
5c276b5
Removed more debug statements
pintohutch Jul 9, 2016
2c22093
Added flag check for pandas
pintohutch Jul 26, 2016
e1c8593
Merge branch 'master' of https://github.com/nipy/nipype into nipy-master
pintohutch Aug 1, 2016
25d6341
Merge branch 'nipy-master' into resource_multiproc
pintohutch Aug 1, 2016
ba90842
Resolved git diffs
pintohutch Aug 1, 2016
b79a0b7
Merge remote-tracking branch 'upstream/master' into fix/ResourceMulti…
oesteban Sep 16, 2016
5dd4247
fix logging messages
oesteban Sep 16, 2016
6935526
Merge remote-tracking branch 'upstream/master' into fix/ResourceMulti…
oesteban Sep 20, 2016
5b099b9
fix error in run_examples.py and add MultiProcResourceProfiler in docs.
oesteban Sep 20, 2016
18746c7
fix references in documentation #1526
oesteban Sep 20, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/users/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

pipeline_tutorial
plugins
resource_sched_profiler
config_file
debug

Expand Down
8 changes: 8 additions & 0 deletions doc/users/plugins.rst
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ the number of used resources (to say 2 CPUs), you can call::

workflow.run(plugin='MultiProc', plugin_args={'n_procs' : 2}


MultiProc: profiling resources
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The ``MultiProc`` plugin can profile memory and cpu usage,
please see :ref:`resource_sched_profiler` for a comprehensive description.


IPython
-------

Expand Down
7 changes: 5 additions & 2 deletions nipype/interfaces/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1407,9 +1407,12 @@ def get_max_resources_used(pid, mem_mb, num_threads, pyfunc=False):
try:
mem_mb = max(mem_mb, _get_ram_mb(pid, pyfunc=pyfunc))
num_threads = max(num_threads, _get_num_threads(psutil.Process(pid)))
except psutil.AccessDenied as exc:
runtime_profile = False
iflogger.debug('Resources used cannot be profiled by psutils: %s', exc)
except Exception as exc:
iflogger.info('Could not get resources used by process. Error: %s'\
% exc)
runtime_profile = False
iflogger.info('Error getting resources used by process: %s', exc)

# Return resources
return mem_mb, num_threads
Expand Down
32 changes: 17 additions & 15 deletions nipype/interfaces/tests/test_runtime_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ def setUp(self):
# Input number of sub-threads (not including parent threads)
self.num_threads = 2
# Acceptable percent error for memory profiled against input
self.mem_err_gb = 0.3 # Increased to 30% for py2.7
self.mem_err_gb = 0.3
self.num_err_thr = 1


# ! Only used for benchmarking the profiler over a range of
# ! RAM usage and number of threads
Expand Down Expand Up @@ -403,19 +405,19 @@ def test_cmdline_profiling(self):
# Get margin of error for RAM GB
allowed_gb_err = self.mem_err_gb
runtime_gb_err = np.abs(runtime_gb-num_gb)
#
expected_runtime_threads = num_threads
# Get margin of error for number of threads
allowed_thr_err = self.num_err_thr
runtime_thr_err = np.abs(runtime_threads-num_threads)

# Error message formatting
mem_err = 'Input memory: %f is not within %.3f GB of runtime '\
'memory: %f' % (num_gb, self.mem_err_gb, runtime_gb)
threads_err = 'Input threads: %d is not equal to runtime threads: %d' \
% (expected_runtime_threads, runtime_threads)
'memory: %f' % (num_gb, allowed_gb_err, runtime_gb)
threads_err = 'Input threads: %d is not within $d of runtime threads: %d' \
% (num_threads, allowed_thr_err)

# Assert runtime stats are what was input
self.assertLessEqual(runtime_gb_err, allowed_gb_err, msg=mem_err)
self.assertTrue(abs(expected_runtime_threads - runtime_threads) <= 1,
msg=threads_err)
self.assertLessEqual(runtime_thr_err, allowed_thr_err, msg=threads_err)

# Test resources were used as expected
@unittest.skipIf(run_profile == False, skip_profile_msg)
Expand Down Expand Up @@ -445,19 +447,19 @@ def test_function_profiling(self):
# Get margin of error for RAM GB
allowed_gb_err = self.mem_err_gb
runtime_gb_err = np.abs(runtime_gb-num_gb)
#
expected_runtime_threads = num_threads
# Get margin of error for number of threads
allowed_thr_err = self.num_err_thr
runtime_thr_err = np.abs(runtime_threads-num_threads)

# Error message formatting
mem_err = 'Input memory: %f is not within %.3f GB of runtime '\
'memory: %f' % (num_gb, self.mem_err_gb, runtime_gb)
threads_err = 'Input threads: %d is not equal to runtime threads: %d' \
% (expected_runtime_threads, runtime_threads)
'memory: %f' % (num_gb, allowed_gb_err, runtime_gb)
threads_err = 'Input threads: %d is not within $d of runtime threads: %d' \
% (num_threads, allowed_thr_err)

# Assert runtime stats are what was input
self.assertLessEqual(runtime_gb_err, allowed_gb_err, msg=mem_err)
self.assertTrue(abs(expected_runtime_threads - runtime_threads) <= 1,
msg=threads_err)
self.assertLessEqual(runtime_thr_err, allowed_thr_err, msg=threads_err)


# Command-line run-able unittest module
Expand Down
1 change: 0 additions & 1 deletion nipype/pipeline/plugins/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,6 @@ def run(self, graph, config, updatehash=False):
if toappend:
self.pending_tasks.extend(toappend)
num_jobs = len(self.pending_tasks)
logger.debug('Number of pending tasks: %d' % num_jobs)
if num_jobs < self.max_jobs:
self._send_procs_to_workers(updatehash=updatehash,
graph=graph)
Expand Down
43 changes: 24 additions & 19 deletions nipype/utils/draw_gantt_chart.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ def create_event_dict(start_time, nodes_list):
runtime_threads = node.get('runtime_threads', 0)
runtime_memory_gb = node.get('runtime_memory_gb', 0.0)

# Init and format event-based nodes
node['estimated_threads'] = estimated_threads
node['estimated_memory_gb'] = estimated_memory_gb
node['runtime_threads'] = runtime_threads
node['runtime_memory_gb'] = runtime_memory_gb
# Init and format event-based nodes
node['estimated_threads'] = estimated_threads
node['estimated_memory_gb'] = estimated_memory_gb
node['runtime_threads'] = runtime_threads
node['runtime_memory_gb'] = runtime_memory_gb
start_node = node
finish_node = copy.deepcopy(node)
start_node['event'] = 'start'
Expand All @@ -71,11 +71,14 @@ def create_event_dict(start_time, nodes_list):
finish_delta = (node['finish'] - start_time).total_seconds()

# Populate dictionary
if events.has_key(start_delta) or events.has_key(finish_delta):
err_msg = 'Event logged twice or events started at exact same time!'
raise KeyError(err_msg)
events[start_delta] = start_node
events[finish_delta] = finish_node
if events.has_key(start_delta):
events[start_delta].append(start_node)
else:
events[start_delta] = [start_node]
if events.has_key(finish_delta):
events[finish_delta].append(finish_node)
else:
events[finish_delta] = [finish_node]

# Return events dictionary
return events
Expand Down Expand Up @@ -186,15 +189,16 @@ def calculate_resource_timeseries(events, resource):
all_res = 0.0

# Iterate through the events
for tdelta, event in sorted(events.items()):
if event['event'] == "start":
if resource in event and event[resource] != 'Unknown':
all_res += float(event[resource])
current_time = event['start'];
elif event['event'] == "finish":
if resource in event and event[resource] != 'Unknown':
all_res -= float(event[resource])
current_time = event['finish'];
for tdelta, events in sorted(events.items()):
for event in events:
if event['event'] == "start":
if resource in event and event[resource] != 'Unknown':
all_res += float(event[resource])
current_time = event['start']
elif event['event'] == "finish":
if resource in event and event[resource] != 'Unknown':
all_res -= float(event[resource])
current_time = event['finish']
res[current_time] = all_res

# Formulate the pandas timeseries
Expand Down Expand Up @@ -349,6 +353,7 @@ def draw_nodes(start, nodes_list, cores, minute_scale, space_between_minutes,
# Return html string for nodes
return result


def draw_resource_bar(start_time, finish_time, time_series, space_between_minutes,
minute_scale, color, left, resource):
'''
Expand Down
36 changes: 33 additions & 3 deletions tools/run_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
from __future__ import print_function
import os
import sys
from shutil import rmtree
from shutil import rmtree, copyfile
from multiprocessing import cpu_count


def run_examples(example, pipelines, data_path, plugin=None, rm_base_dir=True):
""" Run example workflows """
from nipype import config
from nipype.interfaces.base import CommandLine
from nipype.utils import draw_gantt_chart
from nipype.pipeline.plugins import log_nodes_cb

if plugin is None:
plugin = 'MultiProc'
Expand All @@ -23,7 +26,9 @@ def run_examples(example, pipelines, data_path, plugin=None, rm_base_dir=True):
plugin_args['n_procs'] = int(os.getenv('NIPYPE_NUMBER_OF_CPUS', cpu_count()))

__import__(example)

for pipeline in pipelines:
# Init and run workflow
wf = getattr(sys.modules[example], pipeline)
wf.base_dir = os.path.join(os.getcwd(), 'output', example, plugin)

Expand All @@ -40,14 +45,39 @@ def run_examples(example, pipelines, data_path, plugin=None, rm_base_dir=True):
'write_provenance': 'true',
'poll_sleep_duration': 2},
'logging': {'log_directory': log_dir, 'log_to_file': True}}


# Callback log setup
if example == 'fmri_spm_nested' and plugin == 'MultiProc' and \
pipeline == 'l2pipeline':
# Init callback log
import logging
cb_log_path = os.path.abspath('callback.log')
cb_logger = logging.getLogger('callback')
cb_logger.setLevel(logging.DEBUG)
handler = logging.FileHandler(cb_log_path)
cb_logger.addHandler(handler)
plugin_args['status_callback'] = cb_logger

try:
wf.inputs.inputnode.in_data = os.path.abspath(data_path)
except AttributeError:
pass # the workflow does not have inputnode.in_data

wf.run(plugin=plugin, plugin_args=plugin_args)
# run twice to check if nothing is rerunning
wf.run(plugin=plugin)

# Draw gantt chart only if pandas is installed
try:
import pandas
pandas_flg = True
except ImportError as exc:
pandas_flg = False

if plugin_args.get('status_callback', False) and pandas_flg:
draw_gantt_chart.generate_gantt_chart(cb_log_path, 4)
dst_log_html = os.path.abspath('callback.log.html')
copyfile(cb_log_path + '.html', dst_log_html)


if __name__ == '__main__':
path, file = os.path.split(__file__)
Expand Down