Skip to content

Commit

Permalink
kafka client update cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
jadbin committed Nov 17, 2016
1 parent c3a0ecc commit ba22bee
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 2 deletions.
3 changes: 3 additions & 0 deletions tests/test_unikafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ def __init__(self, *args, **kw):
def topics(self):
return self._topic_dict

def update_cluster(self):
pass


@pytest.fixture(scope="function")
def topic_consumer(request, monkeypatch):
Expand Down
1 change: 1 addition & 0 deletions xpaw/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ def push_request(self, topic, req):
with self._lock:
if topic not in self._set:
self._set.add(topic)
self._kafka_client.update_cluster()
self._producers[topic] = self._kafka_client.topics[topic.encode("utf-8")].get_producer()
self._producers[topic].produce(r)

Expand Down
3 changes: 2 additions & 1 deletion xpaw/unikafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ async def _poll_forever(self):
except ConsumerStoppedException:
log.warn("Consumer of topic '{0}' stopped".format(t))
self._remove_consumer(t)
self._kafka_client.update_cluster()
self._create_consumer(t)
log.info("Reset consumer of topic '{0}'".format(t))
msg = self._consumers[t].consume(block=True)
Expand All @@ -157,6 +156,7 @@ async def _poll_forever(self):
def _create_consumer(self, topic):
q = deque(maxlen=self._queue_size)
self._mq[topic] = q
self._kafka_client.update_cluster()
self._consumers[topic] = self._kafka_client.topics[topic.encode("utf-8")].get_balanced_consumer(
consumer_group=self._group.encode("utf-8"),
auto_commit_enable=True,
Expand All @@ -168,6 +168,7 @@ def _remove_consumer(self, topic):
if len(q) > 0:
producer = None
try:
self._kafka_client.update_cluster()
producer = self._kafka_client.topics[topic.encode("utf-8")].get_producer(linger_ms=100)
while len(q) > 0:
b = q.popleft()
Expand Down
2 changes: 1 addition & 1 deletion xpaw/version.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# coding=utf-8

__version__ = "0.5.5a7"
__version__ = "0.5.5a8"

0 comments on commit ba22bee

Please sign in to comment.