Skip to content

Commit

Permalink
nghttpx: Use Memchunk based read buffer for frontend connection
Browse files Browse the repository at this point in the history
Previously, we have dedicated read buffer for each frontend
connection.  With this commit, the buffer spaces are only used when
needed, and pooled if they are not used.  This reduces memory usage
for idle client connections.
  • Loading branch information
tatsuhiro-t committed Jan 8, 2017
1 parent e8b2508 commit 4fa150c
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 44 deletions.
96 changes: 96 additions & 0 deletions src/memchunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,102 @@ inline int limit_iovec(struct iovec *iov, int iovcnt, size_t max) {
return iovcnt;
}

// MemchunkBuffer is similar to Buffer, but it uses pooled Memchunk
// for its underlying buffer.
template <typename Memchunk> struct MemchunkBuffer {
MemchunkBuffer(Pool<Memchunk> *pool) : pool(pool), chunk(nullptr) {}
MemchunkBuffer(const MemchunkBuffer &) = delete;
MemchunkBuffer(MemchunkBuffer &&other) noexcept
: pool(other.pool), chunk(other.chunk) {
other.chunk = nullptr;
}
MemchunkBuffer &operator=(const MemchunkBuffer &) = delete;
MemchunkBuffer &operator=(MemchunkBuffer &&other) noexcept {
if (this == &other) {
return *this;
}

pool = other.pool;
chunk = other.chunk;

other.chunk = nullptr;

return *this;
}

~MemchunkBuffer() {
if (!pool || !chunk) {
return;
}
pool->recycle(chunk);
}

// Ensures that the underlying buffer is allocated.
void ensure_chunk() {
if (chunk) {
return;
}
chunk = pool->get();
}

// Releases the underlying buffer.
void release_chunk() {
if (!chunk) {
return;
}
pool->recycle(chunk);
chunk = nullptr;
}

// Returns true if the underlying buffer is allocated.
bool chunk_avail() const { return chunk != nullptr; }

// The functions below must be called after the underlying buffer is
// allocated (use ensure_chunk).

// MemchunkBuffer provides the same interface functions with Buffer.
// Since we has chunk as a member variable, pos and last are
// implemented as wrapper functions.

uint8_t *pos() const { return chunk->pos; }
uint8_t *last() const { return chunk->last; }

size_t rleft() const { return chunk->len(); }
size_t wleft() const { return chunk->left(); }
size_t write(const void *src, size_t count) {
count = std::min(count, wleft());
auto p = static_cast<const uint8_t *>(src);
chunk->last = std::copy_n(p, count, chunk->last);
return count;
}
size_t write(size_t count) {
count = std::min(count, wleft());
chunk->last += count;
return count;
}
size_t drain(size_t count) {
count = std::min(count, rleft());
chunk->pos += count;
return count;
}
size_t drain_reset(size_t count) {
count = std::min(count, rleft());
std::copy(chunk->pos + count, chunk->last, std::begin(chunk->buf));
chunk->last = std::begin(chunk->buf) + (chunk->last - (chunk->pos + count));
chunk->pos = std::begin(chunk->buf);
return count;
}
void reset() { chunk->reset(); }
uint8_t *begin() { return std::begin(chunk->buf); }
uint8_t &operator[](size_t n) { return chunk->buf[n]; }
const uint8_t &operator[](size_t n) const { return chunk->buf[n]; }

Pool<Memchunk> *pool;
Memchunk *chunk;
};

using DefaultMemchunkBuffer = MemchunkBuffer<Memchunk16K>;

} // namespace nghttp2

