Permalink
Fetching contributors…
Cannot retrieve contributors at this time
670 lines (485 sloc) 19 KB
/*
* Copyright (C) Yichun Zhang (agentzh)
*/
#ifndef DDEBUG
#define DDEBUG 0
#endif
#include "ddebug.h"
#include "ngx_http_drizzle_module.h"
#include "ngx_http_drizzle_handler.h"
#include "ngx_http_drizzle_processor.h"
#include "ngx_http_drizzle_util.h"
#include "ngx_http_drizzle_upstream.h"
#include "ngx_http_drizzle_keepalive.h"
#ifdef _WIN32
/* import the POLLIN and POLLOUT flags */
# ifndef WIN32_LEAN_AND_MEAN
# define WIN32_LEAN_AND_MEAN
# endif
# include <winsock2.h>
#endif
/* for read/write event handlers */
static ngx_int_t ngx_http_drizzle_create_request(ngx_http_request_t *r);
static ngx_int_t ngx_http_drizzle_reinit_request(ngx_http_request_t *r);
static void ngx_http_drizzle_abort_request(ngx_http_request_t *r);
static void ngx_http_drizzle_finalize_request(ngx_http_request_t *r,
ngx_int_t rc);
static ngx_int_t ngx_http_drizzle_process_header(ngx_http_request_t *r);
static ngx_int_t ngx_http_drizzle_input_filter_init(void *data);
static ngx_int_t ngx_http_drizzle_input_filter(void *data, ssize_t bytes);
ngx_int_t
ngx_http_drizzle_handler(ngx_http_request_t *r)
{
ngx_http_upstream_t *u;
ngx_http_drizzle_loc_conf_t *dlcf;
#if defined(nginx_version) && nginx_version < 8017
ngx_http_drizzle_ctx_t *dctx;
#endif
ngx_http_core_loc_conf_t *clcf;
ngx_str_t target;
ngx_url_t url;
ngx_connection_t *c;
dd("request: %p", r);
dd("subrequest in memory: %d", (int) r->subrequest_in_memory);
dd("connection: %p", r->connection);
dd("connection log: %p", r->connection->log);
if (r->subrequest_in_memory) {
/* TODO: add support for subrequest in memory by
* emitting output into u->buffer instead */
ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
"ngx_http_drizzle_module does not support "
"subrequest in memory");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
dlcf = ngx_http_get_module_loc_conf(r, ngx_http_drizzle_module);
if ((dlcf->default_query == NULL) && !(dlcf->methods_set & r->method)) {
if (dlcf->methods_set != 0) {
return NGX_HTTP_NOT_ALLOWED;
}
clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"drizzle: missing \"drizzle_query\" in location \"%V\"",
&clcf->name);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
dd("XXX upstream already exists? %p", r->upstream);
#if defined(nginx_version) && \
((nginx_version >= 7063 && nginx_version < 8000) \
|| nginx_version >= 8007)
dd("creating upstream.......");
if (ngx_http_upstream_create(r) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
u = r->upstream;
#else /* 0.7.x < 0.7.63, 0.8.x < 0.8.7 */
dd("XXX create upstream");
u = ngx_pcalloc(r->pool, sizeof(ngx_http_upstream_t));
if (u == NULL) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
u->peer.log = r->connection->log;
u->peer.log_error = NGX_ERROR_ERR;
# if (NGX_THREADS)
u->peer.lock = &r->connection->lock;
# endif
r->upstream = u;
#endif
if (dlcf->complex_target) {
/* variables used in the drizzle_pass directive */
if (ngx_http_complex_value(r, dlcf->complex_target, &target)
!= NGX_OK)
{
dd("failed to compile");
return NGX_ERROR;
}
if (target.len == 0) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"drizzle: handler: empty \"drizzle_pass\" target");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
url.host = target;
url.port = 0;
url.no_resolve = 1;
dlcf->upstream.upstream = ngx_http_upstream_drizzle_add(r, &url);
if (dlcf->upstream.upstream == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"drizzle: upstream \"%V\" not found", &target);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
}
#if defined(nginx_version) && nginx_version < 8017
dctx = ngx_pcalloc(r->pool, sizeof(ngx_http_drizzle_ctx_t));
if (dctx == NULL) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
ngx_http_set_ctx(r, dctx, ngx_http_drizzle_module);
#endif
u->schema.len = sizeof("drizzle://") - 1;
u->schema.data = (u_char *) "drizzle://";
u->output.tag = (ngx_buf_tag_t) &ngx_http_drizzle_module;
dd("drizzle tag: %p", (void *) u->output.tag);
u->conf = &dlcf->upstream;
u->create_request = ngx_http_drizzle_create_request;
u->reinit_request = ngx_http_drizzle_reinit_request;
u->process_header = ngx_http_drizzle_process_header;
u->abort_request = ngx_http_drizzle_abort_request;
u->finalize_request = ngx_http_drizzle_finalize_request;
/* we bypass the upstream input filter mechanism in
* ngx_http_upstream_process_headers */
u->input_filter_init = ngx_http_drizzle_input_filter_init;
u->input_filter = ngx_http_drizzle_input_filter;
u->input_filter_ctx = NULL;
#if defined(nginx_version) && nginx_version >= 8011
r->main->count++;
#endif
dd("XXX connect timeout: %d", (int) dlcf->upstream.connect_timeout);
ngx_http_upstream_dbd_init(r);
/* override the read/write event handler to our own */
u->write_event_handler = ngx_http_drizzle_wev_handler;
u->read_event_handler = ngx_http_drizzle_rev_handler;
/* a bit hack-ish way to return error response (clean-up part) */
if ((u->peer.connection) && (u->peer.connection->fd == 0)) {
c = u->peer.connection;
u->peer.connection = NULL;
if (c->write->timer_set) {
ngx_del_timer(c->write);
}
ngx_free_connection(c);
ngx_http_upstream_drizzle_finalize_request(r, u,
#if defined(nginx_version) && (nginx_version >= 8017)
NGX_HTTP_SERVICE_UNAVAILABLE);
#else
dctx->status ? dctx->status : NGX_HTTP_INTERNAL_SERVER_ERROR);
#endif
}
return NGX_DONE;
}
void
ngx_http_drizzle_wev_handler(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
ngx_connection_t *c;
dd("drizzle wev handler");
/* just to ensure u->reinit_request always gets called for
* upstream_next */
u->request_sent = 1;
c = u->peer.connection;
if (c->write->timedout) {
dd("drizzle connection write timeout");
ngx_http_drizzle_set_thread_id_variable(r, u);
ngx_http_upstream_drizzle_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT);
return;
}
if (ngx_http_upstream_drizzle_test_connect(c) != NGX_OK) {
dd("drizzle connection is broken");
ngx_http_upstream_drizzle_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
ngx_http_drizzle_set_libdrizzle_ready(r);
(void) ngx_http_drizzle_process_events(r);
}
void
ngx_http_drizzle_rev_handler(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
ngx_connection_t *c;
dd("drizzle rev handler");
/* just to ensure u->reinit_request always gets called for
* upstream_next */
u->request_sent = 1;
c = u->peer.connection;
if (c->read->timedout) {
dd("drizzle connection read timeout");
ngx_http_drizzle_set_thread_id_variable(r, u);
ngx_http_upstream_drizzle_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT);
return;
}
if (ngx_http_upstream_drizzle_test_connect(c) != NGX_OK) {
dd("drizzle connection is broken");
ngx_http_upstream_drizzle_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
ngx_http_drizzle_set_libdrizzle_ready(r);
(void) ngx_http_drizzle_process_events(r);
}
static ngx_int_t
ngx_http_drizzle_create_request(ngx_http_request_t *r)
{
r->upstream->request_bufs = NULL;
return NGX_OK;
}
static ngx_int_t
ngx_http_drizzle_reinit_request(ngx_http_request_t *r)
{
ngx_http_upstream_t *u;
u = r->upstream;
/* override the read/write event handler to our own */
u->write_event_handler = ngx_http_drizzle_wev_handler;
u->read_event_handler = ngx_http_drizzle_rev_handler;
return NGX_OK;
}
static void
ngx_http_drizzle_abort_request(ngx_http_request_t *r)
{
}
static void
ngx_http_drizzle_finalize_request(ngx_http_request_t *r,
ngx_int_t rc)
{
}
static ngx_int_t
ngx_http_drizzle_process_header(ngx_http_request_t *r)
{
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"ngx_http_drizzle_process_header should not be called"
" by the upstream");
return NGX_ERROR;
}
static ngx_int_t
ngx_http_drizzle_input_filter_init(void *data)
{
ngx_http_request_t *r = data;
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"ngx_http_drizzle_input_filter_init should not be called"
" by the upstream");
return NGX_ERROR;
}
static ngx_int_t
ngx_http_drizzle_input_filter(void *data, ssize_t bytes)
{
ngx_http_request_t *r = data;
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"ngx_http_drizzle_input_filter should not be called"
" by the upstream");
return NGX_ERROR;
}
void
ngx_http_drizzle_set_libdrizzle_ready(ngx_http_request_t *r)
{
ngx_http_upstream_drizzle_peer_data_t *dp;
drizzle_con_st *dc;
#if 1
short revents = 0;
#endif
dp = r->upstream->peer.data;
dc = dp->drizzle_con;
#if 0
/* libdrizzle uses standard poll() event constants
* and depends on drizzle_con_wait() to set them.
* we can directly call drizzle_con_wait() here to
* set those drizzle internal event states, because
* epoll() and other underlying event mechamism used
* by the nginx core can play well enough with poll().
* */
(void) drizzle_con_wait(dc->drizzle);
#endif
#if 1
revents |= POLLOUT;
revents |= POLLIN;
/* drizzle_con_set_revents() isn't declared external in libdrizzle-0.4.0, */
/* so we have to do its job all by ourselves... */
dc->options |= DRIZZLE_CON_IO_READY;
dc->revents = revents;
dc->events &= (short) ~revents;
#endif
}
ngx_int_t
ngx_http_drizzle_status_handler(ngx_http_request_t *r)
{
ngx_http_upstream_main_conf_t *umcf;
ngx_http_upstream_srv_conf_t **uscfp;
ngx_http_upstream_srv_conf_t *uscf;
ngx_uint_t i, n;
ngx_chain_t *cl;
ngx_buf_t *b;
size_t len;
ngx_int_t rc;
ngx_queue_t *q;
ngx_http_drizzle_keepalive_cache_t *item;
ngx_http_upstream_drizzle_srv_conf_t *dscf;
umcf = ngx_http_get_module_main_conf(r, ngx_http_upstream_module);
uscfp = umcf->upstreams.elts;
/* calculate the output buffer length */
len = 0;
if (ngx_process == NGX_PROCESS_WORKER) {
len += sizeof("worker process: \n\n") - 1
+ ngx_http_drizzle_get_num_size(ngx_pid);
}
n = 0;
for (i = 0; i < umcf->upstreams.nelts; i++) {
uscf = uscfp[i];
if (uscf->srv_conf == NULL) {
/* skip implicit upstream specified directly by the fastcgi_pass,
* proxy_pass, and similar directives */
continue;
}
dscf = ngx_http_conf_upstream_srv_conf(uscf, ngx_http_drizzle_module);
if (dscf == NULL || dscf->servers == NULL) {
continue;
}
if (n != 0) {
len += sizeof("\n") - 1;
}
n++;
len += sizeof("upstream \n") - 1
+ uscf->host.len
+ sizeof(" active connections: \n") - 1
+ ngx_http_drizzle_get_num_size(dscf->active_conns)
+ sizeof(" connection pool capacity: \n") - 1
+ ngx_http_drizzle_get_num_size(dscf->max_cached);
if (dscf->max_cached) {
/* dump overflow flag for the connection pool */
switch (dscf->overflow) {
case drizzle_keepalive_overflow_ignore:
len += sizeof(" overflow: ignore\n") - 1;
break;
case drizzle_keepalive_overflow_reject:
len += sizeof(" overflow: reject\n") - 1;
break;
default:
len += sizeof(" overflow: N/A\n") - 1;
break;
}
/* dump the lengths of the "cache" and "free" queues in the pool */
len += sizeof(" cached connection queue: \n") - 1
+ ngx_http_drizzle_get_num_size(
ngx_http_drizzle_queue_size(&dscf->cache)
)
+ sizeof(" free'd connection queue: \n") - 1
+ ngx_http_drizzle_get_num_size(
ngx_http_drizzle_queue_size(&dscf->free)
)
/* dump how many times that each individual connection in the
* pool has been successfully used in the "cache" queue */
+ sizeof(" cached connection successfully used count:\n") - 1;
for (q = ngx_queue_head(&dscf->cache);
q != ngx_queue_sentinel(&dscf->cache);
q = ngx_queue_next(q))
{
item = ngx_queue_data(q, ngx_http_drizzle_keepalive_cache_t,
queue);
len += sizeof(" ") - 1
+ ngx_http_drizzle_get_num_size(item->used);
}
/* dump how many times that each individual connection in the
* pool has been successfully used in the "free" queue */
len += sizeof(" free'd connection successfully used count:\n") - 1;
for (q = ngx_queue_head(&dscf->free);
q != ngx_queue_sentinel(&dscf->free);
q = ngx_queue_next(q))
{
item = ngx_queue_data(q, ngx_http_drizzle_keepalive_cache_t,
queue);
len += sizeof(" ") - 1
+ ngx_http_drizzle_get_num_size(item->used);
}
}
len += sizeof(" servers: \n") - 1
+ ngx_http_drizzle_get_num_size(dscf->servers->nelts)
+ sizeof(" peers: \n") - 1
+ ngx_http_drizzle_get_num_size(dscf->peers->number);
}
/* allocate the output buffer */
b = ngx_create_temp_buf(r->pool, len);
if (b == NULL) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
/* fill in the output buffer with the actual data */
if (ngx_process == NGX_PROCESS_WORKER) {
b->last = ngx_sprintf(b->last, "worker process: %P\n\n", ngx_pid);
}
n = 0;
for (i = 0; i < umcf->upstreams.nelts; i++) {
uscf = uscfp[i];
if (uscf->srv_conf == NULL) {
/* skip implicit upstream specified directly by the fastcgi_pass,
* proxy_pass, and similar directives */
continue;
}
dscf = ngx_http_conf_upstream_srv_conf(uscf, ngx_http_drizzle_module);
if (dscf == NULL || dscf->servers == NULL) {
continue;
}
if (n != 0) {
*b->last++ = '\n';
}
n++;
b->last = ngx_copy_const_str(b->last, "upstream ");
b->last = ngx_copy(b->last, uscf->host.data, uscf->host.len);
*b->last++ = '\n';
b->last = ngx_sprintf(b->last, " active connections: %uD\n",
dscf->active_conns);
b->last = ngx_sprintf(b->last, " connection pool capacity: %uD\n",
dscf->max_cached);
if (dscf->max_cached) {
/* dump overflow flag for the connection pool */
switch (dscf->overflow) {
case drizzle_keepalive_overflow_ignore:
b->last = ngx_copy_const_str(b->last,
" overflow: ignore\n");
break;
case drizzle_keepalive_overflow_reject:
b->last = ngx_copy_const_str(b->last,
" overflow: reject\n");
break;
default:
b->last = ngx_copy_const_str(b->last, " overflow: N/A\n");
break;
}
/* dump the lengths of the "cache" and "free" queues in the pool */
b->last = ngx_sprintf(b->last, " cached connection queue: %uD\n",
ngx_http_drizzle_queue_size(&dscf->cache));
b->last = ngx_sprintf(b->last, " free'd connection queue: %uD\n",
ngx_http_drizzle_queue_size(&dscf->free));
/* dump how many times that each individual connection in the
* pool has been successfully used in the "cache" queue */
b->last = ngx_copy_const_str(b->last, " cached connection "
"successfully used count:");
for (q = ngx_queue_head(&dscf->cache);
q != ngx_queue_sentinel(&dscf->cache);
q = ngx_queue_next(q))
{
item = ngx_queue_data(q, ngx_http_drizzle_keepalive_cache_t,
queue);
b->last = ngx_sprintf(b->last, " %uD", item->used);
}
*b->last++ = '\n';
/* dump how many times that each individual connection in the
* pool has been successfully used in the "free" queue */
b->last = ngx_copy_const_str(b->last, " free'd connection "
"successfully used count:");
for (q = ngx_queue_head(&dscf->free);
q != ngx_queue_sentinel(&dscf->free);
q = ngx_queue_next(q))
{
item = ngx_queue_data(q, ngx_http_drizzle_keepalive_cache_t,
queue);
b->last = ngx_sprintf(b->last, " %uD", item->used);
}
*b->last++ = '\n';
}
b->last = ngx_sprintf(b->last, " servers: %uD\n",
dscf->servers->nelts);
b->last = ngx_sprintf(b->last, " peers: %uD\n", dscf->peers->number);
}
if (b->last != b->end) {
ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
"drizzle_status output buffer error: %O != %O",
(off_t) (b->last - b->pos),
(off_t) (b->end - b->pos));
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
if (r == r->main) {
b->last_buf = 1;
}
cl = ngx_alloc_chain_link(r->pool);
if (cl == NULL) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
cl->buf = b;
cl->next = NULL;
r->headers_out.status = NGX_HTTP_OK;
rc = ngx_http_send_header(r);
if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
return rc;
}
return ngx_http_output_filter(r, cl);
}