Permalink
Browse files

Fix pub/sub. Valgrind is happy.

  • Loading branch information...
nicolasff committed Jan 22, 2011
1 parent e914952 commit 6026811c02a9f47c90944869aeb7c921a2e934dd
Showing with 88 additions and 52 deletions.
  1. +3 −4 README.markdown
  2. +11 −28 cmd.c
  3. +1 −3 cmd.h
  4. +4 −9 formats/common.c
  5. +0 −1 formats/custom-type.c
  6. +0 −1 formats/json.c
  7. +0 −1 formats/raw.c
  8. +56 −4 http.c
  9. +13 −1 http.h
View
@@ -31,18 +31,17 @@ curl -d "GET/hello" http://127.0.0.1:7379/
* Custom Content-Type using a pre-defined file extension, or with `?type=some/thing`.
* URL-encoded parameters for binary data or slashes. For instance, `%2f` is decoded as `/` but not used as a command separator.
* Logs, with a configurable verbosity.
-* Cross-origin XHR, if compiled with libevent2 (for `OPTIONS` support).
-* File upload with PUT, if compiled with libevent2 (for `PUT` support).
+* Cross-origin requests, usable with XMLHttpRequest2 (Cross-Origin Resource Sharing - CORS).
+* File upload with PUT.
# Ideas, TODO...
* Add better support for PUT, DELETE, HEAD, OPTIONS? How? For which commands?
-* Switch from evhttp to raw libevent + the http-parser library from node.js for clean disconnection support on SUBSCRIBE commands.
* MULTI/EXEC/DISCARD/WATCH are disabled at the moment; find a way to use them.
* Support POST of raw Redis protocol data, and execute the whole thing. This could be useful for MULTI/EXEC transactions.
* Enrich config file:
* Provide timeout (maybe for some commands only?). What should the response be? 504 Gateway Timeout? 503 Service Unavailable?
* Multi-server support, using consistent hashing.
-* Add WebSocket support (with which protocol?). This will only be possible after the switch from evhttp to http-parser.
+* Add WebSocket support (with which protocol?).
* Send your ideas using the github tracker, on twitter [@yowgi](http://twitter.com/yowgi) or by mail to n.favrefelix@gmail.com.
# HTTP error codes
View
39 cmd.c
@@ -39,24 +39,6 @@ cmd_free(struct cmd *c) {
free(c);
}
-/**
- * Detect disconnection of a pub/sub client. We need to clean up the command.
- */
-void on_http_disconnect(struct evhttp_connection *evcon, void *ctx) {
- struct pubsub_client *ps = ctx;
-
- (void)evcon;
-
- /* clean up redis object */
- redisAsyncFree(ps->s->ac);
-
- /* clean up command object */
- if(ps->cmd) {
- cmd_free(ps->cmd);
- }
- free(ps);
-}
-
/* taken from libevent */
static char *
decode_uri(const char *uri, size_t length, size_t *out_len, int always_decode_plus) {
@@ -136,18 +118,13 @@ cmd_run(struct server *s, struct http_client *client,
return -1;
}
- /* FIXME:check if we have to split the connection */
- /*
+ /* check if we have to split the connection */
if(cmd_is_subscribe(cmd)) {
- struct pubsub_client *ps;
- ps = calloc(1, sizeof(struct pubsub_client));
- ps->s = s = server_copy(s);
- ps->cmd = cmd;
- ps->rq = rq;
- evhttp_connection_set_closecb(rq->evcon, on_http_disconnect, ps);
+ client->sub = malloc(sizeof(struct subscription));
+ client->sub->s = s = server_copy(s);
+ client->sub->cmd = cmd;
}
- */
/* no args (e.g. INFO command) */
if(!slash) {
@@ -247,7 +224,7 @@ cmd_select_format(struct http_client *client, struct cmd *cmd,
}
}
- /* FIXME:the user can force it with ?type=some/thing */
+ /* the user can force it with ?type=some/thing */
if(client->qs_type.s) {
*f_format = custom_type_reply;
cmd->mime = strdup(client->qs_type.s);
@@ -260,6 +237,12 @@ cmd_select_format(struct http_client *client, struct cmd *cmd,
int
cmd_is_subscribe(struct cmd *cmd) {
+ /*
+ if(cmd->started_responding) {
+ return 1;
+ }
+ */
+
if(cmd->count >= 1 &&
(strncasecmp(cmd->argv[0], "SUBSCRIBE", cmd->argv_len[0]) == 0 ||
strncasecmp(cmd->argv[0], "PSUBSCRIBE", cmd->argv_len[0]) == 0)) {
View
4 cmd.h
@@ -15,7 +15,6 @@ struct cmd;
typedef void (*formatting_fun)(redisAsyncContext *, void *, void *);
struct cmd {
-
int count;
const char **argv;
size_t *argv_len;
@@ -28,10 +27,9 @@ struct cmd {
int mime_free;
};
-struct pubsub_client {
+struct subscription {
struct server *s;
struct cmd *cmd;
- struct evhttp_request *rq;
};
struct cmd *
View
@@ -33,7 +33,6 @@ format_send_reply(struct cmd *cmd, const char *p, size_t sz, const char *content
int free_cmd = 1;
-
if(cmd_is_subscribe(cmd)) {
free_cmd = 0;
@@ -42,18 +41,15 @@ format_send_reply(struct cmd *cmd, const char *p, size_t sz, const char *content
const char *ct = cmd->mime?cmd->mime:content_type;
cmd->started_responding = 1;
http_set_header(&cmd->client->out_content_type, ct, strlen(ct));
- /*FIXME:
- evhttp_send_reply_start(cmd->rq, 200, "OK");
- */
+ http_send_reply_start(cmd->client, 200, "OK");
}
- /*FIXME: evhttp_send_reply_chunk(cmd->rq, body); */
+ http_send_reply_chunk(cmd->client, p, sz);
} else {
/* compute ETag */
char *etag = etag_new(p, sz);
const char *if_none_match = cmd->client->header_if_none_match.s;
- /* FIXME */
-#if 1
+
/* check If-None-Match */
if(if_none_match && strncmp(if_none_match, etag, cmd->client->header_if_none_match.sz) == 0) {
/* SAME! send 304. */
@@ -64,12 +60,11 @@ format_send_reply(struct cmd *cmd, const char *p, size_t sz, const char *content
http_set_header(&cmd->client->out_etag, etag, strlen(etag));
http_send_reply(cmd->client, 200, "OK", p, sz);
}
-#endif
+
free(etag);
}
/* cleanup */
if(free_cmd) {
- /*FIXME: evhttp_clear_headers(&cmd->uri_params); */
cmd_free(cmd);
}
}
View
@@ -17,7 +17,6 @@ custom_type_reply(redisAsyncContext *c, void *r, void *privdata) {
int int_len;
if(reply == NULL) {
- http_send_reply(cmd->client, 404, "Not Found", NULL, 0);
return;
}
View
@@ -25,7 +25,6 @@ json_reply(redisAsyncContext *c, void *r, void *privdata) {
}
if (reply == NULL) {
- http_send_reply(cmd->client, 404, "Not Found", NULL, 0);
return;
}
View
@@ -20,7 +20,6 @@ raw_reply(redisAsyncContext *c, void *r, void *privdata) {
(void)c;
if (reply == NULL) {
- http_send_reply(cmd->client, 404, "Not Found", NULL, 0);
return;
}
View
60 http.c
@@ -61,6 +61,10 @@ http_client_read(int fd, short event, void *ctx) {
static void
http_client_cleanup(struct http_client *c) {
+ if(c->sub) {
+ return; /* we need to keep those. */
+ }
+
free(c->path.s);
memset(&c->path, 0, sizeof(str_t));
@@ -100,6 +104,18 @@ http_client_free(struct http_client *c) {
event_del(&c->ev);
close(c->fd);
+ if(c->sub) {
+ /* clean up redis object */
+ redisAsyncFree(c->sub->s->ac);
+
+ /* clean up command object */
+ if(c->sub->cmd) {
+ cmd_free(c->sub->cmd);
+ }
+ free(c->sub);
+ c->sub = NULL;
+ }
+
http_client_cleanup(c);
free(c);
}
@@ -128,7 +144,7 @@ http_client_keep_alive(struct http_client *c) {
void
http_client_reset(struct http_client *c) {
- if(!http_client_keep_alive(c)) {
+ if(!http_client_keep_alive(c) && !c->sub) {
http_client_free(c);
return;
}
@@ -285,29 +301,65 @@ http_send_reply(struct http_client *c, short code, const char *msg,
http_response_init(c, &resp, code, msg);
sprintf(content_length, "%zd", body_len);
- http_response_set_header(&resp, "Content-Length", content_length);
+ if(!c->sub) {
+ http_response_set_header(&resp, "Content-Length", content_length);
+ }
if(body_len) {
http_response_set_header(&resp, "Content-Type", ct);
}
+ if(c->sub) {
+ http_response_set_header(&resp, "Transfer-Encoding", "chunked");
+ }
+
if(code == 200 && c->out_etag.s) {
http_response_set_header(&resp, "ETag", c->out_etag.s);
}
http_response_set_body(&resp, body, body_len);
+ /* flush response in the socket */
if(http_response_send(&resp, c->fd)) {
http_client_free(c);
} else {
- if(code == 200 && http_client_keep_alive(c)) {
+ if(c->sub) { /* don't free the client, but monitor fd. */
+ http_client_serve(c);
+ return;
+ } else if(code == 200 && http_client_keep_alive(c)) { /* reset client */
http_client_reset(c);
http_client_serve(c);
} else {
- http_client_free(c);
+ http_client_free(c); /* error or HTTP < 1.1: close */
}
}
}
+/* Transfer-encoding: chunked */
+void
+http_send_reply_start(struct http_client *c, short code, const char *msg) {
+
+ http_send_reply(c, code, msg, NULL, 0);
+}
+
+void
+http_send_reply_chunk(struct http_client *c, const char *p, size_t sz) {
+
+ char buf[64];
+ int ret;
+
+ ret = sprintf(buf, "%x\r\n", (int)sz);
+ write(c->fd, buf, ret);
+ write(c->fd, p, sz);
+ write(c->fd, "\r\n", 2);
+}
+
+void
+http_send_reply_end(struct http_client *c) {
+
+ http_send_reply_chunk(c, "", 0);
+ http_client_free(c);
+}
+
void
http_set_header(str_t *h, const char *p, size_t sz) {
View
14 http.h
@@ -35,9 +35,12 @@ struct http_client {
str_t out_content_type;
str_t out_etag;
+ /* query string */
str_t qs_type;
str_t qs_jsonp;
+ struct subscription *sub;
+
/* private, used in HTTP parser */
str_t last_header_name;
};
@@ -91,6 +94,16 @@ void
http_send_reply(struct http_client *c, short code, const char *msg,
const char *body, size_t body_len);
+/* Transfer-encoding: chunked */
+void
+http_send_reply_start(struct http_client *c, short code, const char *msg);
+
+void
+http_send_reply_chunk(struct http_client *c, const char *p, size_t sz);
+
+void
+http_send_reply_end(struct http_client *c);
+
void
http_send_error(struct http_client *c, short code, const char *msg);
@@ -107,5 +120,4 @@ http_response_set_body(struct http_response *r, const char *body, size_t body_le
int
http_response_send(struct http_response *r, int fd);
-
#endif

0 comments on commit 6026811

Please sign in to comment.