Skip to content

Commit

Permalink
fix handling of subscriptions to topic
Browse files Browse the repository at this point in the history
  • Loading branch information
semiversus committed Jun 15, 2018
1 parent b3750ad commit 5286962
Showing 1 changed file with 22 additions and 26 deletions.
48 changes: 22 additions & 26 deletions broqer/hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,44 +113,40 @@
from broqer import Publisher, Subscriber, SubscriptionDisposable


class TopicDisposable(SubscriptionDisposable):
def __init__(self, topic: 'Topic', subscriber: 'Subscriber') \
-> None:
self._topic = topic
self._subscriber = subscriber

@property
def _publisher(self):
return self._topic._subject

def dispose(self) -> None:
if self._publisher:
self._publisher.unsubscribe(self._subscriber)


class Topic(Publisher, Subscriber):
def __init__(self):
Publisher.__init__(self)
self._subject = None
self._current_subscriber = None

def subscribe(self, subscriber: 'Subscriber') -> SubscriptionDisposable:
disposable = Publisher.subscribe(self, subscriber)

if self._subject is not None:
return self._subject.subscribe(subscriber)
if len(self._subscriptions) > 1:
self._subject.unsubscribe(self)
self._current_subscriber = subscriber
self._subject.subscribe(self)
self._current_subscriber = None

self._subscriptions.add(subscriber)
return TopicDisposable(self, subscriber)
return disposable

def unsubscribe(self, subscriber: 'Subscriber') -> None:
if self._subject is not None:
self._subject.unsubscribe(subscriber)
else:
self._subscriptions.remove(subscriber)
def unsubscribe(self, subscriber: Subscriber) -> None:
Publisher.unsubscribe(self, subscriber)
if not self._subscriptions and self._subject is not None:
self._subject.unsubscribe(self)

def emit(self, *args: Any, who: Optional[Publisher]=None) -> None:
if self._subject is None:
# method will be replaced by .__call__
raise TypeError('No subject is assigned to this Topic')
self._subject.emit(*args, who=who)

if self._current_subscriber and who == self._current_subscriber:
self._current_subscriber.emit(*args, who=self)
elif who == self._subject:
self._emit(*args)
else:
self._subject.emit(*args, who=self)

def __call__(self, *args, **kwargs) -> None:
raise TypeError('Topic is not callable (for use with | operator).' +
Expand Down Expand Up @@ -214,8 +210,8 @@ def assign(self, topic_str: str, publisher: Publisher,
else:
topic._subject = publisher

for subscriber in topic._subscriptions:
publisher.subscribe(subscriber)
if topic._subscriptions:
publisher.subscribe(topic)

if meta:
topic.meta = meta
Expand Down

0 comments on commit 5286962

Please sign in to comment.