Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'logging-redirect-to-tqdm' into devel
- Loading branch information
Showing
4 changed files
with
361 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,181 @@ | ||
# pylint: disable=missing-module-docstring, missing-class-docstring | ||
# pylint: disable=missing-function-docstring, no-self-use | ||
from __future__ import absolute_import | ||
|
||
import logging | ||
import logging.handlers | ||
import sys | ||
from io import StringIO | ||
|
||
import pytest | ||
|
||
from tqdm import tqdm | ||
from tqdm.contrib.logging import _get_first_found_console_logging_formatter | ||
from tqdm.contrib.logging import _TqdmLoggingHandler as TqdmLoggingHandler | ||
from tqdm.contrib.logging import logging_redirect_tqdm, tqdm_logging_redirect | ||
|
||
from .tests_tqdm import importorskip | ||
|
||
LOGGER = logging.getLogger(__name__) | ||
|
||
TEST_LOGGING_FORMATTER = logging.Formatter() | ||
|
||
|
||
class CustomTqdm(tqdm): | ||
messages = [] | ||
|
||
@classmethod | ||
def write(cls, s, **__): # pylint: disable=arguments-differ | ||
CustomTqdm.messages.append(s) | ||
|
||
|
||
class ErrorRaisingTqdm(tqdm): | ||
exception_class = RuntimeError | ||
|
||
@classmethod | ||
def write(cls, s, **__): # pylint: disable=arguments-differ | ||
raise ErrorRaisingTqdm.exception_class('fail fast') | ||
|
||
|
||
class TestTqdmLoggingHandler: | ||
def test_should_call_tqdm_write(self): | ||
CustomTqdm.messages = [] | ||
logger = logging.Logger('test') | ||
logger.handlers = [TqdmLoggingHandler(CustomTqdm)] | ||
logger.info('test') | ||
assert CustomTqdm.messages == ['test'] | ||
|
||
def test_should_call_handle_error_if_exception_was_thrown(self): | ||
patch = importorskip('unittest.mock').patch | ||
logger = logging.Logger('test') | ||
ErrorRaisingTqdm.exception_class = RuntimeError | ||
handler = TqdmLoggingHandler(ErrorRaisingTqdm) | ||
logger.handlers = [handler] | ||
with patch.object(handler, 'handleError') as mock: | ||
logger.info('test') | ||
assert mock.called | ||
|
||
@pytest.mark.parametrize('exception_class', [ | ||
KeyboardInterrupt, | ||
SystemExit | ||
]) | ||
def test_should_not_swallow_certain_exceptions(self, exception_class): | ||
logger = logging.Logger('test') | ||
ErrorRaisingTqdm.exception_class = exception_class | ||
handler = TqdmLoggingHandler(ErrorRaisingTqdm) | ||
logger.handlers = [handler] | ||
with pytest.raises(exception_class): | ||
logger.info('test') | ||
|
||
|
||
class TestGetFirstFoundConsoleLoggingFormatter: | ||
def test_should_return_none_for_no_handlers(self): | ||
assert _get_first_found_console_logging_formatter([]) is None | ||
|
||
def test_should_return_none_without_stream_handler(self): | ||
handler = logging.handlers.MemoryHandler(capacity=1) | ||
handler.formatter = TEST_LOGGING_FORMATTER | ||
assert _get_first_found_console_logging_formatter([handler]) is None | ||
|
||
def test_should_return_none_for_stream_handler_not_stdout_or_stderr(self): | ||
handler = logging.StreamHandler(StringIO()) | ||
handler.formatter = TEST_LOGGING_FORMATTER | ||
assert _get_first_found_console_logging_formatter([handler]) is None | ||
|
||
def test_should_return_stream_handler_formatter_if_stream_is_stdout(self): | ||
handler = logging.StreamHandler(sys.stdout) | ||
handler.formatter = TEST_LOGGING_FORMATTER | ||
assert _get_first_found_console_logging_formatter( | ||
[handler] | ||
) == TEST_LOGGING_FORMATTER | ||
|
||
def test_should_return_stream_handler_formatter_if_stream_is_stderr(self): | ||
handler = logging.StreamHandler(sys.stderr) | ||
handler.formatter = TEST_LOGGING_FORMATTER | ||
assert _get_first_found_console_logging_formatter( | ||
[handler] | ||
) == TEST_LOGGING_FORMATTER | ||
|
||
|
||
class TestRedirectLoggingToTqdm: | ||
def test_should_add_and_remove_tqdm_handler(self): | ||
logger = logging.Logger('test') | ||
with logging_redirect_tqdm(loggers=[logger]): | ||
assert len(logger.handlers) == 1 | ||
assert isinstance(logger.handlers[0], TqdmLoggingHandler) | ||
assert not logger.handlers | ||
|
||
def test_should_remove_and_restore_console_handlers(self): | ||
logger = logging.Logger('test') | ||
stderr_console_handler = logging.StreamHandler(sys.stderr) | ||
stdout_console_handler = logging.StreamHandler(sys.stderr) | ||
logger.handlers = [stderr_console_handler, stdout_console_handler] | ||
with logging_redirect_tqdm(loggers=[logger]): | ||
assert len(logger.handlers) == 1 | ||
assert isinstance(logger.handlers[0], TqdmLoggingHandler) | ||
assert logger.handlers == [stderr_console_handler, stdout_console_handler] | ||
|
||
def test_should_inherit_console_logger_formatter(self): | ||
logger = logging.Logger('test') | ||
formatter = logging.Formatter('custom: %(message)s') | ||
console_handler = logging.StreamHandler(sys.stderr) | ||
console_handler.setFormatter(formatter) | ||
logger.handlers = [console_handler] | ||
with logging_redirect_tqdm(loggers=[logger]): | ||
assert logger.handlers[0].formatter == formatter | ||
|
||
def test_should_not_remove_stream_handlers_not_fot_stdout_or_stderr(self): | ||
logger = logging.Logger('test') | ||
stream_handler = logging.StreamHandler(StringIO()) | ||
logger.addHandler(stream_handler) | ||
with logging_redirect_tqdm(loggers=[logger]): | ||
assert len(logger.handlers) == 2 | ||
assert logger.handlers[0] == stream_handler | ||
assert isinstance(logger.handlers[1], TqdmLoggingHandler) | ||
assert logger.handlers == [stream_handler] | ||
|
||
|
||
class TestTqdmWithLoggingRedirect: | ||
def test_should_add_and_remove_handler_from_root_logger_by_default(self): | ||
original_handlers = list(logging.root.handlers) | ||
with tqdm_logging_redirect(total=1) as pbar: | ||
assert isinstance(logging.root.handlers[-1], TqdmLoggingHandler) | ||
LOGGER.info('test') | ||
pbar.update(1) | ||
assert logging.root.handlers == original_handlers | ||
|
||
def test_should_add_and_remove_handler_from_custom_logger(self): | ||
logger = logging.Logger('test') | ||
with tqdm_logging_redirect(total=1, loggers=[logger]) as pbar: | ||
assert len(logger.handlers) == 1 | ||
assert isinstance(logger.handlers[0], TqdmLoggingHandler) | ||
logger.info('test') | ||
pbar.update(1) | ||
assert not logger.handlers | ||
|
||
def test_should_not_fail_with_logger_without_console_handler(self): | ||
logger = logging.Logger('test') | ||
logger.handlers = [] | ||
with tqdm_logging_redirect(total=1, loggers=[logger]): | ||
logger.info('test') | ||
assert not logger.handlers | ||
|
||
def test_should_format_message(self): | ||
logger = logging.Logger('test') | ||
console_handler = logging.StreamHandler(sys.stdout) | ||
console_handler.setFormatter(logging.Formatter( | ||
r'prefix:%(message)s' | ||
)) | ||
logger.handlers = [console_handler] | ||
CustomTqdm.messages = [] | ||
with tqdm_logging_redirect(loggers=[logger], tqdm_class=CustomTqdm): | ||
logger.info('test') | ||
assert CustomTqdm.messages == ['prefix:test'] | ||
|
||
def test_use_root_logger_by_default_and_write_to_custom_tqdm(self): | ||
logger = logging.root | ||
CustomTqdm.messages = [] | ||
with tqdm_logging_redirect(total=1, tqdm_class=CustomTqdm) as pbar: | ||
assert isinstance(pbar, CustomTqdm) | ||
logger.info('test') | ||
assert CustomTqdm.messages == ['test'] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
""" | ||
Helper functionality for interoperability with stdlib `logging`. | ||
""" | ||
from __future__ import absolute_import | ||
|
||
import logging | ||
import sys | ||
from contextlib import contextmanager | ||
|
||
try: | ||
from typing import Iterator, List, Optional, Type # pylint: disable=unused-import | ||
except ImportError: | ||
pass | ||
|
||
from ..std import tqdm as std_tqdm | ||
|
||
|
||
class _TqdmLoggingHandler(logging.StreamHandler): | ||
def __init__( | ||
self, | ||
tqdm_class=std_tqdm # type: Type[std_tqdm] | ||
): | ||
super(_TqdmLoggingHandler, self).__init__() | ||
self.tqdm_class = tqdm_class | ||
|
||
def emit(self, record): | ||
try: | ||
msg = self.format(record) | ||
self.tqdm_class.write(msg) | ||
self.flush() | ||
except (KeyboardInterrupt, SystemExit): | ||
raise | ||
except: # noqa pylint: disable=bare-except | ||
self.handleError(record) | ||
|
||
|
||
def _is_console_logging_handler(handler): | ||
return (isinstance(handler, logging.StreamHandler) | ||
and handler.stream in {sys.stdout, sys.stderr}) | ||
|
||
|
||
def _get_first_found_console_logging_formatter(handlers): | ||
for handler in handlers: | ||
if _is_console_logging_handler(handler): | ||
return handler.formatter | ||
|
||
|
||
@contextmanager | ||
def logging_redirect_tqdm( | ||
loggers=None, # type: Optional[List[logging.Logger]], | ||
tqdm_class=std_tqdm # type: Type[std_tqdm] | ||
): | ||
# type: (...) -> Iterator[None] | ||
""" | ||
Context manager redirecting console logging to `tqdm.write()`, leaving | ||
other logging handlers (e.g. log files) unaffected. | ||
Parameters | ||
---------- | ||
loggers : list, optional | ||
Which handlers to redirect (default: [logging.root]). | ||
tqdm_class : optional | ||
Example | ||
------- | ||
```python | ||
import logging | ||
from tqdm import trange | ||
from tqdm.contrib.logging import logging_redirect_tqdm | ||
LOG = logging.getLogger(__name__) | ||
if __name__ == '__main__': | ||
logging.basicConfig(level=logging.INFO) | ||
with logging_redirect_tqdm(): | ||
for i in trange(9): | ||
if i == 4: | ||
LOG.info("console logging redirected to `tqdm.write()`") | ||
# logging restored | ||
``` | ||
""" | ||
if loggers is None: | ||
loggers = [logging.root] | ||
original_handlers_list = [logger.handlers for logger in loggers] | ||
try: | ||
for logger in loggers: | ||
tqdm_handler = _TqdmLoggingHandler(tqdm_class) | ||
tqdm_handler.setFormatter( | ||
_get_first_found_console_logging_formatter(logger.handlers)) | ||
logger.handlers = [ | ||
handler for handler in logger.handlers | ||
if not _is_console_logging_handler(handler)] + [tqdm_handler] | ||
yield | ||
finally: | ||
for logger, original_handlers in zip(loggers, original_handlers_list): | ||
logger.handlers = original_handlers | ||
|
||
|
||
@contextmanager | ||
def tqdm_logging_redirect( | ||
*args, | ||
# loggers=None, # type: Optional[List[logging.Logger]] | ||
# tqdm=None, # type: Optional[Type[tqdm.tqdm]] | ||
**kwargs | ||
): | ||
# type: (...) -> Iterator[None] | ||
""" | ||
Convenience shortcut for: | ||
```python | ||
with tqdm_class(*args, **tqdm_kwargs) as pbar: | ||
with logging_redirect_tqdm(loggers=loggers, tqdm_class=tqdm_class): | ||
yield pbar | ||
``` | ||
Parameters | ||
---------- | ||
tqdm_class : optional, (default: tqdm.std.tqdm). | ||
loggers : optional, list. | ||
**tqdm_kwargs : passed to `tqdm_class`. | ||
""" | ||
tqdm_kwargs = kwargs.copy() | ||
loggers = tqdm_kwargs.pop('loggers', None) | ||
tqdm_class = tqdm_kwargs.pop('tqdm_class', std_tqdm) | ||
with tqdm_class(*args, **tqdm_kwargs) as pbar: | ||
with logging_redirect_tqdm(loggers=loggers, tqdm_class=tqdm_class): | ||
yield pbar |