Permalink
Fetching contributors…
Cannot retrieve contributors at this time
788 lines (568 sloc) 19.9 KB
/*
* Copyright (C) Yichun Zhang (agentzh)
*/
#ifndef DDEBUG
#define DDEBUG 0
#endif
#include "ddebug.h"
#include "ngx_http_drizzle_module.h"
#include "ngx_http_drizzle_output.h"
#include "ngx_http_drizzle_processor.h"
#include "ngx_http_drizzle_util.h"
#include "resty_dbd_stream.h"
#include <nginx.h>
#define ngx_http_drizzle_module_header_key "X-Resty-DBD-Module"
#define ngx_http_drizzle_module_header_key_len \
(sizeof(ngx_http_drizzle_module_header_key) - 1)
#define ngx_http_drizzle_module_header_val \
"ngx_drizzle " \
ngx_http_drizzle_module_version_string
#define ngx_http_drizzle_module_header_val_len \
(sizeof(ngx_http_drizzle_module_header_val) - 1)
#define ngx_http_drizzle_module_header_key_len \
(sizeof(ngx_http_drizzle_module_header_key) - 1)
static ngx_int_t ngx_http_drizzle_get_buf(ngx_http_request_t *r,
ngx_http_upstream_drizzle_peer_data_t *dp);
static u_char *ngx_http_drizzle_get_postponed(ngx_http_request_t *r,
ngx_http_upstream_drizzle_peer_data_t *dp, size_t len);
static rds_col_type_t ngx_http_drizzle_std_col_type(
drizzle_column_type_t col_type);
static u_char *ngx_http_drizzle_request_mem(ngx_http_request_t *r,
ngx_http_upstream_drizzle_peer_data_t *dp, size_t len);
static ngx_int_t ngx_http_drizzle_submit_mem(ngx_http_request_t *r,
ngx_http_upstream_drizzle_peer_data_t *dp, size_t len);
ngx_int_t
ngx_http_drizzle_output_result_header(ngx_http_request_t *r,
drizzle_result_st *res)
{
u_char *pos, *last;
ngx_int_t rc;
ngx_http_upstream_t *u = r->upstream;
const char *errstr;
size_t size;
uint16_t errstr_len;
uint16_t col_count;
uint16_t errcode;
ngx_http_upstream_drizzle_peer_data_t *dp = u->peer.data;
errcode = drizzle_result_error_code(res);
if (dp->enable_charset && ! dp->has_set_names) {
if (errcode != 0) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"drizzle: FATAL: failed to set names 'utf8' "
"(error %d)", (int) errcode);
return NGX_ERROR;
}
if (dp->drizzle_con && dp->drizzle_res.con) {
dd("before drizzle result free");
dd("%p vs. %p", dp->drizzle_res.con, dp->drizzle_con);
drizzle_result_free(&dp->drizzle_res);
dd("after drizzle result free");
}
/* ngx_http_upstream_drizzle_done(r, u, dp, NGX_OK); */
dd("returning DONE when set names");
return NGX_DONE;
}
errstr = drizzle_result_error(res);
errstr_len = (uint16_t) ngx_strlen(errstr);
col_count = drizzle_result_column_count(res);
size = sizeof(uint8_t) /* endian type */
+ sizeof(uint32_t) /* format version */
+ sizeof(uint8_t) /* result type */
+ sizeof(uint16_t) /* standard error code */
+ sizeof(uint16_t) /* driver-specific error code */
+ sizeof(uint16_t) /* driver-specific errstr len */
+ errstr_len /* driver-specific errstr data */
+ sizeof(uint64_t) /* rows affected */
+ sizeof(uint64_t) /* insert id */
+ sizeof(uint16_t); /* column count */
pos = ngx_http_drizzle_request_mem(r, dp, size);
if (pos == NULL) {
return NGX_ERROR;
}
last = pos;
#if NGX_HAVE_LITTLE_ENDIAN
*last++ = 0;
#else /* big endian */
*last++ = 1;
#endif
/* RDS format version */
*(uint32_t *) last = (uint32_t) resty_dbd_stream_version;
last += sizeof(uint32_t);
/* result type fixed to 0 */
*last++ = 0;
/* standard error code
* FIXME: define the standard error code set and map
* libdrizzle's to it. */
*(uint16_t *) last = errcode;
last += sizeof(uint16_t);
/* driver-specific error code */
*(uint16_t *) last = drizzle_result_error_code(res);
last += sizeof(uint16_t);
/* driver-specific errstr len */
*(uint16_t *) last = errstr_len;
last += sizeof(uint16_t);
/* driver-specific errstr data */
if (errstr_len) {
last = ngx_copy(last, (u_char *) errstr, errstr_len);
}
/* affected rows */
*(uint64_t *) last = drizzle_result_affected_rows(res);
last += sizeof(uint64_t);
/* insert id */
*(uint64_t *) last = drizzle_result_insert_id(res);
last += sizeof(uint64_t);
/* column count */
*(uint16_t *) last = col_count;
last += sizeof(uint16_t);
if ((size_t) (last - pos) != size) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"drizzle: FATAL: output result header buffer error");
return NGX_ERROR;
}
if (col_count == 0) {
dd("Col count is ZERO");
/* we suppress row terminator here when there's no columns */
dp->seen_stream_end = 1;
rc = ngx_http_drizzle_submit_mem(r, dp, size);
if (rc != NGX_OK) {
return NGX_ERROR;
}
dd("about to be done...");
ngx_http_upstream_drizzle_done(r, u, dp, NGX_DONE);
dd("i am returning DONE");
return NGX_DONE;
}
return ngx_http_drizzle_submit_mem(r, dp, size);
}
ngx_int_t
ngx_http_drizzle_output_bufs(ngx_http_request_t *r,
ngx_http_upstream_drizzle_peer_data_t *dp)
{
ngx_http_upstream_t *u = r->upstream;
ngx_int_t rc;
ngx_str_t key, value;
ngx_http_drizzle_loc_conf_t *dlcf;
ngx_chain_t *cl;
dd("enter");
dd_dump_chain_size();
if (dp->seen_stream_end) {
dp->seen_stream_end = 0;
if (dp->avail_out) {
cl = ngx_alloc_chain_link(r->pool);
if (cl == NULL) {
return NGX_ERROR;
}
cl->buf = dp->out_buf;
cl->next = NULL;
*dp->last_out = cl;
dp->last_out = &cl->next;
dp->avail_out = 0;
}
}
if (!u->header_sent && u->out_bufs) {
ngx_http_clear_content_length(r);
r->headers_out.status = NGX_HTTP_OK;
/* set the Content-Type header */
r->headers_out.content_type.data = (u_char *) rds_content_type;
r->headers_out.content_type.len = rds_content_type_len;
r->headers_out.content_type_len = rds_content_type_len;
r->headers_out.content_type_lowcase = NULL;
dlcf = ngx_http_get_module_loc_conf(r, ngx_http_drizzle_module);
if (dlcf->enable_module_header) {
/* set the X-Resty-DBD-Module header */
key.data = (u_char *) ngx_http_drizzle_module_header_key;
key.len = ngx_http_drizzle_module_header_key_len;
value.data = (u_char *) ngx_http_drizzle_module_header_val;
value.len = ngx_http_drizzle_module_header_val_len;
rc = ngx_http_drizzle_set_header(r, &key, &value);
if (rc != NGX_OK) {
return NGX_ERROR;
}
}
rc = ngx_http_send_header(r);
if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
return rc;
}
u->header_sent = 1;
}
for ( ;; ) {
if (u->out_bufs == NULL) {
return NGX_OK;
}
#if 0
{
int n;
ngx_chain_t *cl;
for (n = 0, cl = u->out_bufs; cl; cl = cl->next, n++) {
}
fprintf(stderr, "XXX chain size: %d\n", n);
}
#endif
rc = ngx_http_output_filter(r, u->out_bufs);
if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
return rc;
}
#if defined(nginx_version) && nginx_version >= 1001004
ngx_chain_update_chains(r->pool, &u->free_bufs, &u->busy_bufs,
&u->out_bufs, u->output.tag);
#else
ngx_chain_update_chains(&u->free_bufs, &u->busy_bufs, &u->out_bufs,
u->output.tag);
#endif
dp->last_out = &u->out_bufs;
}
/* impossible to reach here */
}
ngx_int_t
ngx_http_drizzle_output_col(ngx_http_request_t *r, drizzle_column_st *col)
{
u_char *pos, *last;
ngx_http_upstream_t *u = r->upstream;
drizzle_column_type_t col_type = 0;
uint16_t std_col_type = 0;
const char *col_name = NULL;
uint16_t col_name_len = 0;
size_t size;
ngx_http_upstream_drizzle_peer_data_t *dp = u->peer.data;
if (col == NULL) {
return NGX_ERROR;
}
col_type = drizzle_column_type(col);
col_name = drizzle_column_name(col);
col_name_len = (uint16_t) strlen(col_name);
size = sizeof(uint16_t) /* std col type */
+ sizeof(uint16_t) /* driver-specific col type */
+ sizeof(uint16_t) /* col name str len */
+ col_name_len; /* col name str len */
pos = ngx_http_drizzle_request_mem(r, dp, size);
if (pos == NULL) {
return NGX_ERROR;
}
last = pos;
/* std column type */
std_col_type = (uint16_t) ngx_http_drizzle_std_col_type(col_type);
#if 0
dd("std col type for %s: %d, %d (%d, %d, %d)",
col_name, std_col_type, rds_col_type_blob,
rds_rough_col_type_str,
rds_rough_col_type_str << 14,
(uint16_t) (19 | (rds_rough_col_type_str << 14))
);
#endif
*(uint16_t *) last = std_col_type;
last += sizeof(uint16_t);
/* drizzle column type */
*(uint16_t *) last = col_type;
last += sizeof(uint16_t);
/* column name string length */
*(uint16_t *) last = col_name_len;
last += sizeof(uint16_t);
/* column name string data */
last = ngx_copy(last, col_name, col_name_len);
if ((size_t) (last - pos) != size) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"drizzle: FATAL: output column buffer error");
return NGX_ERROR;
}
return ngx_http_drizzle_submit_mem(r, dp, size);
}
ngx_int_t
ngx_http_drizzle_output_row(ngx_http_request_t *r, uint64_t row)
{
u_char *pos, *last;
ngx_http_upstream_t *u = r->upstream;
size_t size;
ngx_http_upstream_drizzle_peer_data_t *dp = u->peer.data;
size = sizeof(uint8_t);
pos = ngx_http_drizzle_request_mem(r, dp, size);
if (pos == NULL) {
return NGX_ERROR;
}
last = pos;
*last++ = (row != 0);
if (row == 0) {
dp->seen_stream_end = 1;
}
return ngx_http_drizzle_submit_mem(r, dp, size);
}
ngx_int_t
ngx_http_drizzle_output_field(ngx_http_request_t *r, size_t offset,
size_t len, size_t total, drizzle_field_t field)
{
u_char *pos, *last;
ngx_http_upstream_t *u = r->upstream;
size_t size = 0;
ngx_http_upstream_drizzle_peer_data_t *dp = u->peer.data;
if (offset == 0) {
if (len == 0 && total != 0) {
return NGX_DONE;
}
size = sizeof(uint32_t); /* field total length */
}
/* (more) field data */
size += (uint32_t) len;
/* request memory */
pos = ngx_http_drizzle_request_mem(r, dp, size);
if (pos == NULL) {
return NGX_ERROR;
}
last = pos;
/* fill in the buffer */
if (offset == 0) {
/* field total length */
if (field == NULL) {
*(uint32_t *) last = (uint32_t) -1;
} else {
*(uint32_t *) last = (uint32_t) total;
}
last += sizeof(uint32_t);
}
/* field data */
if (len && field) {
last = ngx_copy(last, field, (uint32_t) len);
}
if ((size_t) (last - pos) != size) {
dd("offset %d, len %d, size %d", (int) offset,
(int) len, (int) size);
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"drizzle: FATAL: output field buffer error");
return NGX_ERROR;
}
return ngx_http_drizzle_submit_mem(r, dp, size);
}
static rds_col_type_t
ngx_http_drizzle_std_col_type(drizzle_column_type_t col_type)
{
dd("drizzle col type: %d", col_type);
switch (col_type) {
case DRIZZLE_COLUMN_TYPE_DECIMAL:
return rds_col_type_decimal;
case DRIZZLE_COLUMN_TYPE_TINY:
return rds_col_type_smallint;
case DRIZZLE_COLUMN_TYPE_SHORT:
return rds_col_type_smallint;
case DRIZZLE_COLUMN_TYPE_LONG:
return rds_col_type_integer;
case DRIZZLE_COLUMN_TYPE_FLOAT:
return rds_col_type_real;
case DRIZZLE_COLUMN_TYPE_DOUBLE:
return rds_col_type_double;
case DRIZZLE_COLUMN_TYPE_NULL:
return rds_col_type_unknown;
case DRIZZLE_COLUMN_TYPE_TIMESTAMP:
return rds_col_type_timestamp;
case DRIZZLE_COLUMN_TYPE_LONGLONG:
return rds_col_type_bigint;
case DRIZZLE_COLUMN_TYPE_INT24:
return rds_col_type_integer;
case DRIZZLE_COLUMN_TYPE_DATE:
return rds_col_type_timestamp;
case DRIZZLE_COLUMN_TYPE_TIME:
return rds_col_type_time;
case DRIZZLE_COLUMN_TYPE_DATETIME:
return rds_col_type_timestamp;
case DRIZZLE_COLUMN_TYPE_YEAR:
return rds_col_type_smallint;
case DRIZZLE_COLUMN_TYPE_NEWDATE:
return rds_col_type_timestamp;
case DRIZZLE_COLUMN_TYPE_VARCHAR:
return rds_col_type_varchar;
case DRIZZLE_COLUMN_TYPE_BIT:
return rds_col_type_bit;
case DRIZZLE_COLUMN_TYPE_NEWDECIMAL:
return rds_col_type_decimal;
case DRIZZLE_COLUMN_TYPE_ENUM:
return rds_col_type_varchar;
case DRIZZLE_COLUMN_TYPE_SET:
return rds_col_type_varchar;
case DRIZZLE_COLUMN_TYPE_TINY_BLOB:
return rds_col_type_blob;
case DRIZZLE_COLUMN_TYPE_MEDIUM_BLOB:
return rds_col_type_blob;
case DRIZZLE_COLUMN_TYPE_LONG_BLOB:
return rds_col_type_blob;
case DRIZZLE_COLUMN_TYPE_BLOB:
return rds_col_type_blob;
case DRIZZLE_COLUMN_TYPE_VAR_STRING:
return rds_col_type_varchar;
case DRIZZLE_COLUMN_TYPE_STRING:
return rds_col_type_varchar;
case DRIZZLE_COLUMN_TYPE_GEOMETRY:
return rds_col_type_varchar;
default:
return rds_col_type_unknown;
}
/* impossible to reach here */
}
static u_char *
ngx_http_drizzle_request_mem(ngx_http_request_t *r,
ngx_http_upstream_drizzle_peer_data_t *dp, size_t len)
{
ngx_int_t rc;
u_char *p;
rc = ngx_http_drizzle_get_buf(r, dp);
if (rc != NGX_OK) {
return NULL;
}
if (dp->avail_out < len) {
p = ngx_http_drizzle_get_postponed(r, dp, len);
if (p == NULL) {
return NULL;
}
dp->postponed.pos = p;
dp->postponed.last = p + len;
return p;
}
return dp->out_buf->last;
}
static ngx_int_t
ngx_http_drizzle_get_buf(ngx_http_request_t *r,
ngx_http_upstream_drizzle_peer_data_t *dp)
{
ngx_http_drizzle_loc_conf_t *conf = dp->loc_conf;
ngx_http_upstream_t *u = dp->upstream;
dd("MEM enter");
if (dp->avail_out) {
return NGX_OK;
}
if (u->free_bufs) {
dd("MEM reusing temp buf from free_bufs");
dp->out_buf = u->free_bufs->buf;
u->free_bufs = u->free_bufs->next;
} else {
dd("MEM creating temp buf with size: %d", (int) conf->buf_size);
dp->out_buf = ngx_create_temp_buf(r->pool,
conf->buf_size);
if (dp->out_buf == NULL) {
return NGX_ERROR;
}
dp->out_buf->tag = (ngx_buf_tag_t) &ngx_http_drizzle_module;
dp->out_buf->recycled = 1;
}
dp->avail_out = conf->buf_size;
return NGX_OK;
}
static u_char *
ngx_http_drizzle_get_postponed(ngx_http_request_t *r,
ngx_http_upstream_drizzle_peer_data_t *dp, size_t len)
{
u_char *p;
dd("MEM enter");
if (dp->cached.start == NULL) {
goto alloc;
}
if ((size_t) (dp->cached.end - dp->cached.start) < len) {
ngx_pfree(r->pool, dp->cached.start);
goto alloc;
}
return dp->cached.start;
alloc:
p = ngx_palloc(r->pool, len);
if (p == NULL) {
return NULL;
}
dp->cached.start = p;
dp->cached.end = p + len;
return p;
}
static ngx_int_t
ngx_http_drizzle_submit_mem(ngx_http_request_t *r,
ngx_http_upstream_drizzle_peer_data_t *dp, size_t len)
{
ngx_chain_t *cl;
ngx_int_t rc;
size_t postponed_len;
ngx_http_drizzle_loc_conf_t *conf = dp->loc_conf;
if (dp->postponed.pos != NULL) {
dd("MEM copy postponed data over to u->out_bufs for len %d", (int) len);
postponed_len = dp->postponed.last - dp->postponed.pos;
if (postponed_len > dp->avail_out) {
/* we should ensure that rds atoms do not get
* splitted into multiple bufs. */
if (dp->out_buf && dp->out_buf->pos != dp->out_buf->last) {
/* save the current dp->out_buf */
cl = ngx_alloc_chain_link(r->pool);
if (cl == NULL) {
return NGX_ERROR;
}
cl->buf = dp->out_buf;
cl->next = NULL;
*dp->last_out = cl;
dp->last_out = &cl->next;
}
/* create a buf for the postponed buf */
len = postponed_len > conf->buf_size ?
postponed_len : conf->buf_size;
dp->out_buf = ngx_create_temp_buf(r->pool, len);
if (dp->out_buf == NULL) {
return NGX_ERROR;
}
dp->out_buf->tag = (ngx_buf_tag_t) &ngx_http_drizzle_module;
dp->out_buf->recycled = 1;
dp->out_buf->last = ngx_copy(dp->out_buf->last, dp->postponed.pos,
postponed_len);
dp->avail_out = len - postponed_len;
dp->postponed.pos = NULL;
if (dp->avail_out == 0) {
/* save the new big buf */
cl = ngx_alloc_chain_link(r->pool);
if (cl == NULL) {
return NGX_ERROR;
}
cl->buf = dp->out_buf;
cl->next = NULL;
*dp->last_out = cl;
dp->last_out = &cl->next;
}
return NGX_OK;
}
for ( ;; ) {
len = dp->postponed.last - dp->postponed.pos;
if (len > dp->avail_out) {
len = dp->avail_out;
}
dp->out_buf->last = ngx_copy(dp->out_buf->last,
dp->postponed.pos, len);
dp->avail_out -= len;
dp->postponed.pos += len;
if (dp->postponed.pos == dp->postponed.last) {
dp->postponed.pos = NULL;
}
if (dp->avail_out > 0) {
break;
}
dd("MEM save dp->out_buf");
cl = ngx_alloc_chain_link(r->pool);
if (cl == NULL) {
return NGX_ERROR;
}
cl->buf = dp->out_buf;
cl->next = NULL;
*dp->last_out = cl;
dp->last_out = &cl->next;
if (dp->postponed.pos == NULL) {
break;
}
rc = ngx_http_drizzle_get_buf(r, dp);
if (rc != NGX_OK) {
return NGX_ERROR;
}
}
return NGX_OK;
}
dd("MEM consuming out_buf for %d", (int) len);
dp->out_buf->last += len;
dp->avail_out -= len;
if (dp->avail_out == 0) {
dd("MEM save dp->out_buf");
cl = ngx_alloc_chain_link(r->pool);
if (cl == NULL) {
return NGX_ERROR;
}
cl->buf = dp->out_buf;
cl->next = NULL;
*dp->last_out = cl;
dp->last_out = &cl->next;
}
return NGX_OK;
}