From 3063f008c7e0af9e3f40c89d8c87d3bb76c2b510 Mon Sep 17 00:00:00 2001 From: Dan Aprahamian Date: Wed, 6 Jun 2018 13:49:47 -0400 Subject: [PATCH] fix(change-stream): fix change stream resuming with promises If a change stream resume occurred, and the user was using the Promise-based version of .next, the promise would resolve before waiting for the change stream to resume, resulting in bad behavior. Fixes NODE-1493 --- lib/change_stream.js | 40 +++++++++++++++++++++------------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/lib/change_stream.js b/lib/change_stream.js index 2c01aab345..ec9a13d475 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -129,6 +129,13 @@ var createChangeStreamCursor = function(self) { self.emit('error', error); }); + if (self.pipeDestinations) { + const cursorStream = changeStreamCursor.stream(self.streamOptions); + for (let pipeDestination in self.pipeDestinations) { + cursorStream.pipe(pipeDestination); + } + } + return changeStreamCursor; }; @@ -303,27 +310,22 @@ var processNewChange = function(self, err, change, callback) { // Handle resumable MongoNetworkErrors if (isResumableError(err) && !self.attemptingResume) { self.attemptingResume = true; - return self.cursor.close(function(closeErr) { - if (closeErr) { - if (callback) return callback(err, null); - return self.promiseLibrary.reject(err); - } - - // Establish a new cursor - self.cursor = createChangeStreamCursor(self); - - // Attempt to reconfigure piping - if (self.pipeDestinations) { - var cursorStream = self.cursor.stream(self.streamOptions); - for (var pipeDestination in self.pipeDestinations) { - cursorStream.pipe(pipeDestination); + if (callback) { + return self.cursor.close(function(closeErr) { + if (closeErr) { + return callback(err, null); } - } - // Attempt the next() operation again - if (callback) return self.next(callback); - return self.next(); - }); + self.cursor = createChangeStreamCursor(self); + + return self.next(callback); + }); + } + + return self.cursor + .close() + .then(() => (self.cursor = createChangeStreamCursor(self))) + .then(() => self.next()); } if (typeof callback === 'function') return callback(err, null);