Skip to content

Commit

Permalink
Work queues refactoring.
Browse files Browse the repository at this point in the history
  • Loading branch information
igorsysoev committed Jan 27, 2017
1 parent 6886b83 commit ba03915
Show file tree
Hide file tree
Showing 35 changed files with 401 additions and 764 deletions.
7 changes: 5 additions & 2 deletions auto/sources
Expand Up @@ -105,7 +105,6 @@ NXT_LIB_SRCS=" \
src/nxt_list.c \
src/nxt_buf.c \
src/nxt_buf_pool.c \
src/nxt_buf_filter.c \
src/nxt_recvbuf.c \
src/nxt_sendbuf.c \
src/nxt_thread_time.c \
Expand All @@ -125,10 +124,14 @@ NXT_LIB_SRCS=" \
src/nxt_event_conn_job_sendfile.c \
src/nxt_event_conn_proxy.c \
src/nxt_job.c \
src/nxt_job_file.c \
src/nxt_job_resolve.c \
src/nxt_sockaddr.c \
src/nxt_listen_socket.c \
"

NXT_LIB_SRC0=" \
src/nxt_buf_filter.c \
src/nxt_job_file.c \
src/nxt_stream_source.c \
src/nxt_upstream_source.c \
src/nxt_http_source.c \
Expand Down
104 changes: 66 additions & 38 deletions src/nxt_application.c
Expand Up @@ -19,6 +19,7 @@ static nxt_app_request_t *nxt_app_request_create(nxt_socket_t s,
static void nxt_app_conn_update(nxt_thread_t *thr, nxt_event_conn_t *c,
nxt_log_t *log);
static nxt_int_t nxt_app_write_finish(nxt_app_request_t *r);
static void nxt_app_buf_send(nxt_event_conn_t *c, nxt_buf_t *out);
static void nxt_app_buf_completion(nxt_task_t *task, void *obj, void *data);
static void nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data);
static void nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data);
Expand All @@ -29,7 +30,7 @@ static void nxt_app_delivery_timeout(nxt_task_t *task, void *obj, void *data);
static nxt_msec_t nxt_app_delivery_timer_value(nxt_event_conn_t *c,
uintptr_t data);
static void nxt_app_delivery_done(nxt_task_t *task, nxt_event_conn_t *c);
static void nxt_app_close_request(nxt_task_t *task, nxt_app_request_t *r);
static void nxt_app_close_request(nxt_task_t *task, void *obj, void *data);


typedef struct nxt_app_http_parse_state_s nxt_app_http_parse_state_t;
Expand All @@ -40,6 +41,13 @@ struct nxt_app_http_parse_state_s {
u_char *end, nxt_app_http_parse_state_t *state);
};


typedef struct {
nxt_work_t work;
nxt_buf_t buf;
} nxt_app_buf_t;


static nxt_int_t nxt_app_http_parse_request(nxt_app_request_t *r, u_char *buf,
size_t size);
static nxt_int_t nxt_app_http_parse_request_line(nxt_app_request_header_t *h,
Expand Down Expand Up @@ -83,13 +91,11 @@ nxt_app_start(nxt_cycle_t *cycle)
return NXT_ERROR;
}

link = nxt_malloc(sizeof(nxt_thread_link_t));
link = nxt_zalloc(sizeof(nxt_thread_link_t));

if (nxt_fast_path(link != NULL)) {
link->start = nxt_app_thread;
link->data = cycle;
link->engine = NULL;
link->exit = NULL;

return nxt_thread_create(&handle, link);
}
Expand Down Expand Up @@ -136,8 +142,12 @@ nxt_app_thread(void *ctx)
ls = cycle->listen_sockets->elts;

