Permalink
Find file
Fetching contributors…
Cannot retrieve contributors at this time
477 lines (335 sloc) 11 KB
/*
* Copyright (C) agentzh
*/
#ifndef DDEBUG
#define DDEBUG 0
#endif
#include "ddebug.h"
#include "ngx_http_rds_csv_processor.h"
#include "ngx_http_rds_csv_util.h"
#include "ngx_http_rds_csv_output.h"
#include "ngx_http_rds.h"
#include "ngx_http_rds_utils.h"
#include <ngx_core.h>
#include <ngx_http.h>
ngx_int_t
ngx_http_rds_csv_process_header(ngx_http_request_t *r, ngx_chain_t *in,
ngx_http_rds_csv_ctx_t *ctx)
{
ngx_buf_t *b;
ngx_http_rds_header_t header;
ngx_int_t rc;
if (in == NULL) {
return NGX_OK;
}
b = in->buf;
if (!ngx_buf_in_memory(b)) {
if (!ngx_buf_special(b)) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"rds_csv: process header: buf from "
"upstream not in memory");
goto invalid;
}
in = in->next;
if (in == NULL) {
return NGX_OK;
}
b = in->buf;
}
rc = ngx_http_rds_parse_header(r, b, &header);
if (rc != NGX_OK) {
goto invalid;
}
dd("col count: %d", (int) header.col_count);
if (header.col_count == 0) {
/* for empty result set, just return the JSON
* representation of the RDS header */
dd("col count == 0");
if (b->pos != b->last) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"rds_csv: header: there's unexpected remaining data "
"in the buf");
goto invalid;
}
ctx->state = state_done;
/* now we send the postponed response header */
if (!ctx->header_sent) {
ctx->header_sent = 1;
rc = ngx_http_rds_csv_next_header_filter(r);
if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
return rc;
}
}
rc = ngx_http_rds_csv_output_header(r, ctx, &header);
if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
return rc;
}
ngx_http_rds_csv_discard_bufs(r->pool, in);
return rc;
}
ctx->cols = ngx_palloc(r->pool,
header.col_count * sizeof(ngx_http_rds_column_t));
if (ctx->cols == NULL) {
goto invalid;
}
ctx->state = state_expect_col;
ctx->cur_col = 0;
ctx->col_count = header.col_count;
/* now we send the postponed response header */
if (!ctx->header_sent) {
ctx->header_sent = 1;
rc = ngx_http_rds_csv_next_header_filter(r);
if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
return rc;
}
}
return ngx_http_rds_csv_process_col(r, b->pos == b->last ? in->next : in,
ctx);
invalid:
dd("return 500");
if (!ctx->header_sent) {
ctx->header_sent = 1;
r->headers_out.status = NGX_HTTP_INTERNAL_SERVER_ERROR;
ngx_http_send_header(r);
ngx_http_send_special(r, NGX_HTTP_LAST);
return NGX_ERROR;
}
return NGX_ERROR;
}
ngx_int_t
ngx_http_rds_csv_process_col(ngx_http_request_t *r, ngx_chain_t *in,
ngx_http_rds_csv_ctx_t *ctx)
{
ngx_buf_t *b;
ngx_int_t rc;
ngx_http_rds_csv_loc_conf_t *conf;
if (in == NULL) {
return NGX_OK;
}
b = in->buf;
if (!ngx_buf_in_memory(b)) {
if (!ngx_buf_special(b)) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"rds_csv: process col: buf from upstream not in "
"memory");
return NGX_ERROR;
}
in = in->next;
if (in == NULL) {
return NGX_OK;
}
b = in->buf;
}
dd("parsing rds column");
rc = ngx_http_rds_parse_col(r, b, &ctx->cols[ctx->cur_col]);
dd("parse col returns %d (%d)", (int) rc, (int) NGX_OK);
if (rc != NGX_OK) {
return NGX_ERROR;
}
if (b->pos == b->last) {
dd("parse col buf consumed");
in = in->next;
}
ctx->cur_col++;
if (ctx->cur_col >= ctx->col_count) {
dd("end of column list");
ctx->state = state_expect_row;
ctx->row = 0;
conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module);
if (conf->field_name_header) {
rc = ngx_http_rds_csv_output_field_names(r, ctx);
if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
return rc;
}
}
dd("after output literal");
if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
return rc;
}
dd("process col is entering process row...");
return ngx_http_rds_csv_process_row(r, in, ctx);
}
return ngx_http_rds_csv_process_col(r, in, ctx);
}
ngx_int_t
ngx_http_rds_csv_process_row(ngx_http_request_t *r, ngx_chain_t *in,
ngx_http_rds_csv_ctx_t *ctx)
{
ngx_buf_t *b;
ngx_int_t rc;
if (in == NULL) {
return NGX_OK;
}
dd("process row");
b = in->buf;
if (!ngx_buf_in_memory(b)) {
if (!ngx_buf_special(b)) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"rds_csv: process row: buf from "
"upstream not in memory");
return NGX_ERROR;
}
in = in->next;
if (in == NULL) {
return NGX_OK;
}
b = in->buf;
}
if (b->last - b->pos < (ssize_t) sizeof(uint8_t)) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"rds_csv: row flag is incomplete in the buf");
return NGX_ERROR;
}
dd("row flag: %d (offset %d)", (char) *b->pos, (int) (b->pos - b->start));
if (*b->pos++ == 0) {
/* end of row list */
ctx->state = state_done;
if (b->pos != b->last) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"rds_csv: row: there's unexpected remaining data "
"in the buf");
return NGX_ERROR;
}
rc = ngx_http_rds_csv_output_literal(r, ctx, (u_char *) "", 0,
1 /* last buf*/);
if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
return rc;
}
return rc;
}
ctx->row++;
ctx->cur_col = 0;
ctx->state = state_expect_field;
if (b->pos == b->last) {
in = in->next;
} else {
dd("process row: buf not consumed completely");
}
return ngx_http_rds_csv_process_field(r, in, ctx);
}
ngx_int_t
ngx_http_rds_csv_process_field(ngx_http_request_t *r, ngx_chain_t *in,
ngx_http_rds_csv_ctx_t *ctx)
{
size_t total, len;
ngx_buf_t *b;
ngx_int_t rc;
for (;;) {
if (in == NULL) {
return NGX_OK;
}
b = in->buf;
if (!ngx_buf_in_memory(b)) {
dd("buf not in memory");
if (!ngx_buf_special(b)) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"rds_csv: process field: buf from "
"upstream not in memory");
return NGX_ERROR;
}
in = in->next;
if (in == NULL) {
return NGX_OK;
}
b = in->buf;
}
dd("process field: buf size: %d", (int) ngx_buf_size(b));
if (b->last - b->pos < (ssize_t) sizeof(uint32_t)) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"rds_csv: field size is incomplete in the buf: %*s "
"(len: %d)", b->last - b->pos, b->pos,
(int) (b->last - b->pos));
return NGX_ERROR;
}
total = *(uint32_t *) b->pos;
dd("total: %d", (int) total);
b->pos += sizeof(uint32_t);
if (total == (uint32_t) -1) {
/* SQL NULL found */
total = 0;
len = 0;
ctx->field_data_rest = 0;
rc = ngx_http_rds_csv_output_field(r, ctx, b->pos, len,
1 /* is null */);
} else {
len = (uint32_t) (b->last - b->pos);
if (len >= total) {
len = total;
}
ctx->field_data_rest = total - len;
rc = ngx_http_rds_csv_output_field(r, ctx, b->pos, len,
0 /* not null */);
}
if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
return rc;
}
b->pos += len;
if (b->pos == b->last) {
in = in->next;
}
if (len < total) {
dd("process field: need to read more field data");
ctx->state = state_expect_more_field_data;
return ngx_http_rds_csv_process_more_field_data(r, in, ctx);
}
ctx->cur_col++;
if (ctx->cur_col >= ctx->col_count) {
dd("reached the end of the current row");
ctx->state = state_expect_row;
return ngx_http_rds_csv_process_row(r, in, ctx);
}
/* continue to process the next field (if any) */
}
/* impossible to reach here */
return NGX_ERROR;
}
ngx_int_t
ngx_http_rds_csv_process_more_field_data(ngx_http_request_t *r,
ngx_chain_t *in, ngx_http_rds_csv_ctx_t *ctx)
{
ngx_int_t rc;
ngx_buf_t *b;
size_t len;
for (;;) {
if (in == NULL) {
return NGX_OK;
}
b = in->buf;
if (!ngx_buf_in_memory(b)) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"rds_csv: buf from upstream not in memory");
return NGX_ERROR;
}
len = b->last - b->pos;
if (len >= ctx->field_data_rest) {
len = ctx->field_data_rest;
ctx->field_data_rest = 0;
} else {
ctx->field_data_rest -= len;
}
rc = ngx_http_rds_csv_output_more_field_data(r, ctx, b->pos, len);
if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
return rc;
}
b->pos += len;
if (b->pos == b->last) {
in = in->next;
}
if (ctx->field_data_rest) {
dd("process more field data: still some data remaining");
continue;
}
dd("process more field data: reached the end of the current field");
ctx->cur_col++;
if (ctx->cur_col >= ctx->col_count) {
dd("process more field data: reached the end of the current row");
ctx->state = state_expect_row;
return ngx_http_rds_csv_process_row(r, in, ctx);
}
dd("proces more field data: read the next field");
ctx->state = state_expect_field;
return ngx_http_rds_csv_process_field(r, in, ctx);
}
/* impossible to reach here */
return NGX_ERROR;
}