diff --git a/nipype/interfaces/base.py b/nipype/interfaces/base.py index 00482e9e6b..609b6fccc7 100644 --- a/nipype/interfaces/base.py +++ b/nipype/interfaces/base.py @@ -1333,6 +1333,9 @@ def get_max_resources_used(pid, mem_mb, num_threads, pyfunc=False): try: mem_mb = max(mem_mb, _get_ram_mb(pid, pyfunc=pyfunc)) num_threads = max(num_threads, _get_num_threads(psutil.Process(pid))) + except psutil.AccessDenied as exc: + iflogger.debug('Could not get resources used by process. Error: %s'\ + % exc) except Exception as exc: iflogger.info('Could not get resources used by process. Error: %s'\ % exc) diff --git a/nipype/interfaces/tests/test_runtime_profiler.py b/nipype/interfaces/tests/test_runtime_profiler.py index 8c911449b0..80a08fa16e 100644 --- a/nipype/interfaces/tests/test_runtime_profiler.py +++ b/nipype/interfaces/tests/test_runtime_profiler.py @@ -134,7 +134,8 @@ def setUp(self): # Input number of sub-threads (not including parent threads) self.num_threads = 2 # Acceptable percent error for memory profiled against input - self.mem_err_gb = 0.25 + self.mem_err_gb = 0.4 + self.num_err_thr = 1 # ! Only used for benchmarking the profiler over a range of # ! RAM usage and number of threads @@ -355,7 +356,7 @@ def _run_function_workflow(self, num_gb, num_threads): return start_str, finish_str # Test resources were used as expected in cmdline interface - @unittest.skipIf(run_profiler == False, skip_profile_msg) + @unittest.skipIf(run_profiler==False, skip_profile_msg) def test_cmdline_profiling(self): ''' Test runtime profiler correctly records workflow RAM/CPUs consumption @@ -382,22 +383,22 @@ def test_cmdline_profiling(self): # Get margin of error for RAM GB allowed_gb_err = self.mem_err_gb runtime_gb_err = np.abs(runtime_gb-num_gb) - # - expected_runtime_threads = num_threads + # Get margin of error for number of threads + allowed_thr_err = self.num_err_thr + runtime_thr_err = np.abs(runtime_threads-num_threads) # Error message formatting mem_err = 'Input memory: %f is not within %.3f GB of runtime '\ - 'memory: %f' % (num_gb, self.mem_err_gb, runtime_gb) - threads_err = 'Input threads: %d is not equal to runtime threads: %d' \ - % (expected_runtime_threads, runtime_threads) + 'memory: %f' % (num_gb, allowed_gb_err, runtime_gb) + threads_err = 'Input threads: %d is not within $d of runtime threads: %d' \ + % (num_threads, allowed_thr_err) # Assert runtime stats are what was input self.assertLessEqual(runtime_gb_err, allowed_gb_err, msg=mem_err) - self.assertTrue(abs(expected_runtime_threads - runtime_threads) <= 1, - msg=threads_err) + self.assertLessEqual(runtime_thr_err, allowed_thr_err, msg=threads_err) # Test resources were used as expected - @unittest.skipIf(run_profiler == False, skip_profile_msg) + @unittest.skipIf(run_profiler==False, skip_profile_msg) def test_function_profiling(self): ''' Test runtime profiler correctly records workflow RAM/CPUs consumption @@ -424,19 +425,19 @@ def test_function_profiling(self): # Get margin of error for RAM GB allowed_gb_err = self.mem_err_gb runtime_gb_err = np.abs(runtime_gb-num_gb) - # - expected_runtime_threads = num_threads + # Get margin of error for number of threads + allowed_thr_err = self.num_err_thr + runtime_thr_err = np.abs(runtime_threads-num_threads) # Error message formatting mem_err = 'Input memory: %f is not within %.3f GB of runtime '\ - 'memory: %f' % (num_gb, self.mem_err_gb, runtime_gb) - threads_err = 'Input threads: %d is not equal to runtime threads: %d' \ - % (expected_runtime_threads, runtime_threads) + 'memory: %f' % (num_gb, allowed_gb_err, runtime_gb) + threads_err = 'Input threads: %d is not within $d of runtime threads: %d' \ + % (num_threads, allowed_thr_err) # Assert runtime stats are what was input self.assertLessEqual(runtime_gb_err, allowed_gb_err, msg=mem_err) - self.assertTrue(abs(expected_runtime_threads - runtime_threads) <= 1, - msg=threads_err) + self.assertLessEqual(runtime_thr_err, allowed_thr_err, msg=threads_err) # Command-line run-able unittest module diff --git a/nipype/pipeline/plugins/base.py b/nipype/pipeline/plugins/base.py index 865023e021..a4293e9f87 100644 --- a/nipype/pipeline/plugins/base.py +++ b/nipype/pipeline/plugins/base.py @@ -262,7 +262,6 @@ def run(self, graph, config, updatehash=False): if toappend: self.pending_tasks.extend(toappend) num_jobs = len(self.pending_tasks) - logger.debug('Number of pending tasks: %d' % num_jobs) if num_jobs < self.max_jobs: self._send_procs_to_workers(updatehash=updatehash, graph=graph) diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index bc77296f0b..1c70108006 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -219,9 +219,9 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): jobids = sorted(jobids, key=lambda item: (self.procs[item]._interface.estimated_memory_gb, self.procs[item]._interface.num_threads)) - - logger.debug('Free memory (GB): %d, Free processors: %d', - free_memory_gb, free_processors) + if len(jobids) > 0: + logger.debug('Free memory (GB): %d, Free processors: %d', + free_memory_gb, free_processors) # While have enough memory and processors for first job # Submit first job on the list @@ -304,5 +304,3 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): self.pending_tasks.insert(0, (tid, jobid)) else: break - - logger.debug('No jobs waiting to execute') diff --git a/nipype/utils/draw_gantt_chart.py b/nipype/utils/draw_gantt_chart.py index 9d1ac3b0fd..680ce1f853 100644 --- a/nipype/utils/draw_gantt_chart.py +++ b/nipype/utils/draw_gantt_chart.py @@ -48,11 +48,11 @@ def create_event_dict(start_time, nodes_list): runtime_threads = node.get('runtime_threads', 0) runtime_memory_gb = node.get('runtime_memory_gb', 0.0) - # Init and format event-based nodes - node['estimated_threads'] = estimated_threads - node['estimated_memory_gb'] = estimated_memory_gb - node['runtime_threads'] = runtime_threads - node['runtime_memory_gb'] = runtime_memory_gb + # Init and format event-based nodes + node['estimated_threads'] = estimated_threads + node['estimated_memory_gb'] = estimated_memory_gb + node['runtime_threads'] = runtime_threads + node['runtime_memory_gb'] = runtime_memory_gb start_node = node finish_node = copy.deepcopy(node) start_node['event'] = 'start' @@ -63,11 +63,14 @@ def create_event_dict(start_time, nodes_list): finish_delta = (node['finish'] - start_time).total_seconds() # Populate dictionary - if events.has_key(start_delta) or events.has_key(finish_delta): - err_msg = 'Event logged twice or events started at exact same time!' - raise KeyError(err_msg) - events[start_delta] = start_node - events[finish_delta] = finish_node + if events.has_key(start_delta): + events[start_delta].append(start_node) + else: + events[start_delta] = [start_node] + if events.has_key(finish_delta): + events[finish_delta].append(finish_node) + else: + events[finish_delta] = [finish_node] # Return events dictionary return events @@ -178,15 +181,16 @@ def calculate_resource_timeseries(events, resource): all_res = 0.0 # Iterate through the events - for tdelta, event in sorted(events.items()): - if event['event'] == "start": - if resource in event and event[resource] != 'Unknown': - all_res += float(event[resource]) - current_time = event['start']; - elif event['event'] == "finish": - if resource in event and event[resource] != 'Unknown': - all_res -= float(event[resource]) - current_time = event['finish']; + for tdelta, events in sorted(events.items()): + for event in events: + if event['event'] == "start": + if resource in event and event[resource] != 'Unknown': + all_res += float(event[resource]) + current_time = event['start'] + elif event['event'] == "finish": + if resource in event and event[resource] != 'Unknown': + all_res -= float(event[resource]) + current_time = event['finish'] res[current_time] = all_res # Formulate the pandas timeseries @@ -341,6 +345,7 @@ def draw_nodes(start, nodes_list, cores, minute_scale, space_between_minutes, # Return html string for nodes return result + def draw_resource_bar(start_time, finish_time, time_series, space_between_minutes, minute_scale, color, left, resource): ''' diff --git a/tools/run_examples.py b/tools/run_examples.py index 2308951426..5941ea58ce 100644 --- a/tools/run_examples.py +++ b/tools/run_examples.py @@ -1,13 +1,20 @@ from __future__ import print_function import os import sys -from shutil import rmtree +from shutil import rmtree, copyfile from multiprocessing import cpu_count def run_examples(example, pipelines, data_path, plugin=None): + ''' + Run example workflows + ''' + + # Import packages from nipype import config from nipype.interfaces.base import CommandLine + from nipype.utils import draw_gantt_chart + from nipype.pipeline.plugins import log_nodes_cb if plugin is None: plugin = 'MultiProc' @@ -22,7 +29,9 @@ def run_examples(example, pipelines, data_path, plugin=None): plugin_args['n_procs'] = cpu_count() __import__(example) + for pipeline in pipelines: + # Init and run workflow wf = getattr(sys.modules[example], pipeline) wf.base_dir = os.path.join(os.getcwd(), 'output', example, plugin) if os.path.exists(wf.base_dir): @@ -35,16 +44,39 @@ def run_examples(example, pipelines, data_path, plugin=None): os.makedirs(log_dir) wf.config = {'execution': {'hash_method': 'timestamp', 'stop_on_first_rerun': 'true', - 'write_provenance': 'true'}, - 'logging': {'log_directory': log_dir, 'log_to_file': True}} + 'write_provenance': 'true'}} + + # Callback log setup + if example == 'fmri_spm_nested' and plugin == 'MultiProc' and \ + pipeline == 'l2pipeline': + # Init callback log + import logging + cb_log_path = os.path.join(os.path.expanduser('~'), 'callback.log') + cb_logger = logging.getLogger('callback') + cb_logger.setLevel(logging.DEBUG) + handler = logging.FileHandler(cb_log_path) + cb_logger.addHandler(handler) + plugin_args = {'n_procs' : 4, 'status_callback' : log_nodes_cb} + else: + plugin_args = {'n_procs' : 4} try: wf.inputs.inputnode.in_data = os.path.abspath(data_path) except AttributeError: pass # the workflow does not have inputnode.in_data wf.run(plugin=plugin, plugin_args=plugin_args) - # run twice to check if nothing is rerunning - wf.run(plugin=plugin) + + # Draw gantt chart only if pandas is installed + try: + import pandas + pandas_flg = True + except ImportError as exc: + pandas_flg = False + + if plugin_args.has_key('status_callback') and pandas_flg: + draw_gantt_chart.generate_gantt_chart(cb_log_path, 4) + dst_log_html = os.path.join(os.path.expanduser('~'), 'callback.log.html') + copyfile(cb_log_path+'.html', dst_log_html) if __name__ == '__main__': path, file = os.path.split(__file__)