Skip to content

Commit

Permalink
fix: use server selection in change stream resume process
Browse files Browse the repository at this point in the history
This commit removes a custom function we had to wait until the topology
was reconnected in favor of performing server selection.  This is described
here: https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#resume-process
  • Loading branch information
baileympearson committed Jun 15, 2022
1 parent 2dceb56 commit 94a4853
Showing 1 changed file with 2 additions and 44 deletions.
46 changes: 2 additions & 44 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import Denque = require('denque');
import type { Readable } from 'stream';
import { setTimeout } from 'timers';

import type { Binary, Document, Timestamp } from './bson';
import { Collection } from './collection';
Expand All @@ -20,17 +19,14 @@ import { InferIdType, TypedEventEmitter } from './mongo_types';
import type { AggregateOptions } from './operations/aggregate';
import type { CollationOptions, OperationParent } from './operations/command';
import type { ReadPreference } from './read_preference';
import type { Topology } from './sdam/topology';
import type { ServerSessionId } from './sessions';
import {
calculateDurationInMs,
Callback,
filterOptions,
getTopology,
maxWireVersion,
maybePromise,
MongoDBNamespace,
now
MongoDBNamespace
} from './utils';

/** @internal */
Expand All @@ -57,14 +53,6 @@ const CHANGE_DOMAIN_TYPES = {
CLUSTER: Symbol('Cluster')
};

interface TopologyWaitOptions {
start?: number;
timeout?: number;
readPreference?: ReadPreference;
}

const SELECTION_TIMEOUT = 30000;

const CHANGE_STREAM_EVENTS = [RESUME_TOKEN_CHANGED, END, CLOSE];

const NO_RESUME_TOKEN_ERROR =
Expand Down Expand Up @@ -800,36 +788,6 @@ export class ChangeStream<
return changeStreamCursor;
}

/**
* This method performs a basic server selection loop, satisfying the requirements of
* ChangeStream resumability until the new SDAM layer can be used.
* @internal
*/
private _waitForTopologyConnected(
topology: Topology,
options: TopologyWaitOptions,
callback: Callback
) {
setTimeout(() => {
if (options && options.start == null) {
options.start = now();
}

const start = options.start || now();
const timeout = options.timeout || SELECTION_TIMEOUT;
if (topology.isConnected()) {
return callback();
}

if (calculateDurationInMs(start) > timeout) {
// TODO(NODE-3497): Replace with MongoNetworkTimeoutError
return callback(new MongoRuntimeError('Timed out waiting for connection'));
}

this._waitForTopologyConnected(topology, options, callback);
}, 500); // this is an arbitrary wait time to allow SDAM to transition
}

/** @internal */
private _closeWithError(error: AnyError, callback?: Callback): void {
if (!callback) {
Expand Down Expand Up @@ -916,7 +874,7 @@ export class ChangeStream<
cursor.close();

const topology = getTopology(this.parent);
this._waitForTopologyConnected(topology, { readPreference: cursor.readPreference }, err => {
topology.selectServer(cursor.readPreference, {}, err => {
// if the topology can't reconnect, close the stream
if (err) return this._closeWithError(err, callback);

Expand Down

0 comments on commit 94a4853

Please sign in to comment.