Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-4125): change stream resumability #3289

Merged
merged 31 commits into from
Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
2dceb56
test: add resumabilty tests for change streams
baileympearson Jun 3, 2022
94a4853
fix: use server selection in change stream resume process
baileympearson Jun 5, 2022
881dcf2
refactor: break apart change stream error handling for iterator / eve…
baileympearson Jun 5, 2022
1a0d82b
fix: fix change stream resumability for iterator mode
baileympearson Jun 5, 2022
45a56d2
docs: add note clarifying that ChangeStream.stream does not resume
baileympearson Jun 13, 2022
b7d2481
fix: lint error
baileympearson Jun 13, 2022
44635ba
refactor: prevent ChangeStream.cursor from ever being undefined
baileympearson Jun 14, 2022
6ee61da
fix: cache maxWireVersion on cursor for resumability check
baileympearson Jun 14, 2022
f65cc73
test: prevent tests from running on standalone servers
baileympearson Jun 15, 2022
28e1b77
test: fix tryNext tests on sharded clusters
baileympearson Jun 15, 2022
ab155d3
chore(NODE-4125): remove cursor checks
durran Jun 16, 2022
5647d0b
fix: add error when cursor is closed in stream()
baileympearson Jun 16, 2022
031043a
chore: mark new error handling functions internal
baileympearson Jun 16, 2022
b85dc73
refactor: break apart closeWithError logic into stream/iterator methods
baileympearson Jun 16, 2022
68a184c
chore: add proper overloads to the close method
baileympearson Jun 16, 2022
3239fe1
fix: consolidate change handling
baileympearson Jun 16, 2022
2fb5b7c
fix: handle promise rejections appropriately in change stream iterato…
baileympearson Jun 16, 2022
315343e
chore: misc fixes
baileympearson Jun 17, 2022
d0d0641
chore: add comment explaining why shadowing is necessary in maybeProm…
baileympearson Jun 17, 2022
51a2907
test: add better testing for maxwireVersion caching on cursor
baileympearson Jun 17, 2022
e20fe96
chore: mark new functions internal && remove unnecessary function
baileympearson Jun 17, 2022
56dbbef
chore: rename iterator mode error handling function
baileympearson Jun 17, 2022
8414b36
chore: address comments in tests
baileympearson Jun 17, 2022
14f3fc2
chore: use better name for errors in error handling functions
baileympearson Jun 17, 2022
50980b8
fix: promisify server selection and iterator mode error handling
baileympearson Jun 17, 2022
6eb5537
Revert "fix: promisify server selection and iterator mode error handl…
baileympearson Jun 17, 2022
537cadf
chore: add comment about promisifying select server
baileympearson Jun 17, 2022
8f99a98
fix: use util client in test to clear database
baileympearson Jun 21, 2022
fa3851f
chore: fix typo in test name
baileympearson Jun 21, 2022
5e18998
fix: typo in dbName when creating collection for tests
baileympearson Jun 21, 2022
f848edc
chore: update chai assertion
baileympearson Jun 21, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
239 changes: 82 additions & 157 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import Denque = require('denque');
import type { Readable } from 'stream';
import { setTimeout } from 'timers';
import { promisify } from 'util';

import type { Binary, Document, Timestamp } from './bson';
import { Collection } from './collection';
Expand All @@ -20,21 +19,9 @@ import { InferIdType, TypedEventEmitter } from './mongo_types';
import type { AggregateOptions } from './operations/aggregate';
import type { CollationOptions, OperationParent } from './operations/command';
import type { ReadPreference } from './read_preference';
import type { Topology } from './sdam/topology';
import type { ServerSessionId } from './sessions';
import {
calculateDurationInMs,
Callback,
filterOptions,
getTopology,
maxWireVersion,
maybePromise,
MongoDBNamespace,
now
} from './utils';
import { Callback, filterOptions, getTopology, maybePromise, MongoDBNamespace } from './utils';

