Skip to content

Commit

Permalink
added .unassigned_topics property to hub
Browse files Browse the repository at this point in the history
  • Loading branch information
semiversus committed Jun 7, 2018
1 parent a537aa5 commit 98a55e5
Showing 1 changed file with 22 additions and 12 deletions.
34 changes: 22 additions & 12 deletions broqer/hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
import asyncio
from collections import defaultdict, OrderedDict
from types import MappingProxyType
from typing import Any, Callable, Optional
from typing import Any, Optional

from broqer import Publisher, Subscriber, SubscriptionDisposable

Expand Down Expand Up @@ -159,9 +159,13 @@ async def wait_for_assignment(self, timeout=None):
asyncio.shield(self._assignment_future), timeout)

@property
def meta(self):
def meta(self) -> dict:
return getattr(self, '_meta', None)

@meta.setter
def meta(self, meta_dict: dict):
self._meta = meta_dict


class Hub:
def __init__(self):
Expand All @@ -172,19 +176,25 @@ def __getitem__(self, topic: str) -> Topic:

def __contains__(self, topic: str) -> bool:
return topic in self._topics

def __iter__(self):
return sorted(self._topics.keys()).__iter__()

@property
def topics(self):
topics_sorted = OrderedDict(sorted(self._topics.items(), key=lambda t:t[0]))
return MappingProxyType(topics_sorted)
topics_sorted = sorted(self._topics.items(), key=lambda t: t[0])
return MappingProxyType(OrderedDict(topics_sorted))

@property
def unassigned_topics(self):
topics_sorted = sorted(self._topics.items(), key=lambda t: t[0])
result_topics = ((n, t) for n, t in topics_sorted if not t.assigned)
return MappingProxyType(OrderedDict(result_topics))

def assign(self, publisher: Publisher, topic_str: str,
meta: Optional[dict]=None) -> Topic:

def assign(self, publisher:Publisher, topic: str,
meta: Optional[dict]=None) -> None:

topic = self[topic]
topic = self[topic_str]

if topic._subject is not None:
raise ValueError('Topic is already assigned')
Expand All @@ -195,9 +205,9 @@ def assign(self, publisher:Publisher, topic: str,
topic._subject.subscribe(topic)

if meta:
topic._meta = meta
topic.meta = meta

if hasattr(topic, '_assignment_future'):
topic._assignment_future.set_result(None)

return topic
return topic

0 comments on commit 98a55e5

Please sign in to comment.