Skip to content

Commit

Permalink
a new kick-ass feature for the spooler
Browse files Browse the repository at this point in the history
  • Loading branch information
roberto@debian32 committed Aug 4, 2011
1 parent 9bd3022 commit 8c1b48a
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 1 deletion.
11 changes: 11 additions & 0 deletions contrib/spoolqueue/producer.py
@@ -0,0 +1,11 @@
from tasksconsumer import enqueue

def application(env, sr):

sr('200 OK', [('Content-Type','text/html')])

enqueue(queue='fast', pippo='pluto')

return "Task enqueued"


9 changes: 9 additions & 0 deletions contrib/spoolqueue/tasks.py
@@ -0,0 +1,9 @@
from tasksconsumer import *

@queueconsumer('fast', 4)
def fast_queue(arguments):
print "fast", arguments

@queueconsumer('slow')
def slow_queue(arguments):
print "foobar", arguments
44 changes: 44 additions & 0 deletions contrib/spoolqueue/tasksconsumer.py
@@ -0,0 +1,44 @@
from uwsgidecorators import *
import Queue
from threading import Thread

queues = {}

class queueconsumer(object):

def __init__(self, name, num=1, **kwargs):
self.name = name
self.num = num
self.queue = Queue.Queue()
self.threads = []
self.func = None
queues[self.name] = self


@staticmethod
def consumer(self):
while True:
req = self.queue.get()
print req
self.func(req)
self.queue.task_done()

def __call__(self, f):
self.func = f
for i in range(self.num):
t = Thread(target=self.consumer,args=(self,))
self.threads.append(t)
t.daemon = True
t.start()

@spool
def spooler_enqueuer(arguments):
if 'queue' in arguments:
queue = arguments['queue']
queues[queue].queue.put(arguments)
else:
raise Exception("You have to specify a queue name")


def enqueue(*args, **kwargs):
return spooler_enqueuer.spool(*args, **kwargs)
4 changes: 3 additions & 1 deletion plugins/python/pyutils.c
Expand Up @@ -22,7 +22,9 @@ PyObject *python_call(PyObject *callable, PyObject *args, int catch) {
if (PyErr_ExceptionMatches(PyExc_MemoryError)) {
uwsgi_log("Memory Error detected !!!\n");
}
uwsgi.workers[uwsgi.mywid].exceptions++;
// this can be in a spooler or in the master
if (uwsgi.mywid > 0)
uwsgi.workers[uwsgi.mywid].exceptions++;
if (!catch) {
PyErr_Print();
}
Expand Down
6 changes: 6 additions & 0 deletions spooler.c
Expand Up @@ -46,6 +46,12 @@ pid_t spooler_start() {
}
}
}

for (i = 0; i < 0xFF; i++) {
if (uwsgi.p[i]->post_fork) {
uwsgi.p[i]->post_fork();
}
}
uwsgi.signal_socket = uwsgi.shared->spooler_signal_pipe[1];
for (i = 0; i < 0xFF; i++) {
if (uwsgi.p[i]->spooler_init) {
Expand Down

0 comments on commit 8c1b48a

Please sign in to comment.