Skip to content

Commit

Permalink
feat!: adds async iterator for custom promises
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas Reggi committed Oct 20, 2020
1 parent 8aad134 commit 16d6572
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 73 deletions.
14 changes: 13 additions & 1 deletion src/cursor/cursor.ts
Expand Up @@ -9,6 +9,7 @@ import { CountOperation, CountOptions } from '../operations/count';
import { ReadPreference, ReadPreferenceLike } from '../read_preference';
import { Callback, emitDeprecatedOptionWarning, maybePromise, MongoDBNamespace } from '../utils';
import { Sort, SortDirection, formatSort } from '../sort';
import { PromiseProvider } from '../promise_provider';
import type { OperationTime, ResumeToken } from '../change_stream';
import type { CloseOptions } from '../cmap/connection_pool';
import type { CollationOptions } from '../cmap/wire_protocol/write_command';
Expand Down Expand Up @@ -1227,7 +1228,7 @@ export class Cursor<
}

/** Close the cursor, sending a KillCursor command and emitting close. */
close(): void;
close(): Promise<void>;
close(callback: Callback): void;
close(options: CursorCloseOptions): Promise<void>;
close(options: CursorCloseOptions, callback: Callback): void;
Expand Down Expand Up @@ -1320,6 +1321,17 @@ export class Cursor<
return this.logger;
}

[Symbol.asyncIterator](): AsyncIterator<Document> {
const Promise = PromiseProvider.get();
return {
next: () => {
if (this.isClosed()) {
return Promise.resolve({ value: null, done: true });
}
return this.next().then(value => ({ value, done: value === null }));
}
};
}
// Internal methods

