Skip to content

Commit

Permalink
Add RequestHandler unit tests regarding handling of slow clients.
Browse files Browse the repository at this point in the history
  • Loading branch information
FooBarWidget committed Mar 7, 2012
1 parent f4256d8 commit 9cfa160
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 18 deletions.
4 changes: 4 additions & 0 deletions ext/common/EventedBufferedInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,10 @@ class EventedBufferedInput: public enable_shared_from_this< EventedBufferedInput
return !paused;
}

bool endReached() const {
return state == END_OF_STREAM;
}

const FileDescriptor &getFd() const {
return fd;
}
Expand Down
40 changes: 24 additions & 16 deletions ext/common/agents/HelperAgent/RequestHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ class Client: public enable_shared_from_this<Client> {
clientOutputPipe->reset(getSafeLibev());
clientOutputPipe->start();
clientOutputWatcher.set(getLoop());
clientOutputWatcher.set(_fd, ev::WRITE);

appOutputWatcher.set(getLoop());

Expand Down Expand Up @@ -459,21 +460,24 @@ class Client: public enable_shared_from_this<Client> {
void inspect(Stream &stream) const {
const char *indent = " ";

stream << indent << "state = " << getStateName() << "\n";
stream << indent << "state = " << getStateName() << "\n";
if (session == NULL) {
stream << indent << "session = NULL\n";
stream << indent << "session = NULL\n";
} else {
stream << indent << "session pid = " << session->getPid() << "\n";
stream << indent << "session gupid = " << session->getGupid() << "\n";
stream << indent << "session initiated = " << boolStr(session->initiated()) << "\n";
stream << indent << "session pid = " << session->getPid() << "\n";
stream << indent << "session gupid = " << session->getGupid() << "\n";
stream << indent << "session initiated = " << boolStr(session->initiated()) << "\n";
}
stream
<< indent << "requestBodyIsBuffered = " << boolStr(requestBodyIsBuffered) << "\n"
<< indent << "clientInput started = " << boolStr(clientInput->isStarted()) << "\n"
<< indent << "clientOutputPipe started = " << boolStr(clientOutputPipe->isStarted()) << "\n"
<< indent << "appInput started = " << boolStr(appInput->isStarted()) << "\n"
<< indent << "responseHeaderSeen = " << boolStr(responseHeaderSeen) << "\n"
<< indent << "useUnionStation = " << boolStr(useUnionStation()) << "\n"
<< indent << "requestBodyIsBuffered = " << boolStr(requestBodyIsBuffered) << "\n"
<< indent << "clientInput started = " << boolStr(clientInput->isStarted()) << "\n"
<< indent << "clientOutputPipe started = " << boolStr(clientOutputPipe->isStarted()) << "\n"
<< indent << "clientOutputPipe ended = " << boolStr(clientOutputPipe->reachedEnd()) << "\n"
<< indent << "clientOutputWatcher active = " << boolStr(clientOutputWatcher.is_active()) << "\n"
<< indent << "appInput started = " << boolStr(appInput->isStarted()) << "\n"
<< indent << "appInput ended = " << boolStr(appInput->endReached()) << "\n"
<< indent << "responseHeaderSeen = " << boolStr(responseHeaderSeen) << "\n"
<< indent << "useUnionStation = " << boolStr(useUnionStation()) << "\n"
;
}
};
Expand Down Expand Up @@ -843,7 +847,6 @@ class RequestHandler {
RH_DEBUG(client, "Application sent EOF");
client->endScopeLog(&client->scopeLogs.requestProxying);
client->clientOutputPipe->end();
client->appInput->stop();
}

void onAppInputError(const ClientPtr &client, const char *message, int errorCode) {
Expand Down Expand Up @@ -889,22 +892,26 @@ class RequestHandler {
return;
}

RH_TRACE(client, 2, "Forwarding " << size << " bytes of application data to client.");
ssize_t ret = syscalls::write(client->fd, data, size);
int e = errno;
if (ret == -1) {
if (errno == EAGAIN) {
// Wait until the client socket is writable before resuming writing data.
RH_TRACE(client, 2, "Could not write to socket: " << strerror(e) << " (errno=" << e << ")");
if (e == EAGAIN) {
RH_TRACE(client, 2, "Waiting until the client socket is writable again.");
client->clientOutputWatcher.start();
consumed(0, true);
} else if (errno == EPIPE) {
} else if (e == EPIPE) {
// If the client closed the connection then disconnect quietly.
if (client->useUnionStation()) {
client->logMessage("Disconnecting: client stopped reading prematurely");
}
disconnect(client);
} else {
disconnectWithClientSocketWriteError(client, errno);
disconnectWithClientSocketWriteError(client, e);
}
} else {
RH_TRACE(client, 2, "Forwarded " << ret << " bytes.");
consumed(ret, false);
}
}
Expand Down Expand Up @@ -937,6 +944,7 @@ class RequestHandler {
}

// Continue forwarding output data to the client.
RH_TRACE(client, 2, "Client socket became writable again.");
client->clientOutputWatcher.stop();
assert(!client->clientOutputPipe->isStarted());
client->clientOutputPipe->start();
Expand Down
39 changes: 37 additions & 2 deletions test/cxx/RequestHandlerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <string>
#include <vector>
#include <map>
#include <sstream>
#include <cstdarg>
#include <sys/socket.h>

Expand Down Expand Up @@ -113,6 +114,18 @@ namespace tut {
return result;
}
}

string inspect() {
string result;
bg.safe->runSync(boost::bind(&RequestHandlerTest::real_inspect, this, &result));
return result;
}

void real_inspect(string *result) {
stringstream stream;
handler->inspect(stream);
*result = stream.str();
}
};

