Permalink
Browse files

Initial version of tcelery package

  • Loading branch information...
0 parents commit 981d0f0e6de5c7434b9d854826d310aa52615dec @mher committed May 11, 2012
Showing with 249 additions and 0 deletions.
  1. 0 tcelery/__init__.py
  2. +34 −0 tcelery/__main__.py
  3. +17 −0 tcelery/app.py
  4. +145 −0 tcelery/handlers.py
  5. +28 −0 tcelery/result.py
  6. +25 −0 tcelery/utils.py
No changes.
@@ -0,0 +1,34 @@
+from __future__ import absolute_import
+
+import logging
+
+from tornado import ioloop
+from tornado import httpserver
+from tornado.options import options, define, parse_command_line
+
+from .app import Application
+from .result import PeriodicResultChecker
+
+
+define("port", default=8888, type=bool, help="run on the given port")
+define("blocking", default=False, type=bool, help="enable blocking mode")
+
+
+def main():
+ parse_command_line()
+
+ if options.blocking:
+ result_checker = PeriodicResultChecker()
+ result_checker.start()
+
+ logging.info("Starting http server on port %s..." % options.port)
+ http_server = httpserver.HTTPServer(Application())
+ http_server.listen(options.port)
+ try:
+ ioloop.IOLoop.instance().start()
+ except KeyboardInterrupt:
+ pass
+
+
+if __name__ == "__main__":
+ main()
@@ -0,0 +1,17 @@
+from __future__ import absolute_import
+
+from tornado import web
+from tornado.options import define, options
+
+from . import handlers as _
+from .utils import route
+
+
+define("debug", type=bool, default=False, help="run in debug mode")
+
+
+class Application(web.Application):
+ def __init__(self):
+ handlers = route.get_routes()
+ settings = dict(debug=options.debug)
+ super(Application, self).__init__(handlers, **settings)
@@ -0,0 +1,145 @@
+from __future__ import absolute_import
+from __future__ import with_statement
+
+from datetime import timedelta
+from functools import partial
+
+from tornado import web
+from tornado import ioloop
+from tornado.options import options as tornado_options
+
+from celery.result import AsyncResult
+from celery.task.control import inspect
+from celery.task.control import revoke
+from celery.execute import send_task
+
+from .utils import route, to_json
+
+
+class ApplyHandlerBase(web.RequestHandler):
+ registered_tasks = []
+
+ def get_task_args(self):
+ options = dict(self.request.arguments)
+ args = options.pop('args', [])
+ kwargs = options.pop('kwargs', {})
+ for o, v in options.iteritems():
+ options[o] = v[0]
+ try:
+ options[o] = float(v[0])
+ options[o] = int(v[0])
+ except:
+ pass
+ return args, kwargs, options
+
+ @classmethod
+ def get_registered_tasks(cls):
+ if not cls.registered_tasks:
+ i = inspect()
+ tasks = []
+ for rt in i.registered().itervalues():
+ tasks.extend(rt)
+ cls.registered_tasks = set(tasks)
+
+ return cls.registered_tasks
+
+
+@route('/async-apply/(.*)/')
+class AsyncApplyHandler(ApplyHandlerBase):
+ def post(self, taskname):
+ if taskname not in self.get_registered_tasks():
+ raise web.HTTPError(404)
+
+ args, kwargs, options = self.get_task_args()
+ result = send_task(taskname, args=args, kwargs=kwargs)
+ self.write(to_json({'task-id': result.task_id, 'state': result.state}))
+
+
+@route('/tasks/result/(.*)/')
+class TaskResultHandler(web.RequestHandler):
+ def get(self, task_id):
+ result = AsyncResult(task_id)
+ response = {'task-id': task_id, 'state': result.state}
+ if result.ready():
+ response.update({'result': result.result})
+ self.write(to_json(response))
+
+
+@route('/tasks/revoke/(.*)/')
+class TaskRevokeHandler(web.RequestHandler):
+ def delete(self, task_id):
+ revoke(task_id)
+ self.write(to_json({'task-id': task_id}))
+
+
+@route('/apply/(.*)/')
+class ApplyHandler(ApplyHandlerBase):
+ tasks = {}
+
+ @web.asynchronous
+ def post(self, taskname):
+ if not tornado_options.blocking:
+ raise web.HTTPError(503)
+ if taskname not in self.get_registered_tasks():
+ raise web.HTTPError(404)
+
+ args, kwargs, options = self.get_task_args()
+ timeout = options.pop('timeout', None)
+ result = send_task(taskname, args=args, kwargs=kwargs, **options)
+
+ htimeout = None
+ if timeout:
+ htimeout = ioloop.IOLoop.instance().add_timeout(
+ timedelta(seconds=timeout),
+ partial(ApplyHandler.on_time, result.task_id))
+
+ self.tasks[result.task_id] = (result, self, htimeout)
+
+ @classmethod
+ def on_complete(cls, task_id):
+ result, handler, htimeout = cls.tasks.pop(task_id)
+ response = {'task-id': task_id, 'state': result.state}
+ if result.successful():
+ response.update({'result': result.result})
+ handler.write(to_json(response))
+ if htimeout:
+ ioloop.IOLoop.instance().remove_timeout(htimeout)
+ handler.finish()
+
+ @classmethod
+ def on_time(cls, task_id):
+ result, handler, _ = cls.tasks.pop(task_id)
+ revoke(task_id)
+ handler.write(to_json({'task-id': task_id, 'state': result.state}))
+ handler.finish()
+
+
+@route('/tasks/registered/(.*)')
+class RegisteredTaskHandler(web.RequestHandler):
+ def get(self, host):
+ host = [host] if host else None
+ i = inspect(host)
+ self.write(to_json(i.registered()))
+
+
+@route('/tasks/active/(.*)')
+class ActiveTaskHandler(web.RequestHandler):
+ def get(self, host):
+ host = [host] if host else None
+ i = inspect(host)
+ self.write(to_json(i.active()))
+
+
+@route('/tasks/scheduled/(.*)')
+class ScheduledTaskHandler(web.RequestHandler):
+ def get(self, host):
+ host = [host] if host else None
+ i = inspect(host)
+ self.write(to_json(i.active()))
+
+
+@route('/')
+class MainHandler(web.RequestHandler):
+ def get(self):
+ self.write("Tasks: ")
+ self.write(unicode(ApplyHandler.tasks))
@@ -0,0 +1,28 @@
+from __future__ import absolute_import
+from __future__ import with_statement
+
+from functools import partial
+
+from tornado import ioloop
+
+from .handlers import ApplyHandler
+
+
+class PeriodicResultChecker(object):
+ def __init__(self, io_loop=None, interval=100):
+ self.io_loop = io_loop or ioloop.IOLoop.instance()
+ self.periodic_callback = ioloop.PeriodicCallback(
+ partial(self.on_time, self), interval, self.io_loop)
+
+ def task_complete(self, event):
+ self.io_loop.add_callback(partial(ApplyHandler.on_complete, event))
+
+ def on_time(self, *args):
+ tasks = ApplyHandler.tasks
+ completed = filter(lambda task_id:tasks[task_id][0].ready(), tasks)
+ for task_id in completed:
+ ApplyHandler.on_complete(task_id)
+
+ def start(self):
+ self.periodic_callback.start()
+
@@ -0,0 +1,25 @@
+import json
+import tornado.web
+
+
+to_json = lambda x: json.dumps(x)
+from_json = lambda x: json.loads(x)
+
+
+class route(object):
+ """route decorator from https://github.com/peterbe/tornado-utils"""
+ _routes = []
+
+ def __init__(self, uri, name=None):
+ self._uri = uri
+ self._name = name
+
+ def __call__(self, handler):
+ """gets called when we class decorate"""
+ name = self._name or handler.__name__
+ self._routes.append(tornado.web.url(self._uri, handler, name=name))
+ return handler
+
+ @classmethod
+ def get_routes(cls):
+ return cls._routes

0 comments on commit 981d0f0

Please sign in to comment.