Skip to content

Commit

Permalink
feat: implements promise provider
Browse files Browse the repository at this point in the history
Adds `PromiseProvider` storage class to save the user-provided promise library. Eliminates the use of `promiseLibrary` stores within class constructors. Creates `PromiseProvider.get()` hook to retrieve the latest promise-library state. Allows setter of promise-library on module export `mongodb.Promise`. Preserves `promiseLibrary` option for `MongoClient`. Adds "no-native-promise" eslint rule to prevent direct internal use of native promises.

NODE-2579
  • Loading branch information
Thomas Reggi committed May 8, 2020
1 parent 491e23b commit e5b762c
Show file tree
Hide file tree
Showing 26 changed files with 167 additions and 156 deletions.
13 changes: 11 additions & 2 deletions .eslintrc
Expand Up @@ -8,7 +8,7 @@
"parserOptions": {
"ecmaVersion": 2018
},
"plugins": ["prettier", "jsdoc"],
"plugins": ["prettier", "promise", "jsdoc"],
"rules": {
"prettier/prettier": "error",

Expand All @@ -30,8 +30,17 @@

"no-console": "off",
"eqeqeq": ["error", "always", { "null": "ignore" }],
"strict": ["error", "global"]
"strict": ["error", "global"],
"promise/no-native": "error"
},
"overrides": [
{
"files": ["test/**/*.js"],
"rules": {
"promise/no-native": "off"
}
}
],
"settings": {
"jsdoc": {
"check-types": false,
Expand Down
11 changes: 11 additions & 0 deletions index.js
@@ -1,12 +1,23 @@
'use strict';

const error = require('./lib/error');
const Instrumentation = require('./lib/apm');
const { BSON } = require('./lib/deps');
const { Cursor, AggregationCursor, CommandCursor } = require('./lib/cursor');
const PromiseProvider = require('./lib/promise_provider');

// Set up the connect function
const connect = require('./lib/mongo_client').connect;

Object.defineProperty(connect, 'Promise', {
get: function() {
return PromiseProvider.get();
},
set: function(lib) {
PromiseProvider.set(lib);
}
});

// Expose error class
connect.MongoError = error.MongoError;
connect.MongoNetworkError = error.MongoNetworkError;
Expand Down
6 changes: 2 additions & 4 deletions lib/admin.js
Expand Up @@ -44,16 +44,14 @@ const executeOperation = require('./operations/execute_operation');
* @returns {Admin} a collection instance.
* @param {any} db
* @param {any} topology
* @param {any} promiseLibrary
*/
function Admin(db, topology, promiseLibrary) {
function Admin(db, topology) {
if (!(this instanceof Admin)) return new Admin(db, topology);

// Internal state
this.s = {
db: db,
topology: topology,
promiseLibrary: promiseLibrary
topology: topology
};
}

Expand Down
10 changes: 4 additions & 6 deletions lib/bulk/common.js
@@ -1,5 +1,6 @@
'use strict';

const PromiseProvider = require('../promise_provider');
const {
BSON: { Long, ObjectId }
} = require('../deps');
Expand Down Expand Up @@ -781,9 +782,6 @@ class BulkOperationBase {
finalOptions = applyWriteConcern(finalOptions, { collection: collection }, options);
const writeConcern = finalOptions.writeConcern;

// Get the promiseLibrary
const promiseLibrary = options.promiseLibrary || Promise;

// Final results
const bulkResult = {
ok: 1,
Expand Down Expand Up @@ -832,8 +830,6 @@ class BulkOperationBase {
executed: executed,
// Collection
collection: collection,
// Promise Library
promiseLibrary: promiseLibrary,
// Fundamental error
err: null,
// check keys
Expand Down Expand Up @@ -1027,12 +1023,14 @@ class BulkOperationBase {
* @param {any} callback
*/
_handleEarlyError(err, callback) {
const Promise = PromiseProvider.get();

if (typeof callback === 'function') {
callback(err, null);
return;
}

return this.s.promiseLibrary.reject(err);
return Promise.reject(err);
}

/**
Expand Down
7 changes: 3 additions & 4 deletions lib/change_stream.js
Expand Up @@ -85,7 +85,6 @@ class ChangeStream extends EventEmitter {
);
}

this.promiseLibrary = parent.s.promiseLibrary;
if (!this.options.readPreference && parent.s.readPreference) {
this.options.readPreference = parent.s.readPreference;
}
Expand Down Expand Up @@ -130,7 +129,7 @@ class ChangeStream extends EventEmitter {
* @returns {Promise|void} returns Promise if no callback passed
*/
hasNext(callback) {
return maybePromise(this.parent, callback, cb => this.cursor.hasNext(cb));
return maybePromise(callback, cb => this.cursor.hasNext(cb));
}

/**
Expand All @@ -142,7 +141,7 @@ class ChangeStream extends EventEmitter {
* @returns {Promise|void} returns Promise if no callback passed
*/
next(callback) {
return maybePromise(this.parent, callback, cb => {
return maybePromise(callback, cb => {
if (this.isClosed()) {
return cb(new MongoError('ChangeStream is closed'));
}
Expand Down Expand Up @@ -170,7 +169,7 @@ class ChangeStream extends EventEmitter {
* @returns {Promise} returns Promise if no callback passed
*/
close(callback) {
return maybePromise(this.parent, callback, cb => {
return maybePromise(callback, cb => {
if (this.closed) return cb();

// flag the change stream as explicitly closed
Expand Down
27 changes: 11 additions & 16 deletions lib/collection.js
@@ -1,5 +1,7 @@
'use strict';

const { emitDeprecatedOptionWarning } = require('./utils');
const PromiseProvider = require('./promise_provider');
const ReadPreference = require('./read_preference');
const { deprecate } = require('util');
const {
Expand Down Expand Up @@ -111,6 +113,7 @@ const mergeKeys = ['ignoreUndefined'];
*/
function Collection(db, topology, dbName, name, pkFactory, options) {
checkCollectionName(name);
emitDeprecatedOptionWarning(options, ['promiseLibrary']);

// Unpack variables
const internalHint = null;
Expand All @@ -136,9 +139,6 @@ function Collection(db, topology, dbName, name, pkFactory, options) {

const namespace = new MongoDBNamespace(dbName, name);

// Get the promiseLibrary
const promiseLibrary = options.promiseLibrary || Promise;

// Set custom primary key factory if provided
pkFactory = pkFactory == null ? ObjectId : pkFactory;

Expand Down Expand Up @@ -172,8 +172,6 @@ function Collection(db, topology, dbName, name, pkFactory, options) {
internalHint: internalHint,
// collectionHint
collectionHint: collectionHint,
// Promise library
promiseLibrary: promiseLibrary,
// Read Concern
readConcern: ReadConcern.fromOptions(options),
// Write Concern
Expand Down Expand Up @@ -453,9 +451,6 @@ Collection.prototype.find = deprecateOptions(
// Add db object to the new options
newOptions.db = this.s.db;

// Add the promise library
newOptions.promiseLibrary = this.s.promiseLibrary;

// Set raw if available at collection level
if (newOptions.raw == null && typeof this.s.raw === 'boolean') newOptions.raw = this.s.raw;
// Set promoteLongs if available at collection level
Expand Down Expand Up @@ -757,11 +752,12 @@ Collection.prototype.insert = deprecate(function(docs, options, callback) {
Collection.prototype.updateOne = function(filter, update, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};
const Promise = PromiseProvider.get();

const err = checkForAtomicOperators(update);
if (err) {
if (typeof callback === 'function') return callback(err);
return this.s.promiseLibrary.reject(err);
return Promise.reject(err);
}

options = Object.assign({}, options);
Expand Down Expand Up @@ -836,13 +832,14 @@ Collection.prototype.replaceOne = function(filter, doc, options, callback) {
* @returns {Promise<Collection~updateWriteOpResult>} returns Promise if no callback passed
*/
Collection.prototype.updateMany = function(filter, update, options, callback) {
const Promise = PromiseProvider.get();
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};

const err = checkForAtomicOperators(update);
if (err) {
if (typeof callback === 'function') return callback(err);
return this.s.promiseLibrary.reject(err);
return Promise.reject(err);
}

options = Object.assign({}, options);
Expand Down Expand Up @@ -1786,6 +1783,8 @@ Collection.prototype.findOneAndReplace = function(filter, replacement, options,
* @returns {Promise<Collection~findAndModifyWriteOpResultObject>} returns Promise if no callback passed
*/
Collection.prototype.findOneAndUpdate = function(filter, update, options, callback) {
const Promise = PromiseProvider.get();

if (typeof options === 'function') (callback = options), (options = {});
options = options || {};

Expand All @@ -1798,7 +1797,7 @@ Collection.prototype.findOneAndUpdate = function(filter, update, options, callba
const err = checkForAtomicOperators(update);
if (err) {
if (typeof callback === 'function') return callback(err);
return this.s.promiseLibrary.reject(err);
return Promise.reject(err);
}

const findOneAndUpdateOperation = new FindOneAndUpdateOperation(this, filter, update, options);
Expand Down Expand Up @@ -2018,9 +2017,6 @@ Collection.prototype.parallelCollectionScan = deprecate(function(options, callba
// Ensure we have the right read preference inheritance
options.readPreference = resolveReadPreference(this, options);

// Add a promiseLibrary
options.promiseLibrary = this.s.promiseLibrary;

if (options.session) {
options.session = undefined;
}
Expand Down Expand Up @@ -2174,7 +2170,6 @@ Collection.prototype.initializeUnorderedBulkOp = function(options) {
options.ignoreUndefined = this.s.options.ignoreUndefined;
}

options.promiseLibrary = this.s.promiseLibrary;
return unordered(this.s.topology, this, options);
};

Expand All @@ -2196,7 +2191,7 @@ Collection.prototype.initializeOrderedBulkOp = function(options) {
if (options.ignoreUndefined == null) {
options.ignoreUndefined = this.s.options.ignoreUndefined;
}
options.promiseLibrary = this.s.promiseLibrary;

return ordered(this.s.topology, this, options);
};

Expand Down
26 changes: 14 additions & 12 deletions lib/cursor/cursor.js
@@ -1,5 +1,7 @@
'use strict';

const { emitDeprecatedOptionWarning } = require('../utils');
const PromiseProvider = require('../promise_provider');
const ReadPreference = require('../read_preference');
const { Transform, PassThrough } = require('stream');
const { deprecate } = require('util');
Expand Down Expand Up @@ -101,14 +103,13 @@ class Cursor extends CoreCursor {
options = this.operation.options;
}

emitDeprecatedOptionWarning(options, ['promiseLibrary']);

// Tailable cursor options
const numberOfRetries = options.numberOfRetries || 5;
const tailableRetryInterval = options.tailableRetryInterval || 500;
const currentNumberOfRetries = numberOfRetries;

// Get the promiseLibrary
const promiseLibrary = options.promiseLibrary || Promise;

// Internal cursor state
this.s = {
// Tailable cursor options
Expand All @@ -117,8 +118,6 @@ class Cursor extends CoreCursor {
currentNumberOfRetries: currentNumberOfRetries,
// State
state: CursorState.INIT,
// Promise library
promiseLibrary,
// explicitlyIgnoreSession
explicitlyIgnoreSession: !!options.explicitlyIgnoreSession
};
Expand Down Expand Up @@ -193,7 +192,7 @@ class Cursor extends CoreCursor {
throw MongoError.create({ message: 'Cursor is closed', driver: true });
}

return maybePromise(this, callback, cb => {
return maybePromise(callback, cb => {
const cursor = this;
if (cursor.isNotified()) {
return cb(null, false);
Expand Down Expand Up @@ -221,7 +220,7 @@ class Cursor extends CoreCursor {
* @returns {Promise} returns Promise if no callback passed
*/
next(callback) {
return maybePromise(this, callback, cb => {
return maybePromise(callback, cb => {
const cursor = this;
if (cursor.s.state === CursorState.CLOSED || (cursor.isDead && cursor.isDead())) {
cb(MongoError.create({ message: 'Cursor is closed', driver: true }));
Expand Down Expand Up @@ -746,6 +745,7 @@ class Cursor extends CoreCursor {
* @returns {Promise} if no callback supplied
*/
forEach(iterator, callback) {
const Promise = PromiseProvider.get();
// Rewind cursor state
this.rewind();

Expand All @@ -770,7 +770,7 @@ class Cursor extends CoreCursor {
}
});
} else {
return new this.s.promiseLibrary((fulfill, reject) => {
return new Promise((fulfill, reject) => {
each(this, (err, doc) => {
if (err) {
reject(err);
Expand Down Expand Up @@ -840,7 +840,7 @@ class Cursor extends CoreCursor {
driver: true
});
}
return maybePromise(this, callback, cb => {
return maybePromise(callback, cb => {
const cursor = this;
const items = [];

Expand Down Expand Up @@ -934,6 +934,8 @@ class Cursor extends CoreCursor {
* @returns {Promise} returns Promise if no callback passed
*/
close(options, callback) {
const Promise = PromiseProvider.get();

if (typeof options === 'function') (callback = options), (options = {});
options = Object.assign({}, { skipKillCursors: false }, options);

Expand All @@ -953,7 +955,7 @@ class Cursor extends CoreCursor {
}

// Return a Promise
return new this.s.promiseLibrary(resolve => {
return new Promise(resolve => {
resolve();
});
};
Expand All @@ -963,7 +965,7 @@ class Cursor extends CoreCursor {
return this._endSession(() => completeClose());
}

return new this.s.promiseLibrary(resolve => {
return new Promise(resolve => {
this._endSession(() => completeClose().then(resolve));
});
}
Expand Down Expand Up @@ -1069,7 +1071,7 @@ class Cursor extends CoreCursor {
if (this.cmd.readConcern) {
delete this.cmd['readConcern'];
}
return maybePromise(this, callback, cb => {
return maybePromise(callback, cb => {
CoreCursor.prototype._next.apply(this, [cb]);
});
}
Expand Down

0 comments on commit e5b762c

Please sign in to comment.