Permalink
Browse files

inspector: simplify buffer management

This change simplifies buffer management to address a number of issues
that original implementation had.

Original implementation was trying to reduce the number of allocations
by providing regions of the internal buffer to libuv IO code. This
introduced some potential use after free issues if the buffer grows
(or shrinks) while there's a pending read. It also had some confusing
math that resulted in issues on Windows version of the libuv.

PR-URL: #8257
Fixes: #8155
Reviewed-By: bnoordhuis - Ben Noordhuis <info@bnoordhuis.nl>
  • Loading branch information...
Eugene Ostroukhov authored and Fishrock123 committed Aug 24, 2016
1 parent 40e8421 commit 250a38023115417981e09e0d3d07a4a6a7df8a0a
Showing with 62 additions and 68 deletions.
  1. +1 −1 src/inspector_agent.cc
  2. +41 −58 src/inspector_socket.cc
  3. +16 −3 src/inspector_socket.h
  4. +4 −6 test/cctest/test_inspector_socket.cc
View
@@ -241,7 +241,7 @@ void InterruptCallback(v8::Isolate*, void* agent) {
}
void DataCallback(uv_stream_t* stream, ssize_t read, const uv_buf_t* buf) {
inspector_socket_t* socket = static_cast<inspector_socket_t*>(stream->data);
inspector_socket_t* socket = inspector_from_stream(stream);
static_cast<AgentImpl*>(socket->data)->OnRemoteDataIO(socket, read, buf);
}
View
@@ -48,16 +48,17 @@ static void dump_hex(const char* buf, size_t len) {
}
#endif
static void remove_from_beginning(std::vector<char>* buffer, size_t count) {
buffer->erase(buffer->begin(), buffer->begin() + count);
}
static void dispose_inspector(uv_handle_t* handle) {
inspector_socket_t* inspector =
reinterpret_cast<inspector_socket_t*>(handle->data);
inspector_socket_t* inspector = inspector_from_stream(handle);
inspector_cb close =
inspector->ws_mode ? inspector->ws_state->close_cb : nullptr;
inspector->buffer.clear();
delete inspector->ws_state;
inspector->ws_state = nullptr;
inspector->data_len = 0;
inspector->last_read_end = 0;
if (close) {
close(inspector, 0);
}
@@ -159,21 +160,19 @@ static std::vector<char> encode_frame_hybi17(const char* message,
return frame;
}
static ws_decode_result decode_frame_hybi17(const char* buffer_begin,
size_t data_length,
static ws_decode_result decode_frame_hybi17(const std::vector<char>& buffer,
bool client_frame,
int* bytes_consumed,
std::vector<char>* output,
bool* compressed) {
*bytes_consumed = 0;
if (data_length < 2)
if (buffer.size() < 2)
return FRAME_INCOMPLETE;
const char* p = buffer_begin;
const char* buffer_end = p + data_length;
auto it = buffer.begin();
unsigned char first_byte = *p++;
unsigned char second_byte = *p++;
unsigned char first_byte = *it++;
unsigned char second_byte = *it++;
bool final = (first_byte & kFinalBit) != 0;
bool reserved1 = (first_byte & kReserved1Bit) != 0;
@@ -215,12 +214,12 @@ static ws_decode_result decode_frame_hybi17(const char* buffer_begin,
} else {
return FRAME_ERROR;
}
if (buffer_end - p < extended_payload_length_size)
if ((buffer.end() - it) < extended_payload_length_size)
return FRAME_INCOMPLETE;
payload_length64 = 0;
for (int i = 0; i < extended_payload_length_size; ++i) {
payload_length64 <<= 8;
payload_length64 |= static_cast<unsigned char>(*p++);
payload_length64 |= static_cast<unsigned char>(*it++);
}
}
@@ -233,16 +232,16 @@ static ws_decode_result decode_frame_hybi17(const char* buffer_begin,
}
size_t payload_length = static_cast<size_t>(payload_length64);
if (data_length - kMaskingKeyWidthInBytes < payload_length)
if (buffer.size() - kMaskingKeyWidthInBytes < payload_length)
return FRAME_INCOMPLETE;
const char* masking_key = p;
const char* payload = p + kMaskingKeyWidthInBytes;
std::vector<char>::const_iterator masking_key = it;
std::vector<char>::const_iterator payload = it + kMaskingKeyWidthInBytes;
for (size_t i = 0; i < payload_length; ++i) // Unmask the payload.
output->insert(output->end(),
payload[i] ^ masking_key[i % kMaskingKeyWidthInBytes]);
size_t pos = p + kMaskingKeyWidthInBytes + payload_length - buffer_begin;
size_t pos = it + kMaskingKeyWidthInBytes + payload_length - buffer.begin();
*bytes_consumed = pos;
return closed ? FRAME_CLOSE : FRAME_OK;
}
@@ -280,13 +279,13 @@ static void close_frame_received(inspector_socket_t* inspector) {
}
}
static int parse_ws_frames(inspector_socket_t* inspector, size_t len) {
static int parse_ws_frames(inspector_socket_t* inspector) {
int bytes_consumed = 0;
std::vector<char> output;
bool compressed = false;
ws_decode_result r = decode_frame_hybi17(&inspector->buffer[0],
len, true /* client_frame */,
ws_decode_result r = decode_frame_hybi17(inspector->buffer,
true /* client_frame */,
&bytes_consumed, &output,
&compressed);
// Compressed frame means client is ignoring the headers and misbehaves
@@ -312,24 +311,22 @@ static int parse_ws_frames(inspector_socket_t* inspector, size_t len) {
}
static void prepare_buffer(uv_handle_t* stream, size_t len, uv_buf_t* buf) {
inspector_socket_t* inspector =
reinterpret_cast<inspector_socket_t*>(stream->data);
*buf = uv_buf_init(new char[len], len);
}
if (len > (inspector->buffer.size() - inspector->data_len)) {
int new_size = (inspector->data_len + len + BUFFER_GROWTH_CHUNK_SIZE - 1) /
BUFFER_GROWTH_CHUNK_SIZE *
BUFFER_GROWTH_CHUNK_SIZE;
inspector->buffer.resize(new_size);
static void reclaim_uv_buf(inspector_socket_t* inspector, const uv_buf_t* buf,
ssize_t read) {
if (read > 0) {
std::vector<char>& buffer = inspector->buffer;
buffer.insert(buffer.end(), buf->base, buf->base + read);
}
buf->base = &inspector->buffer[inspector->data_len];
buf->len = len;
inspector->data_len += len;
delete[] buf->base;
}
static void websockets_data_cb(uv_stream_t* stream, ssize_t nread,
const uv_buf_t* buf) {
inspector_socket_t* inspector =
reinterpret_cast<inspector_socket_t*>(stream->data);
inspector_socket_t* inspector = inspector_from_stream(stream);
reclaim_uv_buf(inspector, buf, nread);
if (nread < 0 || nread == UV_EOF) {
inspector->connection_eof = true;
if (!inspector->shutting_down && inspector->ws_state->read_cb) {
@@ -339,29 +336,19 @@ static void websockets_data_cb(uv_stream_t* stream, ssize_t nread,
#if DUMP_READS
printf("%s read %ld bytes\n", __FUNCTION__, nread);
if (nread > 0) {
dump_hex(buf->base, nread);
dump_hex(inspector->buffer.data() + inspector->buffer.size() - nread,
nread);
}
#endif
// 1. Move read bytes to continue the buffer
// Should be same as this is supposedly last buffer
ASSERT_EQ(buf->base + buf->len, &inspector->buffer[inspector->data_len]);
// Should be noop...
memmove(&inspector->buffer[inspector->last_read_end], buf->base, nread);
inspector->last_read_end += nread;
// 2. Parse.
int processed = 0;
do {
processed = parse_ws_frames(inspector, inspector->last_read_end);
processed = parse_ws_frames(inspector);
// 3. Fix the buffer size & length
if (processed > 0) {
memmove(&inspector->buffer[0], &inspector->buffer[processed],
inspector->last_read_end - processed);
inspector->last_read_end -= processed;
inspector->data_len = inspector->last_read_end;
remove_from_beginning(&inspector->buffer, processed);
}
} while (processed > 0 && inspector->data_len > 0);
} while (processed > 0 && !inspector->buffer.empty());
}
}
@@ -435,7 +422,6 @@ static void handshake_complete(inspector_socket_t* inspector) {
uv_read_stop(reinterpret_cast<uv_stream_t*>(&inspector->client));
handshake_cb callback = inspector->http_parsing_state->callback;
inspector->ws_state = new ws_state_s();
inspector->last_read_end = 0;
inspector->ws_mode = true;
callback(inspector, kInspectorHandshakeUpgraded,
inspector->http_parsing_state->path);
@@ -448,8 +434,7 @@ static void cleanup_http_parsing_state(inspector_socket_t* inspector) {
static void report_handshake_failure_cb(uv_handle_t* handle) {
dispose_inspector(handle);
inspector_socket_t* inspector =
static_cast<inspector_socket_t*>(handle->data);
inspector_socket_t* inspector = inspector_from_stream(handle);
handshake_cb cb = inspector->http_parsing_state->callback;
cleanup_http_parsing_state(inspector);
cb(inspector, kInspectorHandshakeFailed, std::string());
@@ -481,8 +466,7 @@ static void init_handshake(inspector_socket_t* inspector);
static int message_complete_cb(http_parser* parser) {
inspector_socket_t* inspector =
reinterpret_cast<inspector_socket_t*>(parser->data);
struct http_parsing_state_s* state =
(struct http_parsing_state_s*) inspector->http_parsing_state;
struct http_parsing_state_s* state = inspector->http_parsing_state;
if (parser->method != HTTP_GET) {
handshake_failed(inspector);
} else if (!parser->upgrade) {
@@ -527,22 +511,22 @@ static void data_received_cb(uv_stream_s* client, ssize_t nread,
printf("[%s:%d] %s\n", __FUNCTION__, __LINE__, uv_err_name(nread));
}
#endif
inspector_socket_t* inspector =
reinterpret_cast<inspector_socket_t*>((client->data));
inspector_socket_t* inspector = inspector_from_stream(client);
reclaim_uv_buf(inspector, buf, nread);
if (nread < 0 || nread == UV_EOF) {
close_and_report_handshake_failure(inspector);
} else {
http_parsing_state_s* state = inspector->http_parsing_state;
http_parser* parser = &state->parser;
http_parser_execute(parser, &state->parser_settings, &inspector->buffer[0],
nread);
http_parser_execute(parser, &state->parser_settings,
inspector->buffer.data(), nread);
remove_from_beginning(&inspector->buffer, nread);
if (parser->http_errno != HPE_OK) {
handshake_failed(inspector);
}
if (inspector->http_parsing_state->done) {
cleanup_http_parsing_state(inspector);
}
inspector->data_len = 0;
}
}
@@ -576,7 +560,6 @@ int inspector_accept(uv_stream_t* server, inspector_socket_t* inspector,
err = uv_accept(server, client);
}
if (err == 0) {
client->data = inspector;
init_handshake(inspector);
inspector->http_parsing_state->callback = callback;
err = uv_read_start(client, prepare_buffer,
View
@@ -2,6 +2,8 @@
#define SRC_INSPECTOR_SOCKET_H_
#include "http_parser.h"
#include "util.h"
#include "util-inl.h"
#include "uv.h"
#include <string>
@@ -48,8 +50,6 @@ struct inspector_socket_s {
struct http_parsing_state_s* http_parsing_state;
struct ws_state_s* ws_state;
std::vector<char> buffer;
size_t data_len;
size_t last_read_end;
uv_tcp_t client;
bool ws_mode;
bool shutting_down;
@@ -64,12 +64,25 @@ int inspector_accept(uv_stream_t* server, struct inspector_socket_s* inspector,
void inspector_close(struct inspector_socket_s* inspector,
inspector_cb callback);
// Callbacks will receive handles that has inspector in data field...
// Callbacks will receive stream handles. Use inspector_from_stream to get
// inspector_socket_t* from the stream handle.
int inspector_read_start(struct inspector_socket_s* inspector, uv_alloc_cb,
uv_read_cb);
void inspector_read_stop(struct inspector_socket_s* inspector);
void inspector_write(struct inspector_socket_s* inspector,
const char* data, size_t len);
bool inspector_is_active(const struct inspector_socket_s* inspector);
inline inspector_socket_t* inspector_from_stream(uv_tcp_t* stream) {
return node::ContainerOf(&inspector_socket_t::client, stream);
}
inline inspector_socket_t* inspector_from_stream(uv_stream_t* stream) {
return inspector_from_stream(reinterpret_cast<uv_tcp_t*>(stream));
}
inline inspector_socket_t* inspector_from_stream(uv_handle_t* stream) {
return inspector_from_stream(reinterpret_cast<uv_tcp_t*>(stream));
}
#endif // SRC_INSPECTOR_SOCKET_H_
@@ -178,7 +178,7 @@ struct expectations {
static void grow_expects_buffer(uv_handle_t* stream, size_t size, uv_buf_t* b) {
expectations* expects = static_cast<expectations*>(
(static_cast<inspector_socket_t*>(stream->data))->data);
inspector_from_stream(stream)->data);
size_t end = expects->actual_end;
// Grow the buffer in chunks of 64k.
size_t new_length = (end + size + 65535) & ~((size_t) 0xFFFF);
@@ -213,7 +213,7 @@ static void grow_expects_buffer(uv_handle_t* stream, size_t size, uv_buf_t* b) {
static void save_read_data(uv_stream_t* stream, ssize_t nread,
const uv_buf_t* buf) {
expectations* expects =static_cast<expectations*>(
(static_cast<inspector_socket_t*>(stream->data))->data);
inspector_from_stream(stream)->data);
expects->err_code = nread < 0 ? nread : 0;
if (nread > 0) {
expects->actual_end += nread;
@@ -254,8 +254,7 @@ static void expect_on_server(const char* data, size_t len) {
static void inspector_record_error_code(uv_stream_t* stream, ssize_t nread,
const uv_buf_t* buf) {
inspector_socket_t *inspector =
reinterpret_cast<inspector_socket_t*>(stream->data);
inspector_socket_t *inspector = inspector_from_stream(stream);
// Increment instead of assign is to ensure the function is only called once
*(static_cast<int *>(inspector->data)) += nread;
}
@@ -760,8 +759,7 @@ static void CleanupSocketAfterEOF_close_cb(inspector_socket_t* inspector,
static void CleanupSocketAfterEOF_read_cb(uv_stream_t* stream, ssize_t nread,
const uv_buf_t* buf) {
EXPECT_EQ(UV_EOF, nread);
inspector_socket_t* insp =
reinterpret_cast<inspector_socket_t*>(stream->data);
inspector_socket_t* insp = inspector_from_stream(stream);
inspector_close(insp, CleanupSocketAfterEOF_close_cb);
}

0 comments on commit 250a380

Please sign in to comment.