for ( ;; ) {
nxt_log_debug(thr->log, "wait on accept");

s = accept(ls->socket, NULL, NULL);

nxt_thread_time_update(thr);

if (nxt_slow_path(s == -1)) {
err = nxt_socket_errno;

Expand Down Expand Up @@ -190,6 +200,8 @@ nxt_app_thread(void *ctx)

nxt_app->run(r);

nxt_log_debug(thr->log, "app request done");

if (nxt_slow_path(nxt_app_write_finish(r) == NXT_ERROR)) {
goto fail;
}
Expand Down Expand Up @@ -577,11 +589,12 @@ nxt_app_http_read_body(nxt_app_request_t *r, u_char *data, size_t len)
nxt_int_t
nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len)
{
size_t free;
nxt_err_t err;
nxt_buf_t *b, *out, **next;
nxt_uint_t bufs;
nxt_event_conn_t *c;
void *start;
size_t free;
nxt_err_t err;
nxt_buf_t *b, *out, **next;
nxt_uint_t bufs;
nxt_app_buf_t *ab;

out = NULL;
next = &out;
Expand Down Expand Up @@ -619,10 +632,8 @@ nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len)
if (bufs == nxt_app_buf_max_number) {
bufs = 0;
*next = NULL;
c = r->event_conn;

nxt_event_engine_post(nxt_app_engine, nxt_app_delivery_handler,
&c->task, c, out, &nxt_main_log);
nxt_app_buf_send(r->event_conn, out);

out = NULL;
next = &out;
Expand Down Expand Up @@ -658,11 +669,20 @@ nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len)
(void) nxt_thread_mutex_unlock(&nxt_app_mutex);

if (b == NULL) {
b = nxt_buf_mem_alloc(nxt_app_mem_pool, 4096, 0);
if (nxt_slow_path(b == NULL)) {
start = nxt_malloc(4096);
if (nxt_slow_path(start == NULL)) {
return NXT_ERROR;
}

ab = nxt_zalloc(sizeof(nxt_app_buf_t));
if (nxt_slow_path(ab == NULL)) {
return NXT_ERROR;
}

b = &ab->buf;

nxt_buf_mem_init(b, start, 4096);

b->completion_handler = nxt_app_buf_completion;

nxt_app_buf_current_number++;
Expand All @@ -675,10 +695,8 @@ nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len)

if (out != NULL) {
*next = NULL;
c = r->event_conn;

nxt_event_engine_post(nxt_app_engine, nxt_app_delivery_handler,
&c->task, c, out, &nxt_main_log);
nxt_app_buf_send(r->event_conn, out);
}

return NXT_OK;
Expand All @@ -688,8 +706,7 @@ nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len)
static nxt_int_t
nxt_app_write_finish(nxt_app_request_t *r)
{
nxt_buf_t *b, *out;
nxt_event_conn_t *c;
nxt_buf_t *b, *out;

b = nxt_buf_sync_alloc(r->mem_pool, NXT_BUF_SYNC_LAST);
if (nxt_slow_path(b == NULL)) {
Expand All @@ -709,15 +726,25 @@ nxt_app_write_finish(nxt_app_request_t *r)
out = b;
}

c = r->event_conn;

nxt_event_engine_post(nxt_app_engine, nxt_app_delivery_handler,
&c->task, c, out, &nxt_main_log);
nxt_app_buf_send(r->event_conn, out);

return NXT_OK;
}


static void
nxt_app_buf_send(nxt_event_conn_t *c, nxt_buf_t *out)
{
nxt_app_buf_t *ab;

ab = nxt_container_of(out, nxt_app_buf_t, buf);

nxt_work_set(&ab->work, nxt_app_delivery_handler, &c->task, c, out);

nxt_event_engine_post(nxt_app_engine, &ab->work);
}


static void
nxt_app_buf_completion(nxt_task_t *task, void *obj, void *data)
{
Expand Down Expand Up @@ -762,8 +789,8 @@ nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data)

if (c->socket.timedout || c->socket.error != 0) {
nxt_buf_chain_add(&nxt_app_buf_done, b);
nxt_thread_work_queue_add(task->thread, c->write_work_queue,
nxt_app_delivery_completion, task, c, NULL);
nxt_work_queue_add(c->write_work_queue, nxt_app_delivery_completion,
task, c, NULL);
return;
}

Expand Down Expand Up @@ -799,20 +826,17 @@ nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data)

nxt_debug(task, "app delivery ready");

nxt_thread_work_queue_add(task->thread, c->write_work_queue,
nxt_app_delivery_completion, task, c, NULL);
nxt_work_queue_add(c->write_work_queue,
nxt_app_delivery_completion, task, c, NULL);
}


static void
nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data)
{
nxt_buf_t *b, *bn, *free;
nxt_thread_t *thread;
nxt_app_request_t *r;

thread = task->thread;

nxt_debug(task, "app delivery completion");

free = NULL;
Expand All @@ -832,7 +856,9 @@ nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data)

if (nxt_buf_is_last(b)) {
r = (nxt_app_request_t *) b->parent;
nxt_app_close_request(task, r);

nxt_work_queue_add(&task->thread->engine->final_work_queue,
nxt_app_close_request, task, r, NULL);
}
}

Expand All @@ -850,7 +876,7 @@ nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data)

(void) nxt_thread_mutex_unlock(&nxt_app_mutex);

nxt_thread_time_update(thread);
nxt_thread_time_update(task->thread);

(void) nxt_thread_cond_signal(&nxt_app_cond);
}
Expand Down Expand Up @@ -903,20 +929,22 @@ nxt_app_delivery_done(nxt_task_t *task, nxt_event_conn_t *c)

c->write = NULL;