/** @internal */
Expand Down
7 changes: 3 additions & 4 deletions src/mongo_client.ts
Expand Up @@ -273,6 +273,9 @@ export class MongoClient extends EventEmitter implements OperationParent {

if (options && options.promiseLibrary) {
PromiseProvider.set(options.promiseLibrary);
// TODO NODE-2530: this will go away when client options are sorted out
// NOTE: need this to prevent deprecation notice from being inherited in Db, Collection
delete options.promiseLibrary;
}

// The internal state
Expand Down Expand Up @@ -440,10 +443,6 @@ export class MongoClient extends EventEmitter implements OperationParent {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};

if (options && options.promiseLibrary) {
PromiseProvider.set(options.promiseLibrary);
}

// Create client
const mongoClient = new MongoClient(url, options);
// Execute the connect method
Expand Down
179 changes: 111 additions & 68 deletions test/node-next/es2018/cursor_async_iterator.test.js
@@ -1,91 +1,134 @@
'use strict';

const { expect } = require('chai');
const Sinon = require('sinon');

// TODO: unskip as part of NODE-2590
describe.skip('Cursor Async Iterator Tests', function () {
let client, collection;
before(async function () {
client = this.configuration.newClient();
describe('Cursor Async Iterator Tests', function () {
context('default promise library', function () {
let client, collection;
before(async function () {
client = this.configuration.newClient();

await client.connect();
const docs = Array.from({ length: 1000 }).map((_, index) => ({ foo: index, bar: 1 }));
await client.connect();
const docs = Array.from({ length: 1000 }).map((_, index) => ({ foo: index, bar: 1 }));

collection = client.db(this.configuration.db).collection('async_cursor_tests');
collection = client.db(this.configuration.db).collection('async_cursor_tests');

await collection.deleteMany({});
await collection.insertMany(docs);
await client.close();
});
await collection.deleteMany({});
await collection.insertMany(docs);
await client.close();
});

beforeEach(async function () {
client = this.configuration.newClient();
await client.connect();
collection = client.db(this.configuration.db).collection('async_cursor_tests');
});
beforeEach(async function () {
client = this.configuration.newClient();
await client.connect();
collection = client.db(this.configuration.db).collection('async_cursor_tests');
});

afterEach(() => client.close());

afterEach(() => client.close());
it('should be able to use a for-await loop on a find command cursor', {
metadata: { requires: { node: '>=10.5.0' } },
test: async function () {
const cursor = collection.find({ bar: 1 });

it('should be able to use a for-await loop on a find command cursor', {
metadata: { requires: { node: '>=10.5.0' } },
test: async function () {
const cursor = collection.find({ bar: 1 });
let counter = 0;
for await (const doc of cursor) {
expect(doc).to.have.property('bar', 1);
counter += 1;
}

let counter = 0;
for await (const doc of cursor) {
expect(doc).to.have.property('bar', 1);
counter += 1;
expect(counter).to.equal(1000);
}
});

expect(counter).to.equal(1000);
}
});
it('should be able to use a for-await loop on an aggregation cursor', {
metadata: { requires: { node: '>=10.5.0' } },
test: async function () {
const cursor = collection.aggregate([{ $match: { bar: 1 } }]);

it('should be able to use a for-await loop on an aggregation cursor', {
metadata: { requires: { node: '>=10.5.0' } },
test: async function () {
const cursor = collection.aggregate([{ $match: { bar: 1 } }]);
let counter = 0;
for await (const doc of cursor) {
expect(doc).to.have.property('bar', 1);
counter += 1;
}

let counter = 0;
for await (const doc of cursor) {
expect(doc).to.have.property('bar', 1);
counter += 1;
expect(counter).to.equal(1000);
}

expect(counter).to.equal(1000);
}
});

it('should be able to use a for-await loop on a command cursor', {
metadata: { requires: { node: '>=10.5.0', mongodb: '>=3.0.0' } },
test: async function () {
const cursor1 = collection.listIndexes();
const cursor2 = collection.listIndexes();

const indexes = await cursor1.toArray();
let counter = 0;
for await (const doc of cursor2) {
expect(doc).to.exist;
counter += 1;
});

it('should be able to use a for-await loop on a command cursor', {
metadata: { requires: { node: '>=10.5.0', mongodb: '>=3.0.0' } },
test: async function () {
const cursor1 = collection.listIndexes();
const cursor2 = collection.listIndexes();

const indexes = await cursor1.toArray();
let counter = 0;
for await (const doc of cursor2) {
expect(doc).to.exist;
counter += 1;
}

expect(counter).to.equal(indexes.length);
}
});

expect(counter).to.equal(indexes.length);
}
});
it('should properly stop when cursor is closed', {
metadata: { requires: { node: '>=10.5.0' } },
test: async function () {
const cursor = collection.find();

it('should properly stop when cursor is closed', {
metadata: { requires: { node: '>=10.5.0' } },
test: async function () {
const cursor = collection.find();
let count = 0;
for await (const doc of cursor) {
expect(doc).to.exist;
count++;
await cursor.close();
}

let count = 0;
for await (const doc of cursor) {
expect(doc).to.exist;
count++;
await cursor.close();
expect(count).to.equal(1);
}

expect(count).to.equal(1);
}
});
});
context('custom promise library', () => {
let client, collection, promiseSpy;
before(async function () {
class CustomPromise extends Promise {}
promiseSpy = Sinon.spy(CustomPromise.prototype, 'then');
client = this.configuration.newClient({}, { promiseLibrary: CustomPromise });

await client.connect();
const docs = Array.from({ length: 1 }).map((_, index) => ({ foo: index, bar: 1 }));

collection = client.db(this.configuration.db).collection('async_cursor_tests');

await collection.deleteMany({});
await collection.insertMany(docs);
await client.close();
});

beforeEach(async function () {
client = this.configuration.newClient();
await client.connect();
collection = client.db(this.configuration.db).collection('async_cursor_tests');
});

afterEach(() => {
promiseSpy.restore();
return client.close();
});

it('should properly use custom promise', {
metadata: { requires: { node: '>=10.5.0' } },
test: async function () {
const cursor = collection.find();
const countBeforeIteration = promiseSpy.callCount;
for await (const doc of cursor) {
expect(doc).to.exist;
}
expect(countBeforeIteration).to.not.equal(promiseSpy.callCount);
expect(promiseSpy.called).to.equal(true);
}
});
});
});

0 comments on commit 16d6572

Please sign in to comment.