Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

SERVER-5477 DocumentSourceGroup now uses the shardKey to check if it should regroup on the router. #294

Closed
wants to merge 1 commit into from

2 participants

@samuelgmartinez

SERVER-5477 DocumentSourceGroup now uses the shardKey to check if it should regroup on the router.

samuelgarciamartinez SERVER-5477 DocumentSourceGroup now uses the shardKey to check if
it should regroup on the router.
9155866
@alerner

Thank you for your contribution and sorry for the late feedback.

The router (mongos) and the aggregation framework aspects that concern
it are undergoing a few changes so the code region this patches
touches is a bit dynamic. In particular we're reviewing the
dependencies/interactions between these two modules. We'd prefer to
change code once that revision is complete.

@alerner alerner closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Aug 29, 2012
  1. SERVER-5477 DocumentSourceGroup now uses the shardKey to check if

    samuelgarciamartinez authored
    it should regroup on the router.
This page is out of date. Refresh to see the latest.
View
8 src/mongo/db/commands/pipeline_command.cpp
@@ -229,7 +229,7 @@ namespace mongo {
/* on the shard servers, create the local pipeline */
intrusive_ptr<ExpressionContext> pShardCtx(
- ExpressionContext::create(&InterruptStatusMongod::status));
+ ExpressionContext::create((BSONObj*)NULL, &InterruptStatusMongod::status));
intrusive_ptr<Pipeline> pShardPipeline(
Pipeline::parseCommand(errmsg, shardBson, pShardCtx));
if (!pShardPipeline.get()) {
@@ -272,8 +272,10 @@ namespace mongo {
BSONObjBuilder &result, bool fromRepl) {
// PRINT(cmdObj); // uncomment when debugging
+ string ns(parseNs(db, cmdObj));
+
intrusive_ptr<ExpressionContext> pCtx(
- ExpressionContext::create(&InterruptStatusMongod::status));
+ ExpressionContext::create((BSONObj*)NULL, &InterruptStatusMongod::status));
/* try to parse the command; if this fails, then we didn't run */
intrusive_ptr<Pipeline> pPipeline(
@@ -281,8 +283,6 @@ namespace mongo {
if (!pPipeline.get())
return false;
- string ns(parseNs(db, cmdObj));
-
if (pPipeline->isExplain())
return runExplain(result, errmsg, ns, db, pPipeline, pCtx);
else
View
43 src/mongo/db/pipeline/document_source_group.cpp
@@ -24,6 +24,7 @@
#include "db/pipeline/expression.h"
#include "db/pipeline/expression_context.h"
#include "db/pipeline/value.h"
+#include "mongo/s/grid.h"
namespace mongo {
const char DocumentSourceGroup::groupName[] = "$group";
@@ -202,6 +203,7 @@ namespace mongo {
Expression::removeFieldPrefix(groupString));
intrusive_ptr<ExpressionFieldPath> pFieldPath(
ExpressionFieldPath::create(pathString));
+
pGroup->setIdExpression(pFieldPath);
idSet = true;
}
@@ -394,6 +396,47 @@ namespace mongo {
}
intrusive_ptr<DocumentSource> DocumentSourceGroup::getRouterSource() {
+ BSONObj* shardKey(pExpCtx->getShardKey());
+
+ if(shardKey) {
+ ExpressionObject* objExpression(dynamic_cast<ExpressionObject *>(pIdExpression.get()));
+
+ if(objExpression) {
+ BSONObjBuilder objBuilder;
+ objExpression->documentToBson(&objBuilder, true);
+ BSONObj groupId = objBuilder.done();
+
+ //group _id must meet ((shardKey UNION groupId) EQ groupId) so, if groupId has lower fields than the shardKey
+ //we already know that groupId doesn't meet the requirements.
+ if(groupId.nFields() >= shardKey->nFields()) {
+ BSONObjIterator iter(groupId);
+ int fieldCount = 0;
+ //group key must be equal that shard key or be a super set which contains the shard key, never can be a subset of it
+ while(iter.more() && fieldCount < shardKey->nFields()) {
+ BSONElement fieldElement(iter.next());
+
+ //there is no need to validate type, since its already done by the Expression parsing process.
+ if(shardKey->hasField(Expression::removeFieldPrefix(fieldElement.String()).c_str())) {
+ fieldCount++;
+ }
+ }
+
+ if(fieldCount == shardKey->nFields()) {
+ return NULL;
+ }
+ }
+ }
+ else {
+ ExpressionFieldPath* fieldExpression(dynamic_cast<ExpressionFieldPath *>(pIdExpression.get()));
+ //field path expression can only references one field.
+ //group _id must meet ((shardKey UNION groupId) EQ groupId)
+ if(fieldExpression && shardKey->nFields() == 1 &&
+ shardKey->hasField(fieldExpression->getFieldPath(false).c_str())) {
+ return NULL;
+ }
+ }
+ }
+
intrusive_ptr<ExpressionContext> pMergerExpCtx = pExpCtx->clone();
pMergerExpCtx->setDoingMerge(true);
intrusive_ptr<DocumentSourceGroup> pMerger(DocumentSourceGroup::create(pMergerExpCtx));
View
9 src/mongo/db/pipeline/expression_context.cpp
@@ -24,11 +24,12 @@ namespace mongo {
ExpressionContext::~ExpressionContext() {
}
- inline ExpressionContext::ExpressionContext(InterruptStatus *pS):
+ inline ExpressionContext::ExpressionContext(BSONObj* shardKeyObj, InterruptStatus *pS):
doingMerge(false),
inShard(false),
inRouter(false),
intCheckCounter(1),
+ shardKey(shardKeyObj),
pStatus(pS) {
}
@@ -43,15 +44,15 @@ namespace mongo {
}
ExpressionContext* ExpressionContext::clone() {
- ExpressionContext* newContext = create(pStatus);
+ ExpressionContext* newContext = create(getShardKey(), pStatus);
newContext->setDoingMerge(getDoingMerge());
newContext->setInShard(getInShard());
newContext->setInRouter(getInRouter());
return newContext;
}
- ExpressionContext *ExpressionContext::create(InterruptStatus *pStatus) {
- return new ExpressionContext(pStatus);
+ ExpressionContext *ExpressionContext::create(BSONObj* shardKey, InterruptStatus *pStatus) {
+ return new ExpressionContext(shardKey, pStatus);
}
}
View
13 src/mongo/db/pipeline/expression_context.h
@@ -17,8 +17,8 @@
#pragma once
#include "mongo/pch.h"
-
#include "util/intrusive_counter.h"
+#include "bson/bsonobj.h"
namespace mongo {
@@ -32,10 +32,12 @@ namespace mongo {
void setDoingMerge(bool b);
void setInShard(bool b);
void setInRouter(bool b);
+ void setShardKey(BSONObj* shardKey);
bool getDoingMerge() const;
bool getInShard() const;
bool getInRouter() const;
+ BSONObj* getShardKey();
/**
Used by a pipeline to check for interrupts so that killOp() works.
@@ -46,15 +48,16 @@ namespace mongo {
ExpressionContext* clone();
- static ExpressionContext *create(InterruptStatus *pStatus);
+ static ExpressionContext *create(BSONObj* shardKey, InterruptStatus *pStatus);
private:
- ExpressionContext(InterruptStatus *pStatus);
+ ExpressionContext(BSONObj* shardKey, InterruptStatus *pStatus);
bool doingMerge;
bool inShard;
bool inRouter;
unsigned intCheckCounter; // interrupt check counter
+ BSONObj* shardKey;
InterruptStatus *const pStatus;
};
}
@@ -88,4 +91,8 @@ namespace mongo {
return inRouter;
}
+ inline BSONObj* ExpressionContext::getShardKey() {
+ return shardKey;
+ }
+
};
View
9 src/mongo/dbtests/accumulatortests.cpp
@@ -28,12 +28,15 @@
namespace AccumulatorTests {
+ static const char* const ns = "unittests";
+ static BSONObj* shardKey = NULL;
+
class Base {
public:
Base() :
- _standalone( ExpressionContext::create( &InterruptStatusMongod::status ) ),
- _shard( ExpressionContext::create( &InterruptStatusMongod::status ) ),
- _router( ExpressionContext::create( &InterruptStatusMongod::status ) ) {
+ _standalone( ExpressionContext::create(shardKey, &InterruptStatusMongod::status ) ),
+ _shard( ExpressionContext::create(shardKey, &InterruptStatusMongod::status ) ),
+ _router( ExpressionContext::create(shardKey, &InterruptStatusMongod::status ) ) {
_standalone->setInShard( false );
_standalone->setDoingMerge( false );
_shard->setInShard( true );
View
175 src/mongo/dbtests/documentsourcetests.cpp
@@ -29,6 +29,7 @@
namespace DocumentSourceTests {
static const char* const ns = "unittests.documentsourcetests";
+ static BSONObj* shardKey = NULL;
static DBDirectClient client;
BSONObj toBson( const intrusive_ptr<DocumentSource>& source ) {
@@ -52,7 +53,7 @@ namespace DocumentSourceTests {
public:
Base() :
CollectionBase(),
- _ctx( ExpressionContext::create( &InterruptStatusMongod::status ) ) {
+ _ctx( ExpressionContext::create(shardKey, &InterruptStatusMongod::status ) ) {
}
protected:
void createSource() {
@@ -322,16 +323,19 @@ namespace DocumentSourceTests {
class Base : public DocumentSourceCursor::Base {
protected:
void createGroup( const BSONObj &spec, bool inShard = false ) {
- BSONObj namedSpec = BSON( "$group" << spec );
- BSONElement specElement = namedSpec.firstElement();
- intrusive_ptr<ExpressionContext> expressionContext =
- ExpressionContext::create( &InterruptStatusMongod::status );
- if ( inShard ) {
- expressionContext->setInShard( true );
- }
- _group = DocumentSourceGroup::createFromBson( &specElement, expressionContext );
- assertRoundTrips( _group );
- _group->setSource( source() );
+ createGroup(spec, (BSONObj *) NULL, inShard);
+ }
+ void createGroup(const BSONObj &spec, BSONObj* shardKey, bool inShard = false) {
+ BSONObj namedSpec = BSON( "$group" << spec );
+ BSONElement specElement = namedSpec.firstElement();
+ intrusive_ptr<ExpressionContext> expressionContext =
+ ExpressionContext::create(shardKey, &InterruptStatusMongod::status );
+ if ( inShard ) {
+ expressionContext->setInShard( true );
+ }
+ _group = DocumentSourceGroup::createFromBson( &specElement, expressionContext );
+ assertRoundTrips( _group );
+ _group->setSource( source() );
}
DocumentSource* group() { return _group.get(); }
/** Assert that iterator state accessors consistently report the source is exhausted. */
@@ -767,7 +771,148 @@ namespace DocumentSourceTests {
virtual string expectedResultSetString() { return "[{_id:0}]"; }
};
- /** Simulate merging sharded results in the router. */
+ class CheckSplittingBase : public Base {
+ public:
+ virtual ~CheckSplittingBase() {
+ }
+
+ void run() {
+ BSONObj shardingKey(shardKey());
+ createGroup(groupSpec(), &shardingKey, true);
+
+ // Check if this source is splittable
+ SplittableDocumentSource* splittable =
+ dynamic_cast<SplittableDocumentSource *>(group());
+
+ ASSERT_EQUALS((bool)splittable->getShardSource(), shouldSplitMongod());
+ ASSERT_EQUALS((bool)splittable->getRouterSource(), shouldSplitRouter());
+ }
+ protected:
+ virtual BSONObj shardKey() = 0;
+
+ virtual BSONObj groupSpec() = 0;
+
+ virtual bool shouldSplitMongod() = 0;
+
+ virtual bool shouldSplitRouter() = 0;
+ };
+
+ class FieldPathEqualsShardKey : public CheckSplittingBase {
+ protected:
+ virtual BSONObj shardKey() {
+ return fromjson("{'x.y': 1}");
+ }
+
+ virtual BSONObj groupSpec() {
+ return fromjson( "{ _id: '$x.y', count:{'$sum':1}}");
+ }
+
+ virtual bool shouldSplitMongod() {
+ return true;
+ }
+
+ virtual bool shouldSplitRouter() {
+ return false;
+ }
+ };
+
+ class FieldPathNotEqualsShardKey : public CheckSplittingBase {
+ protected:
+ virtual BSONObj shardKey() {
+ return fromjson("{'x.y': 1}");
+ }
+
+ virtual BSONObj groupSpec() {
+ return fromjson( "{ _id: '$x.z', count:{'$sum':1}}");
+ }
+
+ virtual bool shouldSplitMongod() {
+ return true;
+ }
+
+ virtual bool shouldSplitRouter() {
+ return true;
+ }
+ };
+
+ class FieldPathShardKeyMultiple : public CheckSplittingBase {
+ protected:
+ virtual BSONObj shardKey() {
+ return fromjson("{'x.y': 1, 'x.z': 1}");
+ }
+
+ virtual BSONObj groupSpec() {
+ return fromjson( "{ _id: '$x.z', count:{'$sum':1}}");
+ }
+
+ virtual bool shouldSplitMongod() {
+ return true;
+ }
+
+ virtual bool shouldSplitRouter() {
+ return true;
+ }
+ };
+
+ class NullGroupId: public CheckSplittingBase {
+ protected:
+ virtual BSONObj shardKey() {
+ return fromjson("{'x.y': 1}");
+ }
+
+ virtual BSONObj groupSpec() {
+ return fromjson( "{ _id: null, count:{'$sum':1}}");
+ }
+
+ virtual bool shouldSplitMongod() {
+ return true;
+ }
+
+ virtual bool shouldSplitRouter() {
+ return true;
+ }
+ };
+
+ class ObjectGroupIdEqualsShardKey : public CheckSplittingBase {
+ protected:
+ virtual BSONObj shardKey() {
+ return fromjson("{'x': 1, 'y': 1}");
+ }
+
+ virtual BSONObj groupSpec() {
+ return fromjson( "{ _id: {'key1':'$x', 'key2': '$y'}, count:{'$sum':1}}");
+ }
+
+ virtual bool shouldSplitMongod() {
+ return true;
+ }
+
+ virtual bool shouldSplitRouter() {
+ return false;
+ }
+ };
+
+ class ObjectGroupIdNotEqualsShardKey : public CheckSplittingBase {
+ protected:
+ virtual BSONObj shardKey() {
+ return fromjson("{'z': 1, 'y': 1}");
+ }
+
+ virtual BSONObj groupSpec() {
+ return fromjson( "{ _id: {'key1':'$x', 'key2': '$y'}, count:{'$sum':1}}");
+ }
+
+ virtual bool shouldSplitMongod() {
+ return true;
+ }
+
+ virtual bool shouldSplitRouter() {
+ return true;
+ }
+ };
+
+
+ /** Simulate merging sharded results in the router. */
class RouterMerger : public CheckResultsBase {
public:
void run() {
@@ -1734,6 +1879,12 @@ namespace DocumentSourceTests {
add<DocumentSourceGroup::GroupNullUndefinedIds>();
//add<DocumentSourceGroup::ComplexId>(); uncomment after 6195
add<DocumentSourceGroup::UndefinedAccumulatorValue>();
+ add<DocumentSourceGroup::FieldPathEqualsShardKey>();
+ add<DocumentSourceGroup::FieldPathNotEqualsShardKey>();
+ add<DocumentSourceGroup::FieldPathShardKeyMultiple>();
+ add<DocumentSourceGroup::NullGroupId>();
+ add<DocumentSourceGroup::ObjectGroupIdEqualsShardKey>();
+ add<DocumentSourceGroup::ObjectGroupIdNotEqualsShardKey>();
add<DocumentSourceGroup::RouterMerger>();
add<DocumentSourceGroup::Dependencies>();
add<DocumentSourceGroup::StringConstantIdAndAccumulatorExpressions>();
View
20 src/mongo/s/commands_public.cpp
@@ -1564,8 +1564,21 @@ namespace mongo {
BSONObjBuilder &result, bool fromRepl) {
//const string shardedOutputCollection = getTmpName( collection );
- intrusive_ptr<ExpressionContext> pExpCtx(
- ExpressionContext::create(&InterruptStatusMongos::status));
+ string fullns(parseNs(dbName, cmdObj));
+ intrusive_ptr<ExpressionContext> pExpCtx;
+
+ DBConfigPtr conf(grid.getDBConfig(dbName, false));
+ ChunkManagerPtr chunkManager(conf->getChunkManagerIfExists(fullns, false, false));
+ BSONObj shardKey;
+
+ if(conf->isSharded(fullns) && chunkManager) {
+ shardKey = chunkManager->getShardKey().key();
+ pExpCtx = ExpressionContext::create(&shardKey, &InterruptStatusMongos::status);
+ }
+ else {
+ pExpCtx = ExpressionContext::create((BSONObj*) NULL, &InterruptStatusMongos::status);
+ }
+
pExpCtx->setInRouter(true);
/* parse the pipeline specification */
@@ -1574,13 +1587,10 @@ namespace mongo {
if (!pPipeline.get())
return false; // there was some parsing error
- string fullns(dbName + "." + pPipeline->getCollectionName());
-
/*
If the system isn't running sharded, or the target collection
isn't sharded, pass this on to a mongod.
*/
- DBConfigPtr conf(grid.getDBConfig(dbName , false));
if (!conf || !conf->isShardingEnabled() || !conf->isSharded(fullns))
return passthrough(conf, cmdObj, result);
Something went wrong with that request. Please try again.