diff --git a/lib/responder.js b/lib/responder.js index dd6b29b9..e6a753fd 100644 --- a/lib/responder.js +++ b/lib/responder.js @@ -147,7 +147,7 @@ BodyDispatcher.prototype.promise = function dispatchBodyPromise( contentType, response ) { var operation = this.operation; - + operation.logger.debug('body promise'); var collectObject = function collectPromiseBodyObject(data) { operation.data = operation.collectBodyObject(data); @@ -166,7 +166,7 @@ BodyDispatcher.prototype.chunkedStream = function dispatchBodyChunkedStream( contentType, response ) { var operation = this.operation; - + operation.logger.debug('body chunked stream'); response.pipe(operation.outputStream); @@ -175,7 +175,7 @@ BodyDispatcher.prototype.objectStream = function dispatchBodyObjectStream( contentType, response ) { var operation = this.operation; - + operation.logger.debug('body object stream'); var outputStream = operation.outputStream; @@ -345,7 +345,7 @@ MultipartDispatcher.prototype.chunkedStream = function dispatchMultipartChunkedS boundary, response ) { var operation = this.operation; - + var errorListener = operation.errorListener; var outputStream = operation.outputStream; @@ -414,7 +414,7 @@ MultipartDispatcher.prototype.objectStream = function dispatchMultipartObjectStr boundary, response ) { var operation = this.operation; - + var errorListener = operation.errorListener; var rawHeaderQueue = new FifoQueue(5); @@ -426,31 +426,13 @@ MultipartDispatcher.prototype.objectStream = function dispatchMultipartObjectStr var hasParsed = false; var hasEnded = false; - var partTransform = function objectPartQueueTransform( - isLast, data, objectQueue - ) { + var partTransform = function objectPartQueueTransform(data) { parsedParts++; + var objectQueue = queuedReader.getItemQueue(); metadataBuffer = operation.queueDocument( (data.length === 0) ? null : data, rawHeaderQueue, metadataBuffer, objectQueue ); - - if (isLast) { - if (metadataBuffer !== null) { - operation.queueMetadata(metadataBuffer, objectQueue); - metadataBuffer = null; - } - - rawHeaderQueue = null; - queuedReader = null; - parser = null; - partHeadersListener = null; - partListener = null; - parseFinishListener = null; - responseEndListener = null; - partTransform = null; - } else if (!hasEnded && parsedParts === parsingParts) { - parser.emit('drain'); - } + doneChecker(); }; var queuedReader = new QueuedReader( @@ -470,15 +452,44 @@ MultipartDispatcher.prototype.objectStream = function dispatchMultipartObjectStr queuedReader.addReader(partReadStream); }; - var parseFinishListener = function promiseParseFinishListener() { + var parseFinishListener = function objectParseFinishListener() { hasParsed = true; - if (queuedReader !== null) { - queuedReader.queuedAll(); - } + doneChecker(); }; - var responseEndListener = function promiseResponseEndListener() { + var responseEndListener = function objectResponseEndListener() { hasEnded = true; + doneChecker(); + }; + + /** + * Check if HTTP response has ended, Dicer has finished parsing, + * and the Queue is empty. If all are true, then we can call end() + * on output stream. + */ + var doneChecker = function doneChecker() { + + if (queuedReader.isQueueEmpty() && hasParsed && hasEnded) { + if (metadataBuffer !== null) { + var objectQueue = queuedReader.getItemQueue(); + operation.queueMetadata(metadataBuffer, objectQueue); + metadataBuffer = null; + } + + rawHeaderQueue = null; + queuedReader = null; + parser = null; + partHeadersListener = null; + partListener = null; + parseFinishListener = null; + responseEndListener = null; + partTransform = null; + + operation.outputStream.end(); + } else if (!hasEnded && parsedParts === parsingParts) { + parser.emit('drain'); + } + }; var parser = new Dicer({boundary: boundary}); @@ -537,13 +548,10 @@ function QueuedReader(options, logger, itemsTransform) { self.logger.debug('concatenated item'); - var isLast = (self.queueDone && self.readerQueue.length() === 0 && - self.writerQueue.length() === 0); - var itemQueue = self.itemQueue; var beforeLength = itemQueue.length(); - self.itemsTransform(isLast, data, itemQueue); + self.itemsTransform(data); if (beforeLength < itemQueue.length()) { if (beforeLength === 0) { @@ -569,6 +577,12 @@ QueuedReader.prototype.addReader = function queuedAddReader(reader) { this.logger.debug('queued item %d', readerQueue.getTotal()); this.nextReader(); }; +QueuedReader.prototype.getItemQueue = function getItemQueue() { + return this.itemQueue; +}; +QueuedReader.prototype.isQueueEmpty = function isQueueEmpty() { + return (this.readerQueue.length() === 0 && this.writerQueue.length() === 0); +}; QueuedReader.prototype.nextReader = function queuedReaderNextReader() { if (!this.isReading) { return; @@ -635,12 +649,6 @@ QueuedReader.prototype._read = function queuedReaderRead(/*size*/) { this.nextReader(); } }; -QueuedReader.prototype.queuedAll = function queuedReaderAll() { - if (!this.queueDone) { - this.logger.debug('queued all items'); - this.queueDone = true; - } -}; function FifoQueue(min) { if (!(this instanceof FifoQueue)) { @@ -799,13 +807,13 @@ function operationResultPromise(fullfilled, rejected) { switch (operation.outputMode) { case 'none': if (operation.startedResponse === true) { - throw new Error('cannot create result promise after receiving response'); + throw new Error('cannot create result promise after receiving response'); } break; case 'promise': - throw new Error('already created result promise'); + throw new Error('already created result promise'); default: - throw new Error('cannot create result promise after creating stream'); + throw new Error('cannot create result promise after creating stream'); } operation.outputMode = 'promise'; @@ -847,7 +855,7 @@ function resolvedPromise(operation, resolve) { )+' data'); if (!hasData) { - resolve.call(operation); + resolve.call(operation); } else { resolve.call(operation, data); } @@ -932,7 +940,7 @@ function operationResultStream() { if (error != null) { var i = 0; for (; i < error.length; i++) { - outputStream.emit('error', error[i]); + outputStream.emit('error', error[i]); } operation.error = undefined; }