Skip to content

Commit

Permalink
Make doingMerge a property of DocumentSourceGroup rather than Express…
Browse files Browse the repository at this point in the history
…ionContext

Prep for sharded $out (SERVER-10097). Will need to be able to send a
merging $group to a shard.
  • Loading branch information
RedBeard0531 committed Aug 8, 2013
1 parent 17b3f0f commit 8e93f57
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 47 deletions.
4 changes: 4 additions & 0 deletions src/mongo/db/pipeline/document_source.h
Expand Up @@ -636,6 +636,9 @@ namespace mongo {
intrusive_ptr<Accumulator> (*pAccumulatorFactory)(),
const intrusive_ptr<Expression> &pExpression);

/// Tell this source if it is doing a merge from shards. Defaults to false.
void setDoingMerge(bool doingMerge) { _doingMerge = doingMerge; }

/**
Create a grouping DocumentSource from BSON.
Expand Down Expand Up @@ -704,6 +707,7 @@ namespace mongo {

Document makeDocument(const Value& id, const Accumulators& accums, bool mergeableOutput);

bool _doingMerge;
bool _spilled;
const bool _extSortAllowed;
const int _maxMemoryUsageBytes;
Expand Down
10 changes: 4 additions & 6 deletions src/mongo/db/pipeline/document_source_group.cpp
Expand Up @@ -177,6 +177,7 @@ namespace mongo {
DocumentSourceGroup::DocumentSourceGroup(const intrusive_ptr<ExpressionContext>& pExpCtx)
: SplittableDocumentSource(pExpCtx)
, populated(false)
, _doingMerge(false)
, _spilled(false)
, _extSortAllowed(pExpCtx->getExtSortAllowed() && !pExpCtx->getInRouter())
, _maxMemoryUsageBytes(100*1024*1024)
Expand Down Expand Up @@ -352,8 +353,6 @@ namespace mongo {
const size_t numAccumulators = vpAccumulatorFactory.size();
dassert(numAccumulators == vpExpression.size());

const bool mergeInputs = pExpCtx->getDoingMerge();

// pushed to on spill()
vector<shared_ptr<Sorter<Value, Value>::Iterator> > sortedFiles;
int memoryUsageBytes = 0;
Expand Down Expand Up @@ -403,7 +402,7 @@ namespace mongo {
/* tickle all the accumulators for the group we found */
dassert(numAccumulators == group.size());
for (size_t i = 0; i < numAccumulators; i++) {
group[i]->process(vpExpression[i]->evaluate(vars), mergeInputs);
group[i]->process(vpExpression[i]->evaluate(vars), _doingMerge);
memoryUsageBytes += group[i]->memUsageForSorter();
}

Expand Down Expand Up @@ -528,9 +527,8 @@ namespace mongo {
}

intrusive_ptr<DocumentSource> DocumentSourceGroup::getRouterSource() {
intrusive_ptr<ExpressionContext> pMergerExpCtx = pExpCtx->clone();
pMergerExpCtx->setDoingMerge(true);
intrusive_ptr<DocumentSourceGroup> pMerger(DocumentSourceGroup::create(pMergerExpCtx));
intrusive_ptr<DocumentSourceGroup> pMerger(DocumentSourceGroup::create(pExpCtx));
pMerger->setDoingMerge(true);

/* the merger will use the same grouping key */
pMerger->setIdExpression(ExpressionFieldPath::parse("$$ROOT._id"));
Expand Down
4 changes: 1 addition & 3 deletions src/mongo/db/pipeline/expression_context.cpp
Expand Up @@ -26,8 +26,7 @@ namespace mongo {
}

inline ExpressionContext::ExpressionContext(InterruptStatus *pS, const NamespaceString& ns)
: doingMerge(false)
, inShard(false)
: inShard(false)
, inRouter(false)
, extSortAllowed(false)
, intCheckCounter(1)
Expand All @@ -47,7 +46,6 @@ namespace mongo {

ExpressionContext* ExpressionContext::clone() {
ExpressionContext* newContext = create(pStatus, getNs());
newContext->setDoingMerge(getDoingMerge());
newContext->setInShard(getInShard());
newContext->setInRouter(getInRouter());
newContext->setExtSortAllowed(getExtSortAllowed());
Expand Down
42 changes: 4 additions & 38 deletions src/mongo/db/pipeline/expression_context.h
Expand Up @@ -30,15 +30,13 @@ namespace mongo {
public:
virtual ~ExpressionContext();

void setDoingMerge(bool b);
void setInShard(bool b);
void setInRouter(bool b);
void setInShard(bool b) { inShard = b; }
void setInRouter(bool b) { inRouter = b; }
void setExtSortAllowed(bool b) { extSortAllowed = b; }
void setNs(NamespaceString ns) { _ns = ns; }

bool getDoingMerge() const;
bool getInShard() const;
bool getInRouter() const;
bool getInShard() const { return inShard; }
bool getInRouter() const { return inRouter; }
bool getExtSortAllowed() const { return extSortAllowed; }
const NamespaceString& getNs() const { return _ns; }

Expand All @@ -56,7 +54,6 @@ namespace mongo {
private:
ExpressionContext(InterruptStatus *pStatus, const NamespaceString& ns);

bool doingMerge;
bool inShard;
bool inRouter;
bool extSortAllowed;
Expand All @@ -65,34 +62,3 @@ namespace mongo {
NamespaceString _ns;
};
}


/* ======================= INLINED IMPLEMENTATIONS ========================== */

namespace mongo {

inline void ExpressionContext::setDoingMerge(bool b) {
doingMerge = b;
}

inline void ExpressionContext::setInShard(bool b) {
inShard = b;
}

inline void ExpressionContext::setInRouter(bool b) {
inRouter = b;
}

inline bool ExpressionContext::getDoingMerge() const {
return doingMerge;
}

inline bool ExpressionContext::getInShard() const {
return inShard;
}

inline bool ExpressionContext::getInRouter() const {
return inRouter;
}

};

0 comments on commit 8e93f57

Please sign in to comment.