diff --git a/doc/users/index.rst b/doc/users/index.rst index 13c1487ae0..911461fdad 100644 --- a/doc/users/index.rst +++ b/doc/users/index.rst @@ -21,6 +21,7 @@ pipeline_tutorial plugins + resource_sched_profiler config_file debug diff --git a/doc/users/plugins.rst b/doc/users/plugins.rst index 6c825aa8f8..cd3a4d0d32 100644 --- a/doc/users/plugins.rst +++ b/doc/users/plugins.rst @@ -83,6 +83,14 @@ the number of used resources (to say 2 CPUs), you can call:: workflow.run(plugin='MultiProc', plugin_args={'n_procs' : 2} + +MultiProc: profiling resources +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The ``MultiProc`` plugin can profile memory and cpu usage, +please see :ref:`resource_sched_profiler` for a comprehensive description. + + IPython ------- diff --git a/nipype/interfaces/base.py b/nipype/interfaces/base.py index 188053c639..96497f400f 100644 --- a/nipype/interfaces/base.py +++ b/nipype/interfaces/base.py @@ -1407,9 +1407,12 @@ def get_max_resources_used(pid, mem_mb, num_threads, pyfunc=False): try: mem_mb = max(mem_mb, _get_ram_mb(pid, pyfunc=pyfunc)) num_threads = max(num_threads, _get_num_threads(psutil.Process(pid))) + except psutil.AccessDenied as exc: + runtime_profile = False + iflogger.debug('Resources used cannot be profiled by psutils: %s', exc) except Exception as exc: - iflogger.info('Could not get resources used by process. Error: %s'\ - % exc) + runtime_profile = False + iflogger.info('Error getting resources used by process: %s', exc) # Return resources return mem_mb, num_threads diff --git a/nipype/interfaces/tests/test_runtime_profiler.py b/nipype/interfaces/tests/test_runtime_profiler.py index e127232412..3721246ca1 100644 --- a/nipype/interfaces/tests/test_runtime_profiler.py +++ b/nipype/interfaces/tests/test_runtime_profiler.py @@ -151,7 +151,9 @@ def setUp(self): # Input number of sub-threads (not including parent threads) self.num_threads = 2 # Acceptable percent error for memory profiled against input - self.mem_err_gb = 0.3 # Increased to 30% for py2.7 + self.mem_err_gb = 0.3 + self.num_err_thr = 1 + # ! Only used for benchmarking the profiler over a range of # ! RAM usage and number of threads @@ -403,19 +405,19 @@ def test_cmdline_profiling(self): # Get margin of error for RAM GB allowed_gb_err = self.mem_err_gb runtime_gb_err = np.abs(runtime_gb-num_gb) - # - expected_runtime_threads = num_threads + # Get margin of error for number of threads + allowed_thr_err = self.num_err_thr + runtime_thr_err = np.abs(runtime_threads-num_threads) # Error message formatting mem_err = 'Input memory: %f is not within %.3f GB of runtime '\ - 'memory: %f' % (num_gb, self.mem_err_gb, runtime_gb) - threads_err = 'Input threads: %d is not equal to runtime threads: %d' \ - % (expected_runtime_threads, runtime_threads) + 'memory: %f' % (num_gb, allowed_gb_err, runtime_gb) + threads_err = 'Input threads: %d is not within $d of runtime threads: %d' \ + % (num_threads, allowed_thr_err) # Assert runtime stats are what was input self.assertLessEqual(runtime_gb_err, allowed_gb_err, msg=mem_err) - self.assertTrue(abs(expected_runtime_threads - runtime_threads) <= 1, - msg=threads_err) + self.assertLessEqual(runtime_thr_err, allowed_thr_err, msg=threads_err) # Test resources were used as expected @unittest.skipIf(run_profile == False, skip_profile_msg) @@ -445,19 +447,19 @@ def test_function_profiling(self): # Get margin of error for RAM GB allowed_gb_err = self.mem_err_gb runtime_gb_err = np.abs(runtime_gb-num_gb) - # - expected_runtime_threads = num_threads + # Get margin of error for number of threads + allowed_thr_err = self.num_err_thr + runtime_thr_err = np.abs(runtime_threads-num_threads) # Error message formatting mem_err = 'Input memory: %f is not within %.3f GB of runtime '\ - 'memory: %f' % (num_gb, self.mem_err_gb, runtime_gb) - threads_err = 'Input threads: %d is not equal to runtime threads: %d' \ - % (expected_runtime_threads, runtime_threads) + 'memory: %f' % (num_gb, allowed_gb_err, runtime_gb) + threads_err = 'Input threads: %d is not within $d of runtime threads: %d' \ + % (num_threads, allowed_thr_err) # Assert runtime stats are what was input self.assertLessEqual(runtime_gb_err, allowed_gb_err, msg=mem_err) - self.assertTrue(abs(expected_runtime_threads - runtime_threads) <= 1, - msg=threads_err) + self.assertLessEqual(runtime_thr_err, allowed_thr_err, msg=threads_err) # Command-line run-able unittest module diff --git a/nipype/pipeline/plugins/base.py b/nipype/pipeline/plugins/base.py index 098ae5d636..8ac570a894 100644 --- a/nipype/pipeline/plugins/base.py +++ b/nipype/pipeline/plugins/base.py @@ -261,7 +261,6 @@ def run(self, graph, config, updatehash=False): if toappend: self.pending_tasks.extend(toappend) num_jobs = len(self.pending_tasks) - logger.debug('Number of pending tasks: %d' % num_jobs) if num_jobs < self.max_jobs: self._send_procs_to_workers(updatehash=updatehash, graph=graph) diff --git a/nipype/utils/draw_gantt_chart.py b/nipype/utils/draw_gantt_chart.py index 75ea7dc3ea..627d7318dc 100644 --- a/nipype/utils/draw_gantt_chart.py +++ b/nipype/utils/draw_gantt_chart.py @@ -56,11 +56,11 @@ def create_event_dict(start_time, nodes_list): runtime_threads = node.get('runtime_threads', 0) runtime_memory_gb = node.get('runtime_memory_gb', 0.0) - # Init and format event-based nodes - node['estimated_threads'] = estimated_threads - node['estimated_memory_gb'] = estimated_memory_gb - node['runtime_threads'] = runtime_threads - node['runtime_memory_gb'] = runtime_memory_gb + # Init and format event-based nodes + node['estimated_threads'] = estimated_threads + node['estimated_memory_gb'] = estimated_memory_gb + node['runtime_threads'] = runtime_threads + node['runtime_memory_gb'] = runtime_memory_gb start_node = node finish_node = copy.deepcopy(node) start_node['event'] = 'start' @@ -71,11 +71,14 @@ def create_event_dict(start_time, nodes_list): finish_delta = (node['finish'] - start_time).total_seconds() # Populate dictionary - if events.has_key(start_delta) or events.has_key(finish_delta): - err_msg = 'Event logged twice or events started at exact same time!' - raise KeyError(err_msg) - events[start_delta] = start_node - events[finish_delta] = finish_node + if events.has_key(start_delta): + events[start_delta].append(start_node) + else: + events[start_delta] = [start_node] + if events.has_key(finish_delta): + events[finish_delta].append(finish_node) + else: + events[finish_delta] = [finish_node] # Return events dictionary return events @@ -186,15 +189,16 @@ def calculate_resource_timeseries(events, resource): all_res = 0.0 # Iterate through the events - for tdelta, event in sorted(events.items()): - if event['event'] == "start": - if resource in event and event[resource] != 'Unknown': - all_res += float(event[resource]) - current_time = event['start']; - elif event['event'] == "finish": - if resource in event and event[resource] != 'Unknown': - all_res -= float(event[resource]) - current_time = event['finish']; + for tdelta, events in sorted(events.items()): + for event in events: + if event['event'] == "start": + if resource in event and event[resource] != 'Unknown': + all_res += float(event[resource]) + current_time = event['start'] + elif event['event'] == "finish": + if resource in event and event[resource] != 'Unknown': + all_res -= float(event[resource]) + current_time = event['finish'] res[current_time] = all_res # Formulate the pandas timeseries @@ -349,6 +353,7 @@ def draw_nodes(start, nodes_list, cores, minute_scale, space_between_minutes, # Return html string for nodes return result + def draw_resource_bar(start_time, finish_time, time_series, space_between_minutes, minute_scale, color, left, resource): ''' diff --git a/tools/run_examples.py b/tools/run_examples.py index 45f4c8dc4f..26b216c0cc 100644 --- a/tools/run_examples.py +++ b/tools/run_examples.py @@ -2,13 +2,16 @@ from __future__ import print_function import os import sys -from shutil import rmtree +from shutil import rmtree, copyfile from multiprocessing import cpu_count def run_examples(example, pipelines, data_path, plugin=None, rm_base_dir=True): + """ Run example workflows """ from nipype import config from nipype.interfaces.base import CommandLine + from nipype.utils import draw_gantt_chart + from nipype.pipeline.plugins import log_nodes_cb if plugin is None: plugin = 'MultiProc' @@ -23,7 +26,9 @@ def run_examples(example, pipelines, data_path, plugin=None, rm_base_dir=True): plugin_args['n_procs'] = int(os.getenv('NIPYPE_NUMBER_OF_CPUS', cpu_count())) __import__(example) + for pipeline in pipelines: + # Init and run workflow wf = getattr(sys.modules[example], pipeline) wf.base_dir = os.path.join(os.getcwd(), 'output', example, plugin) @@ -40,14 +45,39 @@ def run_examples(example, pipelines, data_path, plugin=None, rm_base_dir=True): 'write_provenance': 'true', 'poll_sleep_duration': 2}, 'logging': {'log_directory': log_dir, 'log_to_file': True}} + + + # Callback log setup + if example == 'fmri_spm_nested' and plugin == 'MultiProc' and \ + pipeline == 'l2pipeline': + # Init callback log + import logging + cb_log_path = os.path.abspath('callback.log') + cb_logger = logging.getLogger('callback') + cb_logger.setLevel(logging.DEBUG) + handler = logging.FileHandler(cb_log_path) + cb_logger.addHandler(handler) + plugin_args['status_callback'] = cb_logger + try: wf.inputs.inputnode.in_data = os.path.abspath(data_path) except AttributeError: pass # the workflow does not have inputnode.in_data wf.run(plugin=plugin, plugin_args=plugin_args) - # run twice to check if nothing is rerunning - wf.run(plugin=plugin) + + # Draw gantt chart only if pandas is installed + try: + import pandas + pandas_flg = True + except ImportError as exc: + pandas_flg = False + + if plugin_args.get('status_callback', False) and pandas_flg: + draw_gantt_chart.generate_gantt_chart(cb_log_path, 4) + dst_log_html = os.path.abspath('callback.log.html') + copyfile(cb_log_path + '.html', dst_log_html) + if __name__ == '__main__': path, file = os.path.split(__file__)