Permalink
Browse files

Replaced BaseHTTPServer with web.py's CherryPy threaded web server.

User's tasks are being discovered correctly.
Added proper POST/GET support for 3 views.
  • Loading branch information...
1 parent edcc048 commit 133f672f025d4a9a0657ce55bb0aff95ead9c62d @mallipeddi committed May 14, 2008
Showing with 86 additions and 46 deletions.
  1. +15 −1 base.py
  2. +8 −0 exceptions.py
  3. +2 −2 http/__init__.py
  4. +26 −26 http/server.py
  5. +6 −4 http/views.py
  6. +6 −8 management/commands/invoketaskforce.py
  7. +23 −5 service.py
View
@@ -41,8 +41,22 @@ def _set_progress(self, progress):
self._lock.release()
progress = property(_get_progress, _set_progress)
+ def _get_results(self):
+ if self._lock.acquire():
+ try:
+ return self._results
+ finally:
+ self._lock.release()
+ def _set_results(self, results):
+ if self._lock.acquire():
+ try:
+ self._results = results
+ finally:
+ self._lock.release()
+ results = property(_get_results, _set_results)
+
def _start(self):
- self.run()
+ self._results = self.run()
def run(self):
"Implement this method in your sub-classes. This is where the meat of a Task goes."
View
@@ -5,3 +5,11 @@ class TaskNotComplete(Exception):
class TaskNotFound(Exception):
"The requested tasks was not found."
pass
+
+class TaskTypeNotFound(Exception):
+ "The requested type of task is not recognized."
+ pass
+
+class TaskFailed(Exception):
+ "The requested task failed."
+ pass
View
@@ -1,3 +1,3 @@
-from taskforce.http.server import TaskForceHTTPServer
+from taskforce.http.server import runserver
-__all__ = ('TaskForceHTTPServer', )
+__all__ = ('runserver', )
View
@@ -1,32 +1,32 @@
-from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
-from SocketServer import ThreadingMixIn
-from taskforce.service import TaskForce
+import web
from django.core.urlresolvers import get_resolver
-_resolver = get_resolver('taskforce.http.urls')
+from taskforce.service import TaskForce
+from taskforce.http.views import *
+
force = None
-class TaskForceHTTPServer(ThreadingMixIn, HTTPServer):
- @staticmethod
- def start(address, port):
- global force
- force = TaskForce()
- TaskForceHTTPServer((address, port), TaskForceHTTPRequestHandler).serve_forever()
-
-class TaskForceHTTPRequestHandler(BaseHTTPRequestHandler):
- def __init__(self, *args, **kwargs):
- BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
-
- def do_GET(self):
- view_func, view_args, view_kwargs = _resolver.resolve(self.path)
- try:
- resp = view_func(force, *view_args, **view_kwargs)
- self.send_response(200, 'OK')
- self.end_headers()
- self.wfile.write(resp)
- except Exception, e:
- self.send_response(500)
- self.end_headers()
- self.wfile.write(e)
+urls = (
+ r'^/task/new/$', 'handle_task_new',
+ r'^/task/(?P<id>\w+)/$', 'handle_task_status',
+ r'^/task/(?P<id>\w+)/results/$', 'handle_task_results',
+)
+
+class handle_task_new:
+ def POST(self):
+ i = web.input()
+ print task_new(force, task_name = i.task_name, task_id = i.task_id)
+
+class handle_task_status:
+ def GET(self, id):
+ print task_status(force, id)
+
+class handle_task_results:
+ def POST(self, id):
+ print task_results(force, id)
+def runserver(available_tasks, address):
+ global force
+ force = TaskForce(available_tasks = available_tasks)
+ web.runsimple(web.webapi.wsgifunc(web.webpyfunc(urls, globals())), address)
View
@@ -13,14 +13,16 @@ def new_fn(*args, **kwargs):
})
return new_fn
-def task_new(force):
- t = BaseTask()
- t.id = 'task1'
+@spit_errors
+def task_new(force, task_name, task_id):
+ t = force.create_task(task_name)
+ t.id = task_id
force.add_task(t)
return simplejson.dumps({
- 'id':'task1',
+ 'id':t.id,
})
+@spit_errors
def task_status(force, id):
status = force.get_status(task_id = id)
progress = force.get_progress(task_id = id)
@@ -1,10 +1,11 @@
import sys
+from optparse import make_option
from django.core.management.base import BaseCommand
from django.conf import settings
-from optparse import make_option
-from taskforce.http import TaskForceHTTPServer
+import taskforce
+from taskforce.http import runserver
class Command(BaseCommand):
option_list = BaseCommand.option_list + (
@@ -34,16 +35,13 @@ def handle(self, *args, **options):
# self._log("ERROR - Takes in exactly 1 arg (start|stop).", error=True)
# sys.exit(1)
- import taskforce
+ available_tasks = []
for app_name in settings.INSTALLED_APPS:
- print "Importing %s" % app_name
app_mod = __import__(app_name, {}, {}, ['tasks'])
- #print app_mod
- #print hasattr(app_mod, 'tasks')
if hasattr(app_mod, 'tasks'):
for k in app_mod.tasks.__dict__.values():
if isinstance(k, type) and issubclass(k, taskforce.BaseTask):
- print k
+ available_tasks.append(k)
print "Starting HTTP server..."
- TaskForceHTTPServer.start('127.0.0.1', 9000)
+ runserver(available_tasks, ('127.0.0.1', 9000))
View
@@ -2,7 +2,7 @@
import Queue
import taskforce
-from taskforce.exceptions import TaskNotFound, TaskNotComplete
+from taskforce.exceptions import TaskNotFound, TaskNotComplete, TaskTypeNotFound
class Slave(threading.Thread):
def __init__(self, todo, poll_timeout=5): # TODO - think about the right poll_timeout
@@ -25,6 +25,7 @@ def run(self):
except Queue.Empty:
continue
except Exception:
+ # TODO - figure out a way to display this exception in error msg
task.status = taskforce.TASK_STATUS.FAILED
def dismiss(self):
@@ -57,18 +58,31 @@ def del_task(self, id):
self._lock.release()
class TaskForce(object):
- def __init__(self, num_slaves=5):
+ def __init__(self, available_tasks=[], num_slaves=5):
self._todo_queue = Queue.Queue()
self._state = State()
self._slaves = []
self._init_slaves(num_slaves)
+ self._init_tasks(available_tasks)
+
+ def _init_tasks(self, available_tasks):
+ if not hasattr(self, '_available_tasks'):
+ self._available_tasks = dict()
+ for t in available_tasks:
+ self._available_tasks[t.__name__] = t
def _init_slaves(self, num_slaves):
for i in range(num_slaves):
self._slaves.append(
Slave(todo = self._todo_queue)
)
+ def create_task(self, taskname):
+ if self._available_tasks.has_key(taskname):
+ return self._available_tasks[taskname].__call__()
+ else:
+ raise TaskTypeNotFound("Task of type %s not recognised." % taskname)
+
def add_task(self, task):
# add to todo queue
self._todo_queue.put(task, block=True, timeout=5) # TODO - handle queue full exception properly
@@ -95,6 +109,10 @@ def get_results(self, task_id):
except KeyError:
raise TaskNotFound("Task not found.")
t_status = t.status
- print t_status
- if t_status != taskforce.TASK_STATUS.FINISHED and t_status != taskforce.TASK_STATUS.FAILED:
- raise TaskNotComplete("Cannot return results since tasks is not complete yet.")
+ if t_status == taskforce.TASK_STATUS.FINISHED:
+ self._state.del_task(task_id)
+ return t.results
+ elif t_status == taskforce.TASK_STATUS.FAILED:
+ raise TaskFailed("Cannot return results because task failed.")
+ else:
+ raise TaskNotComplete("Cannot return results because task is not complete yet.")

0 comments on commit 133f672

Please sign in to comment.