Skip to content

Commit

Permalink
additions in hub.py
Browse files Browse the repository at this point in the history
  • Loading branch information
semiversus committed Jun 7, 2018
1 parent 0e85643 commit 2cf99c1
Showing 1 changed file with 34 additions and 28 deletions.
62 changes: 34 additions & 28 deletions broqer/hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
Assign a publisher to a hub topic:
>>> _ = op.Just(1) | hub.publish('value1')
>>> _ = hub.assign(op.Just(1), 'value1')
Output: 1
>>> hub['value1'].assigned
True
Expand All @@ -46,13 +46,13 @@
>>> _ = op.Just(1) | hub['value2']
Traceback (most recent call last):
...
TypeError: Topic is not callable (for use as operator). ...
TypeError: Topic is not callable (for use with | operator). ...
>>> _d1.dispose()
Also assigning publisher first and then subscribing is possible:
>>> _ = Value(2) | hub.publish('value2')
>>> _ = hub.assign(Value(2), 'value2')
>>> _d2 = hub['value2'] | op.sink(print, 'Output:')
Output: 2
Expand All @@ -63,7 +63,7 @@
It's not possible to assign a second publisher to a hub topic:
>>> _ = Value(0) | hub.publish('value2')
>>> _ = hub.assign(Value(0), 'value2')
Traceback (most recent call last):
...
ValueError: Topic is already assigned
Expand All @@ -73,7 +73,7 @@
Another feature is defining meta data as dictionary to a hub topic:
>>> _ = Value(0) | hub.publish('value3', meta={'maximum':10})
>>> _ = hub.assign(Value(0), 'value3', meta={'maximum':10})
>>> hub['value3'].meta
{'maximum': 10}
Expand All @@ -93,7 +93,7 @@
>>> _f2.done()
False
>>> _ = Value(0) | hub.publish('value4')
>>> _ = hub.assign(Value(0), 'value4')
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.01))
>>> _f2.done()
True
Expand All @@ -107,6 +107,7 @@
"""
import asyncio
from collections import defaultdict
from types import MappingProxyType
from typing import Any, Callable, Optional

from broqer import Publisher, Subscriber, SubscriptionDisposable
Expand Down Expand Up @@ -139,8 +140,8 @@ def emit(self, *args: Any, who: Optional[Publisher]=None) -> None:
self._subject.emit(*args)

def __call__(self, *args, **kwargs) -> None:
raise TypeError('Topic is not callable (for use as operator).' +
' Use "| hub.publish(topic, meta)" instead.')
raise TypeError('Topic is not callable (for use with | operator).' +
' Use "hub.assign(publisher, topic, [meta])" instead.')

@property
def assigned(self):
Expand All @@ -164,33 +165,38 @@ def meta(self):

class Hub:
def __init__(self):
self._proxies = defaultdict(Topic)
self._topics = defaultdict(Topic)

def __getitem__(self, topic: str) -> Topic:
return self._proxies[topic]
return self._topics[topic]

def __contains__(self, topic: str) -> bool:
return topic in self._proxies
return topic in self._topics

def __iter__(self):
return sorted(self._topics.keys()).__iter__()

@property
def topics(self):
return MappingProxyType(self._topics)

def publish(self, topic: str, meta: Optional[dict]=None) \
-> Callable[[Publisher], Publisher]:
proxy = self[topic]
def assign(self, publisher:Publisher, topic: str,
meta: Optional[dict]=None) -> None:

topic = self[topic]

def _build(publisher):
# used for pipeline style assignment
if proxy._subject is not None:
raise ValueError('Topic is already assigned')
else:
proxy._subject = publisher
if topic._subject is not None:
raise ValueError('Topic is already assigned')
else:
topic._subject = publisher

if len(proxy._subscriptions):
proxy._subject.subscribe(proxy)
if len(topic._subscriptions):
topic._subject.subscribe(topic)

if meta:
proxy._meta = meta
if meta:
topic._meta = meta

if hasattr(proxy, '_assignment_future'):
proxy._assignment_future.set_result(None)
if hasattr(topic, '_assignment_future'):
topic._assignment_future.set_result(None)

return proxy
return _build
return topic

0 comments on commit 2cf99c1

Please sign in to comment.