Skip to content

Commit

Permalink
allows multiple offload threads
Browse files Browse the repository at this point in the history
  • Loading branch information
Roberto De Ioris committed Nov 9, 2012
1 parent 5c64839 commit 57b72ac
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 11 deletions.
3 changes: 3 additions & 0 deletions core/master_utils.c
Expand Up @@ -1049,6 +1049,9 @@ struct uwsgi_stats *uwsgi_master_generate_stats() {
if (uwsgi_stats_keylong_comma(us, "routed_requests", (unsigned long long) uc->routed_requests))
goto end;

if (uwsgi_stats_keylong_comma(us, "offloaded_requests", (unsigned long long) uc->offloaded_requests))
goto end;

if (uwsgi_stats_keylong(us, "in_request", (unsigned long long) uc->in_request))
goto end;

Expand Down
20 changes: 14 additions & 6 deletions core/offload.c
Expand Up @@ -28,8 +28,16 @@ static void uwsgi_offload_setup(struct uwsgi_offload_request *uor, struct wsgi_r

}

static int uwsgi_offload_enqueue(struct uwsgi_offload_request *uor) {
if (write(uwsgi.offload_thread->pipe[0], uor, sizeof(struct uwsgi_offload_request)) != sizeof(struct uwsgi_offload_request)) {
static int uwsgi_offload_enqueue(struct wsgi_request *wsgi_req, struct uwsgi_offload_request *uor) {
struct uwsgi_core *uc = &uwsgi.workers[uwsgi.mywid].cores[wsgi_req->async_id];
uc->offloaded_requests++;
// round robin
if (uc->offload_rr >= uwsgi.offload_threads) {
uc->offload_rr = 0;
}
struct uwsgi_thread *ut = uwsgi.offload_thread[uc->offload_rr];
uc->offload_rr++;
if (write(ut->pipe[0], uor, sizeof(struct uwsgi_offload_request)) != sizeof(struct uwsgi_offload_request)) {
return -1;
}
return 0;
Expand All @@ -49,7 +57,7 @@ int uwsgi_offload_request_net_do(struct wsgi_request *wsgi_req, char *socket, st

uor.ubuf = ubuf;

if (uwsgi_offload_enqueue(&uor)) {
if (uwsgi_offload_enqueue(wsgi_req, &uor)) {
goto error2;
}

Expand Down Expand Up @@ -86,7 +94,7 @@ int uwsgi_offload_request_sendfile_do(struct wsgi_request *wsgi_req, char *filen

uor.len = len;

if (uwsgi_offload_enqueue(&uor)) {
if (uwsgi_offload_enqueue(wsgi_req, &uor)) {
goto error2;
}

Expand Down Expand Up @@ -169,9 +177,9 @@ static void uwsgi_offload_loop(struct uwsgi_thread *ut) {
int nevents = event_queue_wait_multi(ut->queue, -1, events, uwsgi.offload_threads_events);
for (i = 0; i < nevents; i++) {
int interesting_fd = event_queue_interesting_fd(events, i);
if (interesting_fd == uwsgi.offload_thread->pipe[1]) {
if (interesting_fd == ut->pipe[1]) {
struct uwsgi_offload_request *uor = uwsgi_malloc(sizeof(struct uwsgi_offload_request));
ssize_t len = read(uwsgi.offload_thread->pipe[1], uor, sizeof(struct uwsgi_offload_request));
ssize_t len = read(ut->pipe[1], uor, sizeof(struct uwsgi_offload_request));
if (len != sizeof(struct uwsgi_offload_request)) {
uwsgi_error("read()");
free(uor);
Expand Down
15 changes: 11 additions & 4 deletions core/uwsgi.c
Expand Up @@ -1044,6 +1044,9 @@ void reap_them_all(int signum) {
if (uwsgi.to_outworld == 1 || uwsgi.lazy_respawned > 0)
return;


if (!uwsgi.workers) return;

// count the number of active workers
int active_workers = 0;
for (i = 1; i <= uwsgi.numproc; i++) {
Expand Down Expand Up @@ -2686,10 +2689,14 @@ int uwsgi_start(void *v_argv) {
uwsgi.wsgi_req = &uwsgi.workers[uwsgi.mywid].cores[0].req;

if (uwsgi.offload_threads > 0) {
uwsgi.offload_thread = uwsgi_offload_thread_start();
if (!uwsgi.offload_thread) {
uwsgi_log("unable to start offload thread for worker %d !!!\n", uwsgi.mywid);
uwsgi.offload_threads = 0;
uwsgi.offload_thread = uwsgi_malloc(sizeof(struct uwsgi_thread *) * uwsgi.offload_threads);
for(i=0;i<uwsgi.offload_threads;i++) {
uwsgi.offload_thread[i] = uwsgi_offload_thread_start();
if (!uwsgi.offload_thread[i]) {
uwsgi_log("unable to start offload thread %d for worker %d !!!\n", i, uwsgi.mywid);
uwsgi.offload_threads = i;
break;
}
}
}

Expand Down
6 changes: 5 additions & 1 deletion uwsgi.h
Expand Up @@ -1520,10 +1520,10 @@ struct uwsgi_server {

int offload_threads;
int offload_threads_events;
struct uwsgi_thread **offload_thread;

int check_static_docroot;

struct uwsgi_thread *offload_thread;
// linked list for offloaded requests
struct uwsgi_offload_request *offload_requests_head;
struct uwsgi_offload_request *offload_requests_tail;
Expand Down Expand Up @@ -2060,11 +2060,14 @@ struct uwsgi_core {
uint64_t failed_requests;
uint64_t static_requests;
uint64_t routed_requests;
uint64_t offloaded_requests;

#ifdef UWSGI_THREADING
pthread_t thread_id;
#endif

int offload_rr;

// one ts-perapp
void **ts;

Expand Down Expand Up @@ -3178,6 +3181,7 @@ struct uwsgi_stats_pusher_instance {
struct uwsgi_stats_pusher_instance *next;
};

struct uwsgi_thread;
void uwsgi_stats_pusher_loop(struct uwsgi_thread *);
void uwsgi_stats_pusher_file(struct uwsgi_stats_pusher_instance *, char *, size_t);
void uwsgi_stats_pusher_socket(struct uwsgi_stats_pusher_instance *, char *, size_t);
Expand Down

0 comments on commit 57b72ac

Please sign in to comment.