Skip to content

Commit

Permalink
fix(ChangeStream): should resume from errors when iterating
Browse files Browse the repository at this point in the history
Introduced `getCursor` method to safely provide a change stream cursor
for `next`/`hasNext` across recoveries from resumable errors.

NODE-2548
  • Loading branch information
emadum committed May 18, 2020
1 parent 6b510a6 commit 7a8a533
Show file tree
Hide file tree
Showing 3 changed files with 288 additions and 108 deletions.
204 changes: 133 additions & 71 deletions lib/change_stream.js
@@ -1,12 +1,15 @@
'use strict';

const Denque = require('denque');
const EventEmitter = require('events');
const { MongoError, isResumableError } = require('./error');
const { Cursor } = require('./cursor');
const { relayEvents, maxWireVersion } = require('./utils');
const maybePromise = require('./utils').maybePromise;
const AggregateOperation = require('./operations/aggregate');

const kResumeQueue = Symbol('resumeQueue');

const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument'];
const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat(
CHANGE_STREAM_OPTIONS
Expand Down Expand Up @@ -89,6 +92,8 @@ class ChangeStream extends EventEmitter {
this.options.readPreference = parent.s.readPreference;
}

this[kResumeQueue] = new Denque();

// Create contained Change Stream cursor
this.cursor = createChangeStreamCursor(this, options);

Expand All @@ -97,9 +102,7 @@ class ChangeStream extends EventEmitter {
// Listen for any `change` listeners being added to ChangeStream
this.on('newListener', eventName => {
if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) {
this.cursor.on('data', change =>
processNewChange({ changeStream: this, change, eventEmitter: true })
);
this.cursor.on('data', change => processNewChange(this, change));
}
});

Expand Down Expand Up @@ -129,7 +132,12 @@ class ChangeStream extends EventEmitter {
* @returns {Promise|void} returns Promise if no callback passed
*/
hasNext(callback) {
return maybePromise(callback, cb => this.cursor.hasNext(cb));
return maybePromise(callback, cb => {
getCursor(this, (err, cursor) => {
if (err) return cb(err); // failed to resume, raise an error
cursor.hasNext(cb);
});
});
}

/**
Expand All @@ -142,11 +150,16 @@ class ChangeStream extends EventEmitter {
*/
next(callback) {
return maybePromise(callback, cb => {
if (this.isClosed()) {
return cb(new MongoError('ChangeStream is closed'));
}
this.cursor.next((error, change) => {
processNewChange({ changeStream: this, error, change, callback: cb });
getCursor(this, (err, cursor) => {
if (err) return cb(err); // failed to resume, raise an error
cursor.next((error, change) => {
if (error) {
this[kResumeQueue].push(() => this.next(cb));
processError(this, error, cb);
return;
}
processNewChange(this, change, cb);
});
});
});
}
Expand All @@ -166,7 +179,7 @@ class ChangeStream extends EventEmitter {
*
* @function ChangeStream.prototype.close
* @param {ChangeStream~resultCallback} [callback] The result callback.
* @returns {Promise} returns Promise if no callback passed
* @returns {Promise|void} returns Promise if no callback passed
*/
close(callback) {
return maybePromise(callback, cb => {
Expand All @@ -175,6 +188,8 @@ class ChangeStream extends EventEmitter {
// flag the change stream as explicitly closed
this.closed = true;

if (!this.cursor) return cb();

// Tidy up the existing cursor
const cursor = this.cursor;

Expand Down Expand Up @@ -389,7 +404,7 @@ function createChangeStreamCursor(self, options) {
*/
if (self.listenerCount('change') > 0) {
changeStreamCursor.on('data', function(change) {
processNewChange({ changeStream: self, change, eventEmitter: true });
processNewChange(self, change);
});
}

Expand Down Expand Up @@ -421,7 +436,7 @@ function createChangeStreamCursor(self, options) {
* @type {Error}
*/
changeStreamCursor.on('error', function(error) {
processNewChange({ changeStream: self, error, eventEmitter: true });
processError(self, error);
});

if (self.pipeDestinations) {
Expand Down Expand Up @@ -462,73 +477,20 @@ function waitForTopologyConnected(topology, options, callback) {
}, 500); // this is an arbitrary wait time to allow SDAM to transition
}

// Handle new change events. This method brings together the routes from the callback, event emitter, and promise ways of using ChangeStream.
function processNewChange(args) {
const changeStream = args.changeStream;
const error = args.error;
const change = args.change;
const callback = args.callback;
const eventEmitter = args.eventEmitter || false;
function processNewChange(changeStream, change, callback) {
const cursor = changeStream.cursor;

// If the cursor is null or the change stream has been closed explictly, do not process a change.
if (cursor == null || changeStream.closed) {
// We do not error in the eventEmitter case.
changeStream.closed = true;
if (eventEmitter) {
return;
}
callback(new MongoError('ChangeStream is closed'));
if (changeStream.closed) {
if (callback) callback(new MongoError('ChangeStream is closed'));
return;
}

const topology = changeStream.topology;
const options = changeStream.cursor.options;
const wireVersion = maxWireVersion(cursor.server);

if (error) {
if (isResumableError(error, wireVersion) && !changeStream.attemptingResume) {
changeStream.attemptingResume = true;

// stop listening to all events from old cursor
['data', 'close', 'end', 'error'].forEach(event =>
changeStream.cursor.removeAllListeners(event)
);

// close internal cursor, ignore errors
changeStream.cursor.close();

waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
if (err) {
// if there's an error reconnecting, close the change stream
changeStream.closed = true;
if (eventEmitter) {
changeStream.emit('error', err);
changeStream.emit('close');
return;
}
return callback(err);
}

changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions);
if (eventEmitter) return;
changeStream.next(callback);
});
return;
}

if (eventEmitter) return changeStream.emit('error', error);
return callback(error);
}

changeStream.attemptingResume = false;

if (change && !change._id) {
const noResumeTokenError = new Error(
'A change stream document has been received that lacks a resume token (_id).'
);

if (eventEmitter) return changeStream.emit('error', noResumeTokenError);
if (!callback) return changeStream.emit('error', noResumeTokenError);
return callback(noResumeTokenError);
}

Expand All @@ -540,8 +502,108 @@ function processNewChange(args) {
changeStream.options.startAtOperationTime = undefined;

// Return the change
if (eventEmitter) return changeStream.emit('change', change);
return callback(error, change);
if (!callback) return changeStream.emit('change', change);
return callback(undefined, change);
}

function processError(changeStream, error, callback) {
const topology = changeStream.topology;
const cursor = changeStream.cursor;

// If the change stream has been closed explictly, do not process error.
if (changeStream.closed) {
if (callback) callback(new MongoError('ChangeStream is closed'));
return;
}

// if the resume succeeds, continue with the new cursor
function resumeWithCursor(newCursor) {
changeStream.cursor = newCursor;
processResumeQueue(changeStream);
}

// otherwise, raise an error and close the change stream
function unresumableError(err) {
if (!callback) {
changeStream.emit('error', err);
changeStream.emit('close');
}
processResumeQueue(changeStream, err);
changeStream.closed = true;
}

if (cursor && isResumableError(error, maxWireVersion(cursor.server))) {
changeStream.cursor = undefined;

// stop listening to all events from old cursor
['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event));

// close internal cursor, ignore errors
cursor.close();

waitForTopologyConnected(topology, { readPreference: cursor.options.readPreference }, err => {
// if the topology can't reconnect, close the stream
if (err) return unresumableError(err);

// create a new cursor, preserving the old cursor's options
const newCursor = createChangeStreamCursor(changeStream, cursor.resumeOptions);

// attempt to continue in emitter mode
if (!callback) return resumeWithCursor(newCursor);

// attempt to continue in iterator mode
newCursor.hasNext(err => {
// if there's an error immediately after resuming, close the stream
if (err) return unresumableError(err);
resumeWithCursor(newCursor);
});
});
return;
}

if (!callback) return changeStream.emit('error', error);
return callback(error);
}

/**
* Safely provides a cursor across resume attempts
*
* @param {ChangeStream} changeStream the parent ChangeStream
* @param {function} callback gets the cursor or error
* @param {ChangeStreamCursor} [oldCursor] when resuming from an error, carry over options from previous cursor
*/
function getCursor(changeStream, callback) {
if (changeStream.isClosed()) {
callback(new MongoError('ChangeStream is closed.'));
return;
}

// if a cursor exists and it is open, return it
if (changeStream.cursor) {
callback(undefined, changeStream.cursor);
return;
}

// no cursor, queue callback until topology reconnects
changeStream[kResumeQueue].push(callback);
}

/**
* Drain the resume queue when a new has become available
*
* @param {ChangeStream} changeStream the parent ChangeStream
* @param {ChangeStreamCursor?} changeStream.cursor the new cursor
* @param {Error} [err] error getting a new cursor
*/
function processResumeQueue(changeStream, err) {
while (changeStream[kResumeQueue].length) {
const request = changeStream[kResumeQueue].pop();
if (changeStream.isClosed() && !err) {
request(new MongoError('Change Stream is not open.'));
return;
}
request(err, changeStream.cursor);
}
}

/**
Expand Down

0 comments on commit 7a8a533

Please sign in to comment.