From 41d7f6bca832f657417dc93f0c1ab2c8a0afcc4b Mon Sep 17 00:00:00 2001 From: Oscar Esteban Date: Tue, 26 Apr 2022 08:42:28 +0200 Subject: [PATCH] enh: incorporate new custom plugin --- mriqc/cli/run.py | 18 +++++++++++++----- mriqc/config.py | 8 ++++++++ mriqc/engine/plugin.py | 4 ++-- 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/mriqc/cli/run.py b/mriqc/cli/run.py index 6d8cef8ed..7c46aa69c 100644 --- a/mriqc/cli/run.py +++ b/mriqc/cli/run.py @@ -52,21 +52,23 @@ def main(): # Set up participant level if "participant" in config.workflow.analysis_level: - _plugin = config.nipype.get_plugin() + _pool = None if config.nipype.plugin in ("MultiProc", "LegacyMultiProc"): from contextlib import suppress import multiprocessing as mp import multiprocessing.forkserver - from mriqc.engine.plugin import MultiProcPlugin + from concurrent.futures import ProcessPoolExecutor with suppress(RuntimeError): mp.set_start_method("forkserver") mp.forkserver.ensure_running() gc.collect() - _plugin = { - "plugin": MultiProcPlugin(plugin_args=config.nipype.plugin_args), - } + _pool = ProcessPoolExecutor( + max_workers=config.nipype.nprocs, + initializer=config._process_initializer, + initargs=(config.execution.cwd,), + ) _resmon = None if config.execution.resource_monitor: @@ -138,6 +140,12 @@ def main(): # Clean up master process before running workflow, which may create forks gc.collect() # run MRIQC + _plugin = config.nipype.get_plugin() + if _pool: + from mriqc.engine.plugin import MultiProcPlugin + _plugin = { + "plugin": MultiProcPlugin(pool=_pool, plugin_args=config.nipype.plugin_args), + } mriqc_wf.run(**_plugin) # Warn about submitting measures AFTER diff --git a/mriqc/config.py b/mriqc/config.py index 0a0d2deac..38af2ccc0 100644 --- a/mriqc/config.py +++ b/mriqc/config.py @@ -346,6 +346,8 @@ class execution(_Config): """Path to the directory containing SQLite database indices for the input BIDS dataset.""" bids_description_hash = None """Checksum (SHA256) of the ``dataset_description.json`` of the BIDS dataset.""" + cwd = os.getcwd() + """Current working directory.""" debug = False """Run in sloppy mode (meaning, suboptimal parameters that minimize run-time).""" dry_run = False @@ -630,3 +632,9 @@ def to_filename(filename): filename = Path(filename) filename.parent.mkdir(exist_ok=True, parents=True) filename.write_text(dumps()) + + +def _process_initializer(cwd): + """Initialize the environment of the child process.""" + os.chdir(cwd) + os.environ["NIPYPE_NO_ET"] = "1" diff --git a/mriqc/engine/plugin.py b/mriqc/engine/plugin.py index e6295dd36..a812554fa 100644 --- a/mriqc/engine/plugin.py +++ b/mriqc/engine/plugin.py @@ -418,7 +418,7 @@ class MultiProcPlugin(DistributedPluginBase): - mp_context: name of multiprocessing context to use """ - def __init__(self, plugin_args=None): + def __init__(self, pool=None, plugin_args=None): """Initialize the plugin.""" from mriqc import config @@ -441,7 +441,7 @@ def __init__(self, plugin_args=None): # Instantiate different thread pools for non-daemon processes mp_context = mp.get_context(self.plugin_args.get("mp_context")) - self.pool = ProcessPoolExecutor( + self.pool = pool or ProcessPoolExecutor( max_workers=self.processors, initializer=process_initializer, initargs=(self._cwd,),