Skip to content

Commit

Permalink
fix: set OMP_NUM_THREADS=1 for master thread
Browse files Browse the repository at this point in the history
  • Loading branch information
oesteban committed Apr 27, 2022
1 parent 3b3e5b0 commit e4bdd4e
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 15 deletions.
8 changes: 5 additions & 3 deletions mriqc/cli/run.py
Expand Up @@ -29,9 +29,12 @@ def main():
import os
import sys
from tempfile import mktemp
import atexit
from mriqc import config, messages
from mriqc.cli.parser import parse_args

atexit.register(config.restore_env)

# Run parser
parse_args()

Expand Down Expand Up @@ -60,14 +63,13 @@ def main():
from concurrent.futures import ProcessPoolExecutor

with suppress(RuntimeError):
mp.set_start_method("forkserver")
mp.forkserver.ensure_running()
mp.set_start_method("fork")
gc.collect()

_pool = ProcessPoolExecutor(
max_workers=config.nipype.nprocs,
initializer=config._process_initializer,
initargs=(config.execution.cwd,),
initargs=(config.execution.cwd, config.nipype.omp_nthreads),
)

_resmon = None
Expand Down
22 changes: 20 additions & 2 deletions mriqc/config.py
Expand Up @@ -103,6 +103,11 @@
from mriqc._warnings import logging

__version__ = get_version("mriqc")
_pre_exec_env = dict(os.environ)

# Reduce numpy's vms by limiting OMP_NUM_THREADS
_default_omp_threads = int(os.getenv("OMP_NUM_THREADS", os.cpu_count()))
os.environ["OMP_NUM_THREADS"] = f"{min(1, _default_omp_threads)}"

# Disable NiPype etelemetry always
_disable_et = bool(
Expand Down Expand Up @@ -273,6 +278,8 @@ class environment(_Config):
"""Total memory available, in GB."""
version = __version__
"""*MRIQC*'s version."""
_pre_mriqc = _pre_exec_env
"""Environment variables before MRIQC's execution."""


class nipype(_Config):
Expand All @@ -288,7 +295,7 @@ class nipype(_Config):
"""Estimation in GB of the RAM this workflow can allocate at any given time."""
nprocs = os.cpu_count()
"""Number of processes (compute tasks) that can be run in parallel (multiprocessing only)."""
omp_nthreads = int(os.getenv("OMP_NUM_THREADS", os.cpu_count()))
omp_nthreads = _default_omp_threads
"""Number of CPUs a single process can access for multithreaded execution."""
plugin = "MultiProc"
"""NiPype's execution plugin."""
Expand Down Expand Up @@ -634,7 +641,18 @@ def to_filename(filename):
filename.write_text(dumps())


def _process_initializer(cwd):
def _process_initializer(cwd, omp_nthreads):
"""Initialize the environment of the child process."""
os.chdir(cwd)
os.environ["NIPYPE_NO_ET"] = "1"
os.environ["OMP_NUM_THREADS"] = f"{omp_nthreads}"


def restore_env():
"""Restore the original environment."""

for k in os.environ.keys():
del os.environ[k]

for k, v in environment._pre_mriqc.items():
os.environ[k] = v
10 changes: 2 additions & 8 deletions mriqc/engine/plugin.py
Expand Up @@ -66,12 +66,6 @@ def run_node(node, updatehash, taskid):
return result


def process_initializer(cwd):
"""Initialize the environment of the child process."""
os.chdir(cwd)
os.environ["NIPYPE_NO_ET"] = "1"


class PluginBase:
"""Base class for plugins."""

Expand Down Expand Up @@ -446,8 +440,8 @@ def __init__(self, pool=None, plugin_args=None):
mp_context = mp.get_context(self.plugin_args.get("mp_context"))
self.pool = pool or ProcessPoolExecutor(
max_workers=self.processors,
initializer=process_initializer,
initargs=(self._cwd,),
initializer=config._process_initializer,
initargs=(config.execution.cwd, config.nipype.omp_nthreads),
mp_context=mp_context,
)

Expand Down
4 changes: 2 additions & 2 deletions mriqc/instrumentation/resources.py
Expand Up @@ -189,9 +189,9 @@ def run(self, *args, **kwargs):
wait_til += self._freq_ns
sleep(max(0, (wait_til - time_ns()) / 1.0e9))

self._logfile.close()
_logfile.close()

def stop(self):
def stop(self, *args):
# Tear-down process
self._done.set()
with Path(self._logfile).open("a") as f:
Expand Down

0 comments on commit e4bdd4e

Please sign in to comment.