Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 54 additions & 46 deletions lib/responder.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ BodyDispatcher.prototype.promise = function dispatchBodyPromise(
contentType, response
) {
var operation = this.operation;

operation.logger.debug('body promise');
var collectObject = function collectPromiseBodyObject(data) {
operation.data = operation.collectBodyObject(data);
Expand All @@ -166,7 +166,7 @@ BodyDispatcher.prototype.chunkedStream = function dispatchBodyChunkedStream(
contentType, response
) {
var operation = this.operation;

operation.logger.debug('body chunked stream');

response.pipe(operation.outputStream);
Expand All @@ -175,7 +175,7 @@ BodyDispatcher.prototype.objectStream = function dispatchBodyObjectStream(
contentType, response
) {
var operation = this.operation;

operation.logger.debug('body object stream');

var outputStream = operation.outputStream;
Expand Down Expand Up @@ -345,7 +345,7 @@ MultipartDispatcher.prototype.chunkedStream = function dispatchMultipartChunkedS
boundary, response
) {
var operation = this.operation;

var errorListener = operation.errorListener;

var outputStream = operation.outputStream;
Expand Down Expand Up @@ -414,7 +414,7 @@ MultipartDispatcher.prototype.objectStream = function dispatchMultipartObjectStr
boundary, response
) {
var operation = this.operation;

var errorListener = operation.errorListener;

var rawHeaderQueue = new FifoQueue(5);
Expand All @@ -426,31 +426,13 @@ MultipartDispatcher.prototype.objectStream = function dispatchMultipartObjectStr
var hasParsed = false;
var hasEnded = false;

var partTransform = function objectPartQueueTransform(
isLast, data, objectQueue
) {
var partTransform = function objectPartQueueTransform(data) {
parsedParts++;
var objectQueue = queuedReader.getItemQueue();
metadataBuffer = operation.queueDocument(
(data.length === 0) ? null : data, rawHeaderQueue, metadataBuffer, objectQueue
);

if (isLast) {
if (metadataBuffer !== null) {
operation.queueMetadata(metadataBuffer, objectQueue);
metadataBuffer = null;
}

rawHeaderQueue = null;
queuedReader = null;
parser = null;
partHeadersListener = null;
partListener = null;
parseFinishListener = null;
responseEndListener = null;
partTransform = null;
} else if (!hasEnded && parsedParts === parsingParts) {
parser.emit('drain');
}
doneChecker();
};

var queuedReader = new QueuedReader(
Expand All @@ -470,15 +452,44 @@ MultipartDispatcher.prototype.objectStream = function dispatchMultipartObjectStr
queuedReader.addReader(partReadStream);
};

var parseFinishListener = function promiseParseFinishListener() {
var parseFinishListener = function objectParseFinishListener() {
hasParsed = true;
if (queuedReader !== null) {
queuedReader.queuedAll();
}
doneChecker();
};

var responseEndListener = function promiseResponseEndListener() {
var responseEndListener = function objectResponseEndListener() {
hasEnded = true;
doneChecker();
};

/**
* Check if HTTP response has ended, Dicer has finished parsing,
* and the Queue is empty. If all are true, then we can call end()
* on output stream.
*/
var doneChecker = function doneChecker() {

if (queuedReader.isQueueEmpty() && hasParsed && hasEnded) {
if (metadataBuffer !== null) {
var objectQueue = queuedReader.getItemQueue();
operation.queueMetadata(metadataBuffer, objectQueue);
metadataBuffer = null;
}

rawHeaderQueue = null;
queuedReader = null;
parser = null;
partHeadersListener = null;
partListener = null;
parseFinishListener = null;
responseEndListener = null;
partTransform = null;

operation.outputStream.end();
} else if (!hasEnded && parsedParts === parsingParts) {
parser.emit('drain');
}

};

var parser = new Dicer({boundary: boundary});
Expand Down Expand Up @@ -537,13 +548,10 @@ function QueuedReader(options, logger, itemsTransform) {

self.logger.debug('concatenated item');

var isLast = (self.queueDone && self.readerQueue.length() === 0 &&
self.writerQueue.length() === 0);

var itemQueue = self.itemQueue;
var beforeLength = itemQueue.length();

self.itemsTransform(isLast, data, itemQueue);
self.itemsTransform(data);

if (beforeLength < itemQueue.length()) {
if (beforeLength === 0) {
Expand All @@ -569,6 +577,12 @@ QueuedReader.prototype.addReader = function queuedAddReader(reader) {
this.logger.debug('queued item %d', readerQueue.getTotal());
this.nextReader();
};
QueuedReader.prototype.getItemQueue = function getItemQueue() {
return this.itemQueue;
};
QueuedReader.prototype.isQueueEmpty = function isQueueEmpty() {
return (this.readerQueue.length() === 0 && this.writerQueue.length() === 0);
};
QueuedReader.prototype.nextReader = function queuedReaderNextReader() {
if (!this.isReading) {
return;
Expand Down Expand Up @@ -635,12 +649,6 @@ QueuedReader.prototype._read = function queuedReaderRead(/*size*/) {
this.nextReader();
}
};
QueuedReader.prototype.queuedAll = function queuedReaderAll() {
if (!this.queueDone) {
this.logger.debug('queued all items');
this.queueDone = true;
}
};

function FifoQueue(min) {
if (!(this instanceof FifoQueue)) {
Expand Down Expand Up @@ -799,13 +807,13 @@ function operationResultPromise(fullfilled, rejected) {
switch (operation.outputMode) {
case 'none':
if (operation.startedResponse === true) {
throw new Error('cannot create result promise after receiving response');
throw new Error('cannot create result promise after receiving response');
}
break;
case 'promise':
throw new Error('already created result promise');
throw new Error('already created result promise');
default:
throw new Error('cannot create result promise after creating stream');
throw new Error('cannot create result promise after creating stream');
}
operation.outputMode = 'promise';

Expand Down Expand Up @@ -847,7 +855,7 @@ function resolvedPromise(operation, resolve) {
)+' data');

if (!hasData) {
resolve.call(operation);
resolve.call(operation);
} else {
resolve.call(operation, data);
}
Expand Down Expand Up @@ -932,7 +940,7 @@ function operationResultStream() {
if (error != null) {
var i = 0;
for (; i < error.length; i++) {
outputStream.emit('error', error[i]);
outputStream.emit('error', error[i]);
}
operation.error = undefined;
}
Expand Down