Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

implemented bulk reply.

  • Loading branch information...
commit 9b9f7529665612d50bda4bea6b8f893885f2018f 1 parent 52d58f4
@agentzh agentzh authored
View
2  misc/serv.erl
@@ -34,6 +34,6 @@ get_request(Sock) ->
send_response(Sock).
send_response(Sock) ->
- gen_tcp:send(Sock, "+OK\r\nabc"),
+ gen_tcp:send(Sock, "$1a\r\n"),
gen_tcp:close(Sock).
View
29 src/bulk_reply.rl
@@ -0,0 +1,29 @@
+%%{
+ machine bulk_reply;
+
+ include common "common.rl";
+
+ chunk_size = (digit+ -- "0"+) >start_reading_size $read_size
+ ;
+
+ chunk_data_octet = any when test_len
+ ;
+
+ chunk_data = chunk_data_octet+
+ >start_reading_data
+ $read_data_byte
+ ;
+
+ chunk = "$0"+ CRLF CRLF
+ | "$-" (digit+ - "0"+) CRLF
+ | "$" chunk_size CRLF chunk_data CRLF
+ ;
+
+ response = chunk
+ ;
+
+ main := response @finalize
+ ;
+
+}%%
+
View
22 src/common.rl
@@ -6,5 +6,27 @@
action finalize {
done = 1;
}
+
+ action read_size {
+ ctx->chunk_size *= 10;
+ ctx->chunk_size += *p - '0';
+ }
+
+ action start_reading_size {
+ ctx->chunk_size = 0;
+ }
+
+ action start_reading_data {
+ ctx->chunk_bytes_read = 0;
+ }
+
+ action read_data_byte {
+ ctx->chunk_bytes_read++;
+ }
+
+ action test_len {
+ ctx->chunk_bytes_read < ctx->chunk_size
+ }
+
}%%
View
4 src/ngx_http_redis2_handler.c
@@ -56,7 +56,7 @@ ngx_http_redis2_handler(ngx_http_request_t *r)
u->abort_request = ngx_http_redis2_abort_request;
u->finalize_request = ngx_http_redis2_finalize_request;
- ctx = ngx_palloc(r->pool, sizeof(ngx_http_redis2_ctx_t));
+ ctx = ngx_pcalloc(r->pool, sizeof(ngx_http_redis2_ctx_t));
if (ctx == NULL) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
@@ -152,7 +152,7 @@ ngx_http_redis2_process_header(ngx_http_request_t *r)
/* the first char is the response header */
- chr = *b->pos++;
+ chr = *b->pos;
dd("response heazder: %c (ascii %d)", chr, chr);
View
2  src/ngx_http_redis2_module.h
@@ -24,6 +24,8 @@ typedef ngx_int_t (*ngx_http_redis2_filter_handler_ptr)
struct ngx_http_redis2_ctx_s {
ngx_http_request_t *request;
int state;
+ size_t chunk_size;
+ size_t chunk_bytes_read;
ngx_http_redis2_filter_handler_ptr filter;
};
View
400 src/ngx_http_redis2_reply.c
@@ -12,7 +12,7 @@
#line 14 "src/ngx_http_redis2_reply.c"
static const int single_line_reply_start = 1;
-static const int single_line_reply_first_final = 3;
+static const int single_line_reply_first_final = 4;
static const int single_line_reply_error = 0;
static const int single_line_reply_en_main = 1;
@@ -21,6 +21,19 @@ static const int single_line_reply_en_main = 1;
#line 10 "src/ngx_http_redis2_reply.rl"
+#line 12 "src/ngx_http_redis2_reply.rl"
+
+#line 27 "src/ngx_http_redis2_reply.c"
+static const int bulk_reply_start = 1;
+static const int bulk_reply_first_final = 15;
+static const int bulk_reply_error = 0;
+
+static const int bulk_reply_en_main = 1;
+
+
+#line 13 "src/ngx_http_redis2_reply.rl"
+
+
ngx_int_t
ngx_http_redis2_process_single_line_reply(ngx_http_redis2_ctx_t *ctx,
ssize_t bytes)
@@ -34,24 +47,22 @@ ngx_http_redis2_process_single_line_reply(ngx_http_redis2_ctx_t *ctx,
int cs;
u_char *p;
u_char *pe;
- ngx_flag_t first_time = 0;
u = ctx->request->upstream;
b = &u->buffer;
if (ctx->state == NGX_ERROR) {
- first_time = 1;
dd("init the state machine");
-#line 35 "src/ngx_http_redis2_reply.rl"
+#line 36 "src/ngx_http_redis2_reply.rl"
-#line 50 "src/ngx_http_redis2_reply.c"
+#line 61 "src/ngx_http_redis2_reply.c"
{
cs = single_line_reply_start;
}
-#line 36 "src/ngx_http_redis2_reply.rl"
+#line 37 "src/ngx_http_redis2_reply.rl"
ctx->state = cs;
@@ -63,58 +74,62 @@ ngx_http_redis2_process_single_line_reply(ngx_http_redis2_ctx_t *ctx,
p = b->last;
pe = b->last + bytes;
+ dd("response body: %.*s", (int) bytes, p);
+
-#line 48 "src/ngx_http_redis2_reply.rl"
+#line 51 "src/ngx_http_redis2_reply.rl"
-#line 49 "src/ngx_http_redis2_reply.rl"
+#line 52 "src/ngx_http_redis2_reply.rl"
-#line 72 "src/ngx_http_redis2_reply.c"
+#line 85 "src/ngx_http_redis2_reply.c"
{
if ( p == pe )
goto _test_eof;
switch ( cs )
{
-st1:
- if ( ++p == pe )
- goto _test_eof1;
case 1:
- if ( (*p) == 13 )
- goto st2;
- goto st1;
+ goto st2;
st2:
if ( ++p == pe )
goto _test_eof2;
case 2:
+ if ( (*p) == 13 )
+ goto st3;
+ goto st2;
+st3:
+ if ( ++p == pe )
+ goto _test_eof3;
+case 3:
switch( (*p) ) {
case 10: goto tr2;
- case 13: goto st2;
+ case 13: goto st3;
}
- goto st1;
+ goto st2;
tr2:
#line 6 "src/common.rl"
{
done = 1;
}
- goto st3;
-st3:
+ goto st4;
+st4:
if ( ++p == pe )
- goto _test_eof3;
-case 3:
-#line 104 "src/ngx_http_redis2_reply.c"
+ goto _test_eof4;
+case 4:
+#line 119 "src/ngx_http_redis2_reply.c"
goto st0;
st0:
cs = 0;
goto _out;
}
- _test_eof1: cs = 1; goto _test_eof;
_test_eof2: cs = 2; goto _test_eof;
_test_eof3: cs = 3; goto _test_eof;
+ _test_eof4: cs = 4; goto _test_eof;
_test_eof: {}
_out: {}
}
-#line 50 "src/ngx_http_redis2_reply.rl"
+#line 53 "src/ngx_http_redis2_reply.rl"
dd("state after exec: %d, done: %d", cs, (int) done);
@@ -128,14 +143,10 @@ cs = 0;
ngx_log_error(NGX_LOG_ERR, ctx->request->connection->log, 0,
"Redis server returns invalid response at %z near "
"\"%V\"",
- (ssize_t) (b->last - b->pos + 1),
+ (ssize_t) (p - b->pos),
&buf);
- return NGX_ERROR;
- }
-
- if (first_time) {
- b->last--;
+ return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
rc = ngx_http_redis2_output_buf(ctx, b->last, p - b->last);
@@ -157,7 +168,334 @@ ngx_int_t
ngx_http_redis2_process_bulk_reply(ngx_http_redis2_ctx_t *ctx,
ssize_t bytes)
{
- /* TODO */
+ ngx_buf_t *b;
+ ngx_http_upstream_t *u;
+ ngx_str_t buf;
+ ngx_int_t rc;
+ ngx_flag_t done = 0;
+
+ int cs;
+ u_char *p;
+ u_char *pe;
+
+ u = ctx->request->upstream;
+ b = &u->buffer;
+
+ if (ctx->state == NGX_ERROR) {
+ dd("init the state machine");
+
+
+#line 108 "src/ngx_http_redis2_reply.rl"
+
+#line 191 "src/ngx_http_redis2_reply.c"
+ {
+ cs = single_line_reply_start;
+ }
+
+#line 109 "src/ngx_http_redis2_reply.rl"
+
+ ctx->state = cs;
+
+ } else {
+ cs = ctx->state;
+ dd("resumed the old state %d", cs);
+ }
+
+ p = b->last;
+ pe = b->last + bytes;
+
+
+#line 121 "src/ngx_http_redis2_reply.rl"
+
+#line 122 "src/ngx_http_redis2_reply.rl"
+
+#line 213 "src/ngx_http_redis2_reply.c"
+ {
+ short _widec;
+ if ( p == pe )
+ goto _test_eof;
+ switch ( cs )
+ {
+case 1:
+ if ( (*p) == 36 )
+ goto st2;
+ goto st0;
+st0:
+cs = 0;
+ goto _out;
+st2:
+ if ( ++p == pe )
+ goto _test_eof2;
+case 2:
+ switch( (*p) ) {
+ case 45: goto st3;
+ case 48: goto st6;
+ }
+ if ( 49 <= (*p) && (*p) <= 57 )
+ goto tr4;
+ goto st0;
+st3:
+ if ( ++p == pe )
+ goto _test_eof3;
+case 3:
+ if ( (*p) == 48 )
+ goto st3;
+ if ( 49 <= (*p) && (*p) <= 57 )
+ goto st4;
+ goto st0;
+st4:
+ if ( ++p == pe )
+ goto _test_eof4;
+case 4:
+ if ( (*p) == 13 )
+ goto st5;
+ if ( 48 <= (*p) && (*p) <= 57 )
+ goto st4;
+ goto st0;
+st5:
+ if ( ++p == pe )
+ goto _test_eof5;
+case 5:
+ if ( (*p) == 10 )
+ goto tr7;
+ goto st0;
+tr7:
+#line 6 "src/common.rl"
+ {
+ done = 1;
+ }
+ goto st15;
+st15:
+ if ( ++p == pe )
+ goto _test_eof15;
+case 15:
+#line 273 "src/ngx_http_redis2_reply.c"
+ goto st0;
+st6:
+ if ( ++p == pe )
+ goto _test_eof6;
+case 6:
+ switch( (*p) ) {
+ case 13: goto st7;
+ case 36: goto st9;
+ }
+ goto st0;
+st7:
+ if ( ++p == pe )
+ goto _test_eof7;
+case 7:
+ if ( (*p) == 10 )
+ goto st8;
+ goto st0;
+st8:
+ if ( ++p == pe )
+ goto _test_eof8;
+case 8:
+ if ( (*p) == 13 )
+ goto st5;
+ goto st0;
+st9:
+ if ( ++p == pe )
+ goto _test_eof9;
+case 9:
+ if ( (*p) == 48 )
+ goto st6;
+ goto st0;
+tr4:
+#line 15 "src/common.rl"
+ {
+ ctx->chunk_size = 0;
+ }
+#line 10 "src/common.rl"
+ {
+ ctx->chunk_size *= 10;
+ ctx->chunk_size += *p - '0';
+ }
+ goto st10;
+tr12:
+#line 10 "src/common.rl"
+ {
+ ctx->chunk_size *= 10;
+ ctx->chunk_size += *p - '0';
+ }
+ goto st10;
+st10:
+ if ( ++p == pe )
+ goto _test_eof10;
+case 10:
+#line 327 "src/ngx_http_redis2_reply.c"
+ if ( (*p) == 13 )
+ goto st11;
+ if ( 49 <= (*p) && (*p) <= 57 )
+ goto tr12;
+ goto st0;
+st11:
+ if ( ++p == pe )
+ goto _test_eof11;
+case 11:
+ if ( (*p) == 10 )
+ goto st12;
+ goto st0;
+st12:
+ if ( ++p == pe )
+ goto _test_eof12;
+case 12:
+ _widec = (*p);
+ _widec = (short)(128 + ((*p) - -128));
+ if (
+#line 27 "src/common.rl"
+
+ ctx->chunk_bytes_read < ctx->chunk_size
+ ) _widec += 256;
+ if ( 384 <= _widec && _widec <= 639 )
+ goto tr14;
+ goto st0;
+tr14:
+#line 19 "src/common.rl"
+ {
+ ctx->chunk_bytes_read = 0;
+ }
+#line 23 "src/common.rl"
+ {
+ ctx->chunk_bytes_read++;
+ }
+ goto st13;
+tr15:
+#line 23 "src/common.rl"
+ {
+ ctx->chunk_bytes_read++;
+ }
+ goto st13;
+st13:
+ if ( ++p == pe )
+ goto _test_eof13;
+case 13:
+#line 374 "src/ngx_http_redis2_reply.c"
+ _widec = (*p);
+ _widec = (short)(128 + ((*p) - -128));
+ if (
+#line 27 "src/common.rl"
+
+ ctx->chunk_bytes_read < ctx->chunk_size
+ ) _widec += 256;
+ switch( _widec ) {
+ case 269: goto st5;
+ case 525: goto tr16;
+ }
+ if ( 384 <= _widec && _widec <= 639 )
+ goto tr15;
+ goto st0;
+tr16:
+#line 23 "src/common.rl"
+ {
+ ctx->chunk_bytes_read++;
+ }
+ goto st14;
+st14:
+ if ( ++p == pe )
+ goto _test_eof14;
+case 14:
+#line 399 "src/ngx_http_redis2_reply.c"
+ _widec = (*p);
+ _widec = (short)(128 + ((*p) - -128));
+ if (
+#line 27 "src/common.rl"
+
+ ctx->chunk_bytes_read < ctx->chunk_size
+ ) _widec += 256;
+ switch( _widec ) {
+ case 266: goto tr7;
+ case 269: goto st5;
+ case 522: goto tr17;
+ case 525: goto tr16;
+ }
+ if ( 384 <= _widec && _widec <= 639 )
+ goto tr15;
+ goto st0;
+tr17:
+#line 23 "src/common.rl"
+ {
+ ctx->chunk_bytes_read++;
+ }
+#line 6 "src/common.rl"
+ {
+ done = 1;
+ }
+ goto st16;
+st16:
+ if ( ++p == pe )
+ goto _test_eof16;
+case 16:
+#line 430 "src/ngx_http_redis2_reply.c"
+ _widec = (*p);
+ _widec = (short)(128 + ((*p) - -128));
+ if (
+#line 27 "src/common.rl"
+
+ ctx->chunk_bytes_read < ctx->chunk_size
+ ) _widec += 256;
+ switch( _widec ) {
+ case 269: goto st5;
+ case 525: goto tr16;
+ }
+ if ( 384 <= _widec && _widec <= 639 )
+ goto tr15;
+ goto st0;
+ }
+ _test_eof2: cs = 2; goto _test_eof;
+ _test_eof3: cs = 3; goto _test_eof;
+ _test_eof4: cs = 4; goto _test_eof;
+ _test_eof5: cs = 5; goto _test_eof;
+ _test_eof15: cs = 15; goto _test_eof;
+ _test_eof6: cs = 6; goto _test_eof;
+ _test_eof7: cs = 7; goto _test_eof;
+ _test_eof8: cs = 8; goto _test_eof;
+ _test_eof9: cs = 9; goto _test_eof;
+ _test_eof10: cs = 10; goto _test_eof;
+ _test_eof11: cs = 11; goto _test_eof;
+ _test_eof12: cs = 12; goto _test_eof;
+ _test_eof13: cs = 13; goto _test_eof;
+ _test_eof14: cs = 14; goto _test_eof;
+ _test_eof16: cs = 16; goto _test_eof;
+
+ _test_eof: {}
+ _out: {}
+ }
+
+#line 123 "src/ngx_http_redis2_reply.rl"
+
+ dd("state after exec: %d, done: %d", cs, (int) done);
+
+ ctx->state = cs;
+
+ if (cs == bulk_reply_error) {
+
+ buf.data = b->last - 1;
+ buf.len = bytes + 1;
+
+ ngx_log_error(NGX_LOG_ERR, ctx->request->connection->log, 0,
+ "Redis server returns invalid response at %z near "
+ "\"%V\"",
+ (ssize_t) (p - b->pos),
+ &buf);
+
+ return NGX_ERROR;
+ }
+
+ dd("read body bytes: %d", (int) (p - b->last));
+ dd("output: [%.*s]", (int) (p - b->last), b->last);
+
+ rc = ngx_http_redis2_output_buf(ctx, b->last, p - b->last);
+ if (rc != NGX_OK) {
+ return NGX_ERROR;
+ }
+
+ b->last = p;
+
+ if (done) {
+ u->length = 0;
+ }
+
return NGX_OK;
}
View
81 src/ngx_http_redis2_reply.rl
@@ -8,6 +8,9 @@
%% machine single_line_reply;
%% write data;
+%% machine bulk_reply;
+%% write data;
+
ngx_int_t
ngx_http_redis2_process_single_line_reply(ngx_http_redis2_ctx_t *ctx,
@@ -22,13 +25,11 @@ ngx_http_redis2_process_single_line_reply(ngx_http_redis2_ctx_t *ctx,
int cs;
u_char *p;
u_char *pe;
- ngx_flag_t first_time = 0;
u = ctx->request->upstream;
b = &u->buffer;
if (ctx->state == NGX_ERROR) {
- first_time = 1;
dd("init the state machine");
%% machine single_line_reply;
@@ -44,6 +45,8 @@ ngx_http_redis2_process_single_line_reply(ngx_http_redis2_ctx_t *ctx,
p = b->last;
pe = b->last + bytes;
+ dd("response body: %.*s", (int) bytes, p);
+
%% machine single_line_reply;
%% include "single_line_reply.rl";
%% write exec;
@@ -60,14 +63,10 @@ ngx_http_redis2_process_single_line_reply(ngx_http_redis2_ctx_t *ctx,
ngx_log_error(NGX_LOG_ERR, ctx->request->connection->log, 0,
"Redis server returns invalid response at %z near "
"\"%V\"",
- (ssize_t) (b->last - b->pos + 1),
+ (ssize_t) (p - b->pos),
&buf);
- return NGX_ERROR;
- }
-
- if (first_time) {
- b->last--;
+ return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
rc = ngx_http_redis2_output_buf(ctx, b->last, p - b->last);
@@ -89,7 +88,71 @@ ngx_int_t
ngx_http_redis2_process_bulk_reply(ngx_http_redis2_ctx_t *ctx,
ssize_t bytes)
{
- /* TODO */
+ ngx_buf_t *b;
+ ngx_http_upstream_t *u;
+ ngx_str_t buf;
+ ngx_int_t rc;
+ ngx_flag_t done = 0;
+
+ int cs;
+ u_char *p;
+ u_char *pe;
+
+ u = ctx->request->upstream;
+ b = &u->buffer;
+
+ if (ctx->state == NGX_ERROR) {
+ dd("init the state machine");
+
+ %% machine single_line_reply;
+ %% write init;
+
+ ctx->state = cs;
+
+ } else {
+ cs = ctx->state;
+ dd("resumed the old state %d", cs);
+ }
+
+ p = b->last;
+ pe = b->last + bytes;
+
+ %% machine bulk_reply;
+ %% include "bulk_reply.rl";
+ %% write exec;
+
+ dd("state after exec: %d, done: %d", cs, (int) done);
+
+ ctx->state = cs;
+
+ if (cs == bulk_reply_error) {
+
+ buf.data = b->last - 1;
+ buf.len = bytes + 1;
+
+ ngx_log_error(NGX_LOG_ERR, ctx->request->connection->log, 0,
+ "Redis server returns invalid response at %z near "
+ "\"%V\"",
+ (ssize_t) (p - b->pos),
+ &buf);
+
+ return NGX_ERROR;
+ }
+
+ dd("read body bytes: %d", (int) (p - b->last));
+ dd("output: [%.*s]", (int) (p - b->last), b->last);
+
+ rc = ngx_http_redis2_output_buf(ctx, b->last, p - b->last);
+ if (rc != NGX_OK) {
+ return NGX_ERROR;
+ }
+
+ b->last = p;
+
+ if (done) {
+ u->length = 0;
+ }
+
return NGX_OK;
}
View
2  src/single_line_reply.rl
@@ -3,7 +3,7 @@
include common "common.rl";
- response = (any* -- CRLF) CRLF
+ response = any (any* -- CRLF) CRLF
;
main := response @finalize
View
105 t/sanity.t
@@ -88,3 +88,108 @@ __DATA__
GET /foo
--- response_body_like: ^:\d+\r\n$
+
+
+=== TEST 7: bulk reply
+--- config
+ location /foo {
+ redis2_literal_raw_query 'get not_exist_yet\r\n';
+ redis2_pass 127.0.0.1:$TEST_NGINX_REDIS2_PORT;
+ }
+--- request
+ GET /foo
+--- response_body eval
+"\$-1\r\n"
+
+
+
+=== TEST 8: bulk reply
+--- config
+ location /set {
+ redis2_literal_raw_query 'set one 5\r\nfirst\r\n';
+ redis2_pass 127.0.0.1:$TEST_NGINX_REDIS2_PORT;
+ }
+
+ location /get {
+ redis2_literal_raw_query 'get one\r\n';
+ redis2_pass 127.0.0.1:$TEST_NGINX_REDIS2_PORT;
+ }
+
+ location /main {
+ echo_location /set;
+ echo_location /get;
+ }
+--- request
+ GET /main
+--- response_body eval
+"+OK\r\n\$5\r\nfirst\r\n"
+
+
+
+=== TEST 9: bulk reply
+--- config
+ location /set {
+ redis2_literal_raw_query 'set one 5\r\nfirst\r\n';
+ redis2_pass 127.0.0.1:$TEST_NGINX_REDIS2_PORT;
+ }
+
+ location /get {
+ redis2_literal_raw_query 'get one\r\n';
+ redis2_pass 127.0.0.1:$TEST_NGINX_REDIS2_PORT;
+ }
+
+ location /main {
+ echo_location /set;
+ echo_location /get;
+ }
+--- request
+ GET /main
+--- response_body eval
+"+OK\r\n\$5\r\nfirst\r\n"
+
+
+
+=== TEST 10: bulk reply (empty)
+--- config
+ location /set {
+ redis2_literal_raw_query 'set one 0\r\n\r\n';
+ redis2_pass 127.0.0.1:$TEST_NGINX_REDIS2_PORT;
+ }
+
+ location /get {
+ redis2_literal_raw_query 'get one\r\n\r\n';
+ redis2_pass 127.0.0.1:$TEST_NGINX_REDIS2_PORT;
+ }
+
+ location /main {
+ echo_location /set;
+ echo_location /get;
+ }
+--- request
+ GET /main
+--- response_body eval
+"+OK\r\n\$0\r\n\r\n"
+
+
+
+=== TEST 11: multi bulk reply
+--- config
+ location /set {
+ redis2_literal_raw_query 'set one 0\r\n\r\n';
+ redis2_pass 127.0.0.1:$TEST_NGINX_REDIS2_PORT;
+ }
+
+ location /get {
+ redis2_literal_raw_query 'get one\r\n\r\n';
+ redis2_pass 127.0.0.1:$TEST_NGINX_REDIS2_PORT;
+ }
+
+ location /main {
+ echo_location /set;
+ echo_location /get;
+ }
+--- request
+ GET /main
+--- response_body eval
+"+OK\r\n\$0\r\n\r\n"
+--- SKIP
Please sign in to comment.
Something went wrong with that request. Please try again.