Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Partial rewrite, adding WebSockets, threads, pool.

  • Loading branch information...
commit 5b7aa50e62b5b0b8b442c9e1ba34bcacb91d2d56 1 parent dda2236
@nicolasff authored
View
4 Makefile
@@ -4,10 +4,10 @@ JANSSON_OBJ=jansson/src/dump.o jansson/src/error.o jansson/src/hashtable.o janss
FORMAT_OBJS=formats/json.o formats/raw.o formats/common.o formats/custom-type.o formats/bson.o
HTTP_PARSER_OBJS=http-parser/http_parser.o
DEPS=$(FORMAT_OBJS) $(HIREDIS_OBJ) $(JANSSON_OBJ) $(HTTP_PARSER_OBJS)
-OBJS=webdis.o conf.o cmd.o slog.o server.o libb64/cencode.o acl.o md5/md5.o http.o client.o $(DEPS)
+OBJS=webdis.o cmd.o worker.o slog.o server.o libb64/cencode.o acl.o md5/md5.o http.o client.o websocket.o pool.o conf.o $(DEPS)
CFLAGS=-O3 -Wall -Wextra -I. -Ijansson/src -Ihttp-parser
-LDFLAGS=-levent
+LDFLAGS=-levent -pthread
all: $(OUT) Makefile
View
3  README.markdown
@@ -20,11 +20,12 @@ curl -d "GET/hello" http://127.0.0.1:7379/
</pre>
# Features
+* Multi-threaded server.
* GET and POST are supported.
* JSON output by default, optional JSONP parameter (`?jsonp=myFunction`).
* Raw Redis 2.0 protocol output with `.raw` suffix
* BSON support for compact responses and MongoDB compatibility.
-* HTTP 1.1 pipelining (50,000 http requests per second on a desktop Linux machine.)
+* HTTP 1.1 pipelining (70,000 http requests per second on a desktop Linux machine.)
* Connects to Redis using a TCP or UNIX socket.
* Restricted commands by IP range (CIDR subnet + mask) or HTTP Basic Auth, returning 403 errors.
* Possible Redis authentication in the config file.
View
3  acl.c
@@ -14,7 +14,7 @@ acl_match_client(struct acl *a, struct http_client *client, in_addr_t *ip) {
/* check HTTP Basic Auth */
const char *auth;
- auth = client->input_headers.authorization.s;
+ auth = client_get_header(client, "Authorization");
if(a->http_basic_auth) {
if(auth && strncasecmp(auth, "Basic ", 6) == 0) { /* sent auth */
if(strcmp(auth + 6, a->http_basic_auth) != 0) { /* bad password */
@@ -39,6 +39,7 @@ acl_match_client(struct acl *a, struct http_client *client, in_addr_t *ip) {
int
acl_allow_command(struct cmd *cmd, struct conf *cfg, struct http_client *client) {
+ /* FIXME */
char *always_off[] = {"MULTI", "EXEC", "WATCH", "DISCARD"};
unsigned int i;
View
560 client.c
@@ -1,257 +1,104 @@
#include "client.h"
-#include "server.h"
-#include "cmd.h"
+#include "http_parser.h"
#include "http.h"
-#include "slog.h"
+#include "server.h"
+#include "worker.h"
+#include "websocket.h"
#include <stdlib.h>
-#include <stdio.h>
-#include <unistd.h>
#include <string.h>
+#include <unistd.h>
-#include "hiredis/async.h"
-
-struct http_client *
-http_client_new(int fd, struct server *s) {
-
- struct http_client *c = calloc(1, sizeof(struct http_client));
- c->fd = fd;
- c->s = s;
-
- /* initialize HTTP parser */
- c->settings.on_path = http_on_path;
- c->settings.on_body = http_on_body;
- c->settings.on_message_complete = http_on_complete;
- c->settings.on_header_field = http_on_header_name;
- c->settings.on_header_value = http_on_header_value;
- c->settings.on_query_string = http_on_query_string;
-
- http_parser_init(&c->parser, HTTP_REQUEST);
- c->parser.data = c;
-
- c->state = CLIENT_WAITING;
-
- return c;
-}
-
-
-/**
- * Called by libevent when read(2) is possible on fd without blocking.
- */
-void
-http_client_read(int fd, short event, void *ctx) {
-
- struct http_client *c = ctx;
- char buffer[64*1024];
- int ret, nparsed;
-
- (void)fd;
- (void)event;
-
- if(c->state != CLIENT_WAITING) { /* not expecting anything, fail. */
- http_client_free(c);
- return;
- }
-
- ret = read(c->fd, buffer, sizeof(buffer));
- if(ret <= 0) { /* broken connection, bye */
- http_client_free(c);
- return;
- }
-
- nparsed = http_parser_execute(&c->parser, &c->settings, buffer, ret);
- if(c->state == CLIENT_BROKEN) {
- http_client_free(c);
- return;
- }
-
- if(c->parser.upgrade) {
- /* TODO: upgrade parser (WebSockets & cie) */
- } else if(nparsed != ret) { /* invalid data */
- http_client_free(c);
- } else if(c->state == CLIENT_WAITING) {
- http_client_serve(c);
- }
-}
-
-static void
-http_client_cleanup(struct http_client *c) {
-
- if(c->sub) {
- return; /* we need to keep those. */
- }
-
- free(c->path.s);
- memset(&c->path, 0, sizeof(str_t));
-
- free(c->body.s);
- memset(&c->body, 0, sizeof(str_t));
-
- free(c->input_headers.connection.s);
- memset(&c->input_headers.connection, 0, sizeof(str_t));
-
- free(c->input_headers.if_none_match.s);
- memset(&c->input_headers.if_none_match, 0, sizeof(str_t));
-
- free(c->input_headers.authorization.s);
- memset(&c->input_headers.authorization, 0, sizeof(str_t));
-
- free(c->output_headers.content_type.s);
- memset(&c->output_headers.content_type, 0, sizeof(str_t));
-
- free(c->output_headers.etag.s);
- memset(&c->output_headers.etag, 0, sizeof(str_t));
-
- free(c->query_string.type.s);
- memset(&c->query_string.type, 0, sizeof(str_t));
+static int
+http_client_on_url(struct http_parser *p, const char *at, size_t sz) {
- free(c->query_string.jsonp.s);
- memset(&c->query_string.jsonp, 0, sizeof(str_t));
+ struct http_client *c = p->data;
- memset(&c->verb, 0, sizeof(c->verb));
+ c->path = realloc(c->path, c->path_sz + sz + 1);
+ memcpy(c->path + c->path_sz, at, sz);
+ c->path_sz += sz;
+ c->path[c->path_sz] = 0;
- c->state = CLIENT_WAITING;
- c->started_responding = 0;
+ return 0;
}
-void
-http_client_free(struct http_client *c) {
-
- event_del(&c->ev);
- if(c->fd != -1) {
- close(c->fd);
- }
-
- if(c->sub) {
- /* clean up Redis connection */
- c->sub->s->ac->onDisconnect = NULL;
- server_free(c->sub->s);
-
- /* clean up command object */
- if(c->sub->cmd) {
- cmd_free(c->sub->cmd);
- }
- free(c->sub);
- c->sub = NULL;
- }
+static int
+http_client_on_body(struct http_parser *p, const char *at, size_t sz) {
- http_client_cleanup(c);
- free(c);
+ struct http_client *c = p->data;
+ return http_client_set_body(c, at, sz);
}
int
-http_client_keep_alive(struct http_client *c) {
-
- /* check disconnection */
- int keep_alive = 0;
+http_client_set_body(struct http_client *c, const char *at, size_t sz) {
- if(c->parser.http_major == 1 && c->parser.http_minor == 1) {
- keep_alive = 1; /* HTTP 1.1: keep-alive by default */
- }
- if(c->input_headers.connection.s) {
- if(strncasecmp(c->input_headers.connection.s, "Keep-Alive", 10) == 0) {
- keep_alive = 1;
- } else if(strncasecmp(c->input_headers.connection.s, "Close", 5) == 0) {
- keep_alive = 0;
- }
- }
- return keep_alive;
-}
-
-void
-http_client_reset(struct http_client *c) {
-
- if(!http_client_keep_alive(c) && !c->sub) {
- c->state = CLIENT_BROKEN;
- return;
- }
-
- http_client_cleanup(c);
- http_parser_init(&c->parser, HTTP_REQUEST);
-}
+ c->body = realloc(c->body, c->body_sz + sz + 1);
+ memcpy(c->body + c->body_sz, at, sz);
+ c->body_sz += sz;
+ c->body[c->body_sz] = 0;
-/**
- * (Re-)add read event callback
- */
-void
-http_client_serve(struct http_client *c) {
-
- event_set(&c->ev, c->fd, EV_READ, http_client_read, c);
- event_base_set(c->s->base, &c->ev);
- event_add(&c->ev, NULL);
+ return 0;
}
-/**** Parser callbacks ****/
-
-/**
- * Called when the path has been found. This is before any `?query-string'.
- */
-int
-http_on_path(http_parser *p, const char *at, size_t length) {
+static int
+http_client_on_header_name(struct http_parser *p, const char *at, size_t sz) {
struct http_client *c = p->data;
+ size_t n = c->header_count;
- c->path.s = calloc(length+1, 1);
- memcpy(c->path.s, at, length);
- c->path.sz = length;
-
- /* save HTTP verb as well */
- c->verb = (enum http_method)p->method;
-
- return 0;
-}
+ if(c->last_cb != LAST_CB_KEY) {
+ n = ++c->header_count;
+ c->headers = realloc(c->headers, n * sizeof(struct http_header));
+ memset(&c->headers[n-1], 0, sizeof(struct http_header));
+ }
-/**
- * Called when the whole body has been read.
- */
-int
-http_on_body(http_parser *p, const char *at, size_t length) {
- struct http_client *c = p->data;
+ c->headers[n-1].key = realloc(c->headers[n-1].key,
+ c->headers[n-1].key_sz + sz + 1);
+ memcpy(c->headers[n-1].key + c->headers[n-1].key_sz, at, sz);
+ c->headers[n-1].key_sz += sz;
+ c->headers[n-1].key[c->headers[n-1].key_sz] = 0;
- c->body.s = realloc(c->body.s, c->body.sz + length);
- memcpy(c->body.s + c->body.sz, at, length);
- c->body.sz += length;
+ c->last_cb = LAST_CB_KEY;
return 0;
}
-/**
- * Called when the query string has been completely read.
- */
-int
-http_on_query_string(http_parser *parser, const char *at, size_t length) {
+static int
+http_client_on_query_string(struct http_parser *parser, const char *at, size_t sz) {
struct http_client *c = parser->data;
const char *p = at;
- while(p < at + length) {
+ while(p < at + sz) {
const char *key = p, *val;
int key_len, val_len;
- char *eq = memchr(key, '=', length - (p-at));
- if(!eq || eq > at + length) { /* last argument */
+ char *eq = memchr(key, '=', sz - (p-at));
+ if(!eq || eq > at + sz) { /* last argument */
break;
} else { /* found an '=' */
- char *and;
+ char *amp;
val = eq + 1;
key_len = eq - key;
p = eq + 1;
- and = memchr(p, '&', length - (p-at));
- if(!and || and > at + length) {
- val_len = at + length - p; /* last arg */
+ amp = memchr(p, '&', sz - (p-at));
+ if(!amp || amp > at + sz) {
+ val_len = at + sz - p; /* last arg */
} else {
- val_len = and - val; /* cur arg */
- p = and + 1;
+ val_len = amp - val; /* cur arg */
+ p = amp + 1;
}
if(key_len == 4 && strncmp(key, "type", 4) == 0) {
- http_set_header(&c->query_string.type, val, val_len);
+ c->type = calloc(1 + val_len, 1);
+ memcpy(c->type, val, val_len);
} else if(key_len == 5 && strncmp(key, "jsonp", 5) == 0) {
- http_set_header(&c->query_string.jsonp, val, val_len);
+ c->jsonp = calloc(1 + val_len, 1);
+ memcpy(c->jsonp, val, val_len);
}
- if(!and) {
+ if(!amp) {
break;
}
}
@@ -259,234 +106,187 @@ http_on_query_string(http_parser *parser, const char *at, size_t length) {
return 0;
}
-/**
- * Called when the whole request has been parsed.
- */
-int
-http_on_complete(http_parser *p) {
- struct http_client *c = p->data;
- int ret = -1;
+static int
+http_client_on_header_value(struct http_parser *p, const char *at, size_t sz) {
- /* check that the command can be executed */
- switch(c->verb) {
- case HTTP_GET:
- if(c->path.sz == 16 && memcmp(c->path.s, "/crossdomain.xml", 16) == 0) {
- return http_crossdomain(c);
- }
- slog(c->s, WEBDIS_DEBUG, c->path.s, c->path.sz);
- ret = cmd_run(c->s, c, 1+c->path.s, c->path.sz-1, NULL, 0);
- break;
-
- case HTTP_POST:
- slog(c->s, WEBDIS_DEBUG, c->path.s, c->path.sz);
- ret = cmd_run(c->s, c, 1+c->body.s, c->body.sz-1, NULL, 0);
- break;
+ struct http_client *c = p->data;
+ size_t n = c->header_count;
- case HTTP_PUT:
- slog(c->s, WEBDIS_DEBUG, c->path.s, c->path.sz);
- ret = cmd_run(c->s, c, 1+c->path.s, c->path.sz-1,
- c->body.s, c->body.sz);
- break;
+ c->headers[n-1].val = realloc(c->headers[n-1].val,
+ c->headers[n-1].val_sz + sz + 1);
+ memcpy(c->headers[n-1].val + c->headers[n-1].val_sz, at, sz);
+ c->headers[n-1].val_sz += sz;
+ c->headers[n-1].val[c->headers[n-1].val_sz] = 0;
- case HTTP_OPTIONS:
- return http_options(c);
+ c->last_cb = LAST_CB_VAL;
- default:
- slog(c->s, WEBDIS_DEBUG, "405", 3);
- http_send_error(c, 405, "Method Not Allowed");
- return 0;
- }
- if(ret < 0) {
- c->state = CLIENT_WAITING;
- if(c->cmd) {
- cmd_free(c->cmd);
- c->cmd = NULL;
+ if(strncmp("Expect", c->headers[n-1].key, c->headers[n-1].key_sz) == 0) {
+ if(sz == 12 && strncasecmp(at, "100-continue", sz) == 0) {
+ /* support HTTP file upload */
+ char http100[] = "HTTP/1.1 100 Continue\r\n\r\n";
+ int ret = write(c->fd, http100, sizeof(http100)-1);
+ (void)ret;
+ }
+ } else if(strncasecmp("Connection", c->headers[n-1].key, c->headers[n-1].key_sz) == 0) {
+ if(sz == 10 && strncasecmp(at, "Keep-Alive", sz) == 0) {
+ c->keep_alive = 1;
}
- http_send_error(c, 403, "Forbidden");
}
- return ret;
-}
-
-/**
- * Called when a header name is read
- */
-int
-http_on_header_name(http_parser *p, const char *at, size_t length) {
-
- struct http_client *c = p->data;
-
- c->last_header_name.s = calloc(length+1, 1);
- memcpy(c->last_header_name.s, at, length);
- c->last_header_name.sz = length;
-
return 0;
}
-/**
- * Called when a header value is read
- */
-int
-http_on_header_value(http_parser *p, const char *at, size_t length) {
+static int
+http_client_on_message_complete(struct http_parser *p) {
struct http_client *c = p->data;
- if(strncmp("Connection", c->last_header_name.s, c->last_header_name.sz) == 0) {
- http_set_header(&c->input_headers.connection, at, length);
- } else if(strncmp("If-None-Match", c->last_header_name.s, c->last_header_name.sz) == 0) {
- http_set_header(&c->input_headers.if_none_match, at, length);
- } else if(strncmp("Authorization", c->last_header_name.s, c->last_header_name.sz) == 0) {
- http_set_header(&c->input_headers.authorization, at, length);
- } else if(strncmp("Expect", c->last_header_name.s, c->last_header_name.sz) == 0) {
- if(length == 12 && memcmp(at, "100-continue", length) == 0) {
- /* support HTTP file upload */
- char http100[] = "HTTP/1.1 100 Continue\r\n\r\n";
- int ret = write(c->fd, http100, sizeof(http100)-1);
- if(ret != sizeof(http100)-1) {
- c->state = CLIENT_BROKEN;
- }
- }
+ /* keep-alive detection */
+ if(c->parser.http_major == 1 && c->parser.http_minor == 1) { /* 1.1 */
+ c->keep_alive = 1;
}
- free(c->last_header_name.s);
- c->last_header_name.s = NULL;
+ if(p->upgrade) { /* WebSocket, don't execute just yet */
+ c->is_websocket = 1;
+ return 0;
+ }
+ worker_process_client(c);
+ http_client_reset(c);
+
return 0;
}
+struct http_client *
+http_client_new(struct worker *w, int fd, in_addr_t addr) {
+ struct http_client *c = calloc(1, sizeof(struct http_client));
+ c->fd = fd;
+ c->w = w;
+ c->addr = addr;
+ c->s = w->s;
-/**** HTTP Responses ****/
+ /* parser */
+ http_parser_init(&c->parser, HTTP_REQUEST);
+ c->parser.data = c;
-static void
-http_response_set_connection_header(struct http_client *c, struct http_response *r) {
+ /* callbacks */
+ c->settings.on_url = http_client_on_url;
+ c->settings.on_query_string = http_client_on_query_string;
+ c->settings.on_body = http_client_on_body;
+ c->settings.on_message_complete = http_client_on_message_complete;
+ c->settings.on_header_field = http_client_on_header_name;
+ c->settings.on_header_value = http_client_on_header_value;
- if(http_client_keep_alive(c)) {
- http_response_set_header(r, "Connection", "Keep-Alive");
- } else {
- http_response_set_header(r, "Connection", "Close");
- }
-}
-
-void
-http_send_reply(struct http_client *c, short code, const char *msg,
- const char *body, size_t body_len) {
+ c->last_cb = LAST_CB_NONE;
- struct http_response resp;
- const char *ct = c->output_headers.content_type.s;
- if(!ct) {
- ct = "text/html";
- }
+ return c;
+}
- /* respond */
- http_response_init(&resp, code, msg);
- http_response_set_connection_header(c, &resp);
- if(body_len) {
- http_response_set_header(&resp, "Content-Type", ct);
- }
+void
+http_client_reset(struct http_client *c) {
- if(c->sub) {
- http_response_set_header(&resp, "Transfer-Encoding", "chunked");
- }
+ int i;
- if(code == 200 && c->output_headers.etag.s) {
- http_response_set_header(&resp, "ETag", c->output_headers.etag.s);
+ /* headers */
+ for(i = 0; i < c->header_count; ++i) {
+ free(c->headers[i].key);
+ free(c->headers[i].val);
}
-
- http_response_set_body(&resp, body, body_len);
-
- /* flush response in the socket */
- if(http_response_write(&resp, c->fd) || !http_client_keep_alive(c)) { /* failure */
- if(c->state == CLIENT_EXECUTING) {
- http_client_free(c);
- return;
- } else if(c->state == CLIENT_WAITING) {
- c->state = CLIENT_BROKEN;
- }
- } else {
- http_client_reset(c);
+ free(c->headers);
+ c->headers = NULL;
+ c->header_count = 0;
+
+ /* other data */
+ free(c->body); c->body = NULL;
+ c->body_sz = 0;
+ free(c->path); c->path = NULL;
+ c->path_sz = 0;
+ free(c->type); c->type = NULL;
+ free(c->jsonp); c->jsonp = NULL;
+
+ /* no last known header callback */
+ c->last_cb = LAST_CB_NONE;
+
+ /* mark as broken if client doesn't support Keep-Alive. */
+ if(c->keep_alive == 0) {
+ c->broken = 1;
}
- http_client_serve(c);
}
void
-http_send_error(struct http_client *c, short code, const char *msg) {
+http_client_free(struct http_client *c) {
- http_send_reply(c, code, msg, NULL, 0);
- http_client_cleanup(c);
-}
+ /* printf("client free: %p\n", (void*)c); */
-/* Transfer-encoding: chunked */
-void
-http_send_reply_start(struct http_client *c, short code, const char *msg) {
+ http_client_reset(c);
+ free(c->buffer);
+
+ /* close(c->fd); */
- http_send_reply(c, code, msg, NULL, 0);
+ free(c);
}
-void
-http_send_reply_chunk(struct http_client *c, const char *p, size_t sz) {
+int
+http_client_read(struct http_client *c) {
- char buf[64];
- int ret, chunk_size;
+ char buffer[4096];
+ int ret;
- chunk_size = sprintf(buf, "%x\r\n", (int)sz);
- ret = write(c->fd, buf, chunk_size);
- ret = write(c->fd, p, sz);
- ret = write(c->fd, "\r\n", 2);
- if(ret != 2) {
- c->state = CLIENT_BROKEN;
+ ret = read(c->fd, buffer, sizeof(buffer));
+ if(ret <= 0) {
+ /* printf("Broken read on c=%p, fd=%d.\n", (void*)c, c->fd); */
+ close(c->fd);
+ http_client_free(c);
+ return -1;
}
-}
-/* send nil chunk to mark the end of a stream. */
-void
-http_send_reply_end(struct http_client *c) {
+ c->buffer = realloc(c->buffer, c->sz + ret);
+ memcpy(c->buffer + c->sz, buffer, ret);
+ c->sz += ret;
+
+#if 0
+ printf("read %d bytes\n", ret);
+ write(1, "\n[", 2);
+ write(1, buffer, ret);
+ write(1, "]\n", 2);
+#endif
- http_send_reply_chunk(c, "", 0);
+ return ret;
}
-/* Adobe flash cross-domain request */
int
-http_crossdomain(struct http_client *c) {
-
- struct http_response resp;
- char out[] = "<?xml version=\"1.0\"?>\n"
-"<!DOCTYPE cross-domain-policy SYSTEM \"http://www.macromedia.com/xml/dtds/cross-domain-policy.dtd\">\n"
-"<cross-domain-policy>\n"
- "<allow-access-from domain=\"*\" />\n"
-"</cross-domain-policy>\n";
-
- http_response_init(&resp, 200, "OK");
- http_response_set_connection_header(c, &resp);
- http_response_set_header(&resp, "Content-Type", "application/xml");
- http_response_set_body(&resp, out, sizeof(out)-1);
+http_client_execute(struct http_client *c) {
- http_response_write(&resp, c->fd);
- http_client_reset(c);
+ int nparsed = http_parser_execute(&c->parser, &c->settings, c->buffer, c->sz);
+ /* printf("nparsed=%d\n", nparsed); */
- return 0;
+ if(!c->is_websocket) {
+ /* removed consumed data */
+ free(c->buffer);
+ c->buffer = NULL;
+ c->sz = 0;
+ }
+ return nparsed;
}
-/* reply to OPTIONS HTTP verb */
-int
-http_options(struct http_client *c) {
-
- struct http_response resp;
+const char *
+client_get_header(struct http_client *c, const char *key) {
- http_response_init(&resp, 200, "OK");
- http_response_set_connection_header(c, &resp);
+ int i;
+ size_t sz = strlen(key);
- http_response_set_header(&resp, "Content-Type", "text/html");
- http_response_set_header(&resp, "Allow", "GET,POST,PUT,OPTIONS");
- http_response_set_header(&resp, "Content-Length", "0");
+ for(i = 0; i < c->header_count; ++i) {
- /* Cross-Origin Resource Sharing, CORS. */
- http_response_set_header(&resp, "Access-Control-Allow-Origin", "*");
+ if(sz == c->headers[i].key_sz &&
+ strncasecmp(key, c->headers[i].key, sz) == 0) {
+ return c->headers[i].val;
+ }
- http_response_write(&resp, c->fd);
- http_client_reset(c);
+ }
- return 0;
+ return NULL;
}
+
View
132 client.h
@@ -3,127 +3,73 @@
#include <event.h>
#include <arpa/inet.h>
-#include "http-parser/http_parser.h"
-#include "http.h"
+#include "http_parser.h"
+struct http_header;
struct server;
-struct cmd;
typedef enum {
- CLIENT_WAITING,
- CLIENT_EXECUTING,
- CLIENT_BROKEN} client_state;
+ LAST_CB_NONE = 0,
+ LAST_CB_KEY = 1,
+ LAST_CB_VAL = 2} last_cb_t;
struct http_client {
- /* socket and server reference */
int fd;
in_addr_t addr;
struct event ev;
+
+ struct worker *w;
struct server *s;
- client_state state;
- struct cmd *cmd;
-
- /* http parser */
- http_parser_settings settings;
- http_parser parser;
-
- /* decoded http */
- enum http_method verb;
- str_t path;
- str_t body;
-
- /* input headers from client */
- struct {
- str_t connection;
- str_t if_none_match;
- str_t authorization;
- } input_headers;
-
- /* response headers */
- struct input_headers {
- str_t content_type;
- str_t etag;
- } output_headers;
-
- /* query string */
- struct {
- str_t type;
- str_t jsonp;
- } query_string;
-
- /* pub/sub */
- struct subscription *sub;
- int started_responding;
-
- struct http_response resp;
-
- /* private, used in HTTP parser */
- str_t last_header_name;
-};
-struct http_client *
-http_client_new(int fd, struct server *s);
+ /* HTTP parsing */
+ struct http_parser parser;
+ struct http_parser_settings settings;
+ char *buffer;
+ size_t sz;
+ last_cb_t last_cb;
-void
-http_client_serve(struct http_client *c);
+ /* various flags. TODO: bit map */
+ int keep_alive;
+ int broken;
+ int is_websocket;
-void
-http_client_free(struct http_client *c);
+ /* HTTP data */
+ char *path;
+ size_t path_sz;
-void
-http_client_reset(struct http_client *c);
+ /* headers */
+ struct http_header *headers;
+ int header_count;
-int
-http_on_path(http_parser*, const char *at, size_t length);
+ char *body;
+ size_t body_sz;
-int
-http_on_path(http_parser*, const char *at, size_t length);
+ char *type; /* forced output content-type */
+ char *jsonp; /* jsonp wrapper */
+};
-int
-http_on_body(http_parser*, const char *at, size_t length);
+struct http_client *
+http_client_new(struct worker *w, int fd, in_addr_t addr);
-int
-http_on_header_name(http_parser*, const char *at, size_t length);
+void
+http_client_reset(struct http_client *c);
-int
-http_on_header_value(http_parser*, const char *at, size_t length);
+void
+http_client_free(struct http_client *c);
int
-http_on_complete(http_parser*);
+http_client_read(struct http_client *c);
int
-http_on_query_string(http_parser*, const char *at, size_t length);
+http_client_execute(struct http_client *c);
int
-http_client_keep_alive(struct http_client *c);
-
-/* responses */
-
-void
-http_send_reply(struct http_client *c, short code, const char *msg,
- const char *body, size_t body_len);
-
-/* Transfer-encoding: chunked */
-void
-http_send_reply_start(struct http_client *c, short code, const char *msg);
+http_client_set_body(struct http_client *c, const char *at, size_t sz);
-void
-http_send_reply_chunk(struct http_client *c, const char *p, size_t sz);
-
-void
-http_send_reply_end(struct http_client *c);
-
-void
-http_send_error(struct http_client *c, short code, const char *msg);
-
-/* convenience functions */
-int
-http_crossdomain(struct http_client *c);
+const char *
+client_get_header(struct http_client *c, const char *key);
-/* reply to OPTIONS HTTP verb */
-int
-http_options(struct http_client *c);
#endif
View
92 cmd.c
@@ -1,8 +1,11 @@
#include "cmd.h"
-#include "server.h"
#include "conf.h"
#include "acl.h"
#include "client.h"
+#include "pool.h"
+#include "worker.h"
+#include "http.h"
+#include "server.h"
#include "formats/json.h"
#include "formats/bson.h"
@@ -12,6 +15,7 @@
#include <stdlib.h>
#include <string.h>
#include <hiredis/hiredis.h>
+#include <hiredis/async.h>
#include <ctype.h>
struct cmd *
@@ -31,13 +35,20 @@ cmd_new(int count) {
void
cmd_free(struct cmd *c) {
+ int i;
if(!c) return;
+ for(i = 0; i < c->count; ++i) {
+ free((char*)c->argv[i]);
+ }
free(c->argv);
free(c->argv_len);
+ free(c->jsonp);
+ free(c->if_none_match);
if(c->mime_free) free(c->mime);
+
free(c);
}
@@ -69,9 +80,39 @@ decode_uri(const char *uri, size_t length, size_t *out_len, int always_decode_pl
return ret;
}
+/* setup headers */
+static void
+cmd_setup(struct cmd *cmd, struct http_client *client) {
+
+ int i;
+ cmd->keep_alive = client->keep_alive;
+
+ for(i = 0; i < client->header_count; ++i) {
+ if(strcasecmp(client->headers[i].key, "If-None-Match") == 0) {
+ cmd->if_none_match = calloc(1+client->headers[i].val_sz, 1);
+ memcpy(cmd->if_none_match, client->headers[i].val,
+ client->headers[i].val_sz);
+ } else if(strcasecmp(client->headers[i].key, "Connection") == 0 &&
+ strcasecmp(client->headers[i].val, "Keep-Alive") == 0) {
+ cmd->keep_alive = 1;
+ }
+ }
+
+ if(client->type) { /* transfer pointer ownership */
+ cmd->mime = client->type;
+ cmd->mime_free = 1;
+ client->type = NULL;
+ }
+
+ if(client->jsonp) { /* transfer pointer ownership */
+ cmd->jsonp = client->jsonp;
+ client->jsonp = NULL;
+ }
+}
+
int
-cmd_run(struct server *s, struct http_client *client,
+cmd_run(struct worker *w, struct http_client *client,
const char *uri, size_t uri_len,
const char *body, size_t body_len) {
@@ -79,10 +120,10 @@ cmd_run(struct server *s, struct http_client *client,
char *slash;
const char *p;
int cmd_len;
- int param_count = 0, cur_param = 1, i;
+ int param_count = 0, cur_param = 1;
struct cmd *cmd;
-
+ redisAsyncContext *ac = NULL;
formatting_fun f_format;
/* count arguments */
@@ -100,11 +141,15 @@ cmd_run(struct server *s, struct http_client *client,
return -1;
}
- client->cmd = cmd = cmd_new(param_count);
+ cmd = cmd_new(param_count);
+ cmd->fd = client->fd;
/* get output formatting function */
uri_len = cmd_select_format(client, cmd, uri, uri_len, &f_format);
+ /* add HTTP info */
+ cmd_setup(cmd, client);
+
/* check if we only have one command or more. */
slash = memchr(uri, '/', uri_len);
if(slash) {
@@ -114,26 +159,25 @@ cmd_run(struct server *s, struct http_client *client,
}
/* there is always a first parameter, it's the command name */
- cmd->argv[0] = uri;
+ cmd->argv[0] = malloc(cmd_len);
+ memcpy(cmd->argv[0], uri, cmd_len);
cmd->argv_len[0] = cmd_len;
/* check that the client is able to run this command */
- if(!acl_allow_command(cmd, s->cfg, client)) {
+ if(!acl_allow_command(cmd, w->s->cfg, client)) {
return -1;
}
/* check if we have to split the connection */
if(cmd_is_subscribe(cmd)) {
-
- client->sub = malloc(sizeof(struct subscription));
- client->sub->s = s = server_copy(s);
- client->sub->cmd = cmd;
+ ac = (redisAsyncContext*)pool_connect(w->pool, 0);
}
/* no args (e.g. INFO command) */
if(!slash) {
- client->state = CLIENT_EXECUTING;
- redisAsyncCommandArgv(s->ac, f_format, client, 1, cmd->argv, cmd->argv_len);
+ ac = (redisAsyncContext*)pool_get_context(w->pool);
+ redisAsyncCommandArgv(ac, f_format, cmd, 1,
+ (const char **)cmd->argv, cmd->argv_len);
return 0;
}
p = slash + 1;
@@ -156,21 +200,25 @@ cmd_run(struct server *s, struct http_client *client,
}
if(body && body_len) { /* PUT request */
- cmd->argv[cur_param] = body;
+ cmd->argv[cur_param] = malloc(body_len);
+ memcpy(cmd->argv[cur_param], body, body_len);
cmd->argv_len[cur_param] = body_len;
}
- /* push command to Redis. */
- client->state = CLIENT_EXECUTING;
- redisAsyncCommandArgv(s->ac, f_format, client, cmd->count, cmd->argv, cmd->argv_len);
-
- for(i = 1; i < cur_param; ++i) {
- free((char*)cmd->argv[i]);
+ /* send it off! */
+ if(!ac) {
+ ac = (redisAsyncContext*)pool_get_context(w->pool);
}
+ cmd_send(ac, f_format, cmd);
return 0;
}
+void
+cmd_send(redisAsyncContext *ac, formatting_fun f_format, struct cmd *cmd) {
+ redisAsyncCommandArgv(ac, f_format, cmd, cmd->count,
+ (const char **)cmd->argv, cmd->argv_len);
+}
/**
* Select Content-Type and processing function.
@@ -235,9 +283,9 @@ cmd_select_format(struct http_client *client, struct cmd *cmd,
}
/* the user can force it with ?type=some/thing */
- if(client->query_string.type.s) {
+ if(client->type) {
*f_format = custom_type_reply;
- cmd->mime = strdup(client->query_string.type.s);
+ cmd->mime = strdup(client->type);
cmd->mime_free = 1;
}
View
19 cmd.h
@@ -10,18 +10,28 @@
struct evhttp_request;
struct http_client;
struct server;
+struct worker;
struct cmd;
typedef void (*formatting_fun)(redisAsyncContext *, void *, void *);
struct cmd {
+ int fd;
+
int count;
- const char **argv;
+ char **argv;
size_t *argv_len;
/* HTTP data */
- char *mime;
+ char *mime; /* forced output content-type */
int mime_free;
+ char *if_none_match; /* used with ETags */
+ char *jsonp; /* jsonp wrapper */
+ int keep_alive;
+
+ /* various flags */
+ int started_responding;
+ int is_websocket;
};
struct subscription {
@@ -36,7 +46,7 @@ void
cmd_free(struct cmd *c);
int
-cmd_run(struct server *s, struct http_client *client,
+cmd_run(struct worker *w, struct http_client *client,
const char *uri, size_t uri_len,
const char *body, size_t body_len);
@@ -47,4 +57,7 @@ cmd_select_format(struct http_client *client, struct cmd *cmd,
int
cmd_is_subscribe(struct cmd *cmd);
+void
+cmd_send(redisAsyncContext *ac, formatting_fun f_format, struct cmd *cmd);
+
#endif
View
3  conf.c
@@ -30,6 +30,7 @@ conf_read(const char *filename) {
conf->redis_port = 6379;
conf->http_host = strdup("0.0.0.0");
conf->http_port = 7379;
+ conf->http_threads = 4;
conf->user = getuid();
conf->group = getgid();
conf->logfile = "webdis.log";
@@ -58,6 +59,8 @@ conf_read(const char *filename) {
conf->http_host = strdup(json_string_value(jtmp));
} else if(strcmp(json_object_iter_key(kv), "http_port") == 0 && json_typeof(jtmp) == JSON_INTEGER) {
conf->http_port = (short)json_integer_value(jtmp);
+ } else if(strcmp(json_object_iter_key(kv), "threads") == 0 && json_typeof(jtmp) == JSON_INTEGER) {
+ conf->http_threads = (short)json_integer_value(jtmp);
} else if(strcmp(json_object_iter_key(kv), "acl") == 0 && json_typeof(jtmp) == JSON_ARRAY) {
conf->perms = conf_parse_acls(jtmp);
} else if(strcmp(json_object_iter_key(kv), "user") == 0 && json_typeof(jtmp) == JSON_STRING) {
View
1  conf.h
@@ -14,6 +14,7 @@ struct conf {
/* HTTP server interface */
char *http_host;
short http_port;
+ short http_threads;
/* daemonize process, off by default */
int daemonize;
View
12 formats/bson.c
@@ -1,7 +1,6 @@
#include "bson.h"
#include "common.h"
#include "cmd.h"
-#include "client.h"
#include "http.h"
#include <string.h>
@@ -18,30 +17,33 @@ void
bson_reply(redisAsyncContext *c, void *r, void *privdata) {
redisReply *reply = r;
- struct http_client *client = privdata;
+ struct cmd *cmd = privdata;
bson_t *b;
char *bstr = NULL;
size_t bsz;
(void)c;
- if(client->cmd == NULL) {
+ if(cmd == NULL) {
/* broken connection */
return;
}
if (reply == NULL) {
+ /* FIXME */
+ /*
http_send_reply(client, 404, "Not Found", NULL, 0);
+ */
return;
}
/* encode redis reply as BSON */
- b = bson_wrap_redis_reply(client->cmd, reply);
+ b = bson_wrap_redis_reply(cmd, reply);
/* get BSON as string */
bstr = bson_string_output(b, &bsz);
/* send reply */
- format_send_reply(client, bstr, bsz, "application/bson");
+ format_send_reply(cmd, bstr, bsz, "application/bson");
/* cleanup */
free(bstr);
View
49 formats/common.c
@@ -2,11 +2,13 @@
#include "cmd.h"
#include "http.h"
#include "client.h"
+#include "websocket.h"
#include "md5/md5.h"
#include <string.h>
+#include <unistd.h>
-/* TODO: replace this with a faster hash function */
+/* TODO: replace this with a faster hash function? */
char *etag_new(const char *p, size_t sz) {
md5_byte_t buf[16];
@@ -30,42 +32,59 @@ char *etag_new(const char *p, size_t sz) {
}
void
-format_send_reply(struct http_client *client, const char *p, size_t sz, const char *content_type) {
+format_send_reply(struct cmd *cmd, const char *p, size_t sz, const char *content_type) {
int free_cmd = 1;
+ struct http_response resp;
- struct cmd *cmd = client->cmd;
+ if(cmd->is_websocket) {
+ ws_reply(cmd, p, sz);
+ cmd_free(cmd);
+ return;
+ }
if(cmd_is_subscribe(cmd)) {
free_cmd = 0;
/* start streaming */
- if(client->started_responding == 0) {
+ if(cmd->started_responding == 0) {
const char *ct = cmd->mime?cmd->mime:content_type;
- client->started_responding = 1;
- http_set_header(&client->output_headers.content_type, ct, strlen(ct));
- http_send_reply_start(client, 200, "OK");
+ cmd->started_responding = 1;
+ http_response_init(&resp, 200, "OK");
+ http_response_set_header(&resp, "Content-Type", ct);
+ http_response_set_header(&resp, "Connection", "Keep-Alive");
+ http_response_set_header(&resp, "Transfer-Encoding", "Chunked");
+ http_response_write(&resp, cmd->fd);
}
- http_send_reply_chunk(client, p, sz);
+ http_response_write_chunk(cmd->fd, p, sz);
} else {
/* compute ETag */
char *etag = etag_new(p, sz);
- const char *if_none_match = client->input_headers.if_none_match.s;
/* check If-None-Match */
- if(if_none_match && strncmp(if_none_match, etag, client->input_headers.if_none_match.sz) == 0) {
+ if(cmd->if_none_match && strcmp(cmd->if_none_match, etag) == 0) {
/* SAME! send 304. */
- http_send_reply(client, 304, "Not Modified", NULL, 0);
+ http_response_init(&resp, 304, "Not Modified");
} else {
const char *ct = cmd->mime?cmd->mime:content_type;
- http_set_header(&client->output_headers.content_type, ct, strlen(ct));
- http_set_header(&client->output_headers.etag, etag, strlen(etag));
- http_send_reply(client, 200, "OK", p, sz);
+ http_response_init(&resp, 200, "OK");
+ http_response_set_header(&resp, "Content-Type", ct);
+ http_response_set_header(&resp, "ETag", etag);
+ http_response_set_body(&resp, p, sz);
+ }
+ if(cmd->keep_alive) {
+ http_response_set_header(&resp, "Connection", "Keep-Alive");
+ } else {
+ http_response_set_header(&resp, "Connection", "Close");
+ }
+ http_response_write(&resp, cmd->fd);
+ if(!cmd->keep_alive) {
+ close(cmd->fd);
}
-
free(etag);
}
+
/* cleanup */
if(free_cmd) {
cmd_free(cmd);
View
4 formats/common.h
@@ -3,10 +3,10 @@
#include <stdlib.h>
-struct http_client;
+struct cmd;
void
-format_send_reply(struct http_client *client,
+format_send_reply(struct cmd *cmd,
const char *p, size_t sz,
const char *content_type);
View
23 formats/custom-type.c
@@ -2,7 +2,6 @@
#include "cmd.h"
#include "common.h"
#include "http.h"
-#include "client.h"
#include <string.h>
#include <hiredis/hiredis.h>
@@ -12,35 +11,43 @@ void
custom_type_reply(redisAsyncContext *c, void *r, void *privdata) {
redisReply *reply = r;
- struct http_client *client = privdata;
+ struct cmd *cmd = privdata;
(void)c;
char int_buffer[50];
int int_len;
+ struct http_response resp;
if(reply == NULL) {
return;
}
- if(client->cmd->mime) { /* use the given content-type, but only for strings */
+ if(cmd->mime) { /* use the given content-type, but only for strings */
switch(reply->type) {
case REDIS_REPLY_NIL: /* or nil values */
- format_send_reply(client, "", 0, client->cmd->mime);
+ format_send_reply(cmd, "", 0, cmd->mime);
return;
case REDIS_REPLY_STRING:
- format_send_reply(client, reply->str, reply->len, client->cmd->mime);
+ format_send_reply(cmd, reply->str, reply->len, cmd->mime);
return;
case REDIS_REPLY_INTEGER:
int_len = sprintf(int_buffer, "%lld", reply->integer);
- format_send_reply(client, int_buffer, int_len, client->cmd->mime);
+ format_send_reply(cmd, int_buffer, int_len, cmd->mime);
return;
}
}
/* couldn't make sense of what the client wanted. */
- http_send_reply(client, 400, "Bad request", NULL, 0);
- cmd_free(client->cmd);
+ http_response_init(&resp, 400, "Bad Request");
+ http_response_set_header(&resp, "Content-Length", "0");
+ if(cmd->keep_alive) {
+ http_response_set_header(&resp, "Connection", "Keep-Alive");
+ } else {
+ http_response_set_header(&resp, "Connection", "Close");
+ }
+ http_response_write(&resp, cmd->fd);
+ cmd_free(cmd);
}
View
98 formats/json.c
@@ -15,33 +15,34 @@ void
json_reply(redisAsyncContext *c, void *r, void *privdata) {
redisReply *reply = r;
- struct http_client *client = privdata;
+ struct cmd *cmd = privdata;
json_t *j;
char *jstr;
(void)c;
- if(client->cmd == NULL) {
+ if(cmd == NULL) {
/* broken connection */
return;
}
if (reply == NULL) {
+/* FIXME */
+#if 0
if(client->started_responding) { /* broken, close */
http_send_reply_end(client);
}
+#endif
return;
}
/* encode redis reply as JSON */
- j = json_wrap_redis_reply(client->cmd, r);
+ j = json_wrap_redis_reply(cmd, r);
/* get JSON as string, possibly with JSONP wrapper */
- jstr = json_string_output(j,
- client->query_string.jsonp.s,
- client->query_string.jsonp.sz);
+ jstr = json_string_output(j, cmd->jsonp);
/* send reply */
- format_send_reply(client, jstr, strlen(jstr), "application/json");
+ format_send_reply(cmd, jstr, strlen(jstr), "application/json");
/* cleanup */
json_decref(j);
@@ -192,12 +193,13 @@ json_wrap_redis_reply(const struct cmd *cmd, const redisReply *r) {
char *
-json_string_output(json_t *j, const char *jsonp, size_t jsonp_len) {
+json_string_output(json_t *j, const char *jsonp) {
char *json_reply = json_dumps(j, JSON_COMPACT);
/* check for JSONP */
if(jsonp) {
+ size_t jsonp_len = strlen(jsonp);
size_t json_len = strlen(json_reply);
size_t ret_len = jsonp_len + 1 + json_len + 3;
char *ret = calloc(1 + ret_len, 1);
@@ -214,3 +216,83 @@ json_string_output(json_t *j, const char *jsonp, size_t jsonp_len) {
return json_reply;
}
+/* extract JSON from WebSocket frame and fill struct cmd. */
+struct cmd *
+json_ws_extract(struct http_client *c, const char *p, size_t sz) {
+
+ struct cmd *cmd = NULL;
+ json_t *j;
+ char *jsonz; /* null-terminated */
+
+ unsigned int i, cur;
+ int argc = 0;
+ json_error_t jerror;
+
+ (void)c;
+
+ jsonz = calloc(sz + 1, 1);
+ memcpy(jsonz, p, sz);
+ j = json_loads(jsonz, sz, &jerror);
+ free(jsonz);
+
+ if(!j) {
+ return NULL;
+ }
+ if(json_typeof(j) != JSON_ARRAY) {
+ json_decref(j);
+ return NULL; /* invalid JSON */
+ }
+
+ /* count elements */
+ for(i = 0; i < json_array_size(j); ++i) {
+ json_t *jelem = json_array_get(j, i);
+
+ switch(json_typeof(jelem)) {
+ case JSON_STRING:
+ case JSON_INTEGER:
+ argc++;
+ break;
+
+ default:
+ break;
+ }
+ }
+
+ if(!argc) { /* not a single item could be decoded */
+ json_decref(j);
+ return NULL;
+ }
+
+ /* create command and add args */
+ cmd = cmd_new(argc);
+ for(i = 0, cur = 0; i < json_array_size(j); ++i) {
+ json_t *jelem = json_array_get(j, i);
+ char *tmp;
+
+ switch(json_typeof(jelem)) {
+ case JSON_STRING:
+ tmp = strdup(json_string_value(jelem));
+
+ cmd->argv[cur] = tmp;
+ cmd->argv_len[cur] = strlen(tmp);
+ cur++;
+ break;
+
+ case JSON_INTEGER:
+ tmp = malloc(40);
+ sprintf(tmp, "%d", (int)json_integer_value(jelem));
+
+ cmd->argv[cur] = tmp;
+ cmd->argv_len[cur] = strlen(tmp);
+ cur++;
+ break;
+
+ default:
+ break;
+ }
+ }
+
+ json_decref(j);
+ return cmd;
+}
+
View
6 formats/json.h
@@ -6,11 +6,15 @@
#include <hiredis/async.h>
struct cmd;
+struct http_client;
void
json_reply(redisAsyncContext *c, void *r, void *privdata);
char *
-json_string_output(json_t *j, const char *jsonp, size_t jsonp_len);
+json_string_output(json_t *j, const char *jsonp);
+
+struct cmd *
+json_ws_extract(struct http_client *c, const char *p, size_t sz);
#endif
View
4 formats/raw.c
@@ -13,7 +13,7 @@ void
raw_reply(redisAsyncContext *c, void *r, void *privdata) {
redisReply *reply = r;
- struct http_client *client = privdata;
+ struct cmd *cmd = privdata;
char *raw_out;
size_t sz;
(void)c;
@@ -25,7 +25,7 @@ raw_reply(redisAsyncContext *c, void *r, void *privdata) {
raw_out = raw_wrap(r, &sz);
/* send reply */
- format_send_reply(client, raw_out, sz, "binary/octet-stream");
+ format_send_reply(cmd, raw_out, sz, "binary/octet-stream");
/* cleanup */
free(raw_out);
View
164 http.c
@@ -1,19 +1,12 @@
#include "http.h"
#include "server.h"
-#include "slog.h"
-#include "cmd.h"
+#include "worker.h"
+#include "client.h"
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
-
-void
-http_set_header(str_t *h, const char *p, size_t sz) {
-
- h->s = calloc(1, 1+sz);
- memcpy(h->s, p, sz);
- h->sz = sz;
-}
+#include <stdio.h>
/* HTTP Response */
@@ -34,35 +27,39 @@ void
http_response_set_header(struct http_response *r, const char *k, const char *v) {
int i, pos = r->header_count;
- size_t sz;
- char *s;
-
- sz = strlen(k) + 2 + strlen(v) + 2;
- s = calloc(sz + 1, 1);
- sprintf(s, "%s: %s\r\n", k, v);
+ size_t key_sz = strlen(k);
+ size_t val_sz = strlen(v);
for(i = 0; i < r->header_count; ++i) {
- size_t klen = strlen(k);
- if(strncmp(r->headers[i].s, k, klen) == 0 && r->headers[i].s[klen] == ':') {
+ if(strncmp(r->headers[i].key, k, key_sz) == 0) {
pos = i;
/* free old value before replacing it. */
- free(r->headers[i].s);
+ free(r->headers[i].key);
+ free(r->headers[i].val);
break;
}
}
/* extend array */
if(pos == r->header_count) {
- r->headers = realloc(r->headers, sizeof(str_t)*(r->header_count + 1));
+ r->headers = realloc(r->headers,
+ sizeof(struct http_header)*(r->header_count + 1));
r->header_count++;
}
- r->headers[pos].s = s;
- r->headers[pos].sz = sz;
- if(!strcmp(k, "Transfer-Encoding") && !strcmp(v, "chunked")) {
+ /* copy key */
+ r->headers[pos].key = calloc(key_sz + 1, 1);
+ memcpy(r->headers[pos].key, k, key_sz);
+ r->headers[pos].key_sz = key_sz;
+
+ /* copy val */
+ r->headers[pos].val = calloc(val_sz + 1, 1);
+ memcpy(r->headers[pos].val, v, val_sz);
+ r->headers[pos].val_sz = val_sz;
+
+ if(!strcmp(k, "Transfer-Encoding") && !strcmp(v, "Chunked")) {
r->chunked = 1;
}
-
}
void
@@ -77,13 +74,13 @@ http_response_write(struct http_response *r, int fd) {
char *s = NULL, *p;
size_t sz = 0;
- int i, ret;
+ int i, ret, keep_alive = 0;
sz = sizeof("HTTP/1.x xxx ")-1 + strlen(r->msg) + 2;
s = calloc(sz + 1, 1);
ret = sprintf(s, "HTTP/1.1 %d %s\r\n", r->code, r->msg);
- p = s; // + ret - 3;
+ p = s;
if(r->code == 200 && r->body) {
char content_length[10];
@@ -94,12 +91,33 @@ http_response_write(struct http_response *r, int fd) {
}
for(i = 0; i < r->header_count; ++i) {
- s = realloc(s, sz + r->headers[i].sz);
+ /* "Key: Value\r\n" */
+ size_t header_sz = r->headers[i].key_sz + 2 + r->headers[i].val_sz + 2;
+ s = realloc(s, sz + header_sz);
p = s + sz;
- memcpy(p, r->headers[i].s, r->headers[i].sz);
- p += r->headers[i].sz;
- sz += r->headers[i].sz;
+ /* add key */
+ memcpy(p, r->headers[i].key, r->headers[i].key_sz);
+ p += r->headers[i].key_sz;
+
+ /* add ": " */
+ memcpy(p, ": ", 2);
+ p += 2;
+
+ /* add value */
+ memcpy(p, r->headers[i].val, r->headers[i].val_sz);
+ p += r->headers[i].val_sz;
+
+ /* add "\r\n" */
+ memcpy(p, "\r\n", 2);
+ p += 2;
+
+ sz += header_sz;
+
+ if(strncasecmp("Connection", r->headers[i].key, r->headers[i].key_sz) == 0 &&
+ strncasecmp("Keep-Alive", r->headers[i].val, r->headers[i].val_sz) == 0) {
+ keep_alive = 1;
+ }
}
/* end of headers */
@@ -114,14 +132,98 @@ http_response_write(struct http_response *r, int fd) {
}
ret = write(fd, s, sz);
+ if(!keep_alive) {
+ /* printf("response write, close fd=%d\n", fd); */
+ close(fd);
+ }
+ /*
+ write(1, "response: [", 11);
+ write(1, s, sz);
+ write(1, "]\n", 2);
+ */
free(s);
/* cleanup response object */
for(i = 0; i < r->header_count; ++i) {
- free(r->headers[i].s);
+ free(r->headers[i].key);
+ free(r->headers[i].val);
}
free(r->headers);
return ret == (int)sz ? 0 : 1;
}
+/* Adobe flash cross-domain request */
+void
+http_crossdomain(struct http_client *c) {
+
+ struct http_response resp;
+ char out[] = "<?xml version=\"1.0\"?>\n"
+"<!DOCTYPE cross-domain-policy SYSTEM \"http://www.macromedia.com/xml/dtds/cross-domain-policy.dtd\">\n"
+"<cross-domain-policy>\n"
+ "<allow-access-from domain=\"*\" />\n"
+"</cross-domain-policy>\n";
+
+ http_response_init(&resp, 200, "OK");
+ http_response_set_connection_header(c, &resp);
+ http_response_set_header(&resp, "Content-Type", "application/xml");
+ http_response_set_body(&resp, out, sizeof(out)-1);
+
+ http_response_write(&resp, c->fd);
+ http_client_reset(c);
+}
+
+/* Simple error response */
+void
+http_send_error(struct http_client *c, short code, const char *msg) {
+
+ struct http_response resp;
+ http_response_init(&resp, code, msg);
+ http_response_set_connection_header(c, &resp);
+ http_response_set_body(&resp, NULL, 0);
+
+ http_response_write(&resp, c->fd);
+ http_client_reset(c);
+}
+
+void
+http_response_set_connection_header(struct http_client *c, struct http_response *r) {
+ if(c->keep_alive) {
+ http_response_set_header(r, "Connection", "Keep-Alive");
+ } else {
+ http_response_set_header(r, "Connection", "Close");
+ }
+}
+
+/* Response to HTTP OPTIONS */
+void
+http_send_options(struct http_client *c) {
+
+ struct http_response resp;
+ http_response_init(&resp, 200, "OK");
+ http_response_set_connection_header(c, &resp);
+
+ http_response_set_header(&resp, "Content-Type", "text/html");
+ http_response_set_header(&resp, "Allow", "GET,POST,PUT,OPTIONS");
+ http_response_set_header(&resp, "Content-Length", "0");
+
+ /* Cross-Origin Resource Sharing, CORS. */
+ http_response_set_header(&resp, "Access-Control-Allow-Origin", "*");
+
+ http_response_write(&resp, c->fd);
+ http_client_reset(c);
+}
+
+void
+http_response_write_chunk(int fd, const char *p, size_t sz) {
+
+ char buf[64];
+ int ret, chunk_size;
+
+ chunk_size = sprintf(buf, "%x\r\n", (int)sz);
+ ret = write(fd, buf, chunk_size);
+ ret = write(fd, p, sz);
+ ret = write(fd, "\r\n", 2);
+ (void)ret;
+}
+
View
33 http.h
@@ -3,16 +3,22 @@
#include <sys/types.h>
-typedef struct {
- char *s;
- size_t sz;
-} str_t;
+struct http_client;
+
+struct http_header {
+ char *key;
+ size_t key_sz;
+
+ char *val;
+ size_t val_sz;
+};
+
struct http_response {
short code;
const char *msg;
- str_t *headers;
+ struct http_header *headers;
int header_count;
const char *body;
@@ -21,9 +27,6 @@ struct http_response {
int chunked;
};
-void
-http_set_header(str_t *h, const char *p, size_t sz);
-
/* HTTP response */
void
@@ -38,5 +41,19 @@ http_response_set_body(struct http_response *r, const char *body, size_t body_le
int
http_response_write(struct http_response *r, int fd);
+void
+http_crossdomain(struct http_client *c);
+
+void
+http_send_error(struct http_client *c, short code, const char *msg);
+
+void
+http_send_options(struct http_client *c);
+
+void
+http_response_set_connection_header(struct http_client *c, struct http_response *r);
+
+void
+http_response_write_chunk(int fd, const char *p, size_t sz);
#endif
View
126 pool.c
@@ -0,0 +1,126 @@
+#include "pool.h"
+#include "worker.h"
+#include "conf.h"
+#include "server.h"
+
+#include <stdlib.h>
+#include <event.h>
+#include <hiredis/adapters/libevent.h>
+
+struct pool *
+pool_new(struct worker *w, int count) {
+
+ struct pool *p = calloc(1, sizeof(struct pool));
+
+ p->count = count;
+ p->ac = calloc(count, sizeof(redisAsyncContext*));
+
+ p->w = w;
+ p->cfg = w->s->cfg;
+
+ return p;
+}
+
+static void
+pool_on_connect(const redisAsyncContext *c) {
+ struct pool *p = c->data;
+ int i = 0;
+
+ printf("Connected to redis\n");
+ if(!p) {
+ return;
+ }
+
+ /* add to pool */
+ for(i = 0; i < p->count; ++i) {
+ if(p->ac[i] == NULL) {
+ p->ac[i] = c;
+ return;
+ }
+ }
+}
+
+static void
+pool_on_disconnect(const redisAsyncContext *c, int status) {
+
+ struct pool *p = c->data;
+ int i = 0;
+ if (status != REDIS_OK) {
+ fprintf(stderr, "Error: %s\n", c->errstr);
+ }
+
+ if(p == NULL) { /* no need to clean anything here. */
+ return;
+ }
+
+ /* remove from the pool */
+ for(i = 0; i < p->count; ++i) {
+ if(p->ac[i] == c) {
+ p->ac[i] = NULL;
+ break;
+ }
+ }
+
+ /* reconnect */
+ pool_connect(p, 1);
+}
+
+/**
+ * Create new connection.
+ */
+redisAsyncContext *
+pool_connect(struct pool *p, int attach) {
+
+ struct redisAsyncContext *ac;
+ if(p->cfg->redis_host[0] == '/') { /* unix socket */
+ ac = redisAsyncConnectUnix(p->cfg->redis_host);
+ } else {
+ ac = redisAsyncConnect(p->cfg->redis_host, p->cfg->redis_port);
+ }
+
+ if(attach) {
+ ac->data = p;
+ } else {
+ ac->data = NULL;
+ }
+
+ if(ac->err) {
+ /*
+ const char err[] = "Connection failed";
+ slog(s, WEBDIS_ERROR, err, sizeof(err)-1);
+ */
+ fprintf(stderr, "Error: %s\n", ac->errstr);
+ redisAsyncFree(ac);
+ return NULL;
+ }
+
+ redisLibeventAttach(ac, p->w->base);
+ redisAsyncSetConnectCallback(ac, pool_on_connect);
+ redisAsyncSetDisconnectCallback(ac, pool_on_disconnect);
+
+ if (p->cfg->redis_auth) { /* authenticate. */
+ redisAsyncCommand(ac, NULL, NULL, "AUTH %s", p->cfg->redis_auth);
+ }
+ if (p->cfg->database) { /* change database. */
+ redisAsyncCommand(ac, NULL, NULL, "SELECT %d", p->cfg->database);
+ }
+ return ac;
+}
+
+const redisAsyncContext *
+pool_get_context(struct pool *p) {
+
+ int orig = p->cur++;
+
+ do {
+ p->cur++;
+ p->cur %= p->count;
+ if(p->ac[p->cur] != NULL) {
+ return p->ac[p->cur];
+ }
+ } while(p->cur != orig);
+
+ return NULL;
+
+}
+
View
30 pool.h
@@ -0,0 +1,30 @@
+#ifndef POOL_H
+#define POOL_H
+
+#include <hiredis/async.h>
+
+struct conf;
+struct worker;
+
+struct pool {
+
+ struct worker *w;
+ struct conf *cfg;
+
+ const redisAsyncContext **ac;
+ int count;
+ int cur;
+
+};
+
+
+struct pool *
+pool_new(struct worker *w, int count);
+
+redisAsyncContext *
+pool_connect(struct pool *p, int attach);
+
+const redisAsyncContext *
+pool_get_context(struct pool *p);
+
+#endif
View
183 server.c
@@ -1,14 +1,10 @@
#include "server.h"
-#include "conf.h"
-#include "cmd.h"
-#include "slog.h"
-#include "http.h"
+#include "worker.h"
#include "client.h"
+#include "conf.h"
-#include <hiredis/hiredis.h>
-#include <hiredis/adapters/libevent.h>
-#include <jansson.h>
-
+#include <stdlib.h>
+#include <stdio.h>
#include <unistd.h>
#include <signal.h>
#include <string.h>
@@ -20,7 +16,7 @@
/**
* Sets up a non-blocking socket
*/
-int
+static int
socket_setup(const char *ip, short port) {
int reuse = 1;
@@ -75,167 +71,74 @@ socket_setup(const char *ip, short port) {
}
struct server *
-server_new(const char *filename) {
+server_new(const char *cfg_file) {
+
+ int i;
struct server *s = calloc(1, sizeof(struct server));
- s->cfg = conf_read(filename);
- s->base = event_base_new();
-
- return s;
-}
-
-void
-server_free(struct server *s) {
-
- /* cleanup Redis async object, _before_ the 2 struct event. */
- redisAsyncFree(s->ac);
-
- /* event_del(&s->ev); */
- event_del(&s->ev_reconnect);
- free(s);
-}
-
-static void
-connectCallback(const redisAsyncContext *c) {
- ((void)c);
-}
+ s->cfg = conf_read(cfg_file);
-static void
-disconnectCallback(const redisAsyncContext *c, int status) {
- struct server *s = c->data;
- if (status != REDIS_OK) {
- fprintf(stderr, "Error: %s\n", c->errstr);
- }
- s->ac = NULL;
-
- /* wait 100 msec and reconnect */
- s->tv_reconnect.tv_sec = 0;
- s->tv_reconnect.tv_usec = 100*1000;
- webdis_connect(s);
-}
-
-static void
-on_timer_reconnect(int fd, short event, void *ctx) {
-
- (void)fd;
- (