Skip to content

Commit

Permalink
SERVER-31755 Create intermediate $lookup stage document size limit knob
Browse files Browse the repository at this point in the history
  • Loading branch information
Brigitte Lamarche committed Dec 14, 2018
1 parent dcc8168 commit dfc46be
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 6 deletions.
111 changes: 111 additions & 0 deletions jstests/noPassthrough/lookup_max_intermediate_size.js
@@ -0,0 +1,111 @@
// This test verifies that an intermediate $lookup stage can grow larger than 16MB,
// but no larger than internalLookupStageIntermediateDocumentMaxSizeBytes.
// @tags: [requires_sharding]

load("jstests/aggregation/extras/utils.js"); // For assertErrorCode.

(function() {
"use strict";

// Used by testPipeline to sort result documents. All _ids must be primitives.
function compareId(a, b) {
if (a._id < b._id) {
return -1;
}
if (a._id > b._id) {
return 1;
}
return 0;
}

// Helper for testing that pipeline returns correct set of results.
function testPipeline(pipeline, expectedResult, collection) {
assert.eq(collection.aggregate(pipeline).toArray().sort(compareId),
expectedResult.sort(compareId));
}

function runTest(coll, from) {
const db = null; // Using the db variable is banned in this function.

//
// Confirm aggregation will not fail if intermediate $lookup stage exceeds 16 MB.
//
assert.commandWorked(coll.insert([
{"_id": 3, "same": 1},
]));

const bigString = new Array(1025).toString();
const doc = {_id: new ObjectId(), x: bigString, same: 1};
const docSize = Object.bsonsize(doc);

// Number of documents in lookup to exceed maximum BSON document size.
// Using 20 MB instead to be safe.
let numDocs = Math.floor(20 * 1024 * 1024 / docSize);

let bulk = from.initializeUnorderedBulkOp();
for (let i = 0; i < numDocs; ++i) {
bulk.insert({x: bigString, same: 1});
}
assert.commandWorked(bulk.execute());

let pipeline = [
{$lookup: {from: "from", localField: "same", foreignField: "same", as: "arr20mb"}},
{$project: {_id: 1}}
];

let expectedResults = [{_id: 3}];

testPipeline(pipeline, expectedResults, coll);

//
// Confirm aggregation will fail if intermediate $lookup stage exceeds
// internalLookupStageIntermediateDocumentMaxSizeBytes, set to 30 MB.
//

// Number of documents to exceed maximum intermediate $lookup stage document size.
// Using 35 MB total to be safe (20 MB from previous test + 15 MB).
numDocs = Math.floor(15 * 1024 * 1024 / docSize);

bulk = from.initializeUnorderedBulkOp();
for (let i = 0; i < numDocs; ++i) {
bulk.insert({x: bigString, same: 1});
}
assert.commandWorked(bulk.execute());

pipeline = [
{$lookup: {from: "from", localField: "same", foreignField: "same", as: "arr35mb"}},
{$project: {_id: 1}}
];

assertErrorCode(coll, pipeline, 4568);
}

// Run tests on single node.
const standalone = MongoRunner.runMongod();
const db = standalone.getDB("test");

assert.commandWorked(db.adminCommand(
{setParameter: 1, internalLookupStageIntermediateDocumentMaxSizeBytes: 30 * 1024 * 1024}));

runTest(db.lookUp, db.from);

MongoRunner.stopMongod(standalone);

// Run tests in a sharded environment.
const sharded = new ShardingTest({
mongos: 1,
shards: 2,
rs: {
nodes: 1,
setParameter:
{internalLookupStageIntermediateDocumentMaxSizeBytes: 30 * 1024 * 1024}
}
});

assert(sharded.adminCommand({enableSharding: "test"}));

assert(sharded.adminCommand({shardCollection: "test.lookUp", key: {_id: 'hashed'}}));
runTest(sharded.getDB('test').lookUp, sharded.getDB('test').from);

sharded.stop();
}());
10 changes: 6 additions & 4 deletions src/mongo/db/pipeline/document_source_lookup.cpp
Expand Up @@ -266,14 +266,16 @@ DocumentSource::GetNextResult DocumentSourceLookUp::getNext() {

std::vector<Value> results;
int objsize = 0;
const auto maxBytes = internalLookupStageIntermediateDocumentMaxSizeBytes.load();
while (auto result = pipeline->getNext()) {
objsize += result->getApproximateSize();
uassert(4568,
str::stream() << "Total size of documents in " << _fromNs.coll()
<< " matching pipeline "
<< getUserPipelineDefinition()
<< " exceeds maximum document size",
objsize <= BSONObjMaxInternalSize);
<< " matching pipeline's $lookup stage exceeds "
<< maxBytes
<< " bytes",

objsize <= maxBytes);
results.emplace_back(std::move(*result));
}
for (auto&& source : pipeline->getSources()) {
Expand Down
18 changes: 16 additions & 2 deletions src/mongo/db/query/query_knobs.cpp
Expand Up @@ -29,6 +29,8 @@
*/

#include "mongo/db/query/query_knobs.h"

#include "mongo/bson/util/builder.h"
#include "mongo/db/server_options.h"
#include "mongo/db/server_parameters.h"

Expand Down Expand Up @@ -89,7 +91,19 @@ MONGO_EXPORT_SERVER_PARAMETER(internalDocumentSourceSortMaxBlockingSortBytes,
->withValidator([](const long long& newVal) {
if (newVal <= 0) {
return Status(ErrorCodes::BadValue,
"internalDocumentSourceSortMaxBlockingSortBytes must be >= 0");
"internalDocumentSourceSortMaxBlockingSortBytes must be > 0");
}
return Status::OK();
});

MONGO_EXPORT_SERVER_PARAMETER(internalLookupStageIntermediateDocumentMaxSizeBytes,
long long,
100 * 1024 * 1024)
->withValidator([](const long long& newVal) {
if (newVal < BSONObjMaxInternalSize) {
return Status(ErrorCodes::BadValue,
"internalLookupStageIntermediateDocumentMaxSizeBytes must be >= " +
std::to_string(BSONObjMaxInternalSize));
}
return Status::OK();
});
Expand All @@ -100,7 +114,7 @@ MONGO_EXPORT_SERVER_PARAMETER(internalDocumentSourceGroupMaxMemoryBytes,
->withValidator([](const long long& newVal) {
if (newVal <= 0) {
return Status(ErrorCodes::BadValue,
"internalDocumentSourceGroupMaxMemoryBytes must be >= 0");
"internalDocumentSourceGroupMaxMemoryBytes must be > 0");
}
return Status::OK();
});
Expand Down
2 changes: 2 additions & 0 deletions src/mongo/db/query/query_knobs.h
Expand Up @@ -130,6 +130,8 @@ extern AtomicInt32 internalQueryFacetBufferSizeBytes;

extern AtomicInt64 internalDocumentSourceSortMaxBlockingSortBytes;

extern AtomicInt64 internalLookupStageIntermediateDocumentMaxSizeBytes;

extern AtomicInt64 internalDocumentSourceGroupMaxMemoryBytes;

extern AtomicInt32 internalInsertMaxBatchSize;
Expand Down

0 comments on commit dfc46be

Please sign in to comment.