Skip to content

Commit

Permalink
Dont commit on shutdown if task hasnt finished
Browse files Browse the repository at this point in the history
  • Loading branch information
Shashank Mehra committed Dec 7, 2016
1 parent 5f89dd3 commit 4829853
Showing 1 changed file with 14 additions and 6 deletions.
20 changes: 14 additions & 6 deletions kq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,20 @@ def __init__(self,
auto_offset_reset='latest',
)

self._task_finished = False

def __del__(self):
"""Commit the Kafka consumer offsets and close the consumer."""
"""Commit the Kafka consumer offsets and close the consumer.
... But only when the task has finished. This is to keep
ourselves consistent with Kafka's at-least once policy.
"""
if hasattr(self, '_consumer'):
try:
self._logger.info('Committing offsets ...')
self._consumer.commit()
except Exception as e: # pragma: no cover
self._logger.warning('Failed to commit offsets: {}'.format(e))
if self._task_finished:
try:
self._logger.info('Committing offsets ...')
self._consumer.commit()
except Exception as e: # pragma: no cover
self._logger.warning('Failed to commit offsets: {}'.format(e))
try:
self._logger.info('Closing consumer ...')
self._consumer.close()
Expand Down Expand Up @@ -258,7 +264,9 @@ def start(self):
try:
for record in self._consumer:
self._consume_record(record)
self._task_finished = True
self._consumer.commit()
self._task_finished = False # For next task
except KeyboardInterrupt: # pragma: no cover
self._logger.info('Stopping {} ...'.format(self))
self._pool.terminate() # TODO not sure if necessary

0 comments on commit 4829853

Please sign in to comment.