Skip to content

Commit

Permalink
Catch MessageEmptyException and return counter for Consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
Bibob7 committed Jul 22, 2019
1 parent a3dfbb9 commit 1bd1c77
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 15 deletions.
20 changes: 7 additions & 13 deletions esque/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from esque.avromessage import AvroFileReader, AvroFileWriter
from esque.config import Config
from esque.errors import raise_for_kafka_error, raise_for_message
from esque.errors import raise_for_kafka_error, raise_for_message, MessageEmptyException
from esque.helpers import delivery_callback, delta_t
from esque.message import KafkaMessage, PlainTextFileReader, PlainTextFileWriter, FileReader, FileWriter
from esque.schemaregistry import SchemaRegistryClient
Expand Down Expand Up @@ -50,16 +50,9 @@ def consume(self, amount: int) -> int:
pass

def _consume_single_message(self, timeout=10) -> Optional[Message]:
poll_limit = 10
counter = 0
while counter < poll_limit:
message = self._consumer.poll(timeout=timeout)
if message is None:
counter += 1
continue
if message.error() is not None:
raise_for_message(message)
return message
message = self._consumer.poll(timeout=timeout)
raise_for_message(message)
return message


class PingConsumer(AbstractConsumer):
Expand Down Expand Up @@ -88,8 +81,9 @@ def consume(self, amount: int) -> int:
file_writers = {}
with ExitStack() as stack:
while counter < amount:
message = self._consume_single_message()
if message is None:
try:
message = self._consume_single_message()
except MessageEmptyException:
return counter

if message.partition() not in file_writers:
Expand Down
5 changes: 3 additions & 2 deletions esque/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def raise_for_kafka_error(err: KafkaError):

def raise_for_message(message: Message):
if message is None:
raise MessageEmptyException(-1, message)
raise MessageEmptyException
elif message.error() is not None:
raise_for_kafka_error(message.error())

Expand All @@ -49,7 +49,8 @@ class ContextNotDefinedException(Exception):


class MessageEmptyException(KafkaException):
pass
def __init__(self):
super().__init__(-185, None)


class TopicAlreadyExistsException(KafkaException):
Expand Down

0 comments on commit 1bd1c77

Please sign in to comment.