From 38a79cdc42b952ac0291d249241ca6499fc269aa Mon Sep 17 00:00:00 2001 From: Hugh Waite Date: Wed, 23 Sep 2015 21:59:09 +0100 Subject: [PATCH] websocket: Add support for SIP message fragmentation - websocket connections are created with a BUF_SIZE (64K) buffer used for concatenating frames - continuation frames (fragments) are supported for the SIP sub-protocol --- modules/websocket/ws_conn.c | 5 +-- modules/websocket/ws_conn.h | 2 ++ modules/websocket/ws_frame.c | 59 +++++++++++++++++++++++++++++------- 3 files changed, 53 insertions(+), 13 deletions(-) diff --git a/modules/websocket/ws_conn.c b/modules/websocket/ws_conn.c index 87d593d966e..b8ede8b4fcc 100644 --- a/modules/websocket/ws_conn.c +++ b/modules/websocket/ws_conn.c @@ -201,19 +201,20 @@ int wsconn_add(struct receive_info rcv, unsigned int sub_protocol) LM_DBG("wsconn_add id [%d]\n", id); /* Allocate and fill in new WebSocket connection */ - wsc = shm_malloc(sizeof(ws_connection_t)); + wsc = shm_malloc(sizeof(ws_connection_t) + BUF_SIZE); if (wsc == NULL) { LM_ERR("allocating shared memory\n"); return -1; } - memset(wsc, 0, sizeof(ws_connection_t)); + memset(wsc, 0, sizeof(ws_connection_t) + BUF_SIZE); wsc->id = id; wsc->id_hash = id_hash; wsc->state = WS_S_OPEN; wsc->rcv = rcv; wsc->sub_protocol = sub_protocol; wsc->run_event = 0; + wsc->frag_buf.s = ((char*)wsc) + sizeof(ws_connection_t); atomic_set(&wsc->refcnt, 0); LM_DBG("wsconn_add new wsc => [%p], ref => [%d]\n", wsc, atomic_get(&wsc->refcnt)); diff --git a/modules/websocket/ws_conn.h b/modules/websocket/ws_conn.h index 029d570c871..df6400a6b7d 100644 --- a/modules/websocket/ws_conn.h +++ b/modules/websocket/ws_conn.h @@ -60,6 +60,8 @@ typedef struct ws_connection atomic_t refcnt; int run_event; + + str frag_buf; } ws_connection_t; typedef struct diff --git a/modules/websocket/ws_frame.c b/modules/websocket/ws_frame.c index e8907702bf3..16f79015838 100644 --- a/modules/websocket/ws_frame.c +++ b/modules/websocket/ws_frame.c @@ -431,15 +431,6 @@ static int decode_and_validate_ws_frame(ws_frame_t *frame, frame->opcode = (buf[0] & 0xff) & BYTE0_MASK_OPCODE; frame->mask = (buf[1] & 0xff) & BYTE1_MASK_MASK; - if (!frame->fin) - { - LM_WARN("WebSocket fragmentation not supported in the sip " - "sub-protocol\n"); - *err_code = 1002; - *err_text = str_status_protocol_error; - return -1; - } - if (frame->rsv1 || frame->rsv2 || frame->rsv3) { LM_WARN("WebSocket reserved fields with non-zero values\n"); @@ -450,6 +441,10 @@ static int decode_and_validate_ws_frame(ws_frame_t *frame, switch(frame->opcode) { + case OPCODE_CONTINUATION: + LM_DBG("supported continuation frame: 0x%x\n", + (unsigned char) frame->opcode); + break; case OPCODE_TEXT_FRAME: case OPCODE_BINARY_FRAME: LM_DBG("supported non-control frame: 0x%x\n", @@ -651,6 +646,36 @@ int ws_frame_receive(void *data) switch(opcode) { + case OPCODE_CONTINUATION: + if (likely(frame.wsc->sub_protocol == SUB_PROTOCOL_SIP)) + { + if (frame.wsc->frag_buf.len + frame.payload_len >= BUF_SIZE) + { + LM_ERR("Buffer overflow assembling websocket fragments %d + %d = %d\n", frame.wsc->frag_buf.len, frame.payload_len, frame.wsc->frag_buf.len + frame.payload_len); + wsconn_put(frame.wsc); + return -1; + } + memcpy(frame.wsc->frag_buf.s + frame.wsc->frag_buf.len, frame.payload_data, frame.payload_len); + frame.wsc->frag_buf.len += frame.payload_len; + frame.wsc->frag_buf.s[frame.wsc->frag_buf.len] = '\0'; + + if (frame.fin) + { + ret = receive_msg(frame.wsc->frag_buf.s, + frame.wsc->frag_buf.len, + tcpinfo->rcv); + wsconn_put(frame.wsc); + return ret; + } + wsconn_put(frame.wsc); + return 0; + } + else + { + LM_ERR("Unsupported fragmented sub-protocol"); + wsconn_put(frame.wsc); + return -1; + } case OPCODE_TEXT_FRAME: case OPCODE_BINARY_FRAME: if (likely(frame.wsc->sub_protocol == SUB_PROTOCOL_SIP)) @@ -659,11 +684,23 @@ int ws_frame_receive(void *data) frame.payload_data); update_stat(ws_sip_received_frames, 1); - wsconn_put(frame.wsc); + if (frame.fin) + { - return receive_msg(frame.payload_data, + wsconn_put(frame.wsc); + + return receive_msg(frame.payload_data, frame.payload_len, tcpinfo->rcv); + } + else + { + memcpy(frame.wsc->frag_buf.s, frame.payload_data, frame.payload_len); + frame.wsc->frag_buf.len = frame.payload_len; + frame.wsc->frag_buf.s[frame.wsc->frag_buf.len] = '\0'; + wsconn_put(frame.wsc); + return 0; + } } else if (frame.wsc->sub_protocol == SUB_PROTOCOL_MSRP) {