Skip to content

Commit

Permalink
enh: incorporate new custom plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
oesteban committed Apr 26, 2022
1 parent 1f6bab4 commit 41d7f6b
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 7 deletions.
18 changes: 13 additions & 5 deletions mriqc/cli/run.py
Expand Up @@ -52,21 +52,23 @@ def main():

# Set up participant level
if "participant" in config.workflow.analysis_level:
_plugin = config.nipype.get_plugin()
_pool = None
if config.nipype.plugin in ("MultiProc", "LegacyMultiProc"):
from contextlib import suppress
import multiprocessing as mp
import multiprocessing.forkserver
from mriqc.engine.plugin import MultiProcPlugin
from concurrent.futures import ProcessPoolExecutor

with suppress(RuntimeError):
mp.set_start_method("forkserver")
mp.forkserver.ensure_running()
gc.collect()

_plugin = {
"plugin": MultiProcPlugin(plugin_args=config.nipype.plugin_args),
}
_pool = ProcessPoolExecutor(
max_workers=config.nipype.nprocs,
initializer=config._process_initializer,
initargs=(config.execution.cwd,),
)

_resmon = None
if config.execution.resource_monitor:
Expand Down Expand Up @@ -138,6 +140,12 @@ def main():
# Clean up master process before running workflow, which may create forks
gc.collect()
# run MRIQC
_plugin = config.nipype.get_plugin()
if _pool:
from mriqc.engine.plugin import MultiProcPlugin
_plugin = {
"plugin": MultiProcPlugin(pool=_pool, plugin_args=config.nipype.plugin_args),
}
mriqc_wf.run(**_plugin)

# Warn about submitting measures AFTER
Expand Down
8 changes: 8 additions & 0 deletions mriqc/config.py
Expand Up @@ -346,6 +346,8 @@ class execution(_Config):
"""Path to the directory containing SQLite database indices for the input BIDS dataset."""
bids_description_hash = None
"""Checksum (SHA256) of the ``dataset_description.json`` of the BIDS dataset."""
cwd = os.getcwd()
"""Current working directory."""
debug = False
"""Run in sloppy mode (meaning, suboptimal parameters that minimize run-time)."""
dry_run = False
Expand Down Expand Up @@ -630,3 +632,9 @@ def to_filename(filename):
filename = Path(filename)
filename.parent.mkdir(exist_ok=True, parents=True)
filename.write_text(dumps())


def _process_initializer(cwd):
"""Initialize the environment of the child process."""
os.chdir(cwd)
os.environ["NIPYPE_NO_ET"] = "1"
4 changes: 2 additions & 2 deletions mriqc/engine/plugin.py
Expand Up @@ -418,7 +418,7 @@ class MultiProcPlugin(DistributedPluginBase):
- mp_context: name of multiprocessing context to use
"""

def __init__(self, plugin_args=None):
def __init__(self, pool=None, plugin_args=None):
"""Initialize the plugin."""
from mriqc import config

Expand All @@ -441,7 +441,7 @@ def __init__(self, plugin_args=None):

# Instantiate different thread pools for non-daemon processes
mp_context = mp.get_context(self.plugin_args.get("mp_context"))
self.pool = ProcessPoolExecutor(
self.pool = pool or ProcessPoolExecutor(
max_workers=self.processors,
initializer=process_initializer,
initargs=(self._cwd,),
Expand Down

0 comments on commit 41d7f6b

Please sign in to comment.