From 3ff01a2d2a3d8e377ad8f060e1df7021662bcc99 Mon Sep 17 00:00:00 2001 From: dclark87 Date: Thu, 12 May 2016 15:53:39 -0400 Subject: [PATCH 01/10] Added compatibility for same time delta nodes --- nipype/utils/draw_gantt_chart.py | 41 ++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/nipype/utils/draw_gantt_chart.py b/nipype/utils/draw_gantt_chart.py index d4fdc88830..232f7b6be8 100644 --- a/nipype/utils/draw_gantt_chart.py +++ b/nipype/utils/draw_gantt_chart.py @@ -49,10 +49,10 @@ def create_event_dict(start_time, nodes_list): runtime_memory_gb = node.get('runtime_memory_gb', 0.0) # Init and format event-based nodes - node['estimated_threads'] = estimated_threads - node['estimated_memory_gb'] = estimated_memory_gb - node['runtime_threads'] = runtime_threads - node['runtime_memory_gb'] = runtime_memory_gb + node['estimated_threads'] = estimated_threads + node['estimated_memory_gb'] = estimated_memory_gb + node['runtime_threads'] = runtime_threads + node['runtime_memory_gb'] = runtime_memory_gb start_node = node finish_node = copy.deepcopy(node) start_node['event'] = 'start' @@ -63,11 +63,14 @@ def create_event_dict(start_time, nodes_list): finish_delta = (node['finish'] - start_time).total_seconds() # Populate dictionary - if events.has_key(start_delta) or events.has_key(finish_delta): - err_msg = 'Event logged twice or events started at exact same time!' - raise KeyError(err_msg) - events[start_delta] = start_node - events[finish_delta] = finish_node + if events.has_key(start_delta): + events[start_delta].append(start_node) + else: + events[start_delta] = [start_node] + if events.has_key(finish_delta): + events[finish_delta].append(finish_node) + else: + events[finish_delta] = [finish_node] # Return events dictionary return events @@ -178,15 +181,16 @@ def calculate_resource_timeseries(events, resource): all_res = 0.0 # Iterate through the events - for tdelta, event in sorted(events.items()): - if event['event'] == "start": - if resource in event and event[resource] != 'Unknown': - all_res += float(event[resource]) - current_time = event['start']; - elif event['event'] == "finish": - if resource in event and event[resource] != 'Unknown': - all_res -= float(event[resource]) - current_time = event['finish']; + for tdelta, events in sorted(events.items()): + for event in events: + if event['event'] == "start": + if resource in event and event[resource] != 'Unknown': + all_res += float(event[resource]) + current_time = event['start'] + elif event['event'] == "finish": + if resource in event and event[resource] != 'Unknown': + all_res -= float(event[resource]) + current_time = event['finish'] res[current_time] = all_res # Formulate the pandas timeseries @@ -341,6 +345,7 @@ def draw_nodes(start, nodes_list, cores, minute_scale, space_between_minutes, # Return html string for nodes return result + def draw_resource_bar(start_time, finish_time, time_series, space_between_minutes, minute_scale, color, left, resource): ''' From 395b60ac3e3cd5ca67308563852bf0be9a5ff1a4 Mon Sep 17 00:00:00 2001 From: dclark87 Date: Tue, 17 May 2016 17:45:55 -0400 Subject: [PATCH 02/10] Added smoke test for draw gantt chart --- circle.yml | 6 ++++++ tools/run_examples.py | 40 ++++++++++++++++++++++++++++++++++++---- 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/circle.yml b/circle.yml index e9c59276a2..82089b39a3 100644 --- a/circle.yml +++ b/circle.yml @@ -69,6 +69,11 @@ test: environment: SPMMCRCMD: "$HOME/spm12/run_spm12.sh $HOME/mcr/v85/ script" FORCE_SPMMCR: 1 + - source $HOME/.profile; python ~/nipype/tools/run_examples.py fmri_spm_nested MultiProc l2pipeline: + pwd: ../examples + environment: + SPMMCRCMD: "$HOME/spm12/run_spm12.sh $HOME/mcr/v85/ script" + FORCE_SPMMCR: 1 general: artifacts: @@ -76,3 +81,4 @@ general: - "~/log.txt" - "nosetests.xml" - "coverage.xml" + - "~/callback.log.html" diff --git a/tools/run_examples.py b/tools/run_examples.py index ef8445bdb2..ddc555bc6c 100644 --- a/tools/run_examples.py +++ b/tools/run_examples.py @@ -1,19 +1,30 @@ from __future__ import print_function import os import sys -from shutil import rmtree +from shutil import rmtree, copyfile def run_examples(example, pipelines, plugin): - print('running example: %s with plugin: %s' % (example, plugin)) + ''' + Run example workflows + ''' + + # Import packages from nipype import config + from nipype.interfaces.base import CommandLine + from nipype.utils import draw_gantt_chart + from nipype.pipeline.plugins import log_nodes_cb + + print('running example: %s with plugin: %s' % (example, plugin)) + config.enable_debug_mode() config.enable_provenance() - from nipype.interfaces.base import CommandLine CommandLine.set_default_terminal_output("stream") __import__(example) + for pipeline in pipelines: + # Init and run workflow wf = getattr(sys.modules[example], pipeline) wf.base_dir = os.path.join(os.getcwd(), 'output', example, plugin) if os.path.exists(wf.base_dir): @@ -21,10 +32,31 @@ def run_examples(example, pipelines, plugin): wf.config = {'execution': {'hash_method': 'timestamp', 'stop_on_first_rerun': 'true', 'write_provenance': 'true'}} - wf.run(plugin=plugin, plugin_args={'n_procs': 4}) + + # Callback log setup + if example == 'fmri_spm_nested' and plugin == 'MultiProc' and \ + pipeline == 'l2pipeline': + # Init callback log + import logging + cb_log_path = os.path.join(wf.base_dir, 'callback.log') + cb_logger = logging.getLogger('callback') + cb_logger.setLevel(logging.DEBUG) + handler = logging.FileHandler(cb_log_path) + cb_logger.addHandler(handler) + plugin_args = {'n_procs' : 4, 'status_callback' : log_nodes_cb} + else: + plugin_args = {'n_procs' : 4} + + wf.run(plugin=plugin, plugin_args=plugin_args) # run twice to check if nothing is rerunning wf.run(plugin=plugin) + # Draw gantt chart + if plugin_args.has_key('status_callback'): + draw_gantt_chart.generate_gantt_chart(cb_log_path, 4) + dst_log_html = os.path.join(os.path.expanduser('~'), 'callback.log.html') + copyfile(cb_log_path+'.html', dst_log_html) + if __name__ == '__main__': path, file = os.path.split(__file__) sys.path.insert(0, os.path.realpath(os.path.join(path, '..', 'examples'))) From 220b9afa716ea2d65595b04a60ce71b9e4e4c178 Mon Sep 17 00:00:00 2001 From: dclark87 Date: Mon, 6 Jun 2016 22:42:58 -0400 Subject: [PATCH 03/10] Should fix issue mentioned https://github.com/nipy/nipype/issues/1488 and CircleCI test --- nipype/interfaces/base.py | 3 +++ tools/run_examples.py | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/nipype/interfaces/base.py b/nipype/interfaces/base.py index 16b1c95293..818d49bda9 100644 --- a/nipype/interfaces/base.py +++ b/nipype/interfaces/base.py @@ -1333,6 +1333,9 @@ def get_max_resources_used(pid, mem_mb, num_threads, pyfunc=False): try: mem_mb = max(mem_mb, _get_ram_mb(pid, pyfunc=pyfunc)) num_threads = max(num_threads, _get_num_threads(psutil.Process(pid))) + except psutil.AccessDenied as exc: + iflogger.debug('Could not get resources used by process. Error: %s'\ + % exc) except Exception as exc: iflogger.info('Could not get resources used by process. Error: %s'\ % exc) diff --git a/tools/run_examples.py b/tools/run_examples.py index ddc555bc6c..01bc151c4e 100644 --- a/tools/run_examples.py +++ b/tools/run_examples.py @@ -38,7 +38,7 @@ def run_examples(example, pipelines, plugin): pipeline == 'l2pipeline': # Init callback log import logging - cb_log_path = os.path.join(wf.base_dir, 'callback.log') + cb_log_path = os.path.join(os.path.expanduser('~'), 'callback.log') cb_logger = logging.getLogger('callback') cb_logger.setLevel(logging.DEBUG) handler = logging.FileHandler(cb_log_path) From cff9393d026026de11823cb53b1a3915105a3dd6 Mon Sep 17 00:00:00 2001 From: Daniel Clark Date: Fri, 17 Jun 2016 14:52:17 -0400 Subject: [PATCH 04/10] added tolerance for num threads --- .../interfaces/tests/test_runtime_profiler.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/nipype/interfaces/tests/test_runtime_profiler.py b/nipype/interfaces/tests/test_runtime_profiler.py index b9adfe5c19..61f64064f9 100644 --- a/nipype/interfaces/tests/test_runtime_profiler.py +++ b/nipype/interfaces/tests/test_runtime_profiler.py @@ -135,6 +135,7 @@ def setUp(self): self.num_threads = 2 # Acceptable percent error for memory profiled against input self.mem_err_gb = 0.25 + self.num_err_thr = 1 # ! Only used for benchmarking the profiler over a range of # ! RAM usage and number of threads @@ -355,7 +356,7 @@ def _run_function_workflow(self, num_gb, num_threads): return start_str, finish_str # Test resources were used as expected in cmdline interface - @unittest.skipIf(run_profiler == False, skip_profile_msg) + @unittest.skipIf(run_profiler==False, skip_profile_msg) def test_cmdline_profiling(self): ''' Test runtime profiler correctly records workflow RAM/CPUs consumption @@ -382,21 +383,22 @@ def test_cmdline_profiling(self): # Get margin of error for RAM GB allowed_gb_err = self.mem_err_gb runtime_gb_err = np.abs(runtime_gb-num_gb) - # - expected_runtime_threads = num_threads + # Get margin of error for number of threads + allowed_thr_err = self.num_err_thr + runtime_thr_err = np.abs(runtime_threads-num_threads) # Error message formatting mem_err = 'Input memory: %f is not within %.3f GB of runtime '\ - 'memory: %f' % (num_gb, self.mem_err_gb, runtime_gb) - threads_err = 'Input threads: %d is not equal to runtime threads: %d' \ - % (expected_runtime_threads, runtime_threads) + 'memory: %f' % (num_gb, allowed_gb_err, runtime_gb) + threads_err = 'Input threads: %d is not within $d of runtime threads: %d' \ + % (num_threads, allowed_thr_err) # Assert runtime stats are what was input self.assertLessEqual(runtime_gb_err, allowed_gb_err, msg=mem_err) - self.assertEqual(expected_runtime_threads, runtime_threads, msg=threads_err) + self.assertLessEqual(runtime_thr_err, allowed_thr_err, msg=threads_err) # Test resources were used as expected - @unittest.skipIf(run_profiler == False, skip_profile_msg) + @unittest.skipIf(run_profiler==False, skip_profile_msg) def test_function_profiling(self): ''' Test runtime profiler correctly records workflow RAM/CPUs consumption From 35f2d477c47f7038c4e0f6b61e2e089d659c0ebe Mon Sep 17 00:00:00 2001 From: Daniel Clark Date: Fri, 17 Jun 2016 14:53:21 -0400 Subject: [PATCH 05/10] Increased memory tolerance --- nipype/interfaces/tests/test_runtime_profiler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nipype/interfaces/tests/test_runtime_profiler.py b/nipype/interfaces/tests/test_runtime_profiler.py index 61f64064f9..09c6c67831 100644 --- a/nipype/interfaces/tests/test_runtime_profiler.py +++ b/nipype/interfaces/tests/test_runtime_profiler.py @@ -134,7 +134,7 @@ def setUp(self): # Input number of sub-threads (not including parent threads) self.num_threads = 2 # Acceptable percent error for memory profiled against input - self.mem_err_gb = 0.25 + self.mem_err_gb = 0.4 self.num_err_thr = 1 # ! Only used for benchmarking the profiler over a range of From f9c10926fc1145f2aa0b0c94cb7fc58e5af20ade Mon Sep 17 00:00:00 2001 From: Daniel Clark Date: Fri, 17 Jun 2016 14:56:32 -0400 Subject: [PATCH 06/10] Added num threads tolerance to both methods --- nipype/interfaces/tests/test_runtime_profiler.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/nipype/interfaces/tests/test_runtime_profiler.py b/nipype/interfaces/tests/test_runtime_profiler.py index 09c6c67831..80a08fa16e 100644 --- a/nipype/interfaces/tests/test_runtime_profiler.py +++ b/nipype/interfaces/tests/test_runtime_profiler.py @@ -425,18 +425,19 @@ def test_function_profiling(self): # Get margin of error for RAM GB allowed_gb_err = self.mem_err_gb runtime_gb_err = np.abs(runtime_gb-num_gb) - # - expected_runtime_threads = num_threads + # Get margin of error for number of threads + allowed_thr_err = self.num_err_thr + runtime_thr_err = np.abs(runtime_threads-num_threads) # Error message formatting mem_err = 'Input memory: %f is not within %.3f GB of runtime '\ - 'memory: %f' % (num_gb, self.mem_err_gb, runtime_gb) - threads_err = 'Input threads: %d is not equal to runtime threads: %d' \ - % (expected_runtime_threads, runtime_threads) + 'memory: %f' % (num_gb, allowed_gb_err, runtime_gb) + threads_err = 'Input threads: %d is not within $d of runtime threads: %d' \ + % (num_threads, allowed_thr_err) # Assert runtime stats are what was input self.assertLessEqual(runtime_gb_err, allowed_gb_err, msg=mem_err) - self.assertEqual(expected_runtime_threads, runtime_threads, msg=threads_err) + self.assertLessEqual(runtime_thr_err, allowed_thr_err, msg=threads_err) # Command-line run-able unittest module From c898148202fb04ffe8733d7fe8147b6c2e7ad588 Mon Sep 17 00:00:00 2001 From: dclark87 Date: Sun, 26 Jun 2016 16:06:39 -0400 Subject: [PATCH 07/10] Debug statements quantity of free resources should be reduced --- nipype/pipeline/plugins/multiproc.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index d2a7f7f9b9..9fb09286ca 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -219,9 +219,9 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): jobids = sorted(jobids, key=lambda item: (self.procs[item]._interface.estimated_memory_gb, self.procs[item]._interface.num_threads)) - - logger.debug('Free memory (GB): %d, Free processors: %d', - free_memory_gb, free_processors) + if len(jobids) > 0: + logger.debug('Free memory (GB): %d, Free processors: %d', + free_memory_gb, free_processors) # While have enough memory and processors for first job # Submit first job on the list From 5c276b52c845da163ea3f05466dbba22f7284fa8 Mon Sep 17 00:00:00 2001 From: dclark87 Date: Sat, 9 Jul 2016 10:37:43 -0400 Subject: [PATCH 08/10] Removed more debug statements --- nipype/pipeline/plugins/base.py | 1 - nipype/pipeline/plugins/multiproc.py | 2 -- 2 files changed, 3 deletions(-) diff --git a/nipype/pipeline/plugins/base.py b/nipype/pipeline/plugins/base.py index 9e73f1b048..868de3726b 100644 --- a/nipype/pipeline/plugins/base.py +++ b/nipype/pipeline/plugins/base.py @@ -260,7 +260,6 @@ def run(self, graph, config, updatehash=False): if toappend: self.pending_tasks.extend(toappend) num_jobs = len(self.pending_tasks) - logger.debug('Number of pending tasks: %d' % num_jobs) if num_jobs < self.max_jobs: self._send_procs_to_workers(updatehash=updatehash, graph=graph) diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index 9fb09286ca..b234ce20b3 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -304,5 +304,3 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): self.pending_tasks.insert(0, (tid, jobid)) else: break - - logger.debug('No jobs waiting to execute') From 2c2209360ec6b670f6923b810ff4d5d8b0f8c2ce Mon Sep 17 00:00:00 2001 From: dclark87 Date: Tue, 26 Jul 2016 13:38:13 -0400 Subject: [PATCH 09/10] Added flag check for pandas --- tools/run_examples.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tools/run_examples.py b/tools/run_examples.py index 01bc151c4e..0425335773 100644 --- a/tools/run_examples.py +++ b/tools/run_examples.py @@ -51,8 +51,14 @@ def run_examples(example, pipelines, plugin): # run twice to check if nothing is rerunning wf.run(plugin=plugin) - # Draw gantt chart - if plugin_args.has_key('status_callback'): + # Draw gantt chart only if pandas is installed + try: + import pandas + pandas_flg = True + except ImportError as exc: + pandas_flg = False + + if plugin_args.has_key('status_callback') and pandas_flg: draw_gantt_chart.generate_gantt_chart(cb_log_path, 4) dst_log_html = os.path.join(os.path.expanduser('~'), 'callback.log.html') copyfile(cb_log_path+'.html', dst_log_html) From ba9084204d381ecfe5664822408cc8909f2a7e81 Mon Sep 17 00:00:00 2001 From: dclark87 Date: Mon, 1 Aug 2016 14:18:39 -0400 Subject: [PATCH 10/10] Resolved git diffs --- circle.yml | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/circle.yml b/circle.yml index ebfe64dd31..1c3bf92171 100644 --- a/circle.yml +++ b/circle.yml @@ -23,28 +23,6 @@ test: timeout: 1600 - docker run -i -v /etc/localtime:/etc/localtime:ro -v ~/scratch:/scratch -w /scratch nipype/testbench test_spm Linear /root/examples/ workflow4d : timeout: 1600 -<<<<<<< HEAD - - source $HOME/.profile; python ~/nipype/tools/run_examples.py fmri_fsl_reuse Linear level1_workflow: - pwd: ../examples - - source $HOME/.profile; python ~/nipype/tools/run_examples.py fmri_spm_nested Linear level1 l2pipeline: - pwd: ../examples - environment: - SPMMCRCMD: "$HOME/spm12/run_spm12.sh $HOME/mcr/v85/ script" - FORCE_SPMMCR: 1 - - source $HOME/.profile; python ~/nipype/tools/run_examples.py fmri_spm_nested MultiProc l2pipeline: - pwd: ../examples - environment: - SPMMCRCMD: "$HOME/spm12/run_spm12.sh $HOME/mcr/v85/ script" - FORCE_SPMMCR: 1 - -general: - artifacts: - - "doc/_build/html" - - "~/log.txt" - - "nosetests.xml" - - "coverage.xml" - - "~/callback.log.html" -======= - docker run -i -v /etc/localtime:/etc/localtime:ro -v ~/scratch:/scratch -w /scratch nipype/testbench fmri_fsl_feeds Linear /root/examples/ l1pipeline - docker run -i -v /etc/localtime:/etc/localtime:ro -v ~/scratch:/scratch -w /scratch nipype/testbench fmri_spm_dartel Linear /root/examples/ level1 : timeout: 1600 @@ -66,4 +44,3 @@ general: - "~/nosetests.xml" - "~/builddocs.log" - "~/scratch" ->>>>>>> 3ed411b4d97b3d688b6bad452500a85f1f9af54c