diff --git a/kq/worker.py b/kq/worker.py index 4743f92..f1e0a86 100644 --- a/kq/worker.py +++ b/kq/worker.py @@ -121,14 +121,20 @@ def __init__(self, auto_offset_reset='latest', ) + self._task_finished = False + def __del__(self): - """Commit the Kafka consumer offsets and close the consumer.""" + """Commit the Kafka consumer offsets and close the consumer. + ... But only when the task has finished. This is to keep + ourselves consistent with Kafka's at-least once policy. + """ if hasattr(self, '_consumer'): - try: - self._logger.info('Committing offsets ...') - self._consumer.commit() - except Exception as e: # pragma: no cover - self._logger.warning('Failed to commit offsets: {}'.format(e)) + if self._task_finished: + try: + self._logger.info('Committing offsets ...') + self._consumer.commit() + except Exception as e: # pragma: no cover + self._logger.warning('Failed to commit offsets: {}'.format(e)) try: self._logger.info('Closing consumer ...') self._consumer.close() @@ -258,7 +264,9 @@ def start(self): try: for record in self._consumer: self._consume_record(record) + self._task_finished = True self._consumer.commit() + self._task_finished = False # For next task except KeyboardInterrupt: # pragma: no cover self._logger.info('Stopping {} ...'.format(self)) self._pool.terminate() # TODO not sure if necessary