nxt_thread_work_queue_add(task->thread, c->write_work_queue,
nxt_app_delivery_completion, task, c, NULL);
nxt_work_queue_add(c->write_work_queue,
nxt_app_delivery_completion, task, c, NULL);
}


static void
nxt_app_close_request(nxt_task_t *task, nxt_app_request_t *r)
nxt_app_close_request(nxt_task_t *task, void *obj, void *data)
{
nxt_event_conn_t *c;

nxt_debug(task, "app close connection");
nxt_event_conn_t *c;
nxt_app_request_t *r;

r = obj;
c = r->event_conn;

nxt_debug(task, "app close connection");

nxt_event_conn_close(task, c);

nxt_mem_pool_destroy(c->mem_pool);
Expand Down
12 changes: 12 additions & 0 deletions src/nxt_buf.c
Expand Up @@ -10,6 +10,18 @@
static void nxt_buf_completion(nxt_task_t *task, void *obj, void *data);


void
nxt_buf_mem_init(nxt_buf_t *b, void *start, size_t size)
{
b->size = NXT_BUF_MEM_SIZE;

b->mem.start = start;
b->mem.pos = start;
b->mem.free = start;
b->mem.end = start + size;
}


nxt_buf_t *
nxt_buf_mem_alloc(nxt_mem_pool_t *mp, size_t size, nxt_uint_t flags)
{
Expand Down
1 change: 1 addition & 0 deletions src/nxt_buf.h
Expand Up @@ -226,6 +226,7 @@ nxt_buf_used_size(b) \
nxt_buf_mem_used_size(&(b)->mem))


NXT_EXPORT void nxt_buf_mem_init(nxt_buf_t *b, void *start, size_t size);
NXT_EXPORT nxt_buf_t *nxt_buf_mem_alloc(nxt_mem_pool_t *mp, size_t size,
nxt_uint_t flags);
NXT_EXPORT nxt_buf_t *nxt_buf_file_alloc(nxt_mem_pool_t *mp, size_t size,
Expand Down
14 changes: 6 additions & 8 deletions src/nxt_chan.c
Expand Up @@ -141,7 +141,7 @@ nxt_chan_write_enable(nxt_task_t *task, nxt_chan_t *chan)

chan->socket.task = &chan->task;

chan->socket.write_work_queue = &task->thread->work_queue.main;
chan->socket.write_work_queue = &task->thread->engine->fast_work_queue;
chan->socket.write_handler = nxt_chan_write_handler;
chan->socket.error_handler = nxt_chan_error_handler;
}
Expand Down Expand Up @@ -290,9 +290,8 @@ nxt_chan_write_handler(nxt_task_t *task, void *obj, void *data)

fail:

nxt_thread_work_queue_add(task->thread, &task->thread->work_queue.main,
nxt_chan_error_handler, task, &chan->socket,
NULL);
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_chan_error_handler, task, &chan->socket, NULL);
}


Expand All @@ -308,7 +307,7 @@ nxt_chan_read_enable(nxt_task_t *task, nxt_chan_t *chan)

chan->socket.task = &chan->task;

chan->socket.read_work_queue = &task->thread->work_queue.main;
chan->socket.read_work_queue = &task->thread->engine->fast_work_queue;
chan->socket.read_handler = nxt_chan_read_handler;
chan->socket.error_handler = nxt_chan_error_handler;

Expand Down Expand Up @@ -378,9 +377,8 @@ nxt_chan_read_handler(nxt_task_t *task, void *obj, void *data)

/* n == 0 || n == NXT_ERROR */

nxt_thread_work_queue_add(task->thread, &task->thread->work_queue.main,
nxt_chan_error_handler, task,
&chan->socket, NULL);
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_chan_error_handler, task, &chan->socket, NULL);
return;
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/nxt_cycle.c
Expand Up @@ -153,8 +153,8 @@ nxt_cycle_create(nxt_thread_t *thr, nxt_task_t *task, nxt_cycle_t *previous,

nxt_log_debug(thr->log, "new cycle: %p", cycle);

nxt_thread_work_queue_add(thr, &thr->work_queue.main, nxt_cycle_start,
task, cycle, NULL);
nxt_work_queue_add(&thr->engine->fast_work_queue, nxt_cycle_start,
task, cycle, NULL);

return NXT_OK;

Expand Down Expand Up @@ -583,8 +583,8 @@ nxt_cycle_quit(nxt_task_t *task, nxt_cycle_t *cycle)
nxt_cycle_close_idle_connections(thr, task);

if (done) {
nxt_thread_work_queue_add(thr, &thr->work_queue.main, nxt_cycle_exit,
task, cycle, NULL);
nxt_work_queue_add(&thr->engine->fast_work_queue, nxt_cycle_exit,
task, cycle, NULL);
}
}

Expand Down

0 comments on commit ba03915

Please sign in to comment.