Skip to content

Commit

Permalink
Add enqueue_with_key method to Queue class
Browse files Browse the repository at this point in the history
  • Loading branch information
joowani committed Dec 13, 2016
1 parent 3275434 commit 423bb56
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 47 deletions.
25 changes: 0 additions & 25 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -102,31 +102,6 @@ Sit back and watch the worker process it in the background:
[INFO] Job 1b92xle0 returned: (1, 2, 3)
Enqueue a job to be processed in-order with other jobs with a particular key:

.. code-block:: python
# Import the blocking function
from my_module import my_func
# Initialize a queue
from kq import Queue
q = Queue()
# Enqueue the function call as Job
import uuid, time
from kq import Job
job = Job(
str(uuid.uuid4()),
timestamp=int(time.time()),
func=my_func,
args=(1, 2),
kwargs={'baz': 3},
key="task_category_1"
)
q.enqueue(job)
Check out the full documentation_ for more details!

.. _documentation: http://kq.readthedocs.io/en/master/
Expand Down
5 changes: 3 additions & 2 deletions docs/callback.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ Here is a trivial example:
logger.info('In topic: {}'.format(job.topic))
logger.info('Function: {}'.format(job.func))
logger.info('Arguments {}'.format(job.args))
logger.info('Keyword arguments {}'.format(job.kwargs))
logger.info('Timeout threshold {}'.format(job.timeout))
logger.info('Keyword arguments: {}'.format(job.kwargs))
logger.info('Timeout threshold: {}'.format(job.timeout))
logger.info('Job message key: {}'.format(job.key))
if status == 'success':
logger.info('The job returned: {}'.format(result))
Expand Down
3 changes: 2 additions & 1 deletion docs/job.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ KQ encapsulates jobs using namedtuples_. The definition is as follows:
'func', # Job function/callable
'args', # Job function arguments
'kwargs', # Job function keyword arguments
'timeout' # Job timeout threshold in seconds
'timeout', # Job timeout threshold in seconds
'key' # Jobs w/ the same keys end up in the same partition
]
)
Expand Down
5 changes: 2 additions & 3 deletions kq/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from collections import namedtuple


# Named tuple which encapsulates a KQ job
# Namedtuple which encapsulates a KQ job
Job = namedtuple(
typename='Job',
field_names=[
Expand All @@ -14,8 +14,7 @@
'args', # Job function arguments
'kwargs', # Job function keyword arguments
'timeout', # Job timeout threshold in seconds
'key' # Jobs of the same key end up in same partition
'key' # Jobs w/ the same keys end up in the same partition
]
)
# Make 'key' None by defauly to support older Jobs
Job.__new__.__defaults__ = (None,)
73 changes: 64 additions & 9 deletions kq/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def timeout(self):
return self._timeout

