Skip to content

Commit

Permalink
WebSocket now works
Browse files Browse the repository at this point in the history
  • Loading branch information
Paul Colomiets committed Oct 28, 2010
1 parent a21ccb6 commit 4d6aec9
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 26 deletions.
12 changes: 12 additions & 0 deletions include/website.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ typedef int bool;
#define ws_MESSAGE_CB(targ, fun) \
(targ)->wsock_callbacks[WS_WEBSOCKET_CB_MESSAGE] = (ws_websocket_cb)fun
#define ws_SET_TIMEOUT(targ, value) (targ)->network_timeout = (value)
#define ws_MESSAGE_INCREF(val) (++(val)->refcnt)
#define ws_MESSAGE_DECREF(val) if(!--(val)->refcnt) ws_message_free(val)

typedef enum {
WS_REQ_CB_HEADERS, // got headers
Expand Down Expand Up @@ -75,6 +77,7 @@ struct ws_request_s;
struct ws_connection_s;

typedef struct ws_message_s {
size_t refcnt;
size_t length;
char *data;
void (*free_cb)(void *);
Expand Down Expand Up @@ -137,6 +140,7 @@ typedef struct ws_connection_s {
int max_header_size;
int _message_size;
int max_message_size;
int max_message_queue;
struct ws_server_s *serv;
struct ws_connection_s *next;
struct ws_connection_s *prev;
Expand All @@ -150,6 +154,11 @@ typedef struct ws_connection_s {
char *websocket_buf;
size_t websocket_buf_size;
size_t websocket_buf_offset;
ws_message_t **websocket_queue;
size_t websocket_queue_size;
size_t websocket_qstart;
size_t websocket_qlen;
size_t websocket_queue_offset; // offset INSIDE the current message
} ws_connection_t;

typedef struct ws_server_s {
Expand All @@ -160,6 +169,7 @@ typedef struct ws_server_s {
int _req_size;
int _message_size;
int max_message_size;
int max_message_queue;
struct ws_listener_s *listeners;
size_t listeners_num;
struct ws_connection_s *first_conn;
Expand All @@ -184,6 +194,8 @@ int ws_add_fd(ws_server_t *serv, int fd);
int ws_index_header(ws_server_t *serv, const char *name);
int ws_server_start(ws_server_t *serv);

int ws_message_send(ws_connection_t *conn, ws_message_t *msg);

void ws_quickstart(ws_server_t *serv, const char *hostname,
int port, ws_request_cb cb);

Expand Down
172 changes: 148 additions & 24 deletions src/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -224,13 +224,50 @@ static int check_websocket(ws_request_t *req) {
return 0;
}

ws_message_t *ws_message_copy_data(ws_connection_t *conn,
void *data, size_t len) {
ws_message_t *res = (ws_message_t *)malloc(conn->_message_size+len+1);
memcpy((char *)res + conn->_message_size, data, len);
res->refcnt = 1;
res->data = (char *)res + conn->_message_size;
res->data[len] = 0; // for easier dealing with that as string
res->length = len;
res->free_cb = NULL;
return res;
}

void ws_message_free(ws_message_t *msg) {
if(msg->free_cb) {
msg->free_cb(msg->data);
}
free(msg);
}

int ws_message_send(ws_connection_t *conn, ws_message_t *msg) {
if(conn->websocket_qlen >= conn->websocket_queue_size) {
errno = EXFULL;
return -1;
}
ws_MESSAGE_INCREF(msg);
int end = conn->websocket_qstart + conn->websocket_qlen;
if(end >= conn->websocket_queue_size) {
end -= conn->websocket_queue_size;
}
conn->websocket_queue[end] = msg;
if(!conn->reply_watch.active) {
ev_io_start(conn->loop, &conn->reply_watch);
}
conn->websocket_qlen += 1;
return 0;
}

static void read_websocket(struct ev_loop *loop, struct ev_io *watch,
int revents) {
ws_connection_t *conn = (ws_connection_t *)((char *)watch
- offsetof(ws_connection_t, watch));
if(revents & EV_READ) {
printf("READ\n");
int r = read(conn->watch.fd, conn->websocket_buf,
int r = read(conn->watch.fd,
conn->websocket_buf+conn->websocket_buf_offset,
conn->websocket_buf_size - conn->websocket_buf_offset);
if(r < 0) {
switch(errno) {
Expand All @@ -246,24 +283,102 @@ static void read_websocket(struct ev_loop *loop, struct ev_io *watch,
ws_connection_close(conn);
return;
}
char *end = conn->websocket_buf + conn->websocket_buf_offset;
conn->websocket_buf_offset += r;
if(conn->websocket_buf[0] != '\x00') {
ws_connection_close(conn);
return
int len = conn->websocket_buf_offset + r;
char *start = conn->websocket_buf;
while(1) {
if(len && start[0] != '\x00') {
ws_connection_close(conn);
return;
}
char *end = (char *)memchr(start, '\xFF', len);
if(end) {
ws_message_t *msg = ws_message_copy_data(conn,
start+1, end - start - 1);
ws_websocket_cb cb = conn->wsock_callbacks[WS_WEBSOCKET_CB_MESSAGE];
int res = -1;
if(cb) {
res = cb(conn, msg);
}
ws_MESSAGE_DECREF(msg);
if(res < 0) {
ws_connection_close(conn);
return;
}
} else {
if(start != conn->websocket_buf) {
memmove(conn->websocket_buf, start, len);
}
conn->websocket_buf_offset = len;
break;
}
len -= end - start + 1;
start = end+1;
}
}
assert(!(revents & EV_ERROR));
}
static void write_websocket(struct ev_loop *loop, struct ev_io *watch,
int revents) {
ws_connection_t *conn = (ws_connection_t *)((char *)watch
- offsetof(ws_connection_t, reply_watch));
if(revents & EV_WRITE) {
char se[2] = "\x00\xff";
ws_message_t *msg = conn->websocket_queue[conn->websocket_qstart];
struct iovec iov[3] = { //TODO: Merge several messages
{ iov_base: &se[0],
iov_len: 1 },
{ iov_base: msg->data,
iov_len: msg->length },
{ iov_base: &se[1],
iov_len: 1 },
};
struct iovec *riov = iov;
int iovcnt = 3;
if(conn->websocket_queue_offset) {
riov += 1;
if(conn->websocket_queue_offset >= msg->length+1) {
riov += 1;
} else {
iov[1].iov_base += conn->websocket_queue_offset-1;
iov[1].iov_len -= conn->websocket_queue_offset-1;
}
}
int res = writev(watch->fd, iov, iovcnt);
if(res <= 0) {
switch(errno) {
case EAGAIN:
case EINTR:
return;
default:
ws_connection_close(conn);
return;
}
}
if(memchr(conn->websocket_buf, '\xFF', conn->websocket_buf_offset)) {
// TODO: got message
conn->websocket_queue_offset += res;
if(conn->websocket_queue_offset >= msg->length+2) {
conn->websocket_qstart += 1;
if(!--conn->websocket_qlen) {
ev_io_stop(conn->loop, &conn->reply_watch);
}
}
}
assert(!(revents & EV_ERROR));
}

static int ws_enable_websocket(ws_request_t *req) {
ev_io_stop(req->conn->loop, &req->conn->watch);
assert(req->conn->last_req == req);

req->conn->websocket_buf_size = req->conn->max_message_size;
req->conn->websocket_buf = malloc(req->conn->websocket_buf_size);
ws_connection_t *conn = req->conn;
ev_io_stop(conn->loop, &conn->watch);
assert(conn->last_req == req);

conn->websocket_buf_size = conn->max_message_size;
conn->websocket_buf = malloc(conn->websocket_buf_size
+ conn->max_message_queue*sizeof(ws_message_t*));
conn->websocket_queue = (ws_message_t **)(conn->websocket_buf
+ conn->websocket_buf_size);
conn->websocket_queue_size = conn->max_message_queue;
conn->websocket_qstart = 0;
conn->websocket_qlen = 0;
conn->websocket_queue_offset = 0;
if(!req->conn->websocket_buf) {
return -1;
}
Expand Down Expand Up @@ -310,8 +425,8 @@ static int ws_enable_websocket(ws_request_t *req) {
ws_finish_headers(req);
ws_reply_data(req, md5, 16);

ev_set_cb(&req->conn->watch, read_websocket);
ev_io_start(req->conn->loop, &req->conn->watch);
ev_set_cb(&conn->watch, read_websocket);
ev_io_start(conn->loop, &conn->watch);
return 0;
}

Expand Down Expand Up @@ -543,10 +658,12 @@ static void ws_connection_init(int fd, ws_server_t *serv,
memcpy(&conn->addr, addr, sizeof(struct sockaddr_in));
conn->network_timeout = serv->network_timeout;
conn->_req_size = serv->_req_size;
conn->_message_size = serv->_message_size;
conn->serv = serv;
conn->loop = serv->loop;
conn->max_header_size = serv->max_header_size;
conn->max_message_size = serv->max_message_size;
conn->max_message_queue = serv->max_message_queue;
conn->close_on_finish = FALSE;
conn->last_req = NULL;
conn->first_req = NULL;
Expand Down Expand Up @@ -619,6 +736,7 @@ int ws_server_init(ws_server_t *serv, struct ev_loop *loop) {
serv->network_timeout = 10.0;
serv->max_header_size = 16384;
serv->max_message_size = 16384;
serv->max_message_queue = 1024;
serv->header_parser.index = ws_match_new();
serv->header_parser.count = WS_STD_HEADERS;
int hindex;
Expand Down Expand Up @@ -732,7 +850,7 @@ static void ws_send_reply(struct ev_loop *loop,
switch(errno) {
case EAGAIN:
case EINTR:
break;
return;
default:
ws_connection_close(req->conn);
return;
Expand All @@ -745,13 +863,19 @@ static void ws_send_reply(struct ev_loop *loop,
if(req->reply_pos >= req->reply_head_size + req->reply_body_size) {
req->reply_state = WS_R_SENT;
ev_io_stop(loop, watch);
}
ws_connection_t *conn = req->conn;
ws_request_finish(req);
if(conn->first_req) {
ws_start_reply(conn->first_req);
} else if(conn->close_on_finish) {
ws_connection_close(conn);

ws_connection_t *conn = req->conn;
ws_request_finish(req);
if(conn->first_req) {
ws_start_reply(conn->first_req);
} else if(conn->close_on_finish) {
ws_connection_close(conn);
} else if(conn->websocket_buf) {
ev_set_cb(&conn->reply_watch, write_websocket);
if(conn->websocket_qlen) {
ev_io_start(loop, watch);
}
}
}
}
assert(!(revents & EV_ERROR));
Expand Down
32 changes: 32 additions & 0 deletions test/httptest.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,5 +189,37 @@ def testHandshake(self):
resp = sock.recv(4096)
self.assertEquals(resp, websock_response)

def testEcho(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 8080))
sock.send(websock_request)
resp = sock.recv(4096)
self.assertEquals(resp, websock_response)
sock.send(b'\x00hello\xff')
resp = sock.recv(4096)
self.assertEquals(resp, b'\x00hello\xff')
sock.send(b'\x00hello\xff\x00world\xff')
time.sleep(0.1) # sorry, will fix that tomorrow :)
resp = sock.recv(4096)
self.assertEquals(resp, b'\x00hello\xff\x00world\xff')

def testParts(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 8080))
sock.send(websock_request)
resp = sock.recv(4096)
self.assertEquals(resp, websock_response)
sock.send(b'\x00hell')
time.sleep(0.01)
sock.send(b'o\xff')
resp = sock.recv(4096)
self.assertEquals(resp, b'\x00hello\xff')
sock.send(b'\x00hello\xff\x00wor')
resp = sock.recv(4096)
self.assertEquals(resp, b'\x00hello\xff')
sock.send(b'ld\xff')
resp = sock.recv(4096)
self.assertEquals(resp, b'\x00world\xff')

if __name__ == '__main__':
unittest.main()
5 changes: 3 additions & 2 deletions test/websocket.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ int websocket(ws_request_t *req) {
return 0;
}

int message(const char *message, int len) {
printf("MESSAGE ``%s''\n", message);
int message(ws_connection_t *conn, ws_message_t *msg) {
printf("MESSAGE [%d] ``%s''\n", msg->length, msg->data);
ws_message_send(conn, msg);
return 0;
}

Expand Down

0 comments on commit 4d6aec9

Please sign in to comment.