Skip to content

Commit

Permalink
update the keepalive module to 0.7
Browse files Browse the repository at this point in the history
  • Loading branch information
yaoweibin committed Nov 16, 2011
1 parent e0c7793 commit e2b4133
Showing 1 changed file with 199 additions and 65 deletions.
264 changes: 199 additions & 65 deletions ngx_http_upstream_keepalive_module.c
Expand Up @@ -32,6 +32,11 @@ typedef struct {
ngx_event_get_peer_pt original_get_peer;
ngx_event_free_peer_pt original_free_peer;

#if (NGX_HTTP_SSL)
ngx_event_set_peer_session_pt original_set_session;
ngx_event_save_peer_session_pt original_save_session;
#endif

ngx_uint_t failed; /* unsigned:1 */

} ngx_http_upstream_keepalive_peer_data_t;
Expand All @@ -58,6 +63,15 @@ static void ngx_http_upstream_free_keepalive_peer(ngx_peer_connection_t *pc,

static void ngx_http_upstream_keepalive_dummy_handler(ngx_event_t *ev);
static void ngx_http_upstream_keepalive_close_handler(ngx_event_t *ev);
static void ngx_http_upstream_keepalive_close(ngx_connection_t *c);


#if (NGX_HTTP_SSL)
static ngx_int_t ngx_http_upstream_keepalive_set_session(
ngx_peer_connection_t *pc, void *data);
static void ngx_http_upstream_keepalive_save_session(ngx_peer_connection_t *pc,
void *data);
#endif

static void *ngx_http_upstream_keepalive_create_conf(ngx_conf_t *cf);
static char *ngx_http_upstream_keepalive(ngx_conf_t *cf, ngx_command_t *cmd,
Expand All @@ -78,9 +92,6 @@ static ngx_command_t ngx_http_upstream_keepalive_commands[] = {


static ngx_http_module_t ngx_http_upstream_keepalive_module_ctx = {
/*
* Copyright (C) Maxim Dounin
*/
NULL, /* preconfiguration */
NULL, /* postconfiguration */

Expand Down Expand Up @@ -111,7 +122,7 @@ ngx_module_t ngx_http_upstream_keepalive_module = {
};


ngx_int_t
static ngx_int_t
ngx_http_upstream_init_keepalive(ngx_conf_t *cf,
ngx_http_upstream_srv_conf_t *us)
{
Expand Down Expand Up @@ -185,6 +196,13 @@ ngx_http_upstream_init_keepalive_peer(ngx_http_request_t *r,
r->upstream->peer.get = ngx_http_upstream_get_keepalive_peer;
r->upstream->peer.free = ngx_http_upstream_free_keepalive_peer;

#if (NGX_HTTP_SSL)
kp->original_set_session = r->upstream->peer.set_session;
kp->original_save_session = r->upstream->peer.save_session;
r->upstream->peer.set_session = ngx_http_upstream_keepalive_set_session;
r->upstream->peer.save_session = ngx_http_upstream_keepalive_save_session;
#endif

return NGX_OK;
}

Expand All @@ -209,17 +227,23 @@ ngx_http_upstream_get_keepalive_peer(ngx_peer_connection_t *pc, void *data)
if (kp->conf->single && !ngx_queue_empty(&kp->conf->cache)) {

q = ngx_queue_head(&kp->conf->cache);
ngx_queue_remove(q);

item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue);
c = item->connection;

ngx_queue_remove(q);
ngx_queue_insert_head(&kp->conf->free, q);

ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"get keepalive peer: using connection %p", c);

c->idle = 0;
c->log = pc->log;
c->read->log = pc->log;
c->write->log = pc->log;
#if (NGX_UPSTREAM_KEEPALIVE_PATCHED)
c->pool->log = pc->log;
#endif

pc->connection = c;
pc->cached = 1;
Expand Down Expand Up @@ -251,10 +275,16 @@ ngx_http_upstream_get_keepalive_peer(ngx_peer_connection_t *pc, void *data)
ngx_queue_remove(q);
ngx_queue_insert_head(&kp->conf->free, q);

ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"get keepalive peer: using connection %p", c);

c->idle = 0;
c->log = pc->log;
c->read->log = pc->log;
c->write->log = pc->log;
#if (NGX_UPSTREAM_KEEPALIVE_PATCHED)
c->pool->log = pc->log;
#endif

pc->connection = c;
pc->cached = 1;
Expand Down Expand Up @@ -295,87 +325,123 @@ ngx_http_upstream_free_keepalive_peer(ngx_peer_connection_t *pc, void *data,
* make sure that u->length is valid (we use u->header_sent flag to test
* this). Memcached is the only supported protocol for now.
*
* Some notes on other possibilities (incomplete):
*
* fastcgi: u->pipe->upstream_done should be sufficient
*
* proxy buffered: u->pipe->upstream_done, 304 replies, replies to head
* requests (see RFC 2616, 4.4 Message Length)
*
* proxy unbuffered: 200 as for memcached (with u->length == 0 and
* header_sent), 304, replies to head requests
*
* subrequest_in_memory: won't work as of now
*
* TODO: move this logic to protocol modules (NGX_PEER_KEEPALIVE?)
* With experimental patches we are able to cache other connections as
* well. Connection status is signalled via u->keepalive flag.
*/

u = kp->upstream;
c = pc->connection;

if (kp->failed
|| c == NULL
|| c->read->eof
|| c->read->error
|| c->read->timedout
|| c->write->error
|| c->write->timedout)
{
goto invalid;
}

#if (NGX_AJP_MODULE)

if (u->pipe->keepalive == 0 || u->pipe->upstream_done == 0) {
goto invalid;
}

#else

#if (NGX_UPSTREAM_KEEPALIVE_PATCHED)

if (!u->keepalive) {
goto invalid;
}

#if !(NGX_AJP_MODULE)
#else
ngx_uint_t status;

status = u->headers_in.status_n;

if (!(status == NGX_HTTP_NOT_FOUND
|| (status == NGX_HTTP_OK && u->header_sent && u->length == 0)))
{
goto invalid;
}

#if (NGX_HTTP_SSL)

/*
* to cache ssl connections separate pool for peer connection is
* required, which is only available with patches
*/

if (c->ssl) {
goto invalid;
}

#endif

if (!kp->failed
&& pc->connection != NULL
#if (NGX_AJP_MODULE)
&& u->pipe->keepalive && u->pipe->upstream_done)
#else
&& (status == NGX_HTTP_NOT_FOUND
|| (status == NGX_HTTP_OK && u->header_sent && u->length == 0)))
#endif
{
c = pc->connection;

ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"free keepalive peer: saving connection %d", c->fd);
#endif

if (ngx_queue_empty(&kp->conf->free)) {
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
goto invalid;
}

q = ngx_queue_last(&kp->conf->cache);
ngx_queue_remove(q);
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"free keepalive peer: saving connection %p", c);

if (ngx_queue_empty(&kp->conf->free)) {

item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t,
queue);
q = ngx_queue_last(&kp->conf->cache);
ngx_queue_remove(q);

ngx_close_connection(item->connection);
item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue);

} else {
q = ngx_queue_head(&kp->conf->free);
ngx_queue_remove(q);
ngx_http_upstream_keepalive_close(item->connection);

item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t,
queue);
}
} else {
q = ngx_queue_head(&kp->conf->free);
ngx_queue_remove(q);

item->connection = c;
ngx_queue_insert_head(&kp->conf->cache, q);
item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue);
}

