Skip to content

Commit

Permalink
handle kafka exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
Matthew Harris committed Sep 28, 2017
1 parent 837c469 commit 259861d
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 5 deletions.
Binary file added dist/eventify-0.5.1.tar.gz
Binary file not shown.
11 changes: 8 additions & 3 deletions eventify/drivers/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,23 @@ async def onJoin(self):
# Used with config.json defined topics
if self.subscribed_topics is not None:
consumer = AIOKafkaConsumer(
', '.join(self.subscribed_topics),
bootstrap_servers=self.transport_host,
loop=loop,
group_id='my-group'
)
await consumer.start()

# Subscribe to all topics
for topic in self.subscribed_topics:
consumer.subscribe(topic)

try:
async for msg in consumer:
value = msg.value.decode()
await handler_instance.on_event(value)
finally:
await consumer.stop()
except Exception as error:
self.log.error("Consumer error. %s", error)
await asyncio.sleep(0)


if hasattr(handler_instance, 'worker'):
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
setup(
name='eventify',
packages=find_packages(),
version='0.5.0',
version='0.5.1',
description='Event Driven Asynchronous Framework',
author='Matthew Harris',
author_email='matt@x-qa.com',
url='https://github.com/eventifyio/eventify',
download_url='https://github.com/eventifyio/eventify/raw/master/dist/eventify-0.5.0.tar.gz',
download_url='https://github.com/eventifyio/eventify/raw/master/dist/eventify-0.5.1.tar.gz',
keywords=['event', 'event-driven', 'async',
'framework', 'producer', 'consumer'],
classifiers=[],
Expand Down

0 comments on commit 259861d

Please sign in to comment.