Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce AbstractCursor and its concrete subclasses #2619

Merged
merged 14 commits into from
Nov 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
"dependencies": {
"bl": "^2.2.1",
"bson": "^4.0.4",
"denque": "^1.4.1"
"denque": "^1.4.1",
"lodash": "^4.17.20"
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
},
"devDependencies": {
"@istanbuljs/nyc-config-typescript": "^1.0.1",
Expand All @@ -38,6 +39,7 @@
"@types/bl": "^2.1.0",
"@types/bson": "^4.0.2",
"@types/kerberos": "^1.1.0",
"@types/lodash": "^4.14.164",
"@types/node": "^14.6.4",
"@types/saslprep": "^1.0.0",
"@typescript-eslint/eslint-plugin": "^3.10.0",
Expand Down
149 changes: 87 additions & 62 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import Denque = require('denque');
import { EventEmitter } from 'events';
import { MongoError, AnyError, isResumableError } from './error';
import { Cursor, CursorOptions, CursorStream, CursorStreamOptions } from './cursor/cursor';
import { AggregateOperation, AggregateOptions } from './operations/aggregate';
import {
relayEvents,
Expand All @@ -21,9 +20,18 @@ import type { CollationOptions } from './cmap/wire_protocol/write_command';
import { MongoClient } from './mongo_client';
import { Db } from './db';
import { Collection } from './collection';
import type { Readable } from 'stream';
import {
AbstractCursor,
AbstractCursorOptions,
CursorStreamOptions
} from './cursor/abstract_cursor';
import type { ClientSession } from './sessions';
import { executeOperation, ExecutionResult } from './operations/execute_operation';

const kResumeQueue = Symbol('resumeQueue');
const kCursorStream = Symbol('cursorStream');
const kClosed = Symbol('closed');

const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument'];
const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat(
Expand Down Expand Up @@ -162,13 +170,6 @@ interface UpdateDescription {
removedFields: string[];
}

/** @internal */
export class ChangeStreamStream extends CursorStream {
constructor(cursor: ChangeStreamCursor) {
super(cursor);
}
}

/**
* Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}.
* @public
Expand All @@ -180,10 +181,10 @@ export class ChangeStream extends EventEmitter {
namespace: MongoDBNamespace;
type: symbol;
cursor?: ChangeStreamCursor;
closed: boolean;
streamOptions?: CursorStreamOptions;
[kResumeQueue]: Denque;
[kCursorStream]?: CursorStream;
[kCursorStream]?: Readable;
[kClosed]: boolean;

/** @event */
static readonly CLOSE = 'close' as const;
Expand Down Expand Up @@ -241,7 +242,7 @@ export class ChangeStream extends EventEmitter {
// Create contained Change Stream cursor
this.cursor = createChangeStreamCursor(this, options);

this.closed = false;
this[kClosed] = false;

// Listen for any `change` listeners being added to ChangeStream
this.on('newListener', eventName => {
Expand All @@ -252,13 +253,13 @@ export class ChangeStream extends EventEmitter {

this.on('removeListener', eventName => {
if (eventName === 'change' && this.listenerCount('change') === 0 && this.cursor) {
this[kCursorStream]?.removeAllListeners(CursorStream.DATA);
this[kCursorStream]?.removeAllListeners('data');
}
});
}

/** @internal */
get cursorStream(): CursorStream | undefined {
get cursorStream(): Readable | undefined {
return this[kCursorStream];
}

Expand Down Expand Up @@ -296,23 +297,20 @@ export class ChangeStream extends EventEmitter {
}

/** Is the cursor closed */
isClosed(): boolean {
return this.closed || (this.cursor?.isClosed() ?? false);
get closed(): boolean {
return this[kClosed] || (this.cursor?.closed ?? false);
}

/** Close the Change Stream */
close(callback?: Callback): Promise<void> | void {
return maybePromise(callback, cb => {
if (this.closed) return cb();
this[kClosed] = true;

// flag the change stream as explicitly closed
this.closed = true;

if (!this.cursor) return cb();
return maybePromise(callback, cb => {
if (!this.cursor) {
return cb();
}

// Tidy up the existing cursor
const cursor = this.cursor;

return cursor.close(err => {
endStream(this);
this.cursor = undefined;
Expand All @@ -325,7 +323,7 @@ export class ChangeStream extends EventEmitter {
* Return a modified Readable stream including a possible transform method.
* @throws MongoError if this.cursor is undefined
*/
stream(options?: CursorStreamOptions): ChangeStreamStream {
stream(options?: CursorStreamOptions): Readable {
this.streamOptions = options;
if (!this.cursor) {
throw new MongoError('ChangeStream has no cursor, unable to stream');
Expand All @@ -335,28 +333,34 @@ export class ChangeStream extends EventEmitter {
}

/** @public */
export interface ChangeStreamCursorOptions extends CursorOptions {
export interface ChangeStreamCursorOptions extends AbstractCursorOptions {
startAtOperationTime?: OperationTime;
resumeAfter?: ResumeToken;
startAfter?: boolean;
}

/** @internal */
export class ChangeStreamCursor extends Cursor<AggregateOperation, ChangeStreamCursorOptions> {
export class ChangeStreamCursor extends AbstractCursor {
_resumeToken: ResumeToken;
startAtOperationTime?: OperationTime;
hasReceived?: boolean;
resumeAfter: ResumeToken;
startAfter: ResumeToken;
options: ChangeStreamCursorOptions;

postBatchResumeToken?: ResumeToken;
pipeline: Document[];

constructor(
topology: Topology,
operation: AggregateOperation,
options: ChangeStreamCursorOptions
namespace: MongoDBNamespace,
pipeline: Document[] = [],
options: ChangeStreamCursorOptions = {}
) {
super(topology, operation, options);
super(topology, namespace, options);

options = options || {};
this.pipeline = pipeline;
this.options = options;
this._resumeToken = null;
this.startAtOperationTime = options.startAtOperationTime;

Expand Down Expand Up @@ -421,18 +425,28 @@ export class ChangeStreamCursor extends Cursor<AggregateOperation, ChangeStreamC
}
}

_initializeCursor(callback: Callback): void {
super._initializeCursor((err, response) => {
_initialize(session: ClientSession, callback: Callback<ExecutionResult>): void {
const aggregateOperation = new AggregateOperation(
{ s: { namespace: this.namespace } },
this.pipeline,
{
...this.cursorOptions,
...this.options,
session
}
);

executeOperation(this.topology, aggregateOperation, (err, response) => {
if (err || response == null) {
callback(err, response);
return;
return callback(err);
}

const server = aggregateOperation.server;
if (
this.startAtOperationTime == null &&
this.resumeAfter == null &&
this.startAfter == null &&
maxWireVersion(this.server) >= 7
maxWireVersion(server) >= 7
) {
this.startAtOperationTime = response.operationTime;
}
Expand All @@ -441,15 +455,16 @@ export class ChangeStreamCursor extends Cursor<AggregateOperation, ChangeStreamC

this.emit('init', response);
this.emit('response');
callback(err, response);

// TODO: NODE-2882
callback(undefined, { server, session, response });
});
}

_getMore(callback: Callback): void {
super._getMore((err, response) => {
_getMore(batchSize: number, callback: Callback): void {
super._getMore(batchSize, (err, response) => {
if (err) {
callback(err);
return;
return callback(err);
}

this._processBatch('nextBatch', response);
Expand All @@ -466,26 +481,32 @@ export class ChangeStreamCursor extends Cursor<AggregateOperation, ChangeStreamC
* @internal
*/
function createChangeStreamCursor(
self: ChangeStream,
changeStream: ChangeStream,
options: ChangeStreamOptions
): ChangeStreamCursor {
const changeStreamStageOptions: Document = { fullDocument: options.fullDocument || 'default' };
applyKnownOptions(changeStreamStageOptions, options, CHANGE_STREAM_OPTIONS);
if (self.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
if (changeStream.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
changeStreamStageOptions.allChangesForCluster = true;
}

const pipeline = [{ $changeStream: changeStreamStageOptions } as Document].concat(self.pipeline);
const pipeline = [{ $changeStream: changeStreamStageOptions } as Document].concat(
changeStream.pipeline
);

const cursorOptions = applyKnownOptions({}, options, CURSOR_OPTIONS);
const changeStreamCursor = new ChangeStreamCursor(
getTopology(self.parent),
new AggregateOperation(self.parent, pipeline, options),
getTopology(changeStream.parent),
changeStream.namespace,
pipeline,
cursorOptions
);

relayEvents(changeStreamCursor, self, ['resumeTokenChanged', 'end', 'close']);
relayEvents(changeStreamCursor, changeStream, ['resumeTokenChanged', 'end', 'close']);
if (changeStream.listenerCount(ChangeStream.CHANGE) > 0) {
streamEvents(changeStream, changeStreamCursor);
}

if (self.listenerCount(ChangeStream.CHANGE) > 0) streamEvents(self, changeStreamCursor);
return changeStreamCursor;
}

Expand Down Expand Up @@ -532,24 +553,24 @@ function waitForTopologyConnected(
}

function closeWithError(changeStream: ChangeStream, error: AnyError, callback?: Callback): void {
if (!callback) changeStream.emit(ChangeStream.ERROR, error);
if (!callback) {
changeStream.emit(ChangeStream.ERROR, error);
}

changeStream.close(() => callback && callback(error));
}

function streamEvents(changeStream: ChangeStream, cursor: ChangeStreamCursor): void {
const stream = changeStream[kCursorStream] || cursor.stream();
changeStream[kCursorStream] = stream;
stream.on(CursorStream.DATA, change => processNewChange(changeStream, change));
stream.on(CursorStream.ERROR, error => processError(changeStream, error));
stream.on('data', change => processNewChange(changeStream, change));
stream.on('error', error => processError(changeStream, error));
}

function endStream(changeStream: ChangeStream): void {
const cursorStream = changeStream[kCursorStream];
if (cursorStream) {
[CursorStream.DATA, CursorStream.CLOSE, CursorStream.END, CursorStream.ERROR].forEach(event =>
cursorStream.removeAllListeners(event)
);

['data', 'close', 'end', 'error'].forEach(event => cursorStream.removeAllListeners(event));
cursorStream.destroy();
}

Expand All @@ -561,7 +582,7 @@ function processNewChange(
change: ChangeStreamDocument,
callback?: Callback
) {
if (changeStream.closed) {
if (changeStream[kClosed]) {
if (callback) callback(CHANGESTREAM_CLOSED_ERROR);
return;
}
Expand Down Expand Up @@ -591,8 +612,8 @@ function processError(changeStream: ChangeStream, error: AnyError, callback?: Ca
const cursor = changeStream.cursor;

// If the change stream has been closed explicitly, do not process error.
if (changeStream.closed) {
if (callback) callback(new MongoError('ChangeStream is closed'));
if (changeStream[kClosed]) {
if (callback) callback(CHANGESTREAM_CLOSED_ERROR);
return;
}

Expand All @@ -604,7 +625,10 @@ function processError(changeStream: ChangeStream, error: AnyError, callback?: Ca

// otherwise, raise an error and close the change stream
function unresumableError(err: AnyError) {
if (!callback) changeStream.emit(ChangeStream.ERROR, err);
if (!callback) {
changeStream.emit(ChangeStream.ERROR, err);
}

changeStream.close(() => processResumeQueue(changeStream, err));
}

Expand Down Expand Up @@ -648,8 +672,8 @@ function processError(changeStream: ChangeStream, error: AnyError, callback?: Ca
* @param changeStream - the parent ChangeStream
*/
function getCursor(changeStream: ChangeStream, callback: Callback<ChangeStreamCursor>) {
if (changeStream.isClosed()) {
callback(new MongoError('ChangeStream is closed.'));
if (changeStream[kClosed]) {
callback(CHANGESTREAM_CLOSED_ERROR);
return;
}

Expand All @@ -672,10 +696,11 @@ function getCursor(changeStream: ChangeStream, callback: Callback<ChangeStreamCu
function processResumeQueue(changeStream: ChangeStream, err?: Error) {
while (changeStream[kResumeQueue].length) {
const request = changeStream[kResumeQueue].pop();
if (changeStream.isClosed() && !err) {
request(new MongoError('Change Stream is not open.'));
if (changeStream[kClosed] && !err) {
request(CHANGESTREAM_CLOSED_ERROR);
return;
}

request(err, changeStream.cursor);
}
}
Loading