Skip to content

Commit

Permalink
enh: finalizing new monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
oesteban committed Apr 21, 2022
1 parent fa11d76 commit c5cd640
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 24 deletions.
4 changes: 1 addition & 3 deletions mriqc/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,9 @@ def main():
if config.nipype.plugin in ("MultiProc", "LegacyMultiProc"):
from contextlib import suppress
import multiprocessing as mp
import multiprocessing.forkserver

with suppress(RuntimeError):
mp.set_start_method("forkserver")
mp.forkserver.ensure_running()
mp.set_start_method("spawn")
gc.collect()

if config.execution.pdb:
Expand Down
1 change: 0 additions & 1 deletion mriqc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@
except ImportError:
from importlib_metadata import version as get_version


# Ignore annoying warnings
from mriqc._warnings import logging

Expand Down
67 changes: 47 additions & 20 deletions mriqc/instrumentation/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
# https://www.nipreps.org/community/licensing/
#
"""Instrumentation to profile resource utilization."""
import warnings
from time import time
from datetime import datetime
from tempfile import mkstemp
Expand Down Expand Up @@ -76,6 +77,23 @@ def sample(
return proc_info


def sample2file(pid=None, recursive=True, timestamp=None, fd=None, flush=True):
if fd is None:
return

print(
"\n".join(
[
"\t".join(parse_sample(s, timestamp=timestamp))
for s in sample(pid=pid, recursive=recursive)
]
),
file=fd,
)
if flush:
fd.flush()


def parse_sample(datapoint, timestamp=None, attrs=SAMPLE_ATTRS):
"""Convert a sample dictionary into a list of string values."""
retval = [f"{timestamp or time()}"]
Expand Down Expand Up @@ -103,6 +121,7 @@ class ResourceRecorder(threading.Thread):

def __init__(self, pid=None, freq=0.2, log_file=None):
"""Initialize a resource recorder."""
threading.Thread.__init__(self)

self._freq = max(freq, 0.01)
"""Frequency (seconds) with which the probe must sample."""
Expand All @@ -123,19 +142,39 @@ def __init__(self, pid=None, freq=0.2, log_file=None):
datetime.now().strftime(
"# MRIQC Resource recorder started (%Y/%m/%d; %H:%M:%S)"
),
"\t".join(SAMPLE_ATTRS).replace("memory_info", "mem_rss_mb\tmem_vsm_mb"),
"\t".join(("timestamp", *SAMPLE_ATTRS)).replace(
"memory_info", "mem_rss_mb\tmem_vsm_mb"
),
]
print("\n".join(_header), file=self._logfile)
sample2file(self._pid, fd=self._logfile)

# Start thread
self._event = threading.Event()
self.start()

def excepthook(self, args):
print(
"\n".join(["\t".join(parse_sample(s)) for s in sample(self._pid)]),
"\n".join(
[
datetime.now().strftime(
"# MRIQC Resource recorder stopped with error (%Y/%m/%d; %H:%M:%S)"
)
]
),
file=self._logfile,
)
self._logfile.flush()
self._logfile.close()

# Start thread
threading.Thread.__init__(self)
self._event = threading.Event()
self.start()
if not self._event.is_set():
self._event.set()

tb = "\n ".join(args.exc_traceback)
warnings.warning(f"""ResourceRecorder errored.
{args.exc_type}: {args.exc_value}
{tb}
""")

def stop(self):
"""Stop monitoring."""
Expand All @@ -144,10 +183,7 @@ def stop(self):
self.join()

# Final sample
print(
"\n".join(["\t".join(parse_sample(s)) for s in sample(self._pid)]),
file=self._logfile,
)
sample2file(self._pid, fd=self._logfile)
print(
"\n".join(
[
Expand All @@ -168,15 +204,6 @@ def run(self):
start_time = time()
wait_til = start_time
while not self._event.is_set():
print(
"\n".join(
[
"\t".join(parse_sample(s, timestamp=wait_til))
for s in sample(self._pid)
]
),
file=self._logfile,
)
self._logfile.flush()
sample2file(self._pid, fd=self._logfile, timestamp=wait_til)
wait_til += self._freq
self._event.wait(max(0, wait_til - time()))

0 comments on commit c5cd640

Please sign in to comment.