Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

added ``async`` option to processor and ``./bin/torque-run`` to run t…

…he default queue in a single process
  • Loading branch information...
commit 341b40ba0bd7cc1e57dbd75b54e454c3fa26838e 1 parent 8805925
@thruflo authored
View
8 README.rst
@@ -16,13 +16,19 @@ webhooks_. This approach allows you to handle tasks within your normal web
application environment by writing request handlers, just as you would to handle
a user initiated request.
-To use it, you need to run three processes:
+To use it, you need to run:
#. a redis_ database
#. ``./bin/torque-serve``, which exposes a Tornado_ application (by default on
``http://localhost:8889``)
#. one ``./bin/torque-process`` per queue
+Or, if you're just using the default queue, you can replace ``./bin/torque-serve`` and ``./bin/torque-process`` with:
+
+#. ``./bin/torque-run``
+
+Which runs the web application and the process loop in seperate threads.
+
You can process queues ad infinitum, or until they are empty. See
``torque.process.QueueProcessor.__doc__`` for the details.
View
5 setup.py
@@ -2,7 +2,7 @@
setup(
name = 'torque',
- version = '0.4.2',
+ version = '0.4.3',
description = 'A web hook task queue based on tornado and redis',
long_description = open('README.rst').read(),
author = 'James Arthur',
@@ -35,7 +35,8 @@
],
'console_scripts': [
'torque-serve = torque.webapp:main',
- 'torque-process = torque.processor:main'
+ 'torque-process = torque.processor:main',
+ 'torque-run = torque.run:main'
]
}
)
View
56 src/torque/processor.py
@@ -11,6 +11,8 @@
import time
import urllib2
+from threading import Thread
+
from tornado import options as tornado_options
from tornado.options import define, options
from tornado.escape import json_decode, json_encode
@@ -28,7 +30,7 @@
# are we processing a queue ad infinitum, or just once until empty?
define(
'finish_on_empty', default=False, type=bool,
- help='should ``QueueProcessor.process`` finish once the queue is empty?'
+ help='should ``QueueProcessor._run`` finish once the queue is empty?'
)
# config specifying how to deal with erroring tasks (these defaults
@@ -69,7 +71,7 @@ class QueueProcessor(object):
"""Takes a range if config and processes a queue.
You can use it in two ways. You can process a queue ad infinitum
- using ``QueueProcessor().process(finish_on_empty=False)``. Or you
+ using ``QueueProcessor(finish_on_empty=False).start()``. Or you
can process a queue until it's empty::
>>> from client import add_task, clear_queue, count_tasks
@@ -80,8 +82,8 @@ class QueueProcessor(object):
...
>>> count_tasks()
26
- >>> qp = QueueProcessor()
- >>> qp.process(finish_on_empty=True)
+ >>> qp = QueueProcessor(finish_on_empty=True)
+ >>> qp.start()
True
>>> count_tasks()
0
@@ -98,8 +100,8 @@ class QueueProcessor(object):
>>> t = add_task(non_existant_url) # will error when executed
>>> count_tasks()
1
- >>> qp = QueueProcessor()
- >>> qp.process(finish_on_empty=True)
+ >>> qp = QueueProcessor(finish_on_empty=True)
+ >>> qp.start()
True
>>> count_tasks()
0
@@ -114,7 +116,8 @@ def __init__(
self, server_address=None, queue_name=None, limit=None,
max_task_errors=None, max_task_delay=None, min_delay=None,
error_multiplier=None, empty_multiplier=None,
- max_empty_delay=None, max_error_delay=None
+ max_empty_delay=None, max_error_delay=None,
+ finish_on_empty=None
):
self.server_address = server_address and server_address or options.server_address
self.queue_name = queue_name and queue_name or options.queue_name
@@ -131,6 +134,11 @@ def __init__(
or options.max_empty_delay
self.max_error_delay = max_error_delay and max_error_delay \
or options.max_error_delay
+ if finish_on_empty is not None:
+ self.finish_on_empty = finish_on_empty
+ else:
+ self.finish_on_empty = options.finish_on_empty
+ self.running = True
@@ -162,17 +170,16 @@ def _dispatch(self, url, params={}, decode=True):
return response, status
- def process(self, finish_on_empty=False):
- finish_on_empty = finish_on_empty and finish_on_empty or options.finish_on_empty
+ def _run(self):
backoff = self.min_delay
url = u'%s/concurrent_executer' % self.server_address
params = {
'queue_name': self.queue_name,
'limit': self.limit,
- 'check_pending': finish_on_empty
+ 'check_pending': self.finish_on_empty
}
- while True:
- logging.info('.')
+ while self.running:
+ logging.debug('.')
response, status = self._dispatch(url=url, params=params)
# first process the tasks
# then deal with the backoff
@@ -198,13 +205,13 @@ def process(self, finish_on_empty=False):
if backoff < self.min_delay:
backoff = self.min_delay
elif status == 204 or status == 205:
- if status == 205 and finish_on_empty:
+ if status == 205 and self.finish_on_empty:
return True
backoff = backoff * self.empty_multiplier
if backoff > self.max_empty_delay:
backoff = self.max_empty_delay
else: # there was an unexpected error
- if finish_on_empty:
+ if self.finish_on_empty:
return False
backoff = backoff * self.error_multiplier
if backoff > self.max_error_delay:
@@ -213,6 +220,25 @@ def process(self, finish_on_empty=False):
+ def start(self, async=False):
+ self.running = True
+ if async:
+ self.thread = Thread(target=self._run)
+ self.thread.start()
+ else:
+ return self._run()
+
+
+ def stop(self):
+ self.running = False
+ while True:
+ self.thread.join(2.0)
+ if not self.thread.isAlive():
+ break
+
+
+
+
def main():
@@ -221,7 +247,7 @@ def main():
# parse the command line options
tornado_options.parse_command_line()
# process the queue
- success = QueueProcessor().process()
+ QueueProcessor().start()
# if there is one, report the result
logging.info(success and 'processed successfully' or 'processing failed')
View
30 src/torque/run.py
@@ -0,0 +1,30 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""Runs the webapp and the process in seperate threads of
+ a single process.
+"""
+
+import logging
+
+from tornado import options as tornado_options
+
+from processor import QueueProcessor
+from webapp import serve
+from utils import do_nothing
+
+def main():
+ # hack around an OSX error
+ tornado_options.enable_pretty_logging = do_nothing
+ # parse the command line options
+ tornado_options.parse_command_line()
+ # start the queue processor
+ qp = QueueProcessor()
+ qp.start(async=True)
+ try: # serve the webapp
+ serve()
+ except KeyboardInterrupt, err:
+ qp.stop()
+ raise err
+
+
View
21 src/torque/webapp.py
@@ -99,7 +99,10 @@ def post(self):
len_tasks = len(tasks)
if len_tasks == 0:
self.set_status(204)
- if self.get_argument('check_pending', False):
+ check_pending = self.get_argument('check_pending')
+ if not isinstance(check_pending, bool):
+ check_pending = eval(check_pending)
+ if check_pending:
kwargs = queue_name and {'queue_name': queue_name} or {}
if count_tasks(**kwargs) < 1:
self.set_status(205)
@@ -150,11 +153,7 @@ def _handle_response(self, response, task_id):
)
]
-def main():
- # hack around an OSX error
- tornado_options.enable_pretty_logging = do_nothing
- # parse the command line options
- tornado_options.parse_command_line()
+def serve():
# create the web application
application = web.Application(mapping, debug=options.debug)
# start the http server, forking one process per cpu
@@ -163,6 +162,16 @@ def main():
http_server.start()
# start the async ioloop
ioloop.IOLoop.instance().start()
+
+
+
+def main():
+ # hack around an OSX error
+ tornado_options.enable_pretty_logging = do_nothing
+ # parse the command line options
+ tornado_options.parse_command_line()
+ # serve the webapp
+ serve()
if __name__ == "__main__":
Please sign in to comment.
Something went wrong with that request. Please try again.