Skip to content

Commit

Permalink
Adds data queues, visibility timeouts, etc.
Browse files Browse the repository at this point in the history
  • Loading branch information
josiahcarlson committed Jul 18, 2019
1 parent 62bc337 commit 6d9caa7
Show file tree
Hide file tree
Showing 42 changed files with 27,497 additions and 141 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@
MANIFEST
build/*
dist/*
_build/*
docs/_sources/*
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ ALLSPHINXOPTS = -d $(BUILDDIR)/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) .
# the i18n builder cannot share the environment and doctrees with the others
I18NSPHINXOPTS = $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) .

.PHONY: clean docs test

clean:
-rm -f *.pyc rpqueue/*.pyc README.html MANIFEST
-rm -rf build dist
Expand All @@ -36,11 +38,12 @@ test:
upload:
git tag `cat VERSION`
git push origin --tags
python3.6 setup.py sdist upload
python3.6 setup.py sdist
python3.6 -m twine upload --verbose dist/rpqueue-`cat VERSION`.tar.gz

docs:
python3.6 -c "import rpqueue; open('VERSION', 'w').write(rpqueue.VERSION);"
$(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html
cd _build/html/ && zip -r9 ../../rpqueue_docs.zip * && cd ../../
cp -r $(BUILDDIR)/html/. docs
@echo
@echo "Build finished. The HTML pages are in $(BUILDDIR)/html."
132 changes: 127 additions & 5 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,16 @@ This package intends to offer a priority-based remote task queue solution
using Redis as the transport and persistence layer, and JSON for a common
interchange format.

Semantically, this module implements a 0/1 queue with optional retries. That
is, it attempts to execute every task once. If the task raises an exception,
it will not automatically retry, but you can manually retry the task and
specify the maximum attempts. See the `Retries`_ section below.
Semantically, this module implements a 0/1 or 1+ queue with optional retries.
That is, it attempts to execute every task once by default, or >1 manually, or
>1 automatically with 'visibility timeouts'.

If a 'manual' retry task raises an exception, it will not automatically retry,
but you can manually retry the task and specify the maximum attempts. Similarly,
for tasks with visibility timeouts, if the task rasises an exception or doesn't
complete, it will be retried up to the limit of retries provided.

See the `Retries`_ section below.

Full documentation is available: `https://pythonhosted.org/rpqueue/
<https://pythonhosted.org/rpqueue/>`_
Expand Down Expand Up @@ -49,7 +55,7 @@ Say that you have a module ``usertasks1`` with a task to be executed called

@task
def echo_to_stdout(message):
print message
print(message)

To call the above task, you would use::

Expand Down Expand Up @@ -89,6 +95,41 @@ attempts=2, and fails. The third pass has value=1, attempts=1, fails, and the
retry returns without retrying. The ``attempts`` value is the total number of
attempts, including the first, and all retries.

Automatic retries with vis_timeout
----------------------------------

Included with rpqueue 0.30.0 or later, you can give tasks (and now data queues)
a visibility timeout, which is (per Amazon SQS-style semantics) a time for how
long the task has to execute correctly before being automatically re-entered
into the queue.::

@task(attempts=20, vis_timeout=5, use_dead=False)
def usually_eventually_succeed(**kwargs):
# (4/5)**20 is ~ 0.0115, so call chain fails about 1% of the time
if not random.randrange(5):
return "done!"

time.sleep(6) # fail silently

Deadletter task queue
---------------------

If you would like to know which tasks failed, failed calls can be automatically
entered into a deadletter queue.::

@rpqueue.task(attempts=5, vis_timeout=5, use_dead=True)
def fails_to_dead(**kwargs):
# (4/5)**5 is 0.32768, so call chain fails about 33% of the time
if not random.randrange(5):
return "done!"

time.sleep(6) # fail silently

task_deadletter = rpqueue.Data(rpqueue.DEADLETTER_QUEUE, is_tasks=True)
dead_tasks = task_deadletter.get_data(items=5)


See ``help(rpqueue.Data)`` for more.

Waiting for task execution
==========================
Expand Down Expand Up @@ -133,10 +174,91 @@ stored in Redis, in seconds. All results must be json-encodable.
Additional features
===================

Crontab
-------

Support for cron_tasks using a crontab-like syntax requires the Python crontab
module: http://pypi.python.org/pypi/crontab/ , allowing for::

@cron_task('0 5 tue * *')
def function2():
# Will be executed every Tuesday at 5AM.
pass

Data queues
-----------

Put data in queues, not tasks. I mean, should have probably been here from the
start, but it's here now.

Convenient features:
* 1-1000 data items per read, at your discretion
* ``vis_timeout``
* ``attempts``
* ``use_dead``
* refresh data if you want to keep working on it (we don't identify the reader, so you should use an explicit lock if you want guaranteed exclusivity)

A few examples::

# 0/1 queue
dq = rpqueue.Data('best_effort')
dq.put_data([item1, item2, item3, ...])
items = dq.get_data(2) # {<uuid>: <item>, ...}

# Up to 5 deliveries, with 5 second delay before re-insertion
dq5 = rpqueue.Data('retry_processing', attempts=5, vis_timeout=5)
dq5.put_data([item1, item2, item3, ...])
items = dq5.get_data(2) # {<uuid>: <item>, ...}
items2 = dq5.get_data(2, vis_timeout=20) # override timeout on read
refreshed = set(dq5.refresh_data(items, vis_timeout=7)) # refresh our lock
items = {k:v for k,v in items if k in refreshed}
dq5.done_data(items)
dq5.done_data(items2)

# Up to 1 try with a 5 second delay before insertion into deadletter queue
dqd = rpqueue.Data('retry_processing', attempts=1, vis_timeout=5, use_dead=True)
dqd.put_data([item1, item2, item3, ...])
items = dqd.get_data(2) # {<uuid>: <item>, ...}
items2 = dqd.get_data(2, vis_timeout=20) # override timeout on read
refreshed = set(dqd.refresh_data(items, vis_timeout=7)) # refresh our lock
items = {k:v for k,v in items if k in refreshed}
dqd.done_data(items)
time.sleep(20)
# items2 are now "dead"
dead = rpqueue.Data(rpqueue.DEADLETTER_QUEUE)
dead_items = dead.get_data(2) # these have a different format, see docs!

A longer example closer to what would be seen in practice::

aggregate_queue = rpqueue.Data("aggregate_stats", vis_timeout=30, use_dead=False)

@rpqueue.periodic_task(60)
def aggregate():
# If vis_timeout is not provided, will use the queue default.
# If vis_timeout is <= 0, will act as a 0/1 queue, and later "done data"
# calling is unnecessary.
data = aggregate_queue.get_data(items=100, vis_timeout=5)
# data is a dictionary: {<uuid>: <item>, <uuid>: <item>, ...}
# do something with data
done_with = []
for id, value in data.items():
# do something with value
done_with.append(id)

aggregate_queue.refresh_data(data) # still working!

# You can pass any iterator that naturally iterates over the uuids you
# want to be "done" with.
aggregate_queue.done_data(done_with)
# also okay:
# aggregate_queue.done_data(data)
# aggregate_queue.done_data(tuple(data))
# aggregate_queue.done_data(list(data))

Sponsors
========

Don't like LGPL? Sponsor the project and get almost any license you want.

This project has been partly sponsored by structd.com and hCaptcha.com, both of
whom received licenses that match their needs appropriately.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.27.1
0.30.0
4 changes: 4 additions & 0 deletions _build/html/.buildinfo
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Sphinx build info version 1
# This file hashes the configuration used when building these files. When it is not found, a full rebuild will be done.
config: 707de8ee0a6a8a50aec5d2be3a870f13
tags: 645f666f9bcd5a90fca523b33c5a78b7
19 changes: 19 additions & 0 deletions changelog.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,22 @@
#---------------------------------- 0.30.0 -----------------------------------
[changed] upload switched to using twine
[added] data queues separate from task queues, see help(rpqueue.Data) for help
[changed] docs will now live at Github on Github pages (and will automatically
update)
[added] visibility timeout to tasks (tasks can auto-retry, and self-spew into a
shared deadletter queue)
[added] docs and examples of using data queues, deadletter queues, and
visibility timeouts
[changed] docs are now alphabetically sorted
[fixed] you can now use any version of the Redis client, we auto-detect and use
the right syntax for commands (like setex, zadd, etc.)
[changed] new queue interfaces (data queues and visibility timeouts) use Redis
internal time to keep consistent
[changed] Redis with Lua support is a hard requirement now and going forward.
Data queues and visibility timeouts rely exclusively on them, and are
unusable without them. Stick with 0.27.1 if you don't want to use Lua.
[sponsored] the above changes to rpqueue were sponsored by hCaptcha.com, and
they received a custom license for their sponsorship.
#---------------------------------- 0.27.1 -----------------------------------
[added] thank you to Github user https://github.com/jonathanhle for patch to
expose ssl options to the connection object.
Expand Down
2 changes: 2 additions & 0 deletions conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
# source_suffix = ['.rst', '.md']
source_suffix = '.rst'

autodoc_member_order = 'alphabetical'

# The encoding of source files.
#source_encoding = 'utf-8-sig'

Expand Down
4 changes: 4 additions & 0 deletions docs/.buildinfo
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Sphinx build info version 1
# This file hashes the configuration used when building these files. When it is not found, a full rebuild will be done.
config: 707de8ee0a6a8a50aec5d2be3a870f13
tags: 645f666f9bcd5a90fca523b33c5a78b7
Loading

0 comments on commit 6d9caa7

Please sign in to comment.