pc->connection = NULL;
item->connection = c;
ngx_queue_insert_head(&kp->conf->cache, q);

if (c->read->timer_set) {
ngx_del_timer(c->read);
}
if (c->write->timer_set) {
ngx_del_timer(c->write);
}
pc->connection = NULL;

c->write->handler = ngx_http_upstream_keepalive_dummy_handler;
c->read->handler = ngx_http_upstream_keepalive_close_handler;
if (c->read->timer_set) {
ngx_del_timer(c->read);
}
if (c->write->timer_set) {
ngx_del_timer(c->write);
}

c->data = item;
c->idle = 1;
c->log = ngx_cycle->log;
c->read->log = ngx_cycle->log;
c->write->log = ngx_cycle->log;
c->write->handler = ngx_http_upstream_keepalive_dummy_handler;
c->read->handler = ngx_http_upstream_keepalive_close_handler;

item->socklen = pc->socklen;
ngx_memcpy(&item->sockaddr, pc->sockaddr, pc->socklen);
c->data = item;
c->idle = 1;
c->log = ngx_cycle->log;
c->read->log = ngx_cycle->log;
c->write->log = ngx_cycle->log;
#if (NGX_UPSTREAM_KEEPALIVE_PATCHED)
c->pool->log = ngx_cycle->log;
#endif

