Skip to content

Commit

Permalink
feat: reintroduce clone and rewind for cursors (#2647)
Browse files Browse the repository at this point in the history
* feat: implement rewind support for AbstractCursor subclasses

This reintroduces support for rewinding a cursor to its
uninitialized state.

NODE-2811

* feat: reimplement clone for find and aggregate cursors

NODE-2811
  • Loading branch information
mbroadst committed Dec 2, 2020
1 parent 3e5ff57 commit a5154fb
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 9 deletions.
6 changes: 6 additions & 0 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,12 @@ export class ChangeStreamCursor extends AbstractCursor {
}
}

clone(): ChangeStreamCursor {
return new ChangeStreamCursor(this.topology, this.namespace, this.pipeline, {
...this.cursorOptions
});
}

_initialize(session: ClientSession, callback: Callback<ExecutionResult>): void {
const aggregateOperation = new AggregateOperation(
{ s: { namespace: this.namespace } },
Expand Down
34 changes: 33 additions & 1 deletion src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,38 @@ export abstract class AbstractCursor extends EventEmitter {
return this;
}

/**
* Rewind this cursor to its uninitialized state. Any options that are present on the cursor will
* remain in effect. Iterating this cursor will cause new queries to be sent to the server, even
* if the resultant data has already been retrieved by this cursor.
*/
rewind(): void {
if (!this[kInitialized]) {
return;
}

this[kId] = undefined;
this[kDocuments] = [];
this[kClosed] = false;
this[kKilled] = false;
this[kInitialized] = false;

const session = this[kSession];
if (session) {
// We only want to end this session if we created it, and it hasn't ended yet
if (session.explicit === false && !session.hasEnded) {
session.endSession();
}

this[kSession] = undefined;
}
}

/**
* Returns a new uninitialized copy of this cursor, with options matching those that have been set on the current instance
*/
abstract clone(): AbstractCursor;

/* @internal */
abstract _initialize(
session: ClientSession | undefined,
Expand Down Expand Up @@ -579,7 +611,7 @@ function next(
if (cursorId == null) {
// All cursors must operate within a session, one must be made implicitly if not explicitly provided
if (cursor[kSession] == null && cursor[kTopology].hasSessionSupport()) {
cursor[kSession] = cursor[kTopology].startSession({ owner: cursor, explicit: true });
cursor[kSession] = cursor[kTopology].startSession({ owner: cursor, explicit: false });
}

cursor._initialize(cursor[kSession], (err, state) => {
Expand Down
7 changes: 7 additions & 0 deletions src/cursor/aggregation_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ export class AggregationCursor extends AbstractCursor {
return this[kPipeline];
}

clone(): AggregationCursor {
return new AggregationCursor(this[kParent], this.topology, this.namespace, this[kPipeline], {
...this[kOptions],
...this.cursorOptions
});
}

/** @internal */
_initialize(session: ClientSession | undefined, callback: Callback<ExecutionResult>): void {
const aggregateOperation = new AggregateOperation(this[kParent], this[kPipeline], {
Expand Down
7 changes: 7 additions & 0 deletions src/cursor/find_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ export class FindCursor extends AbstractCursor {
}
}

clone(): FindCursor {
return new FindCursor(this.topology, this.namespace, this[kFilter], {
...this[kBuiltOptions],
...this.cursorOptions
});
}

/** @internal */
_initialize(session: ClientSession | undefined, callback: Callback<ExecutionResult>): void {
const findOperation = new FindOperation(undefined, this.namespace, this[kFilter], {
Expand Down
7 changes: 7 additions & 0 deletions src/operations/indexes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,13 @@ export class ListIndexesCursor extends AbstractCursor {
this.options = options;
}

clone(): ListIndexesCursor {
return new ListIndexesCursor(this.parent, {
...this.options,
...this.cursorOptions
});
}

/** @internal */
_initialize(session: ClientSession | undefined, callback: Callback<ExecutionResult>): void {
const operation = new ListIndexesOperation(this.parent, {
Expand Down
7 changes: 7 additions & 0 deletions src/operations/list_collections.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ export class ListCollectionsCursor extends AbstractCursor {
this.options = options;
}

clone(): ListCollectionsCursor {
return new ListCollectionsCursor(this.parent, this.filter, {
...this.options,
...this.cursorOptions
});
}

_initialize(session: ClientSession | undefined, callback: Callback<ExecutionResult>): void {
const operation = new ListCollectionsOperation(this.parent, this.filter, {
...this.cursorOptions,
Expand Down
131 changes: 125 additions & 6 deletions test/functional/abstract_cursor.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ describe('AbstractCursor', function () {
before(
withClientV2((client, done) => {
const docs = [{ a: 1 }, { a: 2 }, { a: 3 }, { a: 4 }, { a: 5 }, { a: 6 }];
const coll = client.db().collection('find_cursor');
const coll = client.db().collection('abstract_cursor');
const tryNextColl = client.db().collection('try_next');
coll.drop(() => tryNextColl.drop(() => coll.insertMany(docs, done)));
})
Expand All @@ -19,7 +19,7 @@ describe('AbstractCursor', function () {
const commands = [];
client.on('commandStarted', filterForCommands(['getMore'], commands));

const coll = client.db().collection('find_cursor');
const coll = client.db().collection('abstract_cursor');
const cursor = coll.find({}, { batchSize: 2 });
this.defer(() => cursor.close());

Expand All @@ -40,7 +40,7 @@ describe('AbstractCursor', function () {
const commands = [];
client.on('commandStarted', filterForCommands(['killCursors'], commands));

const coll = client.db().collection('find_cursor');
const coll = client.db().collection('abstract_cursor');
const cursor = coll.find({}, { batchSize: 2 });
cursor.next(err => {
expect(err).to.not.exist;
Expand All @@ -59,7 +59,7 @@ describe('AbstractCursor', function () {
const commands = [];
client.on('commandStarted', filterForCommands(['killCursors'], commands));

const coll = client.db().collection('find_cursor');
const coll = client.db().collection('abstract_cursor');
const cursor = coll.find({}, { batchSize: 2 });
cursor.toArray(err => {
expect(err).to.not.exist;
Expand All @@ -79,7 +79,7 @@ describe('AbstractCursor', function () {
const commands = [];
client.on('commandStarted', filterForCommands(['killCursors'], commands));

const coll = client.db().collection('find_cursor');
const coll = client.db().collection('abstract_cursor');
const cursor = coll.find({}, { batchSize: 2 });
cursor.close(err => {
expect(err).to.not.exist;
Expand All @@ -94,7 +94,7 @@ describe('AbstractCursor', function () {
it(
'should iterate each document in a cursor',
withClientV2(function (client, done) {
const coll = client.db().collection('find_cursor');
const coll = client.db().collection('abstract_cursor');
const cursor = coll.find({}, { batchSize: 2 });

const bag = [];
Expand Down Expand Up @@ -143,4 +143,123 @@ describe('AbstractCursor', function () {
})
);
});

context('#clone', function () {
it(
'should clone a find cursor',
withClientV2(function (client, done) {
const coll = client.db().collection('abstract_cursor');
const cursor = coll.find({});
this.defer(() => cursor.close());

cursor.toArray((err, docs) => {
expect(err).to.not.exist;
expect(docs).to.have.length(6);
expect(cursor).property('closed').to.be.true;

const clonedCursor = cursor.clone();
this.defer(() => clonedCursor.close());

clonedCursor.toArray((err, docs) => {
expect(err).to.not.exist;
expect(docs).to.have.length(6);
expect(clonedCursor).property('closed').to.be.true;
done();
});
});
})
);

it(
'should clone an aggregate cursor',
withClientV2(function (client, done) {
const coll = client.db().collection('abstract_cursor');
const cursor = coll.aggregate([{ $match: {} }]);
this.defer(() => cursor.close());

cursor.toArray((err, docs) => {
expect(err).to.not.exist;
expect(docs).to.have.length(6);
expect(cursor).property('closed').to.be.true;

const clonedCursor = cursor.clone();
this.defer(() => clonedCursor.close());

clonedCursor.toArray((err, docs) => {
expect(err).to.not.exist;
expect(docs).to.have.length(6);
expect(clonedCursor).property('closed').to.be.true;
done();
});
});
})
);
});

context('#rewind', function () {
it(
'should rewind a cursor',
withClientV2(function (client, done) {
const coll = client.db().collection('abstract_cursor');
const cursor = coll.find({});
this.defer(() => cursor.close());

cursor.toArray((err, docs) => {
expect(err).to.not.exist;
expect(docs).to.have.length(6);

cursor.rewind();
cursor.toArray((err, docs) => {
expect(err).to.not.exist;
expect(docs).to.have.length(6);

done();
});
});
})
);

it('should end an implicit session on rewind', {
metadata: { requires: { mongodb: '>=3.6' } },
test: withClientV2(function (client, done) {
const coll = client.db().collection('abstract_cursor');
const cursor = coll.find({}, { batchSize: 1 });
this.defer(() => cursor.close());

cursor.next((err, doc) => {
expect(err).to.not.exist;
expect(doc).to.exist;

const session = cursor.session;
expect(session).property('hasEnded').to.be.false;
cursor.rewind();
expect(session).property('hasEnded').to.be.true;
done();
});
})
});

it('should not end an explicit session on rewind', {
metadata: { requires: { mongodb: '>=3.6' } },
test: withClientV2(function (client, done) {
const coll = client.db().collection('abstract_cursor');
const session = client.startSession();

const cursor = coll.find({}, { batchSize: 1, session });
this.defer(() => cursor.close());

cursor.next((err, doc) => {
expect(err).to.not.exist;
expect(doc).to.exist;

const session = cursor.session;
expect(session).property('hasEnded').to.be.false;
cursor.rewind();
expect(session).property('hasEnded').to.be.false;

session.endSession(done);
});
})
});
});
});
3 changes: 1 addition & 2 deletions test/functional/operation_example.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4908,8 +4908,7 @@ describe('Operation Examples', function () {
* @example-class Cursor
* @example-method rewind
*/
// NOTE: unclear whether we should continue to support `rewind`
it.skip('Should correctly rewind and restart cursor', {
it('Should correctly rewind and restart cursor', {
// Add a tag that our runner can trigger on
// in this case we are setting that node needs to be higher than 0.10.X to run
metadata: {
Expand Down

0 comments on commit a5154fb

Please sign in to comment.