Skip to content

Commit

Permalink
feat(Aggregate): support ReadConcern in aggregates with $out
Browse files Browse the repository at this point in the history
Adds support for adding a ReadConcern to aggregates that
have a $out stage when run against 4.2 servers.
Some ReadConcerns (like linearizable) are still
not supported by the server, but will still be added on to
the outbound request.

Fixes NODE-1880
  • Loading branch information
daprahamian committed Aug 13, 2019
1 parent 46b526e commit 21cdcf0
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 2 deletions.
5 changes: 4 additions & 1 deletion lib/operations/aggregate.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ const toError = require('../utils').toError;

const DB_AGGREGATE_COLLECTION = 1;

const MIN_WIRE_VERSION_$OUT_READ_CONCERN_SUPPORT = 8;

/**
* Perform an aggregate operation. See Collection.prototype.aggregate or Db.prototype.aggregate for more information.
*
Expand Down Expand Up @@ -51,8 +53,9 @@ function aggregate(db, coll, pipeline, options, callback) {
}

const takesWriteConcern = topology.capabilities().commandsTakeWriteConcern;
const ismaster = topology.lastIsMaster() || {};

if (!hasOutStage) {
if (!hasOutStage || ismaster.maxWireVersion >= MIN_WIRE_VERSION_$OUT_READ_CONCERN_SUPPORT) {
decorateWithReadConcern(command, target, options);
}

Expand Down
60 changes: 59 additions & 1 deletion test/functional/readconcern_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ describe('ReadConcern', function() {
});

it('Should set majority readConcern aggregate command but ignore due to out', {
metadata: { requires: { topology: 'replicaset', mongodb: '>= 3.2' } },
metadata: { requires: { topology: 'replicaset', mongodb: '>= 3.2 < 4.1' } },

test: function(done) {
var listener = require('../..').instrument(function(err) {
Expand Down Expand Up @@ -500,6 +500,64 @@ describe('ReadConcern', function() {
}
});

it('Should set majority readConcern aggregate command against server >= 4.1', {
metadata: { requires: { topology: 'replicaset', mongodb: '>= 4.1' } },

test: function(done) {
// Contains all the apm events
const started = [];
const succeeded = [];
// Get a new instance
const client = this.configuration.newClient(
{ w: 1 },
{ poolSize: 1, readConcern: { level: 'majority' }, monitorCommands: true }
);

client
.connect()
.then(() => {
// Get a collection
const collection = client
.db(this.configuration.db)
.collection('readConcernCollectionAggregate1');

// Listen to apm events
client.on('commandStarted', event => {
if (event.commandName === 'aggregate') started.push(event);
});
client.on('commandSucceeded', event => {
if (event.commandName === 'aggregate') succeeded.push(event);
});

// Execute find
return collection
.aggregate([{ $match: {} }, { $out: 'readConcernCollectionAggregate1Output' }])
.toArray()
.then(() => {
expect(started).to.have.a.lengthOf(1);
expect(started[0]).to.have.property('commandName', 'aggregate');
expect(succeeded[0]).to.have.property('commandName', 'aggregate');
expect(started[0]).to.have.nested.property('command.readConcern.level', 'majority');

// Execute find
return collection
.aggregate([{ $match: {} }], { out: 'readConcernCollectionAggregate2Output' })
.toArray()
.then(() => {
expect(started).to.have.a.lengthOf(2);
expect(started[1]).to.have.property('commandName', 'aggregate');
expect(succeeded[1]).to.have.property('commandName', 'aggregate');
expect(started[1]).to.have.nested.property(
'command.readConcern.level',
'majority'
);
});
});
})
.then(() => client.close(done), e => client.close(() => done(e)));
}
});

it('Should set majority readConcern mapReduce command but be ignored', {
metadata: { requires: { topology: 'replicaset', mongodb: '>= 3.2' } },

Expand Down

0 comments on commit 21cdcf0

Please sign in to comment.