Permalink
Browse files

Support of HTTP pipeline

- The HTTP server now handle pipelining
- ape_array are now cleaned
  • Loading branch information...
1 parent 87417fd commit 46f2d425cab6b35ae6138869711330be53eb8527 @paraboul committed May 16, 2012
Showing with 160 additions and 28 deletions.
  1. +24 −1 src/core/ape_array.c
  2. +10 −4 src/core/ape_pool.c
  3. +7 −2 src/core/ape_pool.h
  4. +115 −19 src/core/ape_server.c
  5. +4 −2 src/core/ape_server.h
View
@@ -2,6 +2,8 @@
#include "ape_array.h"
#include <string.h>
+static void ape_array_clean_cb(ape_pool_t *item);
+
ape_array_t *ape_array_new(size_t n)
{
ape_array_t *array;
@@ -154,7 +156,28 @@ void ape_array_add(ape_array_t *array, const char *key, const char *value)
void ape_array_destroy(ape_array_t *array)
{
- ape_destroy_pool_list_ordered((ape_pool_list_t *)array);
+ ape_destroy_pool_list_ordered((ape_pool_list_t *)array, ape_array_clean_cb);
+}
+
+static void ape_array_clean_cb(ape_pool_t *item)
+{
+ ape_array_item_t *array = (ape_array_item_t *)item;
+
+ if (!(array->pool.flags & APE_ARRAY_USED_SLOT)) {
+ return;
+ }
+ array->pool.flags &= ~APE_ARRAY_USED_SLOT;
+
+ buffer_destroy(array->key);
+
+ switch(array->pool.flags & ~APE_POOL_ALL_FLAGS) {
+ case APE_ARRAY_VAL_BUF:
+ buffer_destroy(array->pool.ptr.buf);
+ break;
+ case APE_ARRAY_VAL_INT:
+ default:
+ break;
+ }
}
// vim: ts=4 sts=4 sw=4 et
View
@@ -79,7 +79,7 @@ ape_pool_t *ape_grow_pool(ape_pool_list_t *list, size_t size, size_t n)
return pool;
}
-void ape_destroy_pool_ordered(ape_pool_t *pool)
+void ape_destroy_pool_ordered(ape_pool_t *pool, ape_pool_clean_callback cleaner)
{
ape_pool_t *tPool = NULL;
@@ -88,14 +88,19 @@ void ape_destroy_pool_ordered(ape_pool_t *pool)
if (pool->flags & APE_POOL_ALLOC) {
if (tPool != NULL) {
-
+ if (cleaner != NULL) {
+ cleaner(tPool);
+ }
free(tPool);
}
tPool = pool;
}
pool = pool->next;
}
if (tPool != NULL) {
+ if (cleaner != NULL) {
+ cleaner(tPool);
+ }
free(tPool);
}
}
@@ -138,9 +143,10 @@ void ape_destroy_pool_list(ape_pool_list_t *list)
free(list);
}
-void ape_destroy_pool_list_ordered(ape_pool_list_t *list)
+void ape_destroy_pool_list_ordered(ape_pool_list_t *list,
+ ape_pool_clean_callback cleaner)
{
- ape_destroy_pool_ordered(list->head);
+ ape_destroy_pool_ordered(list->head, cleaner);
free(list);
}
View
@@ -22,6 +22,8 @@ typedef struct _ape_pool_list {
ape_pool_t *current;
} ape_pool_list_t;
+typedef void (*ape_pool_clean_callback)(ape_pool_t *);
+
ape_pool_t *ape_new_pool(size_t size, size_t n);
ape_pool_list_t *ape_new_pool_list(size_t size, size_t n);
ape_pool_t *ape_grow_pool(ape_pool_list_t *list, size_t size, size_t n);
@@ -30,9 +32,12 @@ ape_pool_t *ape_pool_head_to_current(ape_pool_list_t *list);
void ape_init_pool_list(ape_pool_list_t *list, size_t size, size_t n);
void ape_destroy_pool(ape_pool_t *pool);
-void ape_destroy_pool_ordered(ape_pool_t *pool);
+void ape_destroy_pool_ordered(ape_pool_t *pool,
+ ape_pool_clean_callback cleaner);
void ape_destroy_pool_list(ape_pool_list_t *list);
-void ape_destroy_pool_list_ordered(ape_pool_list_t *list);
+void ape_destroy_pool_list_ordered(ape_pool_list_t *list,
+ ape_pool_clean_callback cleaner);
+
#endif
View
@@ -6,9 +6,14 @@
#include "ape_sha1.h"
#include "ape_websocket.h"
#include "ape_ssl.h"
+#include "ape_common_pattern.h"
#include <string.h>
+#ifdef _HAVE_MSGPACK
+ #include <msgpack.h>
+#endif
+
int _co_event = 0, _disco_event = 0;
static int ape_server_http_ready(ape_client *client, ape_global *ape);
@@ -72,6 +77,7 @@ static int ape_http_callback(void **ctx, callback_type type,
client->http.method = HTTP_POST;
break;
}
+
client->http.path = buffer_new(256);
client->http.headers.list = ape_array_new(16);
client->http.headers.tkey = buffer_new(16);
@@ -130,16 +136,20 @@ static int ape_http_callback(void **ctx, callback_type type,
case HTTP_HEADER_KEY:
break;
case HTTP_HEADER_VAL:
+
ape_array_add_b(client->http.headers.list,
- client->http.headers.tkey, client->http.headers.tval);
+ client->http.headers.tkey, client->http.headers.tval);
client->http.headers.tkey = buffer_new(16);
client->http.headers.tval = buffer_new(64);
+
break;
case HTTP_CL_VAL:
break;
case HTTP_HEADER_END:
break;
case HTTP_READY:
+ buffer_destroy(client->http.headers.tkey);
+ buffer_destroy(client->http.headers.tval);
ape_server_http_ready(client, ape);
break;
default:
@@ -148,11 +158,47 @@ static int ape_http_callback(void **ctx, callback_type type,
return 1;
}
+/* TODO: If a close has already been sent : doesnt process (check the RFC) */
static void ape_server_on_ws_frame(ape_client *client, const unsigned char *data, ssize_t length, ape_global *ape)
{
- ape_ws_write(client->socket, CONST_STR_LEN("[\"OK\"]"), APE_DATA_STATIC);
+#ifdef _HAVE_MSGPACK
+ if (client->serial_method == APE_CLIENT_SERIAL_MSGPACK) {
+ int success;
+ msgpack_unpacked msg;
+
+ printf("We have %d data sized\n", length);
+
+ msgpack_unpacked_init(&msg);
+ success = msgpack_unpack_next(&msg, data, length, NULL);
+
+ msgpack_object obj = msg.data;
+ msgpack_object_print(stdout, obj);
+ printf("\n");
+
+ } else if (client->serial_method == APE_CLIENT_SERIAL_JSON) {
+#endif
+ JSON_config config;
+
+ if (client->json.parser == NULL) {
+ init_JSON_config(&config);
+ config.depth = 15;
+ config.callback = NULL;
+ config.callback_ctx = NULL;
+ config.allow_comments = 0;
+ config.handle_floats_manually = 0;
- ape_ws_close(client->socket);
+ if ((client->json.parser = new_JSON_parser(&config)) == NULL) {
+ ape_ws_write(client->socket, (char *)CONST_STR_LEN(PATTERN_ERR_INTERNAL),
+ APE_DATA_GLOBAL_STATIC);
+ return;
+ }
+ }
+#ifdef _HAVE_MSGPACK
+ }
+#endif
+ ape_ws_write(client->socket, (char *)CONST_STR_LEN(PATTERN_ERR_BAD_JSON), APE_DATA_GLOBAL_STATIC);
+
+ //ape_ws_close(client->socket);
APE_EVENT(wsframe, client, data, length, ape);
}
@@ -163,19 +209,58 @@ static int ape_server_http_ready(ape_client *client, ape_global *ape)
const buffer *host = REQUEST_HEADER("host");
const buffer *upgrade = REQUEST_HEADER("upgrade");
-
+
+ client->serial_method = APE_CLIENT_SERIAL_JSON;
+
+
if (host != NULL) {
/* /!\ the buffer is non null terminated */
}
- if (upgrade && strncmp(upgrade->data, " websocket", sizeof(" websocket") - 1) == 0) {
+ if (upgrade &&
+ upgrade->used == (sizeof(" websocket") - 1) &&
+ strncmp(upgrade->data, " websocket", sizeof(" websocket") - 1) == 0) {
+
char *ws_computed_key;
const buffer *ws_key = REQUEST_HEADER("Sec-WebSocket-Key");
- printf("Key : %s\n", &ws_key->data[1]);
+
if (ws_key) {
+ buffer *ws_proto = REQUEST_HEADER("Sec-WebSocket-Protocol");
+
+ if (ws_proto != NULL) {
+ /* strtok needs a NULL-terminated string */
+ buffer_append_char(ws_proto, '\0');
+
+ char *toksave, *token, *tproto = &ws_proto->data[1];
+
+ while(1) {
+ token = strtok_r(tproto, ", ", &toksave);
+
+ if (token == NULL) break;
+#ifdef _HAVE_MSGPACK
+ if (strcasecmp(token, "msgpack.ape") == 0) {
+ client->serial_method = APE_CLIENT_SERIAL_MSGPACK;
+ }
+#endif
+ tproto = NULL;
+ }
+ }
+
ws_computed_key = ape_ws_compute_key(&ws_key->data[1], ws_key->used-1);
+
APE_socket_write(client->socket, CONST_STR_LEN(WEBSOCKET_HARDCODED_HEADERS), APE_DATA_STATIC);
+
+ switch(client->serial_method) {
+ case APE_CLIENT_SERIAL_JSON:
+ APE_socket_write(client->socket, CONST_STR_LEN("Sec-WebSocket-Protocol: json.ape\r\n"), APE_DATA_STATIC);
+ break;
+ case APE_CLIENT_SERIAL_MSGPACK:
+ APE_socket_write(client->socket, CONST_STR_LEN("Sec-WebSocket-Protocol: msgpack.ape\r\n"), APE_DATA_STATIC);
+ break;
+ }
+
APE_socket_write(client->socket, CONST_STR_LEN("Sec-WebSocket-Accept: "), APE_DATA_STATIC);
APE_socket_write(client->socket, ws_computed_key, strlen(ws_computed_key), APE_DATA_STATIC);
+ /* TODO: check the origin */
APE_socket_write(client->socket, CONST_STR_LEN("\r\nSec-WebSocket-Origin: 127.0.0.1\r\n\r\n"), APE_DATA_STATIC);
client->socket->callbacks.on_read = ape_ws_process_frame;
client->ws_state = malloc(sizeof(websocket_state));
@@ -202,24 +287,30 @@ static int ape_server_http_ready(ape_client *client, ape_global *ape)
case APE_TRANSPORT_NU:
case APE_TRANSPORT_FT:
{
- char fullpath[4096];
- char fill[20480];
- char fillb[20480];
-
- memset(fill, 'a', 20480);
- memset(fillb, 'b', 20480);
-
- APE_EVENT(request, client, ape);
-
- APE_socket_write(client->socket, CONST_STR_LEN("HTTP/1.1 200 OK\n\n"), APE_DATA_STATIC);
- APE_socket_write(client->socket, CONST_STR_LEN("<h1>Ho heil :)</h1>\n\n"), APE_DATA_STATIC);
+ APE_socket_write(client->socket, CONST_STR_LEN("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"), APE_DATA_STATIC);
+
+ //APE_EVENT(request, client, ape);
+
+ HTTP_PARSER_RESET(&client->http.parser);
+ client->http.parser.callback = ape_http_callback;
+ client->http.parser.ctx[0] = client;
+ client->http.parser.ctx[1] = ape;
+ client->http.method = HTTP_GET;
+ client->http.transport = APE_TRANSPORT_NU;
+
+ ape_array_destroy(client->http.headers.list);
+ buffer_destroy(client->http.path);
+ client->http.path = NULL;
+
+ //APE_socket_write(client->socket, CONST_STR_LEN("HTTP/1.1 200 OK\n\n"), APE_DATA_STATIC);
+ //APE_socket_write(client->socket, CONST_STR_LEN("<h1>Ho heil :)</h1>\n\n"), APE_DATA_STATIC);
/*APE_socket_write(client->socket, fill, 20480, APE_DATA_STATIC);
APE_socket_write(client->socket, fill, 20480, APE_DATA_STATIC);
APE_socket_write(client->socket, fillb, 20480, APE_DATA_STATIC);
*/
//#endif
- APE_socket_shutdown(client->socket);
+ //APE_socket_shutdown(client->socket);
//printf("Requested : %s\n", client->http.path->data);
//APE_socket_write(client->socket, CONST_STR_LEN("HTTP/1.1 418 I'm a teapot\n\n"));
@@ -246,17 +337,21 @@ static int ape_server_http_ready(ape_client *client, ape_global *ape)
static void ape_server_on_read(ape_socket *socket_client, ape_global *ape)
{
int i;
+ //printf("data used : %d\n", socket_client->data_in.used);
/* TODO : implement duff device here (speedup !)*/
for (i = 0; i < socket_client->data_in.used; i++) {
if (!parse_http_char(&APE_CLIENT(socket_client)->http.parser,
socket_client->data_in.data[i])) {
- printf("Failed %c\n", socket_client->data_in.data[i]);
+ printf("Failed at %d %c\n", i, socket_client->data_in.data[i]);
+ printf("next %c\n", socket_client->data_in.data[i+1]);
// TODO : graceful shutdown
shutdown(socket_client->s.fd, 2);
break;
}
+ //printf("%c", socket_client->data_in.data[i]);
}
+ //printf("\n");
}
static void ape_server_on_connect(ape_socket *socket_server, ape_socket *socket_client, ape_global *ape)
@@ -290,6 +385,7 @@ static void ape_server_on_connect(ape_socket *socket_server, ape_socket *socket_
static void ape_server_on_disconnect(ape_socket *socket_client, ape_global *ape)
{
+ printf("Socket disconnected\n");
if (APE_CLIENT(socket_client)->http.path != NULL) {
buffer_destroy(APE_CLIENT(socket_client)->http.path);
}
View
@@ -17,7 +17,9 @@ typedef struct _ape_client {
ape_socket *socket;
ape_socket *server;
struct _websocket_state *ws_state;
-
+
+ ape_client_serial_method_e serial_method;
+
struct {
http_parser parser;
http_method_t method;
@@ -30,7 +32,7 @@ typedef struct _ape_client {
buffer *tval;
} headers;
} http;
-
+
struct {
struct JSON_parser_struct* parser;
} json;

0 comments on commit 46f2d42

Please sign in to comment.