Skip to content

Commit

Permalink
Merge pull request #1238 from nipreps/enh/error-handling
Browse files Browse the repository at this point in the history
ENH: Improve error handling and logging
  • Loading branch information
oesteban committed Apr 7, 2024
2 parents 08aa455 + 407000c commit baf56bb
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 53 deletions.
1 change: 1 addition & 0 deletions .circleci/circle_T1w.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ dataset_description.json
group_T1w.html
group_T1w.tsv
logs
logs/mriqc.log
sub-50137
sub-50137/anat
sub-50137/anat/sub-50137_T1w.json
Expand Down
1 change: 1 addition & 0 deletions .circleci/circle_bold.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ dataset_description.json
group_bold.html
group_bold.tsv
logs
logs/mriqc.log
sub-ds205s03
sub-ds205s03/figures
sub-ds205s03/figures/sub-ds205s03_task-functionallocalizer_run-01_desc-background_bold.svg
Expand Down
88 changes: 77 additions & 11 deletions mriqc/_warnings.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,93 @@
#
"""Manipulate Python warnings."""
import logging
import warnings
import sys

from nipype import logging as nlogging

logging.addLevelName(26, 'BANNER') # Add a new level for banners
logging.addLevelName(25, 'IMPORTANT') # Add a new level between INFO and WARNING
logging.addLevelName(15, 'VERBOSE') # Add a new level between INFO and DEBUG

LOGGER_FMT = (
'%(asctime)s |{color} %(levelname)-8s {reset}|{color} %(name)-16s '
'{reset}|{color} %(message)s{reset}'
)
DATE_FMT = '%Y-%m-%d %H:%M:%S'
CONSOLE_COLORS = {
logging.DEBUG: '\x1b[38;20m',
logging.INFO: '\x1b[34;20m',
25: '\x1b[33;20m',
logging.WARNING: '\x1b[93;20m',
logging.ERROR: '\x1b[31;20m',
logging.CRITICAL: '\x1b[31;1m',
'reset': '\x1b[0m',
}


class _LogFormatter(logging.Formatter):
"""Customize the log format."""

_colored = True

def __init__(self, datefmt=None, colored=True, **kwargs):
self._colored = colored
super().__init__(
datefmt=datefmt or DATE_FMT,
fmt=LOGGER_FMT.format(
color=CONSOLE_COLORS['reset'] if colored else '',
reset=CONSOLE_COLORS['reset'] if colored else '',
),
)

def format(self, record):
reset = CONSOLE_COLORS['reset'] if self._colored else ''
self._style._fmt = (
'%(message)s' if record.levelno == 26
else LOGGER_FMT.format(
color=CONSOLE_COLORS.get(
record.levelno,
CONSOLE_COLORS['reset'],
) if self._colored else '',
reset=reset,
)
)
return super().format(record)


nlogging.getLogger('nipype')
for logger_name in logging.root.manager.loggerDict:
logging.getLogger(logger_name).handlers.clear()

_root_logger = logging.getLogger()
# _root_logger.handlers.clear()
_handler = logging.StreamHandler(stream=sys.stdout)
_handler.setFormatter(_LogFormatter())
_root_logger.addHandler(_handler)


_wlog = logging.getLogger('py.warnings')
_wlog.addHandler(logging.NullHandler())

_numexprlog = logging.getLogger('numexpr.utils')
_numexprlog.handlers.clear()
_numexprlog.addHandler(logging.NullHandler())

def _warn(message, category=None, stacklevel=1, source=None):
"""Redefine the warning function."""
if category is not None:
category = type(category).__name__
category = category.replace('type', 'WARNING')
# def _warn(message, category=None, stacklevel=1, source=None):
# """Redefine the warning function."""
# if category is not None:
# category = type(category).__name__
# category = category.replace('type', 'WARNING')

logging.getLogger('py.warnings').warning(f"{category or 'WARNING'}: {message}")
# logging.getLogger('py.warnings').debug(f"{category or 'WARNING'}: {message}")


def _showwarning(message, category, filename, lineno, file=None, line=None):
_warn(message, category=category)
# def _showwarning(message, category, filename, lineno, file=None, line=None):
# _warn(message, category=category)


warnings.warn = _warn
warnings.showwarning = _showwarning
# warnings.warn = _warn
# warnings.showwarning = _showwarning

# warnings.filterwarnings("ignore", category=FutureWarning)
# warnings.filterwarnings("ignore", category=DeprecationWarning)
Expand Down
20 changes: 16 additions & 4 deletions mriqc/cli/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,12 +456,14 @@ def parse_args(args=None, namespace=None):
"""Parse args and run further checks on the command line."""
from contextlib import suppress
from json import loads
from logging import DEBUG
from logging import DEBUG, FileHandler
from pathlib import Path
from pprint import pformat