def enqueue(self, obj, *args, **kwargs):
"""Serialize the function call and place it in the Kafka topic.
"""Place the function call (or the job) in the Kafka topic.
For example:
Expand All @@ -198,26 +198,81 @@ def enqueue(self, obj, *args, **kwargs):
:param kwargs: Keyword arguments for the function. Ignored if a KQ
job instance is given as the first argument instead.
:type kwargs: dict
:param key: Queue the job with a key. Jobs queued with a specific key
are processed in order they were queued. Setting it to None (default)
disables this behaviour.
:type key: str | unicode
:return: The job that was enqueued
:rtype: kq.job.Job
"""
key = None
if isinstance(obj, Job):
func = obj.func
args = obj.args
kwargs = obj.kwargs
key = obj.key
else:
func = obj
key = None

if not callable(func):
raise ValueError(
'{} is not a callable'.format(func)
)
raise ValueError('{} is not a callable'.format(func))

job = Job(
id=str(uuid.uuid4()),
timestamp=int(time.time()),
topic=self._topic,
func=func,
args=args,
kwargs=kwargs,
timeout=self._timeout,
key=key
)
self._producer.send(self._topic, dill.dumps(job), key=key)
self._logger.info('Enqueued: {}'.format(job))
return job

def enqueue_with_key(self, key, obj, *args, **kwargs):
"""Place the function call (or the job) in the Kafka topic with key.
For example:
.. code-block:: python
import requests
from kq import Queue
q = Queue()
url = 'https://www.google.com'
# You can queue the function call with its arguments
job = q.enqueue_with_key('my_key', requests.get, url)
# Or you can queue a kq.job.Job instance directly
q.enqueue_with_key('my_key', job)
:param key: The key for the Kafka message. Jobs with the same key are
guaranteed to be placed in the same Kafka partition and processed
sequentially. If a job object is enqueued, its key is overwritten.
:type key: str
:param obj: Function or the job object to enqueue. If a function is
given, the function *must* be pickle-able.
:type obj: callable | kq.job.Job
:param args: Arguments for the function. Ignored if a KQ job object
is given for the first argument instead.
:type args: list
:param kwargs: Keyword arguments for the function. Ignored if a KQ
job instance is given as the first argument instead.
:type kwargs: dict
:return: The job that was enqueued
:rtype: kq.job.Job
"""
if isinstance(obj, Job):
func = obj.func
args = obj.args
kwargs = obj.kwargs
else:
func = obj

if not callable(func):
raise ValueError('{} is not a callable'.format(func))

job = Job(
id=str(uuid.uuid4()),
timestamp=int(time.time()),
Expand Down
5 changes: 0 additions & 5 deletions kq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,6 @@ def __init__(self,
def __del__(self):
"""Commit the Kafka consumer offsets and close the consumer."""
if hasattr(self, '_consumer'):
try:
self._logger.info('Committing offsets ...')
self._consumer.commit()
except Exception as e: # pragma: no cover
self._logger.warning('Failed to commit offsets: {}'.format(e))
try:
self._logger.info('Closing consumer ...')
self._consumer.close()
Expand Down
3 changes: 2 additions & 1 deletion tests/test_misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def test_version():


def test_job():
job = Job(1, 2, 3, 4, 5, 6, 7)
job = Job(1, 2, 3, 4, 5, 6, 7, 8)

assert job.id == 1
assert job.timestamp == 2
Expand All @@ -20,6 +20,7 @@ def test_job():
assert job.args == 5
assert job.kwargs == 6
assert job.timeout == 7
assert job.key == 8


def test_func_repr():
Expand Down
74 changes: 73 additions & 1 deletion tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,26 @@ def test_enqueue_call(producer, logger):
logger.info.assert_called_once_with('Enqueued: {}'.format(job))


def test_enqueue_call_with_key(producer, logger):
producer_cls, producer_inst = producer

queue = Queue(hosts='host:7000', topic='foo', timeout=300)
job = queue.enqueue_with_key('bar', success_func, 1, 2, c=[3, 4, 5])

assert isinstance(job, Job)
assert isinstance(job.id, str)
assert isinstance(job.timestamp, int)
assert job.topic == 'foo'
assert job.func == success_func
assert job.args == (1, 2)
assert job.kwargs == {'c': [3, 4, 5]}
assert job.timeout == 300
assert job.key == 'bar'

producer_inst.send.assert_called_with('foo', dill.dumps(job), key='bar')
logger.info.assert_called_once_with('Enqueued: {}'.format(job))


def test_invalid_call(producer, logger):
producer_cls, producer_inst = producer

Expand All @@ -101,6 +121,20 @@ def test_invalid_call(producer, logger):
assert not logger.info.called


def test_invalid_call_with_key(producer, logger):
producer_cls, producer_inst = producer

queue = Queue(hosts='host:7000', topic='foo', timeout=300)

for bad_func in [None, 1, {1, 2}, [1, 2, 3]]:
with pytest.raises(ValueError) as e:
queue.enqueue_with_key('foo', bad_func, 1, 2, a=3)
assert str(e.value) == '{} is not a callable'.format(bad_func)

assert not producer_inst.send.called
assert not logger.info.called


def test_enqueue_job(producer, logger):
producer_cls, producer_inst = producer

Expand All @@ -127,8 +161,46 @@ def test_enqueue_job(producer, logger):
assert new_job.args == [1, 2]
assert new_job.kwargs == {'a': 3}
assert new_job.timeout == 300
assert new_job.key == None

producer_inst.send.assert_called_with(
'foo', dill.dumps(new_job), key=None
)
logger.info.assert_called_once_with('Enqueued: {}'.format(new_job))


def test_enqueue_job_with_key(producer, logger):
producer_cls, producer_inst = producer

queue = Queue(hosts='host:7000', topic='foo', timeout=300)

old_job = Job(
id='2938401',
timestamp=int(time.time()),
topic='bar',
func=failure_func,
args=[1, 2],
kwargs={'a': 3},
timeout=100,
key='bar',
)
new_job = queue.enqueue_with_key('baz', old_job)

assert isinstance(new_job, Job)
assert isinstance(new_job.id, str)
assert isinstance(new_job.timestamp, int)
assert old_job.id != new_job.id
assert old_job.timestamp <= new_job.timestamp
assert new_job.topic == 'foo'
assert new_job.func == failure_func
assert new_job.args == [1, 2]
assert new_job.kwargs == {'a': 3}
assert new_job.timeout == 300
assert new_job.key == 'baz'

producer_inst.send.assert_called_with('foo', dill.dumps(new_job), key=None)
producer_inst.send.assert_called_with(
'foo', dill.dumps(new_job), key='baz'
)
logger.info.assert_called_once_with('Enqueued: {}'.format(new_job))


Expand Down

0 comments on commit 423bb56

Please sign in to comment.