Skip to content

Commit

Permalink
made op.Trace actually usefull
Browse files Browse the repository at this point in the history
  • Loading branch information
semiversus committed Sep 30, 2018
1 parent cb9aea9 commit 59409b1
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 6 deletions.
4 changes: 2 additions & 2 deletions broqer/default_error_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ def _default_error_callback(exc_type, exc_value, exc_traceback):


class DefaultErrorHandler:
""" DefaultErrorHandler object is a callable which is calling a registred
""" DefaultErrorHandler object is a callable which is calling a registered
callback and is used for handling exceptions when asynchronous operators
receiving an exception during .emit(). The callback can be registred via
receiving an exception during .emit(). The callback can be registered via
the .set(callback) method. The default callback is _default_error_callback
which is dumping the traceback of the exception.
"""
Expand Down
3 changes: 3 additions & 0 deletions broqer/hub/hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ def hub(self) -> 'Hub':
""" Reference to hub """
return self._hub

def __repr__(self):
return 'Topic(%r)' % self._path


class MetaTopic(Topic):
""" MetaTopic is adding a meta dictionary to each topic """
Expand Down
33 changes: 32 additions & 1 deletion broqer/op/subscribers/trace.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
""" Implements Trace subscriber """
from time import time
from typing import Any, Callable, Optional

from broqer import Publisher, SubscriptionDisposable
from broqer.op import Sink

Expand All @@ -9,8 +12,36 @@ class Trace(Sink):
publisher of interest is emitting.
:param callback: optional function to call
:param \\*args: arguments used additionally when calling callback
:param \\*\\*kwargs: keyword arguments used when calling callback
:param unpack: value from emits will be unpacked (\\*value)
:param label: string to be used on output
:param \\*\\*kwargs: keyword arguments used when calling callback
"""
def __init__(self, # pylint: disable=keyword-arg-before-vararg
function: Optional[Callable[..., None]] = None,
*args, unpack=False, label=None, **kwargs) -> None:
Sink.__init__(self, function, *args, unpack=unpack, **kwargs)
self._label = label

def emit(self, value: Any, who: Publisher):
self._trace_handler(who, value, label=self._label)
Sink.emit(self, value, who=who)

def __ror__(self, publisher: Publisher) -> SubscriptionDisposable:
return publisher.subscribe(self, prepend=True)

@classmethod
def set_handler(cls, handler):
""" Setting the handler for tracing information """
cls._trace_handler = handler

_timestamp_start = time()

@staticmethod
def _trace_handler(publisher, value, label=None):
""" Default trace handler is printing the timestamp, the publisher name
and the emitted value
"""
line = '--- %8.3f: ' % (time() - Trace._timestamp_start)
line += repr(publisher) if label is None else label
line += ' %r' % (value,)
print(line)
7 changes: 4 additions & 3 deletions broqer/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ class SubscriptionError(ValueError):


class Publisher():
""" In broqer a subscriber can subscribe to a Publisher. Subscribers
are notified about emitted values from the publisher. In other frameworks
publisher/subscriber are referenced as observable/observer.
""" In broqer a subscriber can subscribe to a publisher. After subscription
the subscriber is notified about emitted values from the publisher. In
other frameworks *publisher*/*subscriber* are referenced as
*observable*/*observer*.
As information receiver use following method to interact with Publisher
Expand Down
43 changes: 43 additions & 0 deletions test/test_op_trace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from unittest import mock
import pytest

from broqer import Publisher, Hub, Subject
from broqer.op import Cache, Trace
from broqer.subject import Subject

@pytest.mark.parametrize('label', [None, 'foo'])
def test_trace(label, capsys):
p = Publisher()
if label is None:
p | Trace()
else:
p | Trace(label=label)

captured = capsys.readouterr()
assert captured.out == ''

p.notify(3)
captured = capsys.readouterr()
assert '--- ' in captured.out
assert ' 3'
assert len(captured.out.splitlines()) == 1

handler = mock.Mock()
old_handler = Trace._trace_handler
Trace.set_handler(handler)

p.notify(4)

handler.assert_called_once_with(p, 4, label=label)
Trace._trace_handler = staticmethod(old_handler) # restore handler

def test_trace_topic(capsys):
hub = Hub()
hub['foo'].assign(Subject())

mock_handler = mock.Mock()
hub['foo'] | Trace(mock_handler)

hub['foo'].emit('test')
captured = capsys.readouterr()
assert "Topic('foo') 'test'" in captured.out

0 comments on commit 59409b1

Please sign in to comment.