Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Added context based filtering

  • Loading branch information...
commit b48bfc2a3a05da345789e6a5e03a837be466ca5f 1 parent 9dc1955
@mitsuhiko authored
View
1  CHANGES
@@ -17,3 +17,4 @@ Release date to be announced.
- implemented maximum buffer size for the
:class:`logbook.more.FingersCrossedHandler` as well as a lock
for thread safety
+- added ability to filter for context.
View
8 docs/designdefense.rst
@@ -188,7 +188,13 @@ can override the :meth:`Logger.process_record` method::
def process_record(self, record):
record.extra['kind'] = 'input'
-A handler can then use this information to filter out input.
+A handler can then use this information to filter out input::
+
+ def no_input(record, handler):
+ return record.extra.get('kind') != 'input'
+
+ with MyHandler().threadbound(filter=no_input):
+ ...
Injecting Context-Sensitive Information
---------------------------------------
View
4 logbook/base.py
@@ -490,12 +490,14 @@ def call_handlers(self, record):
# after that, context specific handlers run (this includes the
# global handlers)
- for handler, processor, bubble in iter_context_handlers():
+ for handler, processor, filter, bubble in iter_context_handlers():
if record.level >= handler.level:
# TODO: cloning? expensive? document that on bubbling
# the record is modified for outer processors too?
if processor is not None:
processor(record, handler)
+ if filter is not None and not filter(record, handler):
+ continue
if handler.handle(record) and not bubble:
break
View
34 logbook/handlers.py
@@ -108,28 +108,28 @@ class NestedHandlerSetup(object):
def __init__(self):
self.handlers = []
- def add(self, handler, processor=None, bubble=True):
+ def add(self, handler, processor=None, filter=None, bubble=True):
"""Registers a new handler on the :class:`NestedHandlerSetup`"""
- self.handlers.append((handler, processor, bubble))
+ self.handlers.append((handler, processor, filter, bubble))
def push_application(self):
"""Pushes all handlers to the global stack."""
- for handler, processor, bubble in self.handlers:
- handler.push_application(processor, bubble)
+ for handler, processor, filter, bubble in self.handlers:
+ handler.push_application(processor, filter, bubble)
def pop_application(self):
"""Pops all handlers from the global stack."""
- for handler, _, _ in reversed(self.handlers):
+ for handler, _, _, _ in reversed(self.handlers):
handler.pop_application()
def push_thread(self):
"""Pushes all handlers to the thread stack."""
- for handler, processor, bubble in self.handlers:
- handler.push_thread(processor, bubble)
+ for handler, processor, filter, bubble in self.handlers:
+ handler.push_thread(processor, filter, bubble)
def pop_thread(self):
"""Pops all handlers from the thread stack."""
- for handler, _, _ in reversed(self.handlers):
+ for handler, _, _, _ in reversed(self.handlers):
handler.pop_thread()
@contextmanager
@@ -281,11 +281,11 @@ def emit(self, record):
def close(self):
"""Tidy up any resources used by the handler."""
- def push_thread(self, processor=None, bubble=True):
+ def push_thread(self, processor=None, filter=None, bubble=True):
"""Push the handler for the current context."""
with _context_handler_lock:
_handler_cache.pop(current_thread(), None)
- item = self, processor, bubble
+ item = self, processor, filter, bubble
stack = getattr(_context_handlers, 'stack', None)
if stack is None:
_context_handlers.stack = [item]
@@ -301,9 +301,9 @@ def pop_thread(self):
popped = stack.pop()[0]
assert popped is self, 'popped unexpected handler'
- def push_application(self, processor=None, bubble=True):
+ def push_application(self, processor=None, filter=None, bubble=True):
"""Push the handler to the global stack."""
- _global_handlers.append((self, processor, bubble))
+ _global_handlers.append((self, processor, filter, bubble))
_handler_cache.clear()
def pop_application(self):
@@ -314,25 +314,25 @@ def pop_application(self):
assert popped is self, 'popped unexpected handler'
@contextmanager
- def threadbound(self, processor=None, bubble=True):
+ def threadbound(self, processor=None, filter=None, bubble=True):
"""Binds the handler temporarily to a thread."""
- self.push_thread(processor, bubble)
+ self.push_thread(processor, filter, bubble)
try:
yield self
finally:
self.pop_thread()
@contextmanager
- def applicationbound(self, processor=None, bubble=True):
+ def applicationbound(self, processor=None, filter=None, bubble=True):
"""Binds the handler temporarily to the whole process."""
- self.push_application(processor, bubble)
+ self.push_application(processor, filter, bubble)
try:
yield self
finally:
self.pop_application()
def __enter__(self):
- self.push_thread(None, False)
+ self.push_thread(None, None, False)
return self
def __exit__(self, exc_type, exc_value, tb):
View
19 test_logbook.py
@@ -370,6 +370,25 @@ def test_channel(self):
logger.warn('Logbook is too awesome for stdlib')
self.assertEqual(test_handler.records[0].channel, logger)
+ def test_filtering(self):
+ logger1 = logbook.Logger('Logger1')
+ logger2 = logbook.Logger('Logger2')
+ handler = logbook.TestHandler()
+ outer_handler = logbook.TestHandler()
+
+ def only_1(record, handler):
+ return record.channel is logger1
+
+ with outer_handler:
+ with handler.threadbound(filter=only_1, bubble=False):
+ logger1.warn('foo')
+ logger2.warn('bar')
+
+ self.assert_(handler.has_warning('foo', logger_name='Logger1'))
+ self.assert_(not handler.has_warning('bar', logger_name='Logger2'))
+ self.assert_(not outer_handler.has_warning('foo', logger_name='Logger1'))
+ self.assert_(outer_handler.has_warning('bar', logger_name='Logger2'))
+
class AttributeTestCase(LogbookTestCase):
Please sign in to comment.
Something went wrong with that request. Please try again.