#endif // MEMCHUNK_H
86 changes: 49 additions & 37 deletions src/shrpx_client_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ void writecb(struct ev_loop *loop, ev_io *w, int revents) {
int ClientHandler::noop() { return 0; }

int ClientHandler::read_clear() {
rb_.ensure_chunk();
for (;;) {
if (rb_.rleft() && on_read() != 0) {
return -1;
Expand All @@ -132,9 +133,12 @@ int ClientHandler::read_clear() {
return 0;
}

auto nread = conn_.read_clear(rb_.last, rb_.wleft());
auto nread = conn_.read_clear(rb_.last(), rb_.wleft());

if (nread == 0) {
if (rb_.rleft() == 0) {
rb_.release_chunk();
}
return 0;
}

Expand Down Expand Up @@ -209,6 +213,8 @@ int ClientHandler::tls_handshake() {
int ClientHandler::read_tls() {
ERR_clear_error();

rb_.ensure_chunk();

for (;;) {
// we should process buffered data first before we read EOF.
if (rb_.rleft() && on_read() != 0) {
Expand All @@ -225,9 +231,12 @@ int ClientHandler::read_tls() {
return 0;
}

auto nread = conn_.read_tls(rb_.last, rb_.wleft());
auto nread = conn_.read_tls(rb_.last(), rb_.wleft());

if (nread == 0) {
if (rb_.rleft() == 0) {
rb_.release_chunk();
}
return 0;
}

Expand Down Expand Up @@ -303,7 +312,7 @@ int ClientHandler::upstream_write() {
int ClientHandler::upstream_http2_connhd_read() {
auto nread = std::min(left_connhd_len_, rb_.rleft());
if (memcmp(NGHTTP2_CLIENT_MAGIC + NGHTTP2_CLIENT_MAGIC_LEN - left_connhd_len_,
rb_.pos, nread) != 0) {
rb_.pos(), nread) != 0) {
// There is no downgrade path here. Just drop the connection.
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "invalid client connection header";
Expand Down Expand Up @@ -332,7 +341,7 @@ int ClientHandler::upstream_http2_connhd_read() {
int ClientHandler::upstream_http1_connhd_read() {
auto nread = std::min(left_connhd_len_, rb_.rleft());
if (memcmp(NGHTTP2_CLIENT_MAGIC + NGHTTP2_CLIENT_MAGIC_LEN - left_connhd_len_,
rb_.pos, nread) != 0) {
rb_.pos(), nread) != 0) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "This is HTTP/1.1 connection, "
<< "but may be upgraded to HTTP/2 later.";
Expand Down Expand Up @@ -386,6 +395,7 @@ ClientHandler::ClientHandler(Worker *worker, int fd, SSL *ssl,
// so the required space is 64 + 48 + 16 + 48 + 16 + 16 + 16 +
// 32 + 8 + 8 * 8 = 328.
balloc_(512, 512),
rb_(worker->get_mcpool()),
conn_(worker->get_loop(), fd, ssl, worker->get_mcpool(),
get_config()->conn.upstream.timeout.write,
get_config()->conn.upstream.timeout.read,
Expand Down Expand Up @@ -647,9 +657,11 @@ int ClientHandler::do_read() { return read_(*this); }
int ClientHandler::do_write() { return write_(*this); }

int ClientHandler::on_read() {
auto rv = on_read_(*this);
if (rv != 0) {
return rv;
if (rb_.chunk_avail()) {
auto rv = on_read_(*this);
if (rv != 0) {
return rv;
}
}
conn_.handle_tls_pending_read();
return 0;
Expand Down Expand Up @@ -1325,7 +1337,7 @@ ssize_t parse_proxy_line_port(const uint8_t *first, const uint8_t *last) {

int ClientHandler::on_proxy_protocol_finish() {
if (conn_.tls.ssl) {
conn_.tls.rbuf.append(rb_.pos, rb_.rleft());
conn_.tls.rbuf.append(rb_.pos(), rb_.rleft());
rb_.reset();
}

Expand All @@ -1346,7 +1358,7 @@ int ClientHandler::proxy_protocol_read() {
CLOG(INFO, this) << "PROXY-protocol: Started";
}

auto first = rb_.pos;
auto first = rb_.pos();

// NULL character really destroys functions which expects NULL
// terminated string. We won't expect it in PROXY protocol line, so
Expand All @@ -1355,12 +1367,12 @@ int ClientHandler::proxy_protocol_read() {

constexpr size_t MAX_PROXY_LINELEN = 107;

auto bufend = rb_.pos + std::min(MAX_PROXY_LINELEN, rb_.rleft());
auto bufend = rb_.pos() + std::min(MAX_PROXY_LINELEN, rb_.rleft());

auto end =
std::find_first_of(rb_.pos, bufend, std::begin(chrs), std::end(chrs));
std::find_first_of(rb_.pos(), bufend, std::begin(chrs), std::end(chrs));

if (end == bufend || *end == '\0' || end == rb_.pos || *(end - 1) != '\r') {
if (end == bufend || *end == '\0' || end == rb_.pos() || *(end - 1) != '\r') {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "PROXY-protocol-v1: No ending CR LF sequence found";
}
Expand All @@ -1371,14 +1383,14 @@ int ClientHandler::proxy_protocol_read() {

constexpr auto HEADER = StringRef::from_lit("PROXY ");

if (static_cast<size_t>(end - rb_.pos) < HEADER.size()) {
if (static_cast<size_t>(end - rb_.pos()) < HEADER.size()) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "PROXY-protocol-v1: PROXY version 1 ID not found";
}
return -1;
}

if (!util::streq(HEADER, StringRef{rb_.pos, HEADER.size()})) {
if (!util::streq(HEADER, StringRef{rb_.pos(), HEADER.size()})) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "PROXY-protocol-v1: Bad PROXY protocol version 1 ID";
}
Expand All @@ -1389,22 +1401,22 @@ int ClientHandler::proxy_protocol_read() {

int family;

if (rb_.pos[0] == 'T') {
if (end - rb_.pos < 5) {
if (rb_.pos()[0] == 'T') {
if (end - rb_.pos() < 5) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "PROXY-protocol-v1: INET protocol family not found";
}
return -1;
}

if (rb_.pos[1] != 'C' || rb_.pos[2] != 'P') {
if (rb_.pos()[1] != 'C' || rb_.pos()[2] != 'P') {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "PROXY-protocol-v1: Unknown INET protocol family";
}
return -1;
}

switch (rb_.pos[3]) {
switch (rb_.pos()[3]) {
case '4':
family = AF_INET;
break;
Expand All @@ -1420,26 +1432,26 @@ int ClientHandler::proxy_protocol_read() {

rb_.drain(5);
} else {
if (end - rb_.pos < 7) {
if (end - rb_.pos() < 7) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "PROXY-protocol-v1: INET protocol family not found";
}
return -1;
}
if (!util::streq_l("UNKNOWN", rb_.pos, 7)) {
if (!util::streq_l("UNKNOWN", rb_.pos(), 7)) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "PROXY-protocol-v1: Unknown INET protocol family";
}
return -1;
}

rb_.drain(end + 2 - rb_.pos);
rb_.drain(end + 2 - rb_.pos());

return on_proxy_protocol_finish();
}

// source address
auto token_end = std::find(rb_.pos, end, ' ');
auto token_end = std::find(rb_.pos(), end, ' ');
if (token_end == end) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "PROXY-protocol-v1: Source address not found";
Expand All @@ -1448,20 +1460,20 @@ int ClientHandler::proxy_protocol_read() {
}

*token_end = '\0';
if (!util::numeric_host(reinterpret_cast<const char *>(rb_.pos), family)) {
if (!util::numeric_host(reinterpret_cast<const char *>(rb_.pos()), family)) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "PROXY-protocol-v1: Invalid source address";
}
return -1;
}

auto src_addr = rb_.pos;
auto src_addrlen = token_end - rb_.pos;
auto src_addr = rb_.pos();
auto src_addrlen = token_end - rb_.pos();

rb_.drain(token_end - rb_.pos + 1);
rb_.drain(token_end - rb_.pos() + 1);

// destination address
token_end = std::find(rb_.pos, end, ' ');
token_end = std::find(rb_.pos(), end, ' ');
if (token_end == end) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "PROXY-protocol-v1: Destination address not found";
Expand All @@ -1470,7 +1482,7 @@ int ClientHandler::proxy_protocol_read() {
}

*token_end = '\0';
if (!util::numeric_host(reinterpret_cast<const char *>(rb_.pos), family)) {
if (!util::numeric_host(reinterpret_cast<const char *>(rb_.pos()), family)) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "PROXY-protocol-v1: Invalid destination address";
}
Expand All @@ -1479,26 +1491,26 @@ int ClientHandler::proxy_protocol_read() {

// Currently we don't use destination address

rb_.drain(token_end - rb_.pos + 1);
rb_.drain(token_end - rb_.pos() + 1);

// source port
auto n = parse_proxy_line_port(rb_.pos, end);
if (n <= 0 || *(rb_.pos + n) != ' ') {
auto n = parse_proxy_line_port(rb_.pos(), end);
if (n <= 0 || *(rb_.pos() + n) != ' ') {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "PROXY-protocol-v1: Invalid source port";
}
return -1;
}

rb_.pos[n] = '\0';
auto src_port = rb_.pos;
rb_.pos()[n] = '\0';
auto src_port = rb_.pos();
auto src_portlen = n;

rb_.drain(n + 1);

// destination port
n = parse_proxy_line_port(rb_.pos, end);
if (n <= 0 || rb_.pos + n != end) {
n = parse_proxy_line_port(rb_.pos(), end);
if (n <= 0 || rb_.pos() + n != end) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "PROXY-protocol-v1: Invalid destination port";
}
Expand All @@ -1507,14 +1519,14 @@ int ClientHandler::proxy_protocol_read() {

// Currently we don't use destination port

rb_.drain(end + 2 - rb_.pos);
rb_.drain(end + 2 - rb_.pos());

ipaddr_ =
make_string_ref(balloc_, StringRef{src_addr, src_addr + src_addrlen});
port_ = make_string_ref(balloc_, StringRef{src_port, src_port + src_portlen});

if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "PROXY-protocol-v1: Finished, " << (rb_.pos - first)
CLOG(INFO, this) << "PROXY-protocol-v1: Finished, " << (rb_.pos() - first)
<< " bytes read";
}

Expand Down
Loading

0 comments on commit 4fa150c

Please sign in to comment.