Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated MongoDB driver to 4.8 #12097

Merged
merged 6 commits into from Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 2 additions & 17 deletions packages/mongo/collection.js
Expand Up @@ -96,15 +96,7 @@ Mongo.Collection = function Collection(name, options) {
else if (Meteor.isClient) this._connection = Meteor.connection;
else this._connection = Meteor.server;

if (options._driver) {
if (typeof options._driver.open !== 'function') {
throw new Error('If you are creating the driver manually using new ' +
'MongoInternals.RemoteCollectionDriver then you need to use ' +
'Promise.await() or await on it since it is async in recent ' +
'versions of Meteor. ' +
'Read more: https://docs.meteor.com/changelog.html.');
}
} else {
if (!options._driver) {
// XXX This check assumes that webapp is loaded so that Meteor.server !==
// null. We should fully support the case of "want to use a Mongo-backed
// collection from Node code without webapp", but we don't yet.
Expand All @@ -115,7 +107,7 @@ Mongo.Collection = function Collection(name, options) {
typeof MongoInternals !== 'undefined' &&
MongoInternals.defaultRemoteCollectionDriver
) {
options._driver = MongoInternals.getDefaultRemoteCollectionDriver();
options._driver = MongoInternals.defaultRemoteCollectionDriver();
} else {
const { LocalCollectionDriver } = require('./local_collection_driver.js');
options._driver = LocalCollectionDriver;
Expand Down Expand Up @@ -903,10 +895,3 @@ ASYNC_COLLECTION_METHODS.forEach(methodName => {
return Promise.resolve(this[methodName](...args));
};
});

if (Meteor.isServer) {
const userOptions = Meteor.settings?.packages?.mongo || {};
if (!userOptions?.skipStartupConnection && !process.env.METEOR_TEST_FAKE_MONGOD_CONTROL_PORT) {
Promise.await(MongoInternals.defaultRemoteCollectionDriver());
}
}
2 changes: 1 addition & 1 deletion packages/mongo/collection_async_tests.js
@@ -1,7 +1,7 @@
Tinytest.add('async collection - check for methods presence', function (test) {
const isFunction = fn => test.equal(typeof fn, 'function');

const collection = new Mongo.Collection('myAsyncCollection');
const collection = new Mongo.Collection('myAsyncCollection' + test.id);
isFunction(collection.createCappedCollectionAsync);
isFunction(collection.createIndexAsync);
isFunction(collection.dropCollectionAsync);
Expand Down
4 changes: 2 additions & 2 deletions packages/mongo/collection_tests.js
Expand Up @@ -159,7 +159,7 @@ Tinytest.add('collection - calling find with a valid readPreference',
if (Meteor.isServer) {
const defaultReadPreference = 'primary';
const customReadPreference = 'secondaryPreferred';
const collection = new Mongo.Collection('readPreferenceTest');
const collection = new Mongo.Collection('readPreferenceTest' + test.id);
const defaultCursor = collection.find();
const customCursor = collection.find(
{},
Expand Down Expand Up @@ -190,7 +190,7 @@ Tinytest.add('collection - calling find with an invalid readPreference',
function(test) {
if (Meteor.isServer) {
const invalidReadPreference = 'INVALID';
const collection = new Mongo.Collection('readPreferenceTest2');
const collection = new Mongo.Collection('readPreferenceTest2' + test.id);
const cursor = collection.find(
{},
{ readPreference: invalidReadPreference }
Expand Down
4 changes: 2 additions & 2 deletions packages/mongo/doc_fetcher_tests.js
Expand Up @@ -3,15 +3,15 @@ var Future = Npm.require('fibers/future');
import { DocFetcher } from "./doc_fetcher.js";

testAsyncMulti("mongo-livedata - doc fetcher", [
async function (test, expect) {
function (test, expect) {
var self = this;
var collName = "docfetcher-" + Random.id();
var collection = new Mongo.Collection(collName);
var id1 = collection.insert({x: 1});
var id2 = collection.insert({y: 2});

var fetcher = new DocFetcher(
(await MongoInternals.defaultRemoteCollectionDriver()).mongo);
MongoInternals.defaultRemoteCollectionDriver().mongo);

// Test basic operation.
const fakeOp1 = {};
Expand Down
15 changes: 0 additions & 15 deletions packages/mongo/mongoAsyncUtils.js

This file was deleted.

40 changes: 14 additions & 26 deletions packages/mongo/mongo_driver.js
Expand Up @@ -12,6 +12,7 @@ import { normalizeProjection } from "./mongo_utils";
const path = require("path");
const util = require("util");

/** @type {import('mongodb')} */
var MongoDB = NpmModuleMongodb;
var Future = Npm.require('fibers/future');
import { DocFetcher } from "./doc_fetcher.js";
Expand Down Expand Up @@ -140,7 +141,7 @@ var replaceTypes = function (document, atomTransformer) {
};


MongoConnection = async function (url, options) {
MongoConnection = function (url, options) {
var self = this;
options = options || {};
self._observeMultiplexers = {};
Expand Down Expand Up @@ -184,29 +185,22 @@ MongoConnection = async function (url, options) {
self._oplogHandle = null;
self._docFetcher = null;

const connect = util.promisify(MongoDB.MongoClient.connect);
const client = await connect(
url,
mongoOptions);

var db = client.db();
self.client = new MongoDB.MongoClient(url, mongoOptions);
self.db = self.client.db();

try {
const helloDocument = await db.admin().command({hello: 1});
// First, figure out what the current primary is, if any.
// Figure out what the current primary is, if any. This operation will fail,
// if the connection fails, as `connect` is implicit since version 4.7 of the
// MongoDB driver. It's not a problem, as the connection may break at anytime
// anyway, and all errors have to be handled properly.
self.db.admin().command({hello: 1}).then(helloDocument => {
if (helloDocument.primary) {
self._primary = helloDocument.primary;
}
}catch(_){
// ismaster command is supported on older mongodb versions
const isMasterDocument = await db.admin().command({ismaster:1});
// First, figure out what the current primary is, if any.
if (isMasterDocument.primary) {
self._primary = isMasterDocument.primary;
}
}
}, () => {
// Ignore the error entirely.
});

client.topology.on(
self.client.topology.on(
'joined', Meteor.bindEnvironment(function (kind, doc) {
if (kind === 'primary') {
if (doc.primary !== self._primary) {
Expand All @@ -226,16 +220,10 @@ MongoConnection = async function (url, options) {
}
}));

// Wait for the connection to be successful (throws on failure) and assign the
// results (`client` and `db`) to `self`.
Object.assign(self, { client, db });

if (options.oplogUrl && ! Package['disable-oplog']) {
self._oplogHandle = await new OplogHandle(options.oplogUrl, self.db.databaseName);
self._oplogHandle = new OplogHandle(options.oplogUrl, self.db.databaseName);
self._docFetcher = new DocFetcher(self);
}

return self;
};

MongoConnection.prototype.close = function() {
Expand Down
45 changes: 18 additions & 27 deletions packages/mongo/mongo_livedata_tests.js
Expand Up @@ -873,7 +873,7 @@ if (Meteor.isServer) {

// This test mainly checks the correctness of oplog code dealing with limited
// queries. Compitablity with poll-diff is added as well.
Tinytest.addAsync("mongo-livedata - observe sorted, limited " + idGeneration, async function (test) {
Tinytest.add("mongo-livedata - observe sorted, limited " + idGeneration, function (test) {
var run = test.runId();
var coll = new Mongo.Collection("observeLimit-"+run, collectionOptions);

Expand Down Expand Up @@ -936,7 +936,7 @@ if (Meteor.isServer) {

// Insert a doc and start observing.
var docId1 = ins({foo: 22, bar: 5});
await waitUntilOplogCaughtUp();
waitUntilOplogCaughtUp();

// State: [ 5:1 | ]!
var o = observer();
Expand Down Expand Up @@ -1218,7 +1218,7 @@ if (Meteor.isServer) {
onComplete();
});

Tinytest.addAsync("mongo-livedata - observe sorted, limited, big initial set" + idGeneration, async function (test) {
Tinytest.add("mongo-livedata - observe sorted, limited, big initial set" + idGeneration, function (test) {
var run = test.runId();
var coll = new Mongo.Collection("observeLimit-"+run, collectionOptions);

Expand Down Expand Up @@ -1276,7 +1276,7 @@ if (Meteor.isServer) {

// Ensure that we are past all the 'i' entries before we run the query, so
// that we get the expected phase transitions.
await waitUntilOplogCaughtUp();
waitUntilOplogCaughtUp();

var o = observer();
var usesOplog = o.handle._multiplexer._observeDriver._usesOplog;
Expand Down Expand Up @@ -2802,10 +2802,10 @@ if (Meteor.isServer) {
}

// This is a VERY white-box test.
Meteor.isServer && Tinytest.addAsync("mongo-livedata - oplog - _disableOplog", async function (test) {
Meteor.isServer && Tinytest.add("mongo-livedata - oplog - _disableOplog", function (test) {
var collName = Random.id();
var coll = new Mongo.Collection(collName);
if ((await MongoInternals.defaultRemoteCollectionDriver()).mongo._oplogHandle) {
if (MongoInternals.defaultRemoteCollectionDriver().mongo._oplogHandle) {
var observeWithOplog = coll.find({x: 5})
.observeChanges({added: function () {}});
test.isTrue(observeWithOplog._multiplexer._observeDriver._usesOplog);
Expand All @@ -2817,7 +2817,7 @@ Meteor.isServer && Tinytest.addAsync("mongo-livedata - oplog - _disableOplog", a
observeWithoutOplog.stop();
});

Meteor.isServer && Tinytest.addAsync("mongo-livedata - oplog - include selector fields", async function (test) {
Meteor.isServer && Tinytest.add("mongo-livedata - oplog - include selector fields", function (test) {
var collName = "includeSelector" + Random.id();
var coll = new Mongo.Collection(collName);

Expand All @@ -2828,7 +2828,7 @@ Meteor.isServer && Tinytest.addAsync("mongo-livedata - oplog - include selector
// during the observeChanges, the bug in question is not consistently
// reproduced.) We don't have to do this for polling observe (eg
// --disable-oplog).
await waitUntilOplogCaughtUp();
waitUntilOplogCaughtUp();

var output = [];
var handle = coll.find({a: 1, b: 2}, {fields: {c: 1}}).observeChanges({
Expand Down Expand Up @@ -2859,7 +2859,7 @@ Meteor.isServer && Tinytest.addAsync("mongo-livedata - oplog - include selector
handle.stop();
});

Meteor.isServer && Tinytest.addAsync("mongo-livedata - oplog - transform", async function (test) {
Meteor.isServer && Tinytest.add("mongo-livedata - oplog - transform", function (test) {
var collName = "oplogTransform" + Random.id();
var coll = new Mongo.Collection(collName);

Expand All @@ -2870,7 +2870,7 @@ Meteor.isServer && Tinytest.addAsync("mongo-livedata - oplog - transform", async
// during the observeChanges, the bug in question is not consistently
// reproduced.) We don't have to do this for polling observe (eg
// --disable-oplog).
await waitUntilOplogCaughtUp();
waitUntilOplogCaughtUp();

var cursor = coll.find({}, {transform: function (doc) {
return doc.x;
Expand Down Expand Up @@ -2899,17 +2899,17 @@ Meteor.isServer && Tinytest.addAsync("mongo-livedata - oplog - transform", async
});


Meteor.isServer && Tinytest.addAsync("mongo-livedata - oplog - drop collection/db", async function (test) {
Meteor.isServer && Tinytest.add("mongo-livedata - oplog - drop collection/db", function (test) {
// This test uses a random database, so it can be dropped without affecting
// anything else.
var mongodbUri = Npm.require('mongodb-uri');
var parsedUri = mongodbUri.parse(process.env.MONGO_URL);
parsedUri.database = 'dropDB' + Random.id();
var driver = Promise.await(new MongoInternals.RemoteCollectionDriver(
var driver = new MongoInternals.RemoteCollectionDriver(
mongodbUri.format(parsedUri), {
oplogUrl: process.env.MONGO_OPLOG_URL
}
));
);

var collName = "dropCollection" + Random.id();
var coll = new Mongo.Collection(collName, { _driver: driver });
Expand Down Expand Up @@ -2943,7 +2943,7 @@ Meteor.isServer && Tinytest.addAsync("mongo-livedata - oplog - drop collection/d

// Wait until we've processed the insert oplog entry, so that we are in a
// steady state (and we don't see the dropped docs because we are FETCHING).
await waitUntilOplogCaughtUp();
waitUntilOplogCaughtUp();

// Drop the collection. Should remove all docs.
runInFence(function () {
Expand Down Expand Up @@ -3105,9 +3105,9 @@ testAsyncMulti("mongo-livedata - oplog - update EJSON", [
]);


async function waitUntilOplogCaughtUp() {
function waitUntilOplogCaughtUp() {
var oplogHandle =
(await MongoInternals.defaultRemoteCollectionDriver()).mongo._oplogHandle;
MongoInternals.defaultRemoteCollectionDriver().mongo._oplogHandle;
if (oplogHandle)
oplogHandle.waitUntilCaughtUp();
}
Expand Down Expand Up @@ -3229,15 +3229,6 @@ Meteor.isServer && testAsyncMulti("mongo-livedata - update with replace forbidde
}
]);

Meteor.isServer && Tinytest.add(
"mongo-livedata - connection failure throws",
function (test) {
test.throws(function () {
Promise.await(new MongoInternals.Connection('mongodb://this-does-not-exist.test/asdf'));
});
}
);

Meteor.isServer && Tinytest.add("mongo-livedata - npm modules", function (test) {
// Make sure the version number looks like a version number.
test.matches(MongoInternals.NpmModules.mongodb.version, /^4\.(\d+)\.(\d+)/);
Expand Down Expand Up @@ -3433,8 +3424,8 @@ if (Meteor.isServer) {
}

if (Meteor.isServer) {
Tinytest.addAsync("mongo-livedata - transaction", async function (test) {
const { client } = (await MongoInternals.defaultRemoteCollectionDriver()).mongo;
Tinytest.addAsync("mongo-livedata - transaction", function (test) {
const { client } = MongoInternals.defaultRemoteCollectionDriver().mongo;

const Collection = new Mongo.Collection(`transaction_test_${test.runId()}`);
const rawCollection = Collection.rawCollection();
Expand Down
11 changes: 5 additions & 6 deletions packages/mongo/oplog_tailing.js
Expand Up @@ -26,7 +26,7 @@ idForOp = function (op) {
throw Error("Unknown op: " + EJSON.stringify(op));
};

OplogHandle = async function (oplogUrl, dbName) {
OplogHandle = function (oplogUrl, dbName) {
var self = this;
self._oplogUrl = oplogUrl;
self._dbName = dbName;
Expand Down Expand Up @@ -82,8 +82,7 @@ OplogHandle = async function (oplogUrl, dbName) {
self._entryQueue = new Meteor._DoubleEndedQueue();
self._workerActive = false;

await self._startTailing();
return self;
self._startTailing();
};

Object.assign(OplogHandle.prototype, {
Expand Down Expand Up @@ -186,7 +185,7 @@ Object.assign(OplogHandle.prototype, {
self._catchingUpFutures.splice(insertAfter, 0, {ts: ts, future: f});
f.wait();
},
_startTailing: async function () {
_startTailing: function () {
var self = this;
// First, make sure that we're talking to the local database.
var mongodbUri = Npm.require('mongodb-uri');
Expand All @@ -206,12 +205,12 @@ Object.assign(OplogHandle.prototype, {
//
// The tail connection will only ever be running a single tail command, so
// it only needs to make one underlying TCP connection.
self._oplogTailConnection = await new MongoConnection(
self._oplogTailConnection = new MongoConnection(
self._oplogUrl, {maxPoolSize: 1});
// XXX better docs, but: it's to get monotonic results
// XXX is it safe to say "if there's an in flight query, just use its
// results"? I don't think so but should consider that
self._oplogLastEntryConnection = await new MongoConnection(
self._oplogLastEntryConnection = new MongoConnection(
self._oplogUrl, {maxPoolSize: 1});

// Now, make sure that there actually is a repl set here. If not, oplog
Expand Down