Skip to content

Commit

Permalink
feat(changeStream): expanding changeStream resumable errors
Browse files Browse the repository at this point in the history
Fixes NODE-1462
  • Loading branch information
daprahamian committed May 21, 2018
1 parent fa1a3c5 commit 49fbafd
Showing 1 changed file with 39 additions and 1 deletion.
40 changes: 39 additions & 1 deletion lib/change_stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -296,12 +296,50 @@ ChangeStream.prototype.stream = function(options) {
return this.cursor.stream(options);
};

const RESUMABLE_ERROR_CODES = new Set([
6, // HostUnreachable
7, // HostNotFound
50, // ExceededTimeLimit
89, // NetworkTimeout
189, // PrimarySteppedDown
216, // ElectionInProgress
234, // RetryChangeStream
9001, // SocketException
10107, // NotMaster
11602, // InterruptedDueToReplStateChange
13435, // NotMasterNoSlaveOk
13436 // NotMasterOrSecondary
]);

// TODO: will be used for check for getMore errors
// const GET_MORE_NON_RESUMABLE_CODES = new Set([
// 136, // CappedPositionLost
// 237, // CursorKilled
// 11601 // Interrupted
// ]);

function isResumableError(error) {
// TODO: Need a way to check if error is
// - from a getMore
// - is not in GET_MORE_NON_RESUMABLE_CODES
if (
error instanceof MongoNetworkError ||
RESUMABLE_ERROR_CODES.has(error.code) ||
error.message.match(/not master/) ||
error.message.match(/node is recovering/)
) {
return true;
}

return false;
}

// Handle new change events. This method brings together the routes from the callback, event emitter, and promise ways of using ChangeStream.
var processNewChange = function(self, err, change, callback) {
// Handle errors
if (err) {
// Handle resumable MongoNetworkErrors
if (err instanceof MongoNetworkError && !self.attemptingResume) {
if (isResumableError(err) && !self.attemptingResume) {
self.attemptingResume = true;
return self.cursor.close(function(closeErr) {
if (closeErr) {
Expand Down

0 comments on commit 49fbafd

Please sign in to comment.