Skip to content

Commit

Permalink
Merge branch 'master' into NODE-2544/master/fix-connect-after-close
Browse files Browse the repository at this point in the history
  • Loading branch information
HanaPearlman committed Oct 15, 2020
2 parents 4fe0911 + 054838f commit 4c113dd
Show file tree
Hide file tree
Showing 17 changed files with 297 additions and 254 deletions.
183 changes: 66 additions & 117 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import Denque = require('denque');
import { EventEmitter } from 'events';
import { MongoError, AnyError, isResumableError } from './error';
import { Cursor } from './cursor';
import { Cursor, CursorOptions, CursorStream } from './cursor/cursor';
import { AggregateOperation, AggregateOptions } from './operations/aggregate';
import { loadCollection, loadDb, loadMongoClient } from './dynamic_loaders';
import {
Expand All @@ -13,15 +13,14 @@ import {
MongoDBNamespace,
Callback
} from './utils';
import type { CursorOptions } from './cursor/cursor';
import type { ReadPreference } from './read_preference';
import type { Timestamp, Document } from './bson';
import type { Topology } from './sdam/topology';
import type { Writable } from 'stream';
import type { StreamOptions } from './cursor/core_cursor';
import type { OperationParent } from './operations/command';
import type { CollationOptions } from './cmap/wire_protocol/write_command';
import type { CursorStreamOptions } from './cursor/core_cursor';
const kResumeQueue = Symbol('resumeQueue');
const kCursorStream = Symbol('cursorStream');

const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument'];
const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat(
Expand All @@ -34,6 +33,11 @@ const CHANGE_DOMAIN_TYPES = {
CLUSTER: Symbol('Cluster')
};

const NO_RESUME_TOKEN_ERROR = new MongoError(
'A change stream document has been received that lacks a resume token (_id).'
);
const CHANGESTREAM_CLOSED_ERROR = new MongoError('ChangeStream is closed');

/** @public */
export interface ResumeOptions {
startAtOperationTime?: Timestamp;
Expand Down Expand Up @@ -155,6 +159,12 @@ interface UpdateDescription {
removedFields: string[];
}

export class ChangeStreamStream extends CursorStream {
constructor(cursor: ChangeStreamCursor) {
super(cursor);
}
}

/**
* Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}.
* @public
Expand All @@ -168,9 +178,9 @@ export class ChangeStream extends EventEmitter {
topology: Topology;
cursor?: ChangeStreamCursor;
closed: boolean;
pipeDestinations: Writable[] = [];
streamOptions?: StreamOptions;
streamOptions?: CursorStreamOptions;
[kResumeQueue]: Denque;
[kCursorStream]?: CursorStream;

/** @event */
static readonly CLOSE = 'close' as const;
Expand Down Expand Up @@ -239,20 +249,24 @@ export class ChangeStream extends EventEmitter {
this.closed = false;

// Listen for any `change` listeners being added to ChangeStream
this.on('newListener', (eventName: string) => {
this.on('newListener', eventName => {
if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) {
this.cursor.on('data', change => processNewChange(this, change));
streamEvents(this, this.cursor);
}
});

// Listen for all `change` listeners being removed from ChangeStream
this.on('removeListener', (eventName: string) => {
this.on('removeListener', eventName => {
if (eventName === 'change' && this.listenerCount('change') === 0 && this.cursor) {
this.cursor.removeAllListeners('data');
this[kCursorStream]?.removeAllListeners(CursorStream.DATA);
}
});
}

/** @internal */
get cursorStream(): CursorStream | undefined {
return this[kCursorStream];
}

/** The cached resume token that is used to resume after the most recently returned change. */
get resumeToken(): ResumeToken {
return this.cursor?.resumeToken;
Expand Down Expand Up @@ -305,83 +319,24 @@ export class ChangeStream extends EventEmitter {
const cursor = this.cursor;

return cursor.close(err => {
['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event));
endStream(this);
this.cursor = undefined;

return cb(err);
});
});
}

/**
* This method pulls all the data out of a readable stream, and writes it to the supplied destination,
* automatically managing the flow so that the destination is not overwhelmed by a fast readable stream.
*
* @param destination - The destination for writing data
* @param options - {@link https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options| NodeJS Pipe options}
* @throws MongoError if this.cursor is undefined
*/
pipe(destination: Writable, options?: PipeOptions): Writable {
if (!this.pipeDestinations) {
this.pipeDestinations = [];
}
this.pipeDestinations.push(destination);
if (!this.cursor) {
throw new MongoError('ChangeStream has no cursor, unable to pipe');
}
return this.cursor.pipe(destination, options);
}

/**
* This method will remove the hooks set up for a previous pipe() call.
*
* @param destination - The destination for writing data
* @throws MongoError if this.cursor is undefined
*/
unpipe(destination?: Writable): ChangeStreamCursor {
const destinationIndex = destination ? this.pipeDestinations.indexOf(destination) : -1;
if (this.pipeDestinations && destinationIndex > -1) {
this.pipeDestinations.splice(destinationIndex, 1);
}
if (!this.cursor) {
throw new MongoError('ChangeStream has no cursor, unable to unpipe');
}
return this.cursor.unpipe(destination);
}

/**
* Return a modified Readable stream including a possible transform method.
* @throws MongoError if this.cursor is undefined
*/
stream(options?: StreamOptions): ChangeStreamCursor {
stream(options?: CursorStreamOptions): ChangeStreamStream {
this.streamOptions = options;
if (!this.cursor) {
throw new MongoError('ChangeStream has no cursor, unable to stream');
}
return this.cursor.stream(options);
}

/**
* This method will cause a stream in flowing mode to stop emitting data events. Any data that becomes available will remain in the internal buffer.
* @throws MongoError if this.cursor is undefined
*/
pause(): ChangeStreamCursor {
if (!this.cursor) {
throw new MongoError('ChangeStream has no cursor, unable to pause');
}
return this.cursor.pause();
}

/**
* This method will cause the readable stream to resume emitting data events.
* @throws MongoError if this.cursor is undefined
*/
resume(): ChangeStreamCursor {
if (!this.cursor) {
throw new MongoError('ChangeStream has no cursor, unable to resume');
}
return this.cursor.resume();
}
}

/** @public */
Expand Down Expand Up @@ -524,7 +479,6 @@ function createChangeStreamCursor(

const pipeline = [{ $changeStream: changeStreamStageOptions } as Document].concat(self.pipeline);
const cursorOptions = applyKnownOptions({}, options, CURSOR_OPTIONS);

const changeStreamCursor = new ChangeStreamCursor(
self.topology,
new AggregateOperation(self.parent, pipeline, options),
Expand All @@ -533,23 +487,7 @@ function createChangeStreamCursor(

relayEvents(changeStreamCursor, self, ['resumeTokenChanged', 'end', 'close']);

if (self.listenerCount(ChangeStream.CHANGE) > 0) {
changeStreamCursor.on(ChangeStreamCursor.DATA, function (change) {
processNewChange(self, change);
});
}

changeStreamCursor.on(ChangeStream.ERROR, function (error) {
processError(self, error);
});

if (self.pipeDestinations) {
const cursorStream = changeStreamCursor.stream(self.streamOptions);
for (const pipeDestination of self.pipeDestinations) {
cursorStream.pipe(pipeDestination);
}
}

if (self.listenerCount(ChangeStream.CHANGE) > 0) streamEvents(self, changeStreamCursor);
return changeStreamCursor;
}

Expand Down Expand Up @@ -595,28 +533,48 @@ function waitForTopologyConnected(
}, 500); // this is an arbitrary wait time to allow SDAM to transition
}

function closeWithError(changeStream: ChangeStream, error: AnyError, callback?: Callback): void {
if (!callback) changeStream.emit(ChangeStream.ERROR, error);
changeStream.close(() => callback && callback(error));
}

function streamEvents(changeStream: ChangeStream, cursor: ChangeStreamCursor): void {
const stream = changeStream[kCursorStream] || cursor.stream();
changeStream[kCursorStream] = stream;
stream.on(CursorStream.DATA, change => processNewChange(changeStream, change));
stream.on(CursorStream.ERROR, error => processError(changeStream, error));
}

function endStream(changeStream: ChangeStream): void {
const cursorStream = changeStream[kCursorStream];
if (cursorStream) {
[CursorStream.DATA, CursorStream.CLOSE, CursorStream.END, CursorStream.ERROR].forEach(event =>
cursorStream.removeAllListeners(event)
);

cursorStream.destroy();
}

changeStream[kCursorStream] = undefined;
}

function processNewChange(
changeStream: ChangeStream,
change: ChangeStreamDocument,
callback?: Callback
) {
// a null change means the cursor has been notified, implicitly closing the change stream
if (change == null) {
changeStream.closed = true;
}

if (changeStream.closed) {
if (callback) callback(new MongoError('ChangeStream is closed'));
if (callback) callback(CHANGESTREAM_CLOSED_ERROR);
return;
}

if (change && !change._id) {
const noResumeTokenError = new Error(
'A change stream document has been received that lacks a resume token (_id).'
);
// a null change means the cursor has been notified, implicitly closing the change stream
if (change == null) {
return closeWithError(changeStream, CHANGESTREAM_CLOSED_ERROR, callback);
}

if (!callback) return changeStream.emit(ChangeStream.ERROR, noResumeTokenError);
return callback(noResumeTokenError);
if (change && !change._id) {
return closeWithError(changeStream, NO_RESUME_TOKEN_ERROR, callback);
}

// cache the resume token
Expand All @@ -631,7 +589,7 @@ function processNewChange(
return callback(undefined, change);
}

function processError(changeStream: ChangeStream, error?: AnyError, callback?: Callback) {
function processError(changeStream: ChangeStream, error: AnyError, callback?: Callback) {
const topology = changeStream.topology;
const cursor = changeStream.cursor;

Expand All @@ -649,24 +607,15 @@ function processError(changeStream: ChangeStream, error?: AnyError, callback?: C

// otherwise, raise an error and close the change stream
function unresumableError(err: AnyError) {
if (!callback) {
changeStream.emit(ChangeStream.ERROR, err);
changeStream.emit(ChangeStream.CLOSE);
}
processResumeQueue(changeStream, err);
changeStream.closed = true;
if (!callback) changeStream.emit(ChangeStream.ERROR, err);
changeStream.close(() => processResumeQueue(changeStream, err));
}

if (cursor && isResumableError(error as MongoError, maxWireVersion(cursor.server))) {
changeStream.cursor = undefined;

// stop listening to all events from old cursor
[
ChangeStreamCursor.DATA,
ChangeStreamCursor.CLOSE,
ChangeStreamCursor.END,
ChangeStreamCursor.ERROR
].forEach(event => cursor.removeAllListeners(event));
endStream(changeStream);

// close internal cursor, ignore errors
cursor.close();
Expand All @@ -691,8 +640,8 @@ function processError(changeStream: ChangeStream, error?: AnyError, callback?: C
return;
}

if (!callback) return changeStream.emit(ChangeStream.ERROR, error);
return callback(error);
// if initial error wasn't resumable, raise an error and close the change stream
return closeWithError(changeStream, error, callback);
}

/**
Expand Down
5 changes: 2 additions & 3 deletions src/cursor/aggregation_cursor.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { MongoError } from '../error';
import { Cursor, CursorOptions } from './cursor';
import { CursorState } from './core_cursor';
import { deprecate } from 'util';
import type { AggregateOperation, AggregateOptions } from '../operations/aggregate';
import type { Document } from '../bson';
import type { Sort } from '../operations/find';
Expand Down Expand Up @@ -110,8 +109,8 @@ export class AggregationCursor extends Cursor<AggregateOperation, AggregationCur

// deprecated methods
/** @deprecated Add a geoNear stage to the aggregation pipeline */
geoNear = deprecate(($geoNear: Document) => {
geoNear($geoNear: Document): this {
this.operation.addToPipeline({ $geoNear });
return this;
}, 'The `$geoNear` stage is deprecated in MongoDB 4.0, and removed in version 4.2.');
}
}
Loading

0 comments on commit 4c113dd

Please sign in to comment.