Skip to content

Commit

Permalink
fix(aggregate): do not send batchSize for aggregation with $out
Browse files Browse the repository at this point in the history
Sending a batchSize of 0 will break the cursor, and in general we
should not be sending batchSize for aggregation pipelines
that contain a $out stage.

Fixes NODE-1850
  • Loading branch information
daprahamian committed Mar 8, 2019
1 parent f73bd06 commit ddb8d90
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 5 deletions.
10 changes: 5 additions & 5 deletions lib/operations/aggregate.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ function aggregate(db, coll, pipeline, options, callback) {
const isDbAggregate = typeof coll === 'string';
const target = isDbAggregate ? db : coll;
const topology = target.s.topology;
let ignoreReadConcern = false;
let hasOutStage = false;

if (typeof options.out === 'string') {
pipeline = pipeline.concat({ $out: options.out });
ignoreReadConcern = true;
hasOutStage = true;
} else if (pipeline.length > 0 && pipeline[pipeline.length - 1]['$out']) {
ignoreReadConcern = true;
hasOutStage = true;
}

let command;
Expand All @@ -52,7 +52,7 @@ function aggregate(db, coll, pipeline, options, callback) {

const takesWriteConcern = topology.capabilities().commandsTakeWriteConcern;

if (!ignoreReadConcern) {
if (!hasOutStage) {
decorateWithReadConcern(command, target, options);
}

Expand Down Expand Up @@ -96,7 +96,7 @@ function aggregate(db, coll, pipeline, options, callback) {
}

options.cursor = options.cursor || {};
if (options.batchSize) options.cursor.batchSize = options.batchSize;
if (options.batchSize && !hasOutStage) options.cursor.batchSize = options.batchSize;
command.cursor = options.cursor;

// promiseLibrary
Expand Down
67 changes: 67 additions & 0 deletions test/functional/aggregation_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -1354,4 +1354,71 @@ describe('Aggregation', function() {
// DOC_END
}
});

it('should not send a batchSize for aggregations with an out stage', {
metadata: { requires: { topology: ['single', 'replicaset'] } },
test: function(done) {
const databaseName = this.configuration.db;
const client = this.configuration.newClient(this.configuration.writeConcernMax(), {
poolSize: 1,
monitorCommands: true
});

let err;
let coll1;
let coll2;
const events = [];

client.on('commandStarted', e => {
if (e.commandName === 'aggregate') {
events.push(e);
}
});

client
.connect()
.then(() => {
coll1 = client.db(databaseName).collection('coll1');
coll2 = client.db(databaseName).collection('coll2');

return Promise.all([coll1.remove({}), coll2.remove({})]);
})
.then(() => {
const docs = Array.from({ length: 10 }).map(() => ({ a: 1 }));

return coll1.insertMany(docs);
})
.then(() => {
return Promise.all(
[
coll1.aggregate([{ $out: 'coll2' }]),
coll1.aggregate([{ $out: 'coll2' }], { batchSize: 0 }),
coll1.aggregate([{ $out: 'coll2' }], { batchSize: 1 }),
coll1.aggregate([{ $out: 'coll2' }], { batchSize: 30 }),
coll1.aggregate([{ $match: { a: 1 } }, { $out: 'coll2' }]),
coll1.aggregate([{ $match: { a: 1 } }, { $out: 'coll2' }], { batchSize: 0 }),
coll1.aggregate([{ $match: { a: 1 } }, { $out: 'coll2' }], { batchSize: 1 }),
coll1.aggregate([{ $match: { a: 1 } }, { $out: 'coll2' }], { batchSize: 30 })
].map(cursor => cursor.toArray())
);
})
.then(() => {
expect(events)
.to.be.an('array')
.with.a.lengthOf(8);
events.forEach(event => {
expect(event).to.have.property('commandName', 'aggregate');
expect(event)
.to.have.property('command')
.that.has.property('cursor')
.that.does.not.have.property('batchSize');
});
})
.catch(_err => {
err = _err;
})
.then(() => client.close())
.then(() => done(err));
}
});
});

0 comments on commit ddb8d90

Please sign in to comment.