Skip to content

Commit

Permalink
MB-2689 - add timeout to moxi SASL auth & bucket select requests
Browse files Browse the repository at this point in the history
In this change, moxi SASl auth and bucket select are still
synchronous, but have additional configurable timeouts (via the
downstream_timeout configuration parameter).

This is done by adding a select() with timeout mcs_io_read(),
which shouldn't be too horrible as SASL auth should be very
infrequent due to long-lived, reused connections.

The alternative solution of rewriting SASL auth and bucket-select, in
comparison, to be asynchronous is a lot of effort.

Change-Id: I61f31fb3a4c4a994ff79d6a9f909ca578ae02236
Reviewed-on: http://review.membase.org/3604
Reviewed-by: Sean Lynch <seanl@literati.org>
Tested-by: Sean Lynch <seanl@literati.org>
  • Loading branch information
steveyen authored and Sean Lynch committed Nov 10, 2010
1 parent 8d7fd65 commit b9242d3
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 6 deletions.
1 change: 1 addition & 0 deletions Makefile.am
Expand Up @@ -114,6 +114,7 @@ test: memcached-debug sizes testapp
$(srcdir)/testapp
./t/moxi_all.pl
./t/issue-MB-2660.sh
./t/issue-MB-2689.sh

test_gcov: test
@if test `basename $(PROFILER)` = "gcov"; then \
Expand Down
22 changes: 18 additions & 4 deletions cproxy.c
@@ -1,4 +1,5 @@
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */

#include "config.h"
#include <stdio.h>
#include <stdlib.h>
Expand Down Expand Up @@ -2608,8 +2609,14 @@ bool cproxy_auth_downstream(mcs_server_st *server,

protocol_binary_response_header res = { .bytes = {0} };

struct timeval *timeout = NULL;
if (behavior->downstream_timeout.tv_sec != 0 &&
behavior->downstream_timeout.tv_usec != 0) {
timeout = &behavior->downstream_timeout;
}

if (mcs_io_read(fd, &res.bytes,
sizeof(res.bytes)) == MCS_SUCCESS &&
sizeof(res.bytes), timeout) == MCS_SUCCESS &&
res.response.magic == PROTOCOL_BINARY_RES) {
res.response.status = ntohs(res.response.status);
res.response.keylen = ntohs(res.response.keylen);
Expand All @@ -2620,7 +2627,7 @@ bool cproxy_auth_downstream(mcs_server_st *server,
int len = res.response.bodylen;
while (len > 0) {
int amt = (len > (int) sizeof(buf) ? (int) sizeof(buf) : len);
if (mcs_io_read(fd, buf, amt) != MCS_SUCCESS) {
if (mcs_io_read(fd, buf, amt, timeout) != MCS_SUCCESS) {
if (settings.verbose > 1) {
moxi_log_write("auth could not read response body (%d)\n",
usr, amt);
Expand Down Expand Up @@ -2698,8 +2705,15 @@ bool cproxy_bucket_downstream(mcs_server_st *server,
}

protocol_binary_response_header res = { .bytes = {0} };

struct timeval *timeout = NULL;
if (behavior->downstream_timeout.tv_sec != 0 &&
behavior->downstream_timeout.tv_usec != 0) {
timeout = &behavior->downstream_timeout;
}

if (mcs_io_read(fd, &res.bytes,
sizeof(res.bytes)) == MCS_SUCCESS &&
sizeof(res.bytes), timeout) == MCS_SUCCESS &&
res.response.magic == PROTOCOL_BINARY_RES) {
res.response.status = ntohs(res.response.status);
res.response.keylen = ntohs(res.response.keylen);
Expand All @@ -2712,7 +2726,7 @@ bool cproxy_bucket_downstream(mcs_server_st *server,
int len = res.response.bodylen;
while (len > 0) {
int amt = (len > (int) sizeof(buf) ? (int) sizeof(buf) : len);
if (mcs_io_read(fd, buf, amt) != MCS_SUCCESS) {
if (mcs_io_read(fd, buf, amt, timeout) != MCS_SUCCESS) {
return false;
}
len -= amt;
Expand Down
14 changes: 13 additions & 1 deletion mcs.c
Expand Up @@ -577,7 +577,7 @@ ssize_t mcs_io_write(int fd, const void *buffer, size_t length) {
return write(fd, buffer, length);
}

mcs_return mcs_io_read(int fd, void *dta, size_t size) {
mcs_return mcs_io_read(int fd, void *dta, size_t size, struct timeval *timeout) {
// We use a blocking read, but reset back to non-blocking
// or the original state when we're done.
//
Expand All @@ -591,6 +591,18 @@ mcs_return mcs_io_read(int fd, void *dta, size_t size) {
size_t done = 0;

while (done < size) {
if (timeout != NULL) {
fd_set readfds;
FD_ZERO(&readfds);
FD_SET(fd, &readfds);

if (select(1, &readfds, NULL, NULL, timeout) != 1) {
fcntl(fd, F_SETFL, flags | O_NONBLOCK);

return MCS_FAILURE;
}
}

ssize_t n = read(fd, data + done, size - done);
if (n == -1) {
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
Expand Down
2 changes: 1 addition & 1 deletion mcs.h
Expand Up @@ -73,7 +73,7 @@ mcs_return mcs_server_st_connect(mcs_server_st *ptr,
bool blocking);

ssize_t mcs_io_write(int fd, const void *buffer, size_t length);
mcs_return mcs_io_read(int fd, void *dta, size_t size);
mcs_return mcs_io_read(int fd, void *dta, size_t size, struct timeval *timeout);
void mcs_io_reset(int fd);

const char *mcs_server_st_hostname(mcs_server_st *ptr);
Expand Down
33 changes: 33 additions & 0 deletions t/issue-MB-2689.sh
@@ -0,0 +1,33 @@
#!/bin/sh

echo starting moxi...

# Note, if you removed or zero'ed the timeouts, moxi would also
# (correctly, expectedly) hang at client request 2.

./moxi -d -P /tmp/moxi-2689-test-moxi.pid \
-z 11266=127.0.0.1:11277 -t 1 \
-Z downstream_conn_max=1,downstream_max=0,downstream_timeout=100,wait_queue_timeout=100

echo starting memcached simulant...

./moxi -d -P /tmp/moxi-2689-test-memcached.pid -p 11277

echo client request 1 - expect NOT_FOUND
time (echo incr a 1 | nc 127.0.0.1 11266)

echo stopping memcached simulant...
kill -STOP `cat /tmp/moxi-2689-test-memcached.pid`

echo client request 2 - hangs in bug MB-2689, instead expect SERVER_ERROR
time (echo incr a 1 | nc 127.0.0.1 11266)

echo client request 3 - expect SERVER_ERROR
time (echo incr a 1 | nc 127.0.0.1 11266)

echo client request 4 - expect SERVER_ERROR
time (echo incr a 1 | nc 127.0.0.1 11266)

echo OK - no more hanging
killall moxi

0 comments on commit b9242d3

Please sign in to comment.