Permalink
Browse files

fix #255: stress issue with large upload and download

  • Loading branch information...
1 parent cedc7c8 commit 8b6fbbf8ee409cfe23887fc9b352322a0d3ec475 @tjanczuk committed Mar 5, 2013
@@ -2,6 +2,19 @@
extern RtlNtStatusToDosError pRtlNtStatusToDosError;
+void ASYNC_CONTEXT::RunSynchronousContinuations()
+{
+ while (this->continueSynchronously)
+ {
+ // The continueSynchronously is used to unwind the call stack
+ // to avoid stack overflow in case of a synchronous IO completions
+ this->continueSynchronously = FALSE;
+ DWORD bytesCompleteted = this->bytesCompleteted;
+ this->bytesCompleteted = 0;
+ this->completionProcessor(S_OK, bytesCompleteted, (LPOVERLAPPED)this);
+ }
+}
+
CAsyncManager::CAsyncManager()
: threads(NULL), threadCount(0), completionPort(NULL), timerThread(NULL), timerSignal(NULL)
{
@@ -170,6 +183,7 @@ unsigned int WINAPI CAsyncManager::Worker(void* arg)
(0 == entry->dwNumberOfBytesTransferred && ERROR_SUCCESS == error) ? ERROR_NO_DATA : error,
entry->dwNumberOfBytesTransferred,
(LPOVERLAPPED)ctx);
+ ctx->RunSynchronousContinuations();
}
else if (-1L == entry->lpCompletionKey) // shutdown initiated from Terminate
{
@@ -181,6 +195,7 @@ unsigned int WINAPI CAsyncManager::Worker(void* arg)
{
ctx = (ASYNC_CONTEXT*)entry->lpOverlapped;
ctx->completionProcessor(S_OK, 0, (LPOVERLAPPED)ctx);
+ ctx->RunSynchronousContinuations();
}
else if (-3L == entry->lpCompletionKey) // continuation initiated form PostContinuation
{
@@ -4,10 +4,14 @@
typedef struct {
OVERLAPPED overlapped; // this member must be first in the struct
LPOVERLAPPED_COMPLETION_ROUTINE completionProcessor;
+ BOOL continueSynchronously;
void* data;
HANDLE timer;
LARGE_INTEGER dueTime;
HANDLE completionPort;
+ DWORD bytesCompleteted;
+
+ void RunSynchronousContinuations();
} ASYNC_CONTEXT;
typedef void (*ContinuationCallback)(void* data);
@@ -36,13 +36,9 @@ REQUEST_NOTIFICATION_STATUS CNodeHttpModule::OnExecuteRequestHandler(
this->applicationManager->GetEventProvider()->Log(L"iisnode received a new http request", WINEVENT_LEVEL_INFO);
- // TODO: reject websocket connections on IIS < 8
- // http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-17#page-17
-
- //PCSTR upgrade = pHttpContext->GetRequest()->GetHeader(HttpHeaderUpgrade, NULL);
- //ErrorIf(upgrade && 0 == strcmp("websocket", upgrade), ERROR_NOT_SUPPORTED);
-
CheckError(this->applicationManager->Dispatch(pHttpContext, pProvider, &ctx));
+ ASYNC_CONTEXT* async = ctx->GetAsyncContext();
+ async->RunSynchronousContinuations();
if (0 == ctx->DecreasePendingAsyncOperationCount()) // decreases ref count set to 1 in the ctor of CNodeHttpStoredContext
{
@@ -175,7 +171,14 @@ REQUEST_NOTIFICATION_STATUS CNodeHttpModule::OnAsyncCompletion(
ASYNC_CONTEXT* async = ctx->GetAsyncContext();
if (NULL != async->completionProcessor)
{
- async->completionProcessor(pCompletionInfo->GetCompletionStatus(), pCompletionInfo->GetCompletionBytes(), ctx->GetOverlapped());
+ DWORD bytesCompleted = pCompletionInfo->GetCompletionBytes();
+ if (async->completionProcessor == CProtocolBridge::SendResponseBodyCompleted)
+ {
+ bytesCompleted = async->bytesCompleteted;
+ async->bytesCompleteted = 0;
+ }
+ async->completionProcessor(pCompletionInfo->GetCompletionStatus(), bytesCompleted, ctx->GetOverlapped());
+ async->RunSynchronousContinuations();
}
if (0 == ctx->DecreasePendingAsyncOperationCount()) // decreases ref count increased on entering OnAsyncCompletion
@@ -84,6 +84,7 @@ CNodeApplication* CNodeHttpStoredContext::GetNodeApplication()
void CNodeHttpStoredContext::SetNextProcessor(LPOVERLAPPED_COMPLETION_ROUTINE processor)
{
this->asyncContext.completionProcessor = processor;
+ this->SetContinueSynchronously(FALSE);
}
LPOVERLAPPED CNodeHttpStoredContext::GetOverlapped()
@@ -441,3 +442,18 @@ FILETIME* CNodeHttpStoredContext::GetStartTime()
{
return &this->startTime;
}
+
+DWORD CNodeHttpStoredContext::GetBytesCompleted()
+{
+ return this->asyncContext.bytesCompleteted;
+}
+
+void CNodeHttpStoredContext::SetBytesCompleted(DWORD bytesCompleted)
+{
+ this->asyncContext.bytesCompleteted = bytesCompleted;
+}
+
+void CNodeHttpStoredContext::SetContinueSynchronously(BOOL continueSynchronously)
+{
+ this->asyncContext.continueSynchronously = continueSynchronously;
+}
@@ -75,6 +75,7 @@ class CNodeHttpStoredContext : public IHttpStoredContext
void SetTargetUrl(PCSTR targetUrl, DWORD targetUrlLength);
void SetChildContext(IHttpContext* context);
IHttpContext* GetChildContext();
+ DWORD GetBytesCompleted();
void SetNextProcessor(LPOVERLAPPED_COMPLETION_ROUTINE processor);
void SetNodeProcess(CNodeProcess* process);
@@ -107,6 +108,8 @@ class CNodeHttpStoredContext : public IHttpStoredContext
BOOL GetRequestPumpStarted();
FILETIME* GetStartTime();
HRESULT EnsureResponseChunk(DWORD size, HTTP_DATA_CHUNK** chunk);
+ void SetBytesCompleted(DWORD bytesCompleted);
+ void SetContinueSynchronously(BOOL continueSynchronously);
static CNodeHttpStoredContext* Get(LPOVERLAPPED overlapped);
@@ -818,6 +818,7 @@ void CProtocolBridge::ReadRequestBody(CNodeHttpStoredContext* context)
HRESULT hr;
DWORD bytesReceived = 0;
BOOL completionPending = FALSE;
+ BOOL continueSynchronouslyNow = TRUE;
if (0 < context->GetHttpContext()->GetRequest()->GetRemainingEntityBytes() || context->GetIsUpgrade())
{
@@ -831,14 +832,25 @@ void CProtocolBridge::ReadRequestBody(CNodeHttpStoredContext* context)
{
CheckError(context->GetHttpContext()->GetRequest()->ReadEntityBody(context->GetBuffer(), context->GetBufferSize(), TRUE, &bytesReceived, &completionPending));
}
+
+ if (!completionPending)
+ {
+ context->SetContinueSynchronously(TRUE);
+ continueSynchronouslyNow = FALSE;
+ context->SetBytesCompleted(bytesReceived);
+ }
}
if (!completionPending)
{
context->GetNodeApplication()->GetApplicationManager()->GetEventProvider()->Log(
L"iisnode initiated reading http request body chunk and completed synchronously", WINEVENT_LEVEL_VERBOSE, context->GetActivityId());
- CProtocolBridge::ReadRequestBodyCompleted(S_OK, bytesReceived, context->GetOverlapped());
+ context->SetBytesCompleted(bytesReceived);
+ if (continueSynchronouslyNow)
+ {
+ CProtocolBridge::ReadRequestBodyCompleted(S_OK, 0, context->GetOverlapped());
+ }
}
else
{
@@ -1507,6 +1519,8 @@ void CProtocolBridge::EnsureRequestPumpStarted(CNodeHttpStoredContext* context)
context->SetRequestPumpStarted();
CProtocolBridge::ReadRequestBody(context->GetUpgradeContext());
+ ASYNC_CONTEXT* async = context->GetUpgradeContext()->GetAsyncContext();
+ async->RunSynchronousContinuations();
}
}
@@ -1552,6 +1566,7 @@ void WINAPI CProtocolBridge::ProcessResponseBody(DWORD error, DWORD bytesTransfe
}
ctx->SetNextProcessor(CProtocolBridge::SendResponseBodyCompleted);
+ ctx->SetBytesCompleted(bytesToSend);
CheckError(ctx->GetHttpContext()->GetResponse()->WriteEntityChunks(
chunk,
@@ -1566,7 +1581,7 @@ void WINAPI CProtocolBridge::ProcessResponseBody(DWORD error, DWORD bytesTransfe
if (!completionExpected)
{
- CProtocolBridge::SendResponseBodyCompleted(S_OK, chunk->FromMemory.BufferLength, ctx->GetOverlapped());
+ ctx->SetContinueSynchronously(TRUE);
}
}
else if (ctx->GetIsChunked())
@@ -1646,8 +1661,13 @@ void WINAPI CProtocolBridge::SendResponseBodyCompleted(DWORD error, DWORD bytesT
CheckError(error);
if (!ctx->GetIsUpgrade())
- {
+ {
ctx->SetChunkTransmitted(ctx->GetChunkTransmitted() + bytesTransfered);
+ if (ctx->GetChunkLength() == ctx->GetChunkTransmitted())
+ {
+ ctx->SetChunkTransmitted(0);
+ ctx->SetChunkLength(0);
+ }
}
if (ctx->GetIsLastChunk() && ctx->GetChunkLength() == ctx->GetChunkTransmitted())
@@ -35,8 +35,7 @@ class CProtocolBridge
static void WINAPI ProcessResponseHeaders(DWORD error, DWORD bytesTransfered, LPOVERLAPPED overlapped);
static void WINAPI ProcessChunkHeader(DWORD error, DWORD bytesTransfered, LPOVERLAPPED overlapped);
- static void WINAPI ProcessResponseBody(DWORD error, DWORD bytesTransfered, LPOVERLAPPED overlapped);
- static void WINAPI SendResponseBodyCompleted(DWORD error, DWORD bytesTransfered, LPOVERLAPPED overlapped);
+ static void WINAPI ProcessResponseBody(DWORD error, DWORD bytesTransfered, LPOVERLAPPED overlapped);
static void WINAPI ProcessUpgradeResponse(DWORD error, DWORD bytesTransfered, LPOVERLAPPED overlapped);
static void WINAPI ContinueProcessResponseBodyAfterPartialFlush(DWORD error, DWORD bytesTransfered, LPOVERLAPPED overlapped);
@@ -48,6 +47,8 @@ class CProtocolBridge
public:
+ static void WINAPI SendResponseBodyCompleted(DWORD error, DWORD bytesTransfered, LPOVERLAPPED overlapped);
+
static HRESULT InitiateRequest(CNodeHttpStoredContext* context);
static BOOL SendIisnodeError(IHttpContext* httpCtx, HRESULT hr);
static BOOL SendIisnodeError(CNodeHttpStoredContext* ctx, HRESULT hr);
@@ -0,0 +1,28 @@
+/*
+A 30MB download
+*/
+
+var assert = require('assert');
+
+var timeout = setTimeout(function () {
+ console.error('Timeout occurred');
+ assert.ok(false, 'request timed out');
+}, 10000);
+
+var host = process.env.IISNODETEST_HOST || 'localhost';
+var port = process.env.IISNODETEST_PORT || 31415;
+
+require('http').get('http://' + host + ':' + port + '/140_large_download/hello.js', function (res) {
+ assert.equal(res.statusCode, 200);
+ res.setEncoding('binary');
+ var contentLength = 0;
+ res.on('data', function (data) {
+ contentLength += data.length;
+ });
+ res.on('end', function () {
+ clearTimeout(timeout);
+ assert.equal(contentLength, 30 * 1024 * 1024);
+ });
+}).on('error', function (e) {
+ assert.ifError(e);
+});
@@ -0,0 +1,42 @@
+/*
+A 31MB upload
+*/
+
+var assert = require('assert');
+
+var timeout = setTimeout(function () {
+ console.error('Timeout occurred');
+ assert.ok(false, 'request timed out');
+}, 10000);
+
+var host = process.env.IISNODETEST_HOST || 'localhost';
+var port = process.env.IISNODETEST_PORT || 31415;
+
+var options = {
+ hostname: host,
+ port: port,
+ path: '/141_large_upload/hello.js',
+ method: 'POST'
+};
+
+var req = require('http').request(options, function (res) {
+ assert.equal(res.statusCode, 200);
+ res.setEncoding('utf8');
+ var body = ''
+ res.on('data', function (chunk) {
+ body += chunk;
+ });
+ res.on('end', function () {
+ clearTimeout(timeout);
+ assert.equal(body, 31 * 1024 * 1024);
+ });
+});
+
+req.on('error', function (e) {
+ assert.ifError(e);
+});
+
+var buffer = new Buffer(1024);
+for (var i = 0; i < (31 * 1024) ; i++)
+ req.write(buffer);
+req.end();
@@ -0,0 +1,42 @@
+/*
+A 31MB request with 30MB response
+*/
+
+var assert = require('assert');
+
+var timeout = setTimeout(function () {
+ console.error('Timeout occurred');
+ assert.ok(false, 'request timed out');
+}, 10000);
+
+var host = process.env.IISNODETEST_HOST || 'localhost';
+var port = process.env.IISNODETEST_PORT || 31415;
+
+var options = {
+ hostname: host,
+ port: port,
+ path: '/140_large_download/hello.js',
+ method: 'POST'
+};
+
+var req = require('http').request(options, function (res) {
+ assert.equal(res.statusCode, 200);
+ res.setEncoding('binary');
+ var contentLength = 0;
+ res.on('data', function (data) {
+ contentLength += data.length;
+ });
+ res.on('end', function () {
+ clearTimeout(timeout);
+ assert.equal(contentLength, 30 * 1024 * 1024);
+ });
+});
+
+req.on('error', function (e) {
+ assert.ifError(e);
+});
+
+var buffer = new Buffer(1024);
+for (var i = 0; i < (31 * 1024) ; i++)
+ req.write(buffer);
+req.end();
@@ -0,0 +1,12 @@
+var buffer = new Buffer(1024);
+var numKB = 1024 * 30; // 30MB downlaod
+
+require('http').createServer(function (req, res) {
+ req.on('data', function (data) {} );
+ req.on('end', function () {
+ res.writeHead(200, {'Content-Type': 'application/binary', 'Cache-Control': 'no-cache'});
+ for (var i = 0; i < numKB; i++)
+ res.write(buffer);
+ res.end();
+ });
+}).listen(process.env.PORT || 3000);
@@ -0,0 +1,7 @@
+<configuration>
+ <system.webServer>
+ <handlers>
+ <add name="iisnode" path="hello.js" verb="*" modules="iisnode" />
+ </handlers>
+ </system.webServer>
+</configuration>
@@ -0,0 +1,11 @@
+require('http').createServer(function (req, res) {
+ req.setEncoding('binary');
+ var contentLength = 0;
+ req.on('data', function (data) {
+ contentLength += data.length;
+ });
+ req.on('end', function () {
+ res.writeHead(200, {'Content-Type': 'text/plain', 'Cache-Control': 'no-cache'});
+ res.end('' + contentLength);
+ });
+}).listen(process.env.PORT || 3000);
@@ -0,0 +1,7 @@
+<configuration>
+ <system.webServer>
+ <handlers>
+ <add name="iisnode" path="hello.js" verb="*" modules="iisnode" />
+ </handlers>
+ </system.webServer>
+</configuration>

0 comments on commit 8b6fbbf

Please sign in to comment.