item->socklen = pc->socklen;
ngx_memcpy(&item->sockaddr, pc->sockaddr, pc->socklen);

if (c->read->ready) {
ngx_http_upstream_keepalive_close_handler(c->read);
}

return kp->original_free_peer(pc, kp->data, state);
invalid:

kp->original_free_peer(pc, kp->data, state);
}


Expand All @@ -393,17 +459,38 @@ ngx_http_upstream_keepalive_close_handler(ngx_event_t *ev)
ngx_http_upstream_keepalive_srv_conf_t *conf;
ngx_http_upstream_keepalive_cache_t *item;

int n;
char buf[1];
ngx_connection_t *c;

ngx_log_debug0(NGX_LOG_DEBUG_HTTP, ev->log, 0,
"keepalive close handler");

c = ev->data;

if (c->close) {
goto close;
}

n = recv(c->fd, buf, 1, MSG_PEEK);

if (n == -1 && ngx_socket_errno == NGX_EAGAIN) {
/* stale event */

if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
goto close;
}

return;
}

close:

item = c->data;
conf = item->conf;

#if (NGX_DEBUG)
u_char buffer[64], n, i;
u_char buffer[64], i;

n = c->recv(c, buffer, 64);

Expand All @@ -416,12 +503,61 @@ ngx_http_upstream_keepalive_close_handler(ngx_event_t *ev)

#endif

ngx_http_upstream_keepalive_close(c);

ngx_queue_remove(&item->queue);
ngx_close_connection(item->connection);
ngx_queue_insert_head(&conf->free, &item->queue);
}


static void
ngx_http_upstream_keepalive_close(ngx_connection_t *c)
{

#if (NGX_HTTP_SSL)

if (c->ssl) {
c->ssl->no_wait_shutdown = 1;
c->ssl->no_send_shutdown = 1;

if (ngx_ssl_shutdown(c) == NGX_AGAIN) {
c->ssl->handler = ngx_http_upstream_keepalive_close;
return;
}
}

#endif

#if (NGX_UPSTREAM_KEEPALIVE_PATCHED)
ngx_destroy_pool(c->pool);
#endif
ngx_close_connection(c);
}


#if (NGX_HTTP_SSL)

static ngx_int_t
ngx_http_upstream_keepalive_set_session(ngx_peer_connection_t *pc, void *data)
{
ngx_http_upstream_keepalive_peer_data_t *kp = data;

return kp->original_set_session(pc, kp->data);
}


static void
ngx_http_upstream_keepalive_save_session(ngx_peer_connection_t *pc, void *data)
{
ngx_http_upstream_keepalive_peer_data_t *kp = data;

kp->original_save_session(pc, kp->data);
return;
}

#endif


static void *
ngx_http_upstream_keepalive_create_conf(ngx_conf_t *cf)
{
Expand Down Expand Up @@ -485,9 +621,7 @@ ngx_http_upstream_keepalive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
for (i = 2; i < cf->args->nelts; i++) {

if (ngx_strcmp(value[i].data, "single") == 0) {

kcf->single = 1;

continue;
}

Expand Down

0 comments on commit e2b4133

Please sign in to comment.