from niworkflows.utils.bids import DEFAULT_BIDS_QUERIES, collect_data

from mriqc import __version__
from mriqc._warnings import DATE_FMT, LOGGER_FMT, _LogFormatter
from mriqc.messages import PARTICIPANT_START

parser = _build_parser()
Expand All @@ -470,16 +472,26 @@ def parse_args(args=None, namespace=None):

config.loggers.init()

extra_messages = [' ' * 9 + '-' * 66]
_log_file = Path(opts.output_dir) / 'logs' / 'mriqc.log'
_log_file.parent.mkdir(exist_ok=True, parents=True)
_handler = FileHandler(_log_file)
_handler.setFormatter(_LogFormatter(
fmt=LOGGER_FMT.format(color='', reset=''),
datefmt=DATE_FMT,
colored=False,
))
config.loggers.default.addHandler(_handler)

extra_messages = []

if opts.bids_filter_file:
extra_messages.insert(
0,
f' * BIDS filters-file: {opts.bids_filter_file.absolute()}.',
f' * BIDS filters-file: {opts.bids_filter_file.absolute()}.',
)

config.loggers.cli.log(
25,
26,
PARTICIPANT_START.format(
version=__version__,
bids_dir=opts.bids_dir,
Expand Down
10 changes: 9 additions & 1 deletion mriqc/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@ def main():
import gc
import os
import sys
import time
from tempfile import mkstemp

from mriqc import config, messages
from mriqc.cli.parser import parse_args

atexit.register(config.restore_env)

start_time = time.time()

# Run parser
parse_args()

Expand All @@ -53,7 +56,10 @@ def main():
config_file = mkstemp(
dir=config.execution.work_dir, prefix='.mriqc.', suffix='.toml'
)[1]

config.to_filename(config_file)
config.loggers.cli.info(
f'Saved MRIQC config file: {config_file}.')
config.file_path = config_file
exitcode = 0
# Set up participant level
Expand Down Expand Up @@ -174,7 +180,9 @@ def main():

generate_reports()

config.loggers.cli.log(25, messages.PARTICIPANT_FINISHED)
config.loggers.cli.log(26, messages.PARTICIPANT_FINISHED.format(
duration=time.strftime('%Hh %Mmin %Ss', time.gmtime(time.time() - start_time))
))

if _resmon is not None:
from mriqc.instrumentation.viz import plot
Expand Down
19 changes: 5 additions & 14 deletions mriqc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,6 @@
):
os.environ['PYTHONWARNINGS'] = 'ignore'

logging.addLevelName(25, 'IMPORTANT') # Add a new level between INFO and WARNING
logging.addLevelName(15, 'VERBOSE') # Add a new level between INFO and DEBUG

SUPPORTED_SUFFIXES = ('T1w', 'T2w', 'bold', 'dwi')

Expand Down Expand Up @@ -475,7 +473,7 @@ def init(cls):
ignore_paths[0] = re.compile(
r'^(?!/sub-('
+ '|'.join(cls.participant_label)
+ r'))'
+ '))'
)

# Recommended after PyBIDS 12.1
Expand Down Expand Up @@ -552,8 +550,7 @@ class workflow(_Config):
class loggers:
"""Keep loggers easily accessible (see :py:func:`init`)."""

_fmt = '%(asctime)s,%(msecs)d %(name)-2s ' '%(levelname)-2s:\n\t %(message)s'
_datefmt = '%y%m%d-%H:%M:%S'
_datefmt = '%y%m%d %H:%M:%S'
_init = False

default = logging.getLogger()
Expand Down Expand Up @@ -585,12 +582,9 @@ def init(cls):
cls.interface = nlogging.getLogger('nipype.interface')
cls.utils = nlogging.getLogger('nipype.utils')

if not len(cls.cli.handlers):
_handler = logging.StreamHandler(stream=sys.stdout)
_handler.setFormatter(
logging.Formatter(fmt=cls._fmt, datefmt=cls._datefmt)
)
cls.cli.addHandler(_handler)
cls.workflow.handlers.clear()
cls.interface.handlers.clear()
cls.utils.handlers.clear()

ncfg.update_config(
{
Expand All @@ -614,9 +608,6 @@ def getLogger(cls, name):
retval = getattr(cls, name)
if retval is None:
setattr(cls, name, logging.getLogger(name))
_handler = logging.StreamHandler(stream=sys.stdout)
_handler.setFormatter(logging.Formatter(fmt=cls._fmt, datefmt=cls._datefmt))
retval.addHandler(_handler)
retval.setLevel(execution.log_level)
return retval

Expand Down
60 changes: 44 additions & 16 deletions mriqc/engine/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@

# Import packages
import gc
import getpass
import multiprocessing as mp
import os
import sys
import uuid
from concurrent.futures import ProcessPoolExecutor
from copy import deepcopy
from time import sleep, time
from time import sleep, strftime, time
from traceback import format_exception

from nipype.utils.filemanip import crash2txt, savepkl


# Run node
def run_node(node, updatehash, taskid):
Expand All @@ -58,9 +62,20 @@ def run_node(node, updatehash, taskid):
# Try and execute the node via node.run()
try:
result['result'] = node.run(updatehash=updatehash)
except (KeyboardInterrupt, SystemError, SystemExit):
raise
except: # noqa: E722, intendedly catch all here
result['traceback'] = format_exception(*sys.exc_info())
from mriqc import config

tb = format_exception(*sys.exc_info())
node._traceback = tb
result['result'] = node.result
result['traceback'] = tb

crashfile = report_crash(node, traceback=tb)
config.loggers.workflow.error(
f'Node {node._id} (taskid={taskid}) crashed: {crashfile}'
)

# Return the result dictionary
return result
Expand Down Expand Up @@ -227,16 +242,6 @@ def _get_result(self, taskid):
def _submit_job(self, node, updatehash=False):
raise NotImplementedError

def _report_crash(self, node, result=None):
from nipype.pipeline.plugins.tools import report_crash

tb = None
if result is not None:
node._result = result['result']
tb = result['traceback']
node._traceback = tb
return report_crash(node, traceback=tb)

def _clear_task(self, taskid):
raise NotImplementedError

Expand All @@ -251,7 +256,6 @@ def _clean_queue(self, jobid, graph, result=None):
'traceback': '\n'.join(format_exception(*sys.exc_info())),
}

crashfile = self._report_crash(self.procs[jobid], result=result)
if config.nipype.stop_on_first_crash:
raise RuntimeError(''.join(result['traceback']))
if jobid in self.mapnodesubids:
Expand All @@ -263,7 +267,7 @@ def _clean_queue(self, jobid, graph, result=None):
self.proc_pending[jobid] = False
self.proc_done[jobid] = True
# remove dependencies from queue
return self._remove_node_deps(jobid, crashfile, graph)
return self._remove_node_deps(jobid, graph)

def _send_procs_to_workers(self, updatehash=False, graph=None):
"""Submit tasks to workers when system resources are available."""
Expand Down Expand Up @@ -352,7 +356,7 @@ def _generate_dependency_list(self, graph):
self.proc_done = np.zeros(len(self.procs), dtype=bool)
self.proc_pending = np.zeros(len(self.procs), dtype=bool)

def _remove_node_deps(self, jobid, crashfile, graph):
def _remove_node_deps(self, jobid, graph):
import networkx as nx

try:
Expand All @@ -367,7 +371,6 @@ def _remove_node_deps(self, jobid, crashfile, graph):
return {
'node': self.procs[jobid],
'dependents': subnodes,
'crashfile': crashfile,
}

def _remove_node_dirs(self):
Expand Down Expand Up @@ -613,3 +616,28 @@ def _sort_jobs(self, jobids, scheduler='tsort'):
key=lambda item: (self.procs[item].mem_gb, self.procs[item].n_procs),
)
return jobids


def report_crash(node, traceback=None, hostname=None):
"""Writes crash related information to a file"""
name = node._id
traceback = traceback or format_exception(*sys.exc_info())
timeofcrash = strftime('%Y%m%d-%H%M%S')
try:
login_name = getpass.getuser()
except KeyError:
login_name = f'UID{os.getuid():d}'

crashfile = f'crash-{timeofcrash}-{login_name}-{name}-{str(uuid.uuid4())}'
crashdir = node.config['execution'].get('crashdump_dir', os.getcwd())

os.makedirs(crashdir, exist_ok=True)
crashfile = os.path.join(crashdir, crashfile)

if node.config['execution']['crashfile_format'].lower() in ('text', 'txt', '.txt'):
crashfile = f'{crashfile}.txt'
crash2txt(crashfile, {'node': node, 'traceback': traceback})
else:
crashfile = f'{crashfile}.pklz'
savepkl(crashfile, {'node': node, 'traceback': traceback}, versioning=True)
return crashfile

0 comments on commit baf56bb

Please sign in to comment.