/** @internal */
const kResumeQueue = Symbol('resumeQueue');
/** @internal */
const kCursorStream = Symbol('cursorStream');
/** @internal */
Expand All @@ -57,14 +44,6 @@ const CHANGE_DOMAIN_TYPES = {
CLUSTER: Symbol('Cluster')
};

interface TopologyWaitOptions {
start?: number;
timeout?: number;
readPreference?: ReadPreference;
}

const SELECTION_TIMEOUT = 30000;

const CHANGE_STREAM_EVENTS = [RESUME_TOKEN_CHANGED, END, CLOSE];

const NO_RESUME_TOKEN_ERROR =
Expand Down Expand Up @@ -548,11 +527,9 @@ export class ChangeStream<
namespace: MongoDBNamespace;
type: symbol;
/** @internal */
cursor: ChangeStreamCursor<TSchema, TChange> | undefined;
cursor: ChangeStreamCursor<TSchema, TChange>;
streamOptions?: CursorStreamOptions;
/** @internal */
[kResumeQueue]: Denque<Callback<ChangeStreamCursor<TSchema, TChange>>>;
/** @internal */
[kCursorStream]?: Readable & AsyncIterable<TChange>;
/** @internal */
[kClosed]: boolean;
Expand Down Expand Up @@ -618,8 +595,6 @@ export class ChangeStream<
this.options.readPreference = parent.readPreference;
}

this[kResumeQueue] = new Denque();

// Create contained Change Stream cursor
this.cursor = this._createChangeStreamCursor(options);

