Skip to content

Commit

Permalink
Threads: writing via threads pools in event pipe.
Browse files Browse the repository at this point in the history
The "aio_write" directive is introduced, which enables use of aio
for writing.  Currently it is meaningful only with "aio threads".

Note that aio operations can be done by both event pipe and output
chain, so proper mapping between r->aio and p->aio is provided when
calling ngx_event_pipe() and in output filter.

In collaboration with Valentin Bartenev.
  • Loading branch information
mdounin committed Mar 18, 2016
1 parent 10c8c8d commit 348f705
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 21 deletions.
98 changes: 78 additions & 20 deletions src/event/ngx_event_pipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
return NGX_OK;
}

#if (NGX_THREADS)
if (p->aio) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe read upstream: aio");
return NGX_AGAIN;
}
#endif

ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe read upstream: %d", p->upstream->read->ready);

Expand Down Expand Up @@ -258,19 +266,6 @@ ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
break;
}

if (rc == NGX_AGAIN) {
if (ngx_event_flags & NGX_USE_LEVEL_EVENT
&& p->upstream->read->active
&& p->upstream->read->ready)
{
if (ngx_del_event(p->upstream->read, NGX_READ_EVENT, 0)
== NGX_ERROR)
{
return NGX_ABORT;
}
}
}

if (rc != NGX_OK) {
return rc;
}
Expand Down Expand Up @@ -475,8 +470,10 @@ ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write chain");

if (ngx_event_pipe_write_chain_to_temp_file(p) == NGX_ABORT) {
return NGX_ABORT;
rc = ngx_event_pipe_write_chain_to_temp_file(p);

if (rc != NGX_OK) {
return rc;
}
}

Expand All @@ -499,6 +496,18 @@ ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write downstream: %d", downstream->write->ready);

#if (NGX_THREADS)

if (p->writing) {
rc = ngx_event_pipe_write_chain_to_temp_file(p);

if (rc == NGX_ABORT) {
return NGX_ABORT;
}
}

#endif

flushed = 0;

for ( ;; ) {
Expand Down Expand Up @@ -532,6 +541,10 @@ ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
p->out = NULL;
}

if (p->writing) {
break;
}

if (p->in) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write downstream flush in");
Expand Down Expand Up @@ -608,7 +621,7 @@ ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)

p->out = p->out->next;

} else if (!p->cacheable && p->in) {
} else if (!p->cacheable && !p->writing && p->in) {
cl = p->in;

ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
Expand Down Expand Up @@ -710,12 +723,38 @@ ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p)
ssize_t size, bsize, n;
ngx_buf_t *b;
ngx_uint_t prev_last_shadow;
ngx_chain_t *cl, *tl, *next, *out, **ll, **last_out, **last_free, fl;
ngx_chain_t *cl, *tl, *next, *out, **ll, **last_out, **last_free;

#if (NGX_THREADS)

if (p->writing) {

if (p->aio) {
return NGX_AGAIN;
}

out = p->writing;
p->writing = NULL;

n = ngx_write_chain_to_temp_file(p->temp_file, NULL);

if (n == NGX_ERROR) {
return NGX_ABORT;
}

goto done;
}

#endif

if (p->buf_to_file) {
fl.buf = p->buf_to_file;
fl.next = p->in;
out = &fl;
out = ngx_alloc_chain_link(p->pool);
if (out == NULL) {
return NGX_ABORT;
}

out->buf = p->buf_to_file;
out->next = p->in;

} else {
out = p->in;
Expand Down Expand Up @@ -775,12 +814,31 @@ ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p)
p->last_in = &p->in;
}

#if (NGX_THREADS)
p->temp_file->thread_write = p->thread_handler ? 1 : 0;
p->temp_file->file.thread_task = p->thread_task;
p->temp_file->file.thread_handler = p->thread_handler;
p->temp_file->file.thread_ctx = p->thread_ctx;
#endif

n = ngx_write_chain_to_temp_file(p->temp_file, out);

if (n == NGX_ERROR) {
return NGX_ABORT;
}

#if (NGX_THREADS)

if (n == NGX_AGAIN) {
p->writing = out;
p->thread_task = p->temp_file->file.thread_task;
return NGX_AGAIN;
}

done:

#endif

if (p->buf_to_file) {
p->temp_file->offset = p->buf_to_file->last - p->buf_to_file->pos;
n -= p->buf_to_file->last - p->buf_to_file->pos;
Expand Down
10 changes: 10 additions & 0 deletions src/event/ngx_event_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ struct ngx_event_pipe_s {
ngx_chain_t *in;
ngx_chain_t **last_in;

ngx_chain_t *writing;

ngx_chain_t *out;
ngx_chain_t *free;
ngx_chain_t *busy;
Expand All @@ -45,6 +47,13 @@ struct ngx_event_pipe_s {
ngx_event_pipe_output_filter_pt output_filter;
void *output_ctx;

#if (NGX_THREADS)
ngx_int_t (*thread_handler)(ngx_thread_task_t *task,
ngx_file_t *file);
void *thread_ctx;
ngx_thread_task_t *thread_task;
#endif

unsigned read:1;
unsigned cacheable:1;
unsigned single_buf:1;
Expand All @@ -56,6 +65,7 @@ struct ngx_event_pipe_s {
unsigned downstream_done:1;
unsigned downstream_error:1;
unsigned cyclic_temp_file:1;
unsigned aio:1;

ngx_int_t allocated;
ngx_bufs_t bufs;
Expand Down
9 changes: 9 additions & 0 deletions src/http/ngx_http_core_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,13 @@ static ngx_command_t ngx_http_core_commands[] = {
0,
NULL },

{ ngx_string("aio_write"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_FLAG,
ngx_conf_set_flag_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_core_loc_conf_t, aio_write),
NULL },

{ ngx_string("read_ahead"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_size_slot,
Expand Down Expand Up @@ -3608,6 +3615,7 @@ ngx_http_core_create_loc_conf(ngx_conf_t *cf)
clcf->sendfile = NGX_CONF_UNSET;
clcf->sendfile_max_chunk = NGX_CONF_UNSET_SIZE;
clcf->aio = NGX_CONF_UNSET;
clcf->aio_write = NGX_CONF_UNSET;
#if (NGX_THREADS)
clcf->thread_pool = NGX_CONF_UNSET_PTR;
clcf->thread_pool_value = NGX_CONF_UNSET_PTR;
Expand Down Expand Up @@ -3829,6 +3837,7 @@ ngx_http_core_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
prev->sendfile_max_chunk, 0);
#if (NGX_HAVE_FILE_AIO || NGX_THREADS)
ngx_conf_merge_value(conf->aio, prev->aio, NGX_HTTP_AIO_OFF);
ngx_conf_merge_value(conf->aio_write, prev->aio_write, 0);
#endif
#if (NGX_THREADS)
ngx_conf_merge_ptr_value(conf->thread_pool, prev->thread_pool, NULL);
Expand Down
1 change: 1 addition & 0 deletions src/http/ngx_http_core_module.h
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ struct ngx_http_core_loc_conf_s {
ngx_flag_t internal; /* internal */
ngx_flag_t sendfile; /* sendfile */
ngx_flag_t aio; /* aio */
ngx_flag_t aio_write; /* aio_write */
ngx_flag_t tcp_nopush; /* tcp_nopush */
ngx_flag_t tcp_nodelay; /* tcp_nodelay */
ngx_flag_t reset_timedout_connection; /* reset_timedout_connection */
Expand Down
117 changes: 116 additions & 1 deletion src/http/ngx_http_upstream.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ static void
static ngx_int_t ngx_http_upstream_non_buffered_filter_init(void *data);
static ngx_int_t ngx_http_upstream_non_buffered_filter(void *data,
ssize_t bytes);
#if (NGX_THREADS)
static ngx_int_t ngx_http_upstream_thread_handler(ngx_thread_task_t *task,
ngx_file_t *file);
static void ngx_http_upstream_thread_event_handler(ngx_event_t *ev);
#endif
static ngx_int_t ngx_http_upstream_output_filter(void *data,
ngx_chain_t *chain);
static void ngx_http_upstream_process_downstream(ngx_http_request_t *r);
static void ngx_http_upstream_process_upstream(ngx_http_request_t *r,
ngx_http_upstream_t *u);
Expand Down Expand Up @@ -2870,7 +2877,7 @@ ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u)

p = u->pipe;

p->output_filter = (ngx_event_pipe_output_filter_pt) ngx_http_output_filter;
p->output_filter = ngx_http_upstream_output_filter;
p->output_ctx = r;
p->tag = u->output.tag;
p->bufs = u->conf->bufs;
Expand Down Expand Up @@ -2913,6 +2920,13 @@ ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u)
p->max_temp_file_size = u->conf->max_temp_file_size;
p->temp_file_write_size = u->conf->temp_file_write_size;

#if (NGX_THREADS)
if (clcf->aio == NGX_HTTP_AIO_THREADS && clcf->aio_write) {
p->thread_handler = ngx_http_upstream_thread_handler;
p->thread_ctx = r;
}
#endif

p->preread_bufs = ngx_alloc_chain_link(r->pool);
if (p->preread_bufs == NULL) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
Expand Down Expand Up @@ -3487,6 +3501,97 @@ ngx_http_upstream_non_buffered_filter(void *data, ssize_t bytes)
}


#if (NGX_THREADS)

static ngx_int_t
ngx_http_upstream_thread_handler(ngx_thread_task_t *task, ngx_file_t *file)
{
ngx_str_t name;
ngx_event_pipe_t *p;
ngx_thread_pool_t *tp;
ngx_http_request_t *r;
ngx_http_core_loc_conf_t *clcf;

r = file->thread_ctx;
p = r->upstream->pipe;

clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
tp = clcf->thread_pool;

if (tp == NULL) {
if (ngx_http_complex_value(r, clcf->thread_pool_value, &name)
!= NGX_OK)
{
return NGX_ERROR;
}

tp = ngx_thread_pool_get((ngx_cycle_t *) ngx_cycle, &name);

if (tp == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"thread pool \"%V\" not found", &name);
return NGX_ERROR;
}
}

task->event.data = r;
task->event.handler = ngx_http_upstream_thread_event_handler;

if (ngx_thread_task_post(tp, task) != NGX_OK) {
return NGX_ERROR;
}

r->main->blocked++;
r->aio = 1;
p->aio = 1;

return NGX_OK;
}


static void
ngx_http_upstream_thread_event_handler(ngx_event_t *ev)
{
ngx_connection_t *c;
ngx_http_request_t *r;

r = ev->data;
c = r->connection;

ngx_http_set_log_request(c->log, r);

ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream thread: \"%V?%V\"", &r->uri, &r->args);

r->main->blocked--;
r->aio = 0;

r->write_event_handler(r);

ngx_http_run_posted_requests(c);
}

#endif


static ngx_int_t
ngx_http_upstream_output_filter(void *data, ngx_chain_t *chain)
{
ngx_int_t rc;
ngx_event_pipe_t *p;
ngx_http_request_t *r;

r = data;
p = r->upstream->pipe;

rc = ngx_http_output_filter(r, chain);

p->aio = r->aio;

return rc;
}


static void
ngx_http_upstream_process_downstream(ngx_http_request_t *r)
{
Expand All @@ -3505,6 +3610,10 @@ ngx_http_upstream_process_downstream(ngx_http_request_t *r)

c->log->action = "sending to client";

#if (NGX_THREADS)
p->aio = r->aio;
#endif

if (wev->timedout) {

if (wev->delayed) {
Expand Down Expand Up @@ -3634,6 +3743,12 @@ ngx_http_upstream_process_request(ngx_http_request_t *r,

p = u->pipe;

#if (NGX_THREADS)
if (p->writing) {
return;
}
#endif

if (u->peer.connection) {

if (u->store) {
Expand Down

0 comments on commit 348f705

Please sign in to comment.