diff --git a/jstests/aggregation/testshard1.js b/jstests/aggregation/testshard1.js index 3bd0ff5f1bf6f..8d5d4f376c466 100644 --- a/jstests/aggregation/testshard1.js +++ b/jstests/aggregation/testshard1.js @@ -131,5 +131,27 @@ for(i = 0; i < 6; ++i) { 'agg sharded test simple match failed'); } +function testSkipLimit(ops, expectedCount) { + if (expectedCount > 10) { + // make shard -> mongos intermediate results less than 16MB + ops.unshift({$project: {_id:1}}) + } + + ops.push({$group: {_id:1, count: {$sum: 1}}}); + + var out = db.runCommand({aggregate:"ts1", pipeline:ops}); + assert.commandWorked(out); + assert.eq(out.result[0].count, expectedCount); +} + +testSkipLimit([], nItems); // control +testSkipLimit([{$skip:10}], nItems - 10); +testSkipLimit([{$limit:10}], 10); +testSkipLimit([{$skip:5}, {$limit:10}], 10); +testSkipLimit([{$limit:10}, {$skip:5}], 10 - 5); +testSkipLimit([{$skip:5}, {$skip: 3}, {$limit:10}], 10); +testSkipLimit([{$skip:5}, {$limit:10}, {$skip: 3}], 10 - 3); +testSkipLimit([{$limit:10}, {$skip:5}, {$skip: 3}], 10 - 3 - 5); + // shut everything down shardedAggTest.stop(); diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 329564ac490b9..e1e2cffcf967d 100755 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -965,7 +965,7 @@ namespace mongo { class DocumentSourceLimit : - public DocumentSource { + public SplittableDocumentSource { public: // virtuals from DocumentSource virtual ~DocumentSourceLimit(); @@ -988,6 +988,14 @@ namespace mongo { static intrusive_ptr create( const intrusive_ptr &pExpCtx); + // Virtuals for SplittableDocumentSource + // Need to run on rounter. Running on shard as well is an optimization. + virtual intrusive_ptr getShardSource() { return this; } + virtual intrusive_ptr getRouterSource() { return this; } + + long long getLimit() const { return limit; } + void setLimit(long long newLimit) { limit = newLimit; } + /** Create a limiting DocumentSource from BSON. @@ -1019,7 +1027,7 @@ namespace mongo { }; class DocumentSourceSkip : - public DocumentSource { + public SplittableDocumentSource { public: // virtuals from DocumentSource virtual ~DocumentSourceSkip(); @@ -1042,6 +1050,14 @@ namespace mongo { static intrusive_ptr create( const intrusive_ptr &pExpCtx); + // Virtuals for SplittableDocumentSource + // Need to run on rounter. Can't run on shards. + virtual intrusive_ptr getShardSource() { return NULL; } + virtual intrusive_ptr getRouterSource() { return this; } + + long long getSkip() const { return skip; } + void setSkip(long long newSkip) { skip = newSkip; } + /** Create a skipping DocumentSource from BSON. diff --git a/src/mongo/db/pipeline/document_source_limit.cpp b/src/mongo/db/pipeline/document_source_limit.cpp index 8bbcaff211336..48d12d5850ffe 100644 --- a/src/mongo/db/pipeline/document_source_limit.cpp +++ b/src/mongo/db/pipeline/document_source_limit.cpp @@ -27,9 +27,8 @@ namespace mongo { const char DocumentSourceLimit::limitName[] = "$limit"; - DocumentSourceLimit::DocumentSourceLimit( - const intrusive_ptr &pExpCtx): - DocumentSource(pExpCtx), + DocumentSourceLimit::DocumentSourceLimit(const intrusive_ptr &pExpCtx): + SplittableDocumentSource(pExpCtx), limit(0), count(0) { } diff --git a/src/mongo/db/pipeline/document_source_skip.cpp b/src/mongo/db/pipeline/document_source_skip.cpp index d4c1fc2caa687..5024b817a9086 100644 --- a/src/mongo/db/pipeline/document_source_skip.cpp +++ b/src/mongo/db/pipeline/document_source_skip.cpp @@ -28,9 +28,8 @@ namespace mongo { const char DocumentSourceSkip::skipName[] = "$skip"; - DocumentSourceSkip::DocumentSourceSkip( - const intrusive_ptr &pExpCtx): - DocumentSource(pExpCtx), + DocumentSourceSkip::DocumentSourceSkip(const intrusive_ptr &pExpCtx): + SplittableDocumentSource(pExpCtx), skip(0), count(0) { } diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 0bd4096fe78c8..52f725887fc0c 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -227,6 +227,35 @@ namespace mongo { } } + /* Move limits in front of skips. This is more optimal for sharding + * since currently, we can only split the pipeline at a single source + * and it is better to limit the results coming from each shard + */ + for(int i = pSourceVector->size() - 1; i >= 1 /* not looking at 0 */; i--) { + DocumentSourceLimit* limit = + dynamic_cast((*pSourceVector)[i].get()); + DocumentSourceSkip* skip = + dynamic_cast((*pSourceVector)[i-1].get()); + if (limit && skip) { + // Increase limit by skip since the skipped docs now pass through the $limit + limit->setLimit(limit->getLimit() + skip->getSkip()); + swap((*pSourceVector)[i], (*pSourceVector)[i-1]); + + // Start at back again. This is needed to handle cases with more than 1 $limit + // (S means skip, L means limit) + // + // These two would work without second pass (assuming back to front ordering) + // SL -> LS + // SSL -> LSS + // + // The following cases need a second pass to handle the second limit + // SLL -> LLS + // SSLL -> LLSS + // SLSL -> LLSS + i = pSourceVector->size(); // decremented before next pass + } + } + /* Coalesce adjacent filters where possible. Two adjacent filters are equivalent to one filter whose predicate is the conjunction of