Expand Down Expand Up @@ -655,11 +630,23 @@ export class ChangeStream<
hasNext(callback: Callback<boolean>): void;
hasNext(callback?: Callback): Promise<boolean> | void {
this._setIsIterator();
return maybePromise(callback, cb => {
this._getCursor((err, cursor) => {
if (err || !cursor) return cb(err); // failed to resume, raise an error
cursor.hasNext(cb);
});
return maybePromise(callback, callback => {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
(async () => {
if (!this.cursor) return callback(new MongoChangeStreamError(NO_CURSOR_ERROR));
durran marked this conversation as resolved.
Show resolved Hide resolved
try {
const hasNext = await this.cursor.hasNext();
return callback(undefined, hasNext);
} catch (error) {
const errorOnResume = await this._processErrorAsync(error).catch(err => err);
dariakp marked this conversation as resolved.
Show resolved Hide resolved
if (errorOnResume) return callback(errorOnResume);
try {
const hasNext = await this.cursor.hasNext();
dariakp marked this conversation as resolved.
Show resolved Hide resolved
return callback(undefined, hasNext);
} catch (error) {
this._closeWithError(error, callback);
}
}
})();
});
}

Expand All @@ -668,18 +655,23 @@ export class ChangeStream<
next(callback: Callback<TChange>): void;
next(callback?: Callback<TChange>): Promise<TChange> | void {
this._setIsIterator();
return maybePromise(callback, cb => {
this._getCursor((err, cursor) => {
if (err || !cursor) return cb(err); // failed to resume, raise an error
cursor.next((error, change) => {
if (error) {
this[kResumeQueue].push(() => this.next(cb));
this._processError(error, cb);
return;
return maybePromise(callback, callback => {
(async () => {
if (!this.cursor) return callback(new MongoChangeStreamError(NO_CURSOR_ERROR));
durran marked this conversation as resolved.
Show resolved Hide resolved
try {
const change = await this.cursor.next();
this._processNewChange(change ?? null, callback);
} catch (error) {
const errorOnResume = await this._processErrorAsync(error).catch(err => err);
if (errorOnResume) return callback(errorOnResume);
try {
const change = await this.cursor.next();
this._processNewChange(change ?? null, callback);
} catch (error) {
this._closeWithError(error, callback);
}
this._processNewChange(change ?? null, cb);
});
});
}
})();
});
}

Expand All @@ -690,11 +682,23 @@ export class ChangeStream<
tryNext(callback: Callback<Document | null>): void;
tryNext(callback?: Callback<Document | null>): Promise<Document | null> | void {
this._setIsIterator();
return maybePromise(callback, cb => {
this._getCursor((err, cursor) => {
if (err || !cursor) return cb(err); // failed to resume, raise an error
return cursor.tryNext(cb);
});
return maybePromise(callback, callback => {
(async () => {
if (!this.cursor) return callback(new MongoChangeStreamError(NO_CURSOR_ERROR));
durran marked this conversation as resolved.
Show resolved Hide resolved
try {
const change = await this.cursor.tryNext();
callback(undefined, change ?? null);
} catch (error) {
const errorOnResume = await this._processErrorAsync(error).catch(err => err);
if (errorOnResume) return callback(errorOnResume);
try {
const change = await this.cursor.tryNext();
callback(undefined, change ?? null);
} catch (error) {
this._closeWithError(error, callback);
}
}
})();
});
}

Expand All @@ -715,14 +719,17 @@ export class ChangeStream<
const cursor = this.cursor;
return cursor.close(err => {
this._endStream();
this.cursor = undefined;
return cb(err);
});
});
}

/**
* Return a modified Readable stream including a possible transform method.
*
* NOTE: When using a Stream to process change stream events, the stream will
* NOT automatically resume in the case a resumable error is encountered.
*
* @throws MongoDriverError if this.cursor is undefined
*/
stream(options?: CursorStreamOptions): Readable & AsyncIterable<TChange> {
Expand Down Expand Up @@ -800,36 +807,6 @@ export class ChangeStream<
return changeStreamCursor;
}

/**
* This method performs a basic server selection loop, satisfying the requirements of
* ChangeStream resumability until the new SDAM layer can be used.
* @internal
*/
private _waitForTopologyConnected(
topology: Topology,
options: TopologyWaitOptions,
callback: Callback
) {
setTimeout(() => {
if (options && options.start == null) {
options.start = now();
}

const start = options.start || now();
const timeout = options.timeout || SELECTION_TIMEOUT;
if (topology.isConnected()) {
return callback();
}

if (calculateDurationInMs(start) > timeout) {
// TODO(NODE-3497): Replace with MongoNetworkTimeoutError
return callback(new MongoRuntimeError('Timed out waiting for connection'));
}

this._waitForTopologyConnected(topology, options, callback);
}, 500); // this is an arbitrary wait time to allow SDAM to transition
}

/** @internal */
private _closeWithError(error: AnyError, callback?: Callback): void {
if (!callback) {
Expand All @@ -845,7 +822,7 @@ export class ChangeStream<
const stream = this[kCursorStream] ?? cursor.stream();
this[kCursorStream] = stream;
stream.on('data', change => this._processNewChange(change));
stream.on('error', error => this._processError(error));
stream.on('error', error => this._processErrorStreamMode(error));
}

/** @internal */
Expand Down Expand Up @@ -889,98 +866,46 @@ export class ChangeStream<
return callback(undefined, change);
}

/** @internal */
private _processError(error: AnyError, callback?: Callback) {
const cursor = this.cursor;

private _processErrorStreamMode(error: AnyError) {
durran marked this conversation as resolved.
Show resolved Hide resolved
// If the change stream has been closed explicitly, do not process error.
if (this[kClosed]) {
// TODO(NODE-3485): Replace with MongoChangeStreamClosedError
if (callback) callback(new MongoAPIError(CHANGESTREAM_CLOSED_ERROR));
return;
}

// if the resume succeeds, continue with the new cursor
const resumeWithCursor = (newCursor: ChangeStreamCursor<TSchema, TChange>) => {
this.cursor = newCursor;
this._processResumeQueue();
};
if (this[kClosed]) return;

if (cursor && isResumableError(error, maxWireVersion(cursor.server))) {
this.cursor = undefined;

// stop listening to all events from old cursor
if (this.cursor && isResumableError(error, this.cursor.maxWireVersion)) {
this._endStream();

// close internal cursor, ignore errors
cursor.close();
this.cursor.close();

const topology = getTopology(this.parent);
this._waitForTopologyConnected(topology, { readPreference: cursor.readPreference }, err => {
// if the topology can't reconnect, close the stream
if (err) return this._closeWithError(err, callback);

// create a new cursor, preserving the old cursor's options
const newCursor = this._createChangeStreamCursor(cursor.resumeOptions);

// attempt to continue in emitter mode
if (!callback) return resumeWithCursor(newCursor);

// attempt to continue in iterator mode
newCursor.hasNext(err => {
// if there's an error immediately after resuming, close the stream
if (err) return this._closeWithError(err);
resumeWithCursor(newCursor);
});
topology.selectServer(this.cursor.readPreference, {}, err => {
if (err) return this._closeWithError(err);
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);
});
return;
} else {
this._closeWithError(error);
}

// if initial error wasn't resumable, raise an error and close the change stream
return this._closeWithError(error, callback);
}

private _processErrorAsync = promisify(this._processErrorIteratorMode);

/** @internal */
private _getCursor(callback: Callback<ChangeStreamCursor<TSchema, TChange>>) {
private _processErrorIteratorMode(error: AnyError, callback: Callback) {
if (this[kClosed]) {
// TODO(NODE-3485): Replace with MongoChangeStreamClosedError
callback(new MongoAPIError(CHANGESTREAM_CLOSED_ERROR));
return;
return callback(new MongoAPIError(CHANGESTREAM_CLOSED_ERROR));
}

// if a cursor exists and it is open, return it
if (this.cursor) {
callback(undefined, this.cursor);
return;
}
if (this.cursor && isResumableError(error, this.cursor.maxWireVersion)) {
this.cursor.close();

// no cursor, queue callback until topology reconnects
this[kResumeQueue].push(callback);
}
const topology = getTopology(this.parent);
topology.selectServer(this.cursor.readPreference, {}, err => {
// if the topology can't reconnect, close the stream
if (err) return this._closeWithError(err, callback);

/**
* Drain the resume queue when a new has become available
* @internal
*
* @param error - error getting a new cursor
*/
private _processResumeQueue(error?: Error) {
while (this[kResumeQueue].length) {
const request = this[kResumeQueue].pop();
if (!request) break; // Should never occur but TS can't use the length check in the while condition

if (!error) {
if (this[kClosed]) {
// TODO(NODE-3485): Replace with MongoChangeStreamClosedError
request(new MongoAPIError(CHANGESTREAM_CLOSED_ERROR));
return;
}
if (!this.cursor) {
request(new MongoChangeStreamError(NO_CURSOR_ERROR));
return;
}
}
request(error, this.cursor ?? undefined);
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);
callback();
});
} else {
this._closeWithError(error, callback);
}
}
}
12 changes: 11 additions & 1 deletion src/cursor/change_stream_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ export class ChangeStreamCursor<
postBatchResumeToken?: ResumeToken;
pipeline: Document[];

/**
* @internal
*
* used to determine change stream resumability
*/
maxWireVersion: number | undefined;

constructor(
client: MongoClient,
namespace: MongoDBNamespace,
Expand Down Expand Up @@ -148,11 +155,13 @@ export class ChangeStreamCursor<
}

const server = aggregateOperation.server;
this.maxWireVersion = maxWireVersion(server);

if (
this.startAtOperationTime == null &&
this.resumeAfter == null &&
this.startAfter == null &&
maxWireVersion(server) >= 7
this.maxWireVersion >= 7
) {
this.startAtOperationTime = response.operationTime;
}
Expand All @@ -174,6 +183,7 @@ export class ChangeStreamCursor<
return callback(err);
}

this.maxWireVersion = maxWireVersion(this.server);
this._processBatch(response as TODO_NODE_3286 as ChangeStreamAggregateRawResult<TChange>);

this.emit(ChangeStream.MORE, response);
Expand Down