DEFINE_TEST_GROUP(RequestHandlerTest);
Expand Down Expand Up @@ -476,7 +489,7 @@ namespace tut {
"PATH_INFO", "/custom_status",
"HTTP_X_CUSTOM_STATUS", "201",
NULL);
string response = readAll(connection);
string response = readAll(connection);
ensure(containsSubstring(response, "HTTP/1.1 201 Created\r\n"));
ensure(containsSubstring(response, "Status: 201 Created\r\n"));
}
Expand All @@ -491,7 +504,7 @@ namespace tut {
"PATH_INFO", "/custom_status",
"HTTP_X_CUSTOM_STATUS", "201 Bunnies Jump",
NULL);
string response = readAll(connection);
string response = readAll(connection);
ensure(containsSubstring(response, "HTTP/1.1 201 Bunnies Jump\r\n"));
ensure(containsSubstring(response, "Status: 201 Bunnies Jump\r\n"));
}
Expand All @@ -500,4 +513,26 @@ namespace tut {
// If the application doesn't output a status line then it rejects the application response.
// TODO
}

TEST_METHOD(39) {
// Test handling of slow clients that can't receive response data fast enough.
init();
connect();
sendHeaders(defaultHeaders,
"PASSENGER_APP_ROOT", wsgiAppPath.c_str(),
"PATH_INFO", "/blob",
"HTTP_X_SIZE", "10485760",
NULL);
EVENTUALLY(10,
result = containsSubstring(inspect(), "appInput ended = true");
);
string result = stripHeaders(readAll(connection));
ensure_equals(result.size(), 10485760u);
const char *data = result.data();
const char *end = result.data() + result.size();
while (data < end) {
ensure_equals(*data, 'x');
data++;
}
}
}
10 changes: 10 additions & 0 deletions test/stub/wsgi/passenger_wsgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ def body():
i += 1
start_response(status, [('Content-Type', 'text/html'), ('Transfer-Encoding', 'chunked')])
return body()
elif path == '/blob':
size = int(env.get('HTTP_X_SIZE', 1024 * 1024 * 10))
def body():
written = 0
while written < size:
data = 'x' * min(1024 * 8, size - written)
yield(data)
written += len(data)
start_response(status, [('Content-Type', 'text/plain')])
return body()
else:
body = 'hello <b>world</b>'

Expand Down

0 comments on commit 9cfa160

Please sign in to comment.