Skip to content

Commit

Permalink
Commit the consumer offset after reading each record
Browse files Browse the repository at this point in the history
  • Loading branch information
joowani committed Dec 4, 2016
1 parent 03817ee commit 5f89dd3
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 15 deletions.
10 changes: 5 additions & 5 deletions README.rst
Expand Up @@ -31,9 +31,9 @@ KQ: Kafka-based Job Queue for Python

|
KQ (Kafka Queue) is a light-weight Python library which provides a simple API
to queue and process jobs asynchronously in the background. It is backed by
`Apache Kafka`_ and designed primarily for ease of use.
KQ (Kafka Queue) is a lightweight Python library which provides a simple API
to process jobs asynchronously in the background. It uses `Apache Kafka`_ and
is designed primarily for ease of use.

.. _Apache Kafka: https://kafka.apache.org

Expand Down Expand Up @@ -73,7 +73,7 @@ Start a KQ worker:
.. code-block:: bash
~$ kq worker --verbose
[INFO] Starting Worker(topic=foobar) ...
[INFO] Starting Worker(topic=default) ...
Enqueue the function call as a job:
Expand All @@ -91,7 +91,7 @@ Enqueue the function call as a job:
q.enqueue(my_func, 1, 2, baz=3)
Sit back and watch the worker process the job in the background:
Sit back and watch the worker process it in the background:

.. code-block:: bash
Expand Down
4 changes: 2 additions & 2 deletions docs/overview.rst
Expand Up @@ -26,7 +26,7 @@ Start a KQ worker:
.. code-block:: bash
~$ kq worker --verbose
[INFO] Starting Worker(topic=foobar) ...
[INFO] Starting Worker(topic=default) ...
Enqueue the function call as a job:
Expand All @@ -44,7 +44,7 @@ Enqueue the function call as a job:
q.enqueue(my_func, 1, 2, baz=3)
Sit back and watch the worker process the job in the background:
Sit back and watch the worker process it in the background:

.. code-block:: bash
Expand Down
2 changes: 1 addition & 1 deletion kq/manager.py
Expand Up @@ -7,7 +7,7 @@
class Manager(object):
"""KQ manager.
Here is an example of initializing a manager:
Here is an example of initializing and using a manager:
.. code-block:: python
Expand Down
4 changes: 2 additions & 2 deletions kq/queue.py
Expand Up @@ -15,8 +15,8 @@ class Queue(object):
"""KQ queue.
A queue serializes incoming function calls and places them into a Kafka
topic as *jobs*. Workers fetch these jobs and process them asynchronously
in the background. Here is an example of using a queue:
topic as *jobs*. Workers fetch these jobs and execute them asynchronously
in the background. Here is an example of initializing and using a queue:
.. code-block:: python
Expand Down
2 changes: 1 addition & 1 deletion kq/version.py
@@ -1 +1 @@
VERSION = '1.0'
VERSION = '1.0.1'
7 changes: 4 additions & 3 deletions kq/worker.py
Expand Up @@ -15,8 +15,8 @@
class Worker(object):
"""KQ worker.
A worker fetches jobs from a Kafka broker, de-serializes them, and
processes them asynchronously in the background. Here is an example
A worker fetches jobs from a Kafka broker, de-serializes them and
executes them asynchronously in the background. Here is an example
of initializing and starting a worker:
.. code-block:: python
Expand Down Expand Up @@ -117,7 +117,7 @@ def __init__(self,
ssl_keyfile=keyfile,
ssl_crlfile=crlfile,
consumer_timeout_ms=-1,
enable_auto_commit=True,
enable_auto_commit=False,
auto_offset_reset='latest',
)

Expand Down Expand Up @@ -258,6 +258,7 @@ def start(self):
try:
for record in self._consumer:
self._consume_record(record)
self._consumer.commit()
except KeyboardInterrupt: # pragma: no cover
self._logger.info('Stopping {} ...'.format(self))
self._pool.terminate() # TODO not sure if necessary
2 changes: 1 addition & 1 deletion tests/test_worker.py
Expand Up @@ -115,7 +115,7 @@ def test_init(logger, callback):
ssl_keyfile='/test/files/keyfile',
ssl_crlfile='/test/files/crlfile',
consumer_timeout_ms=-1,
enable_auto_commit=True,
enable_auto_commit=False,
auto_offset_reset='latest',
)
assert repr(worker) == 'Worker(topic=foo)'
Expand Down

0 comments on commit 5f89dd3

Please sign in to comment.