Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Add instrumentation to monitor resources #984

Merged
merged 15 commits into from Apr 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
40 changes: 40 additions & 0 deletions .circleci/config.yml
Expand Up @@ -363,6 +363,14 @@ jobs:
keys:
- t1w-v3-{{ epoch }}
- t1w-v3-

- run:
name: Remove old, cached configs
command: |
rm -f /tmp/t1w/work/.mriqc.*.toml
rm -f /tmp/t1w/work/.resources.*.tsv
rm -f /tmp/t1w/work/.resources.*.png

- run:
name: Run participant-level on T1w images
no_output_timeout: 2h
Expand All @@ -375,8 +383,21 @@ jobs:
nipreps/mriqc:latest \
/data derivatives/ participant \
-vv --verbose-reports --profile -m T1w --dsname circletests \
--resource-monitor \
--n_procs 2 --ants-nthreads 1 --ants-float \
--webapi-url http://$( hostname -I | awk '{print $1}' )/api/v1 --upload-strict

- run:
name: Move temporary but relevant artifacts
command: |
mkdir /tmp/t1w/misc
mv /tmp/t1w/work/.mriqc.*.toml /tmp/t1w/misc
mv /tmp/t1w/work/.resources.*.tsv /tmp/t1w/misc
mv /tmp/t1w/work/.resources.*.png /tmp/t1w/misc

- store_artifacts:
path: /tmp/t1w/misc

- save_cache:
key: t1w-v3-{{ epoch }}
paths:
Expand Down Expand Up @@ -502,6 +523,13 @@ jobs:
- bold-v2-{{ epoch }}
- bold-v2-

- run:
name: Remove old, cached configs
command: |
rm -f /tmp/bold/work/.mriqc.*.toml
rm -f /tmp/bold/work/.resources.*.tsv
rm -f /tmp/bold/work/.resources.*.png

- run:
name: Run participant-level on BOLD images
no_output_timeout: 2h
Expand All @@ -514,8 +542,20 @@ jobs:
/data derivatives/ participant \
-vv --verbose-reports --profile -m bold --dsname circletests \
--n_procs 2 --ants-nthreads 1 --ants-float \
--resource-monitor \
--testing --ica \
--webapi-url http://$( hostname -I | awk '{print $1}' )/api/v1 --upload-strict
- run:
name: Move temporary but relevant artifacts
command: |
mkdir /tmp/bold/misc
mv /tmp/bold/work/.mriqc.*.toml /tmp/bold/misc
mv /tmp/bold/work/.resources.*.tsv /tmp/bold/misc
mv /tmp/bold/work/.resources.*.png /tmp/bold/misc

- store_artifacts:
path: /tmp/bold/misc

- save_cache:
key: bold-v2-{{ epoch }}
paths:
Expand Down
12 changes: 0 additions & 12 deletions mriqc/cli/parser.py
Expand Up @@ -416,7 +416,6 @@ def parse_args(args=None, namespace=None):
opts = parser.parse_args(args, namespace)
config.execution.log_level = int(max(25 - 5 * opts.verbose_count, DEBUG))
config.from_dict(vars(opts))
config.loggers.init()

# Load base plugin_settings from file if --use-plugin
if opts.use_plugin is not None:
Expand All @@ -432,17 +431,6 @@ def parse_args(args=None, namespace=None):
"nprocs", config.nipype.nprocs
)

# Resource management options
# Note that we're making strong assumptions about valid plugin args
# This may need to be revisited if people try to use batch plugins
if 1 < config.nipype.nprocs < config.nipype.omp_nthreads:
config.loggers.cli.warning(
"Per-process threads (--omp-nthreads=%d) exceed total "
"threads (--nthreads/--n_cpus=%d)",
config.nipype.omp_nthreads,
config.nipype.nprocs,
)

bids_dir = config.execution.bids_dir
output_dir = config.execution.output_dir
work_dir = config.execution.work_dir
Expand Down
129 changes: 103 additions & 26 deletions mriqc/cli/run.py
Expand Up @@ -29,28 +29,15 @@ def main():
import os
import sys
from tempfile import mktemp
import atexit
from mriqc import config, messages
from mriqc.cli.parser import parse_args

atexit.register(config.restore_env)

# Run parser
parse_args()

_plugin = config.nipype.get_plugin()
if config.nipype.plugin in ("MultiProc", "LegacyMultiProc"):
from contextlib import suppress
import multiprocessing as mp
import multiprocessing.forkserver
from mriqc.engine.plugin import MultiProcPlugin

with suppress(RuntimeError):
mp.set_start_method("forkserver")
mp.forkserver.ensure_running()

gc.collect()
_plugin = {
"plugin": MultiProcPlugin(plugin_args=config.nipype.plugin_args),
}

if config.execution.pdb:
from mriqc.utils.debug import setup_exceptionhook

Expand All @@ -68,16 +55,83 @@ def main():

# Set up participant level
if "participant" in config.workflow.analysis_level:
from mriqc.workflows.core import init_mriqc_wf

start_message = messages.PARTICIPANT_START.format(
version=config.environment.version,
bids_dir=config.execution.bids_dir,
output_dir=config.execution.output_dir,
analysis_level=config.workflow.analysis_level,
)
config.loggers.cli.log(25, start_message)
mriqc_wf = init_mriqc_wf()
_pool = None
if config.nipype.plugin in ("MultiProc", "LegacyMultiProc"):
from contextlib import suppress
import multiprocessing as mp
import multiprocessing.forkserver
from concurrent.futures import ProcessPoolExecutor

os.environ["OMP_NUM_THREADS"] = "1"

with suppress(RuntimeError):
mp.set_start_method("fork")
gc.collect()

_pool = ProcessPoolExecutor(
max_workers=config.nipype.nprocs,
initializer=config._process_initializer,
initargs=(config.execution.cwd, config.nipype.omp_nthreads),
)

_resmon = None
if config.execution.resource_monitor:
from mriqc.instrumentation.resources import ResourceRecorder

_resmon = ResourceRecorder(
pid=os.getpid(),
log_file=mktemp(
dir=config.execution.work_dir, prefix=".resources.", suffix=".tsv"
),
)
_resmon.start()

# CRITICAL Call build_workflow(config_file, retval) in a subprocess.
# Because Python on Linux does not ever free virtual memory (VM), running the
# workflow construction jailed within a process preempts excessive VM buildup.
from multiprocessing import Manager, Process

with Manager() as mgr:
from .workflow import build_workflow

retval = mgr.dict()
p = Process(target=build_workflow, args=(str(config_file), retval))
p.start()
p.join()

mriqc_wf = retval.get("workflow", None)
retcode = p.exitcode or retval.get("return_code", 0)

# CRITICAL Load the config from the file. This is necessary because the ``build_workflow``
# function executed constrained in a process may change the config (and thus the global
# state of MRIQC).
config.load(config_file)

retcode = retcode or (mriqc_wf is None) * os.EX_SOFTWARE
if retcode != 0:
sys.exit(retcode)

# Initalize nipype config
config.nipype.init()
# Make sure loggers are started
config.loggers.init()

if _resmon:
config.loggers.cli.info(
f"Started resource recording at {_resmon._logfile}."
)

# Resource management options
if config.nipype.plugin in ("MultiProc", "LegacyMultiProc") and (
1 < config.nipype.nprocs < config.nipype.omp_nthreads
):
config.loggers.cli.warning(
"Per-process threads (--omp-nthreads=%d) exceed total "
"threads (--nthreads/--n_cpus=%d)",
config.nipype.omp_nthreads,
config.nipype.nprocs,
)

if mriqc_wf is None:
sys.exit(os.EX_SOFTWARE)

Expand All @@ -92,13 +146,36 @@ def main():
# Clean up master process before running workflow, which may create forks
gc.collect()
# run MRIQC
_plugin = config.nipype.get_plugin()
if _pool:
from mriqc.engine.plugin import MultiProcPlugin

_plugin = {
"plugin": MultiProcPlugin(
pool=_pool, plugin_args=config.nipype.plugin_args
),
}
mriqc_wf.run(**_plugin)

# Warn about submitting measures AFTER
if not config.execution.no_sub:
config.loggers.cli.warning(config.DSA_MESSAGE)
config.loggers.cli.log(25, messages.PARTICIPANT_FINISHED)

if _resmon is not None:
from mriqc.instrumentation.viz import plot
_resmon.stop()
plot(
_resmon._logfile,
param="mem_rss_mb",
out_file=str(_resmon._logfile).replace(".tsv", ".rss.png"),
)
plot(
_resmon._logfile,
param="mem_vsm_mb",
out_file=str(_resmon._logfile).replace(".tsv", ".vsm.png"),
)

# Set up group level
if "group" in config.workflow.analysis_level:
from ..reports import group_html
Expand Down
22 changes: 20 additions & 2 deletions mriqc/cli/workflow.py
Expand Up @@ -33,10 +33,28 @@

def build_workflow(config_file, retval):
"""Create the Nipype Workflow that supports the whole execution graph."""
from .. import config
from ..workflows.core import init_mriqc_wf
import os
from mriqc import config, messages
os.environ["OMP_NUM_THREADS"] = "1"

from mriqc.workflows.core import init_mriqc_wf

# We do not need OMP > 1 for workflow creation

config.load(config_file)
# Initalize nipype config
config.nipype.init()
# Make sure loggers are started
config.loggers.init()

start_message = messages.PARTICIPANT_START.format(
version=config.environment.version,
bids_dir=config.execution.bids_dir,
output_dir=config.execution.output_dir,
analysis_level=config.workflow.analysis_level,
)
config.loggers.cli.log(25, start_message)

retval["return_code"] = 1
retval["workflow"] = None

Expand Down