Skip to content

Commit

Permalink
fix(NODE-4475): make interrupted message more specific (#3437)
Browse files Browse the repository at this point in the history
  • Loading branch information
durran committed Oct 13, 2022
1 parent 26bce4a commit 5f37cb6
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 36 deletions.
12 changes: 10 additions & 2 deletions src/cursor/abstract_cursor.ts
Expand Up @@ -897,14 +897,22 @@ class ReadableCursorStream extends Readable {
}

// NOTE: This is also perhaps questionable. The rationale here is that these errors tend
// to be "operation interrupted", where a cursor has been closed but there is an
// to be "operation was interrupted", where a cursor has been closed but there is an
// active getMore in-flight. This used to check if the cursor was killed but once
// that changed to happen in cleanup legitimate errors would not destroy the
// stream. There are change streams test specifically test these cases.
if (err.message.match(/interrupted/)) {
if (err.message.match(/operation was interrupted/)) {
return this.push(null);
}

// NOTE: The two above checks on the message of the error will cause a null to be pushed
// to the stream, thus closing the stream before the destroy call happens. This means
// that either of those error messages on a change stream will not get a proper
// 'error' event to be emitted (the error passed to destroy). Change stream resumability
// relies on that error event to be emitted to create its new cursor and thus was not
// working on 4.4 servers because the error emitted on failover was "interrupted at
// shutdown" while on 5.0+ it is "The server is in quiesce mode and will shut down".
// See NODE-4475.
return this.destroy(err);
}

Expand Down
116 changes: 82 additions & 34 deletions test/integration/change-streams/change_stream.test.ts
Expand Up @@ -1674,22 +1674,30 @@ describe('ChangeStream resumability', function () {
};

const resumableErrorCodes = [
{ error: 'HostUnreachable', code: 6 },
{ error: 'HostNotFound', code: 7 },
{ error: 'NetworkTimeout', code: 89 },
{ error: 'ShutdownInProgress', code: 91 },
{ error: 'PrimarySteppedDown', code: 189 },
{ error: 'ExceededTimeLimit', code: 262 },
{ error: 'SocketException', code: 9001 },
{ error: 'NotWritablePrimary', code: 10107 },
{ error: 'InterruptedAtShutdown', code: 11600 },
{ error: 'InterruptedDueToReplStateChange', code: 11602 },
{ error: 'NotPrimaryNoSecondaryOk', code: 13435 },
{ error: 'StaleShardVersion', code: 63 },
{ error: 'StaleEpoch', code: 150 },
{ error: 'RetryChangeStream', code: 234 },
{ error: 'FailedToSatisfyReadPreference', code: 133 },
{ error: 'CursorNotFound', code: 43 }
{ error: 'HostUnreachable', code: 6, message: 'host unreachable' },
{ error: 'HostNotFound', code: 7, message: 'hot not found' },
{ error: 'NetworkTimeout', code: 89, message: 'network timeout' },
{ error: 'ShutdownInProgress', code: 91, message: 'shutdown in progress' },
{ error: 'PrimarySteppedDown', code: 189, message: 'primary stepped down' },
{ error: 'ExceededTimeLimit', code: 262, message: 'operation exceeded time limit' },
{ error: 'SocketException', code: 9001, message: 'socket exception' },
{ error: 'NotWritablePrimary', code: 10107, message: 'not writable primary' },
{ error: 'InterruptedAtShutdown', code: 11600, message: 'interrupted at shutdown' },
{
error: 'InterruptedDueToReplStateChange',
code: 11602,
message: 'interrupted due to state change'
},
{ error: 'NotPrimaryNoSecondaryOk', code: 13435, message: 'not primary and no secondary ok' },
{ error: 'StaleShardVersion', code: 63, message: 'stale shard version' },
{ error: 'StaleEpoch', code: 150, message: 'stale epoch' },
{ error: 'RetryChangeStream', code: 234, message: 'retry change stream' },
{
error: 'FailedToSatisfyReadPreference',
code: 133,
message: 'failed to satisfy read preference'
},
{ error: 'CursorNotFound', code: 43, message: 'cursor not found' }
];

const is4_2Server = (serverVersion: string) =>
Expand Down Expand Up @@ -1731,7 +1739,7 @@ describe('ChangeStream resumability', function () {

context('iterator api', function () {
context('#next', function () {
for (const { error, code } of resumableErrorCodes) {
for (const { error, code, message } of resumableErrorCodes) {
it(
`resumes on error code ${code} (${error})`,
{ requires: { topology: '!single', mongodb: '>=4.2' } },
Expand All @@ -1746,7 +1754,8 @@ describe('ChangeStream resumability', function () {
mode: { times: 1 },
data: {
failCommands: ['getMore'],
errorCode: code
errorCode: code,
errmsg: message
}
} as FailPoint);

Expand All @@ -1759,7 +1768,7 @@ describe('ChangeStream resumability', function () {
}
);
}
for (const { error, code } of resumableErrorCodes) {
for (const { error, code, message } of resumableErrorCodes) {
it(
`resumes on error code ${code} (${error})`,
{ requires: { topology: '!single', mongodb: '<4.2' } },
Expand All @@ -1778,7 +1787,7 @@ describe('ChangeStream resumability', function () {
.stub(changeStream.cursor, '_getMore')
.callsFake((_batchSize, callback) => {
mock.restore();
const error = new MongoServerError({ message: 'Something went wrong' });
const error = new MongoServerError({ message: message });
error.code = code;
callback(error);
});
Expand Down Expand Up @@ -1807,7 +1816,8 @@ describe('ChangeStream resumability', function () {
mode: { times: 1 },
data: {
failCommands: ['getMore'],
errorCode: resumableErrorCodes[0].code
errorCode: resumableErrorCodes[0].code,
errmsg: resumableErrorCodes[0].message
}
} as FailPoint);

Expand Down Expand Up @@ -1858,7 +1868,7 @@ describe('ChangeStream resumability', function () {
});

context('#hasNext', function () {
for (const { error, code } of resumableErrorCodes) {
for (const { error, code, message } of resumableErrorCodes) {
it(
`resumes on error code ${code} (${error})`,
{ requires: { topology: '!single', mongodb: '>=4.2' } },
Expand All @@ -1873,7 +1883,8 @@ describe('ChangeStream resumability', function () {
mode: { times: 1 },
data: {
failCommands: ['getMore'],
errorCode: code
errorCode: code,
errmsg: message
}
} as FailPoint);

Expand All @@ -1887,7 +1898,7 @@ describe('ChangeStream resumability', function () {
);
}

for (const { error, code } of resumableErrorCodes) {
for (const { error, code, message } of resumableErrorCodes) {
it(
`resumes on error code ${code} (${error})`,
{ requires: { topology: '!single', mongodb: '<4.2' } },
Expand All @@ -1906,7 +1917,7 @@ describe('ChangeStream resumability', function () {
.stub(changeStream.cursor, '_getMore')
.callsFake((_batchSize, callback) => {
mock.restore();
const error = new MongoServerError({ message: 'Something went wrong' });
const error = new MongoServerError({ message: message });
error.code = code;
callback(error);
});
Expand Down Expand Up @@ -1935,7 +1946,8 @@ describe('ChangeStream resumability', function () {
mode: { times: 1 },
data: {
failCommands: ['getMore'],
errorCode: resumableErrorCodes[0].code
errorCode: resumableErrorCodes[0].code,
errmsg: resumableErrorCodes[0].message
}
} as FailPoint);

Expand Down Expand Up @@ -1986,7 +1998,7 @@ describe('ChangeStream resumability', function () {
});

context('#tryNext', function () {
for (const { error, code } of resumableErrorCodes) {
for (const { error, code, message } of resumableErrorCodes) {
it(
`resumes on error code ${code} (${error})`,
{ requires: { topology: '!single', mongodb: '>=4.2' } },
Expand All @@ -2001,7 +2013,8 @@ describe('ChangeStream resumability', function () {
mode: { times: 1 },
data: {
failCommands: ['getMore'],
errorCode: code
errorCode: code,
errmsg: message
}
} as FailPoint);

Expand All @@ -2022,7 +2035,7 @@ describe('ChangeStream resumability', function () {
);
}

for (const { error, code } of resumableErrorCodes) {
for (const { error, code, message } of resumableErrorCodes) {
it(
`resumes on error code ${code} (${error})`,
{ requires: { topology: '!single', mongodb: '<4.2' } },
Expand All @@ -2041,7 +2054,7 @@ describe('ChangeStream resumability', function () {
.stub(changeStream.cursor, '_getMore')
.callsFake((_batchSize, callback) => {
mock.restore();
const error = new MongoServerError({ message: 'Something went wrong' });
const error = new MongoServerError({ message: message });
error.code = code;
callback(error);
});
Expand Down Expand Up @@ -2077,7 +2090,8 @@ describe('ChangeStream resumability', function () {
mode: { times: 1 },
data: {
failCommands: ['getMore'],
errorCode: resumableErrorCodes[0].code
errorCode: resumableErrorCodes[0].code,
errmsg: resumableErrorCodes[0].message
}
} as FailPoint);

Expand Down Expand Up @@ -2127,7 +2141,7 @@ describe('ChangeStream resumability', function () {
});

describe('event emitter based iteration', function () {
for (const { error, code } of resumableErrorCodes) {
for (const { error, code, message } of resumableErrorCodes) {
it(
`resumes on error code ${code} (${error})`,
{ requires: { topology: '!single', mongodb: '>=4.2' } },
Expand All @@ -2141,7 +2155,8 @@ describe('ChangeStream resumability', function () {
mode: { times: 1 },
data: {
failCommands: ['getMore'],
errorCode: code
errorCode: code,
errmsg: message
}
} as FailPoint);

Expand Down Expand Up @@ -2171,7 +2186,8 @@ describe('ChangeStream resumability', function () {
mode: { times: 1 },
data: {
failCommands: ['getMore'],
errorCode: resumableErrorCodes[0].code
errorCode: resumableErrorCodes[0].code,
errmsg: resumableErrorCodes[0].message
}
} as FailPoint);

Expand Down Expand Up @@ -2222,6 +2238,38 @@ describe('ChangeStream resumability', function () {
}
);
});

context('when the error is operation was interrupted', function () {
it(
'does not resume',
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([]);

const unresumableErrorCode = 237;
await client.db('admin').command({
configureFailPoint: is4_2Server(this.configuration.version)
? 'failCommand'
: 'failGetMoreAfterCursorCheckout',
mode: { times: 1 },
data: {
failCommands: ['getMore'],
errorCode: unresumableErrorCode,
errmsg: 'operation was interrupted'
}
} as FailPoint);

const willBeError = once(changeStream, 'change').catch(error => error);
await once(changeStream.cursor, 'init');
await collection.insertOne({ name: 'bailey' });

const error = await willBeError;

expect(error).to.be.instanceOf(MongoServerError);
expect(aggregateEvents).to.have.lengthOf(1);
}
);
});
});

it(
Expand Down

0 comments on commit 5f37cb6

Please sign in to comment.