Skip to content
This repository has been archived by the owner on Dec 11, 2019. It is now read-only.

Commit

Permalink
SERVER-7408 Correctly handle $skip and $limit in sharded agg
Browse files Browse the repository at this point in the history
This bug only comes up if the first $skip or $limit precedes the first
$sort or $limit. This is very rare, but should still be handled
correctly.
  • Loading branch information
RedBeard0531 authored and milkie committed Nov 6, 2012
1 parent f43972d commit f4270ab
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 8 deletions.
22 changes: 22 additions & 0 deletions jstests/aggregation/testshard1.js
Expand Up @@ -131,5 +131,27 @@ for(i = 0; i < 6; ++i) {
'agg sharded test simple match failed');
}

function testSkipLimit(ops, expectedCount) {
if (expectedCount > 10) {
// make shard -> mongos intermediate results less than 16MB
ops.unshift({$project: {_id:1}})
}

ops.push({$group: {_id:1, count: {$sum: 1}}});

var out = db.runCommand({aggregate:"ts1", pipeline:ops});
assert.commandWorked(out);
assert.eq(out.result[0].count, expectedCount);
}

testSkipLimit([], nItems); // control
testSkipLimit([{$skip:10}], nItems - 10);
testSkipLimit([{$limit:10}], 10);
testSkipLimit([{$skip:5}, {$limit:10}], 10);
testSkipLimit([{$limit:10}, {$skip:5}], 10 - 5);
testSkipLimit([{$skip:5}, {$skip: 3}, {$limit:10}], 10);
testSkipLimit([{$skip:5}, {$limit:10}, {$skip: 3}], 10 - 3);
testSkipLimit([{$limit:10}, {$skip:5}, {$skip: 3}], 10 - 3 - 5);

// shut everything down
shardedAggTest.stop();
20 changes: 18 additions & 2 deletions src/mongo/db/pipeline/document_source.h
Expand Up @@ -965,7 +965,7 @@ namespace mongo {


class DocumentSourceLimit :
public DocumentSource {
public SplittableDocumentSource {
public:
// virtuals from DocumentSource
virtual ~DocumentSourceLimit();
Expand All @@ -988,6 +988,14 @@ namespace mongo {
static intrusive_ptr<DocumentSourceLimit> create(
const intrusive_ptr<ExpressionContext> &pExpCtx);

// Virtuals for SplittableDocumentSource
// Need to run on rounter. Running on shard as well is an optimization.
virtual intrusive_ptr<DocumentSource> getShardSource() { return this; }
virtual intrusive_ptr<DocumentSource> getRouterSource() { return this; }

long long getLimit() const { return limit; }
void setLimit(long long newLimit) { limit = newLimit; }

/**
Create a limiting DocumentSource from BSON.
Expand Down Expand Up @@ -1019,7 +1027,7 @@ namespace mongo {
};

class DocumentSourceSkip :
public DocumentSource {
public SplittableDocumentSource {
public:
// virtuals from DocumentSource
virtual ~DocumentSourceSkip();
Expand All @@ -1042,6 +1050,14 @@ namespace mongo {
static intrusive_ptr<DocumentSourceSkip> create(
const intrusive_ptr<ExpressionContext> &pExpCtx);

// Virtuals for SplittableDocumentSource
// Need to run on rounter. Can't run on shards.
virtual intrusive_ptr<DocumentSource> getShardSource() { return NULL; }
virtual intrusive_ptr<DocumentSource> getRouterSource() { return this; }

long long getSkip() const { return skip; }
void setSkip(long long newSkip) { skip = newSkip; }

/**
Create a skipping DocumentSource from BSON.
Expand Down
5 changes: 2 additions & 3 deletions src/mongo/db/pipeline/document_source_limit.cpp
Expand Up @@ -27,9 +27,8 @@
namespace mongo {
const char DocumentSourceLimit::limitName[] = "$limit";

DocumentSourceLimit::DocumentSourceLimit(
const intrusive_ptr<ExpressionContext> &pExpCtx):
DocumentSource(pExpCtx),
DocumentSourceLimit::DocumentSourceLimit(const intrusive_ptr<ExpressionContext> &pExpCtx):
SplittableDocumentSource(pExpCtx),
limit(0),
count(0) {
}
Expand Down
5 changes: 2 additions & 3 deletions src/mongo/db/pipeline/document_source_skip.cpp
Expand Up @@ -28,9 +28,8 @@ namespace mongo {

const char DocumentSourceSkip::skipName[] = "$skip";

DocumentSourceSkip::DocumentSourceSkip(
const intrusive_ptr<ExpressionContext> &pExpCtx):
DocumentSource(pExpCtx),
DocumentSourceSkip::DocumentSourceSkip(const intrusive_ptr<ExpressionContext> &pExpCtx):
SplittableDocumentSource(pExpCtx),
skip(0),
count(0) {
}
Expand Down
29 changes: 29 additions & 0 deletions src/mongo/db/pipeline/pipeline.cpp
Expand Up @@ -227,6 +227,35 @@ namespace mongo {
}
}

/* Move limits in front of skips. This is more optimal for sharding
* since currently, we can only split the pipeline at a single source
* and it is better to limit the results coming from each shard
*/
for(int i = pSourceVector->size() - 1; i >= 1 /* not looking at 0 */; i--) {
DocumentSourceLimit* limit =
dynamic_cast<DocumentSourceLimit*>((*pSourceVector)[i].get());
DocumentSourceSkip* skip =
dynamic_cast<DocumentSourceSkip*>((*pSourceVector)[i-1].get());
if (limit && skip) {
// Increase limit by skip since the skipped docs now pass through the $limit
limit->setLimit(limit->getLimit() + skip->getSkip());
swap((*pSourceVector)[i], (*pSourceVector)[i-1]);

// Start at back again. This is needed to handle cases with more than 1 $limit
// (S means skip, L means limit)
//
// These two would work without second pass (assuming back to front ordering)
// SL -> LS
// SSL -> LSS
//
// The following cases need a second pass to handle the second limit
// SLL -> LLS
// SSLL -> LLSS
// SLSL -> LLSS
i = pSourceVector->size(); // decremented before next pass
}
}

/*
Coalesce adjacent filters where possible. Two adjacent filters
are equivalent to one filter whose predicate is the conjunction of
Expand Down

0 comments on commit f4270ab

Please sign in to comment.