Skip to content

Commit

Permalink
SERVER-24638 Move command processing from Pipeline to AggregationRequest
Browse files Browse the repository at this point in the history
  • Loading branch information
cswanson310 committed Jun 24, 2016
1 parent 5bdf5d6 commit 20e9b27
Show file tree
Hide file tree
Showing 23 changed files with 918 additions and 415 deletions.
7 changes: 3 additions & 4 deletions jstests/aggregation/bugs/server7781.js
Expand Up @@ -11,10 +11,9 @@
db[coll].insert({loc: [0, 0]});

// $geoNear is only allowed as the first stage in a pipeline, nowhere else.
assertErrorCode(
db[coll],
[{$match: {x: 1}}, {$geoNear: {near: [1, 1], spherical: true, distanceField: 'dis'}}],
28837);
assert.throws(
() => db[coll].aggregate(
[{$match: {x: 1}}, {$geoNear: {near: [1, 1], spherical: true, distanceField: 'dis'}}]));

function checkOutput(cmdOut, aggOut, expectedNum) {
assert.commandWorked(cmdOut, "geoNear command");
Expand Down
7 changes: 3 additions & 4 deletions jstests/aggregation/bugs/server9444.js
Expand Up @@ -29,17 +29,16 @@
assert.eq(res.code, outOfMemoryCode);

// ensure allowDiskUse: false does what it says
var res = t.runCommand('aggregate', {pipeline: pipeline, allowDiskUse: false});
res = t.runCommand('aggregate', {pipeline: pipeline, allowDiskUse: false});
assert.commandFailed(res);
assert.eq(res.code, outOfMemoryCode);

// allowDiskUse only supports bool. In particular, numbers aren't allowed.
var res = t.runCommand('aggregate', {pipeline: pipeline, allowDiskUse: 1});
res = t.runCommand('aggregate', {pipeline: pipeline, allowDiskUse: 1});
assert.commandFailed(res);
assert.eq(res.code, 16949);

// ensure we work when allowDiskUse === true
var res = t.aggregate(pipeline, {allowDiskUse: true});
res = t.aggregate(pipeline, {allowDiskUse: true});
assert.eq(res.itcount(), t.count()); // all tests output one doc per input doc
}

Expand Down
4 changes: 2 additions & 2 deletions jstests/noPassthrough/read_majority.js
Expand Up @@ -41,8 +41,8 @@ load("jstests/libs/analyze_plan.js");
}

function getReadMajorityAggCursor() {
var res =
t.runCommand('aggregate', {cursor: {batchSize: 2}, readConcern: {level: "majority"}});
var res = t.runCommand(
'aggregate', {pipeline: [], cursor: {batchSize: 2}, readConcern: {level: "majority"}});
assert.commandWorked(res);
return new DBCommandCursor(db.getMongo(), res, 2);
}
Expand Down
3 changes: 1 addition & 2 deletions src/mongo/db/SConscript
Expand Up @@ -726,8 +726,7 @@ serveronlyLibdeps = [
"matcher/expressions_mongod_only",
"ops/update_driver",
"ops/write_ops_parsers",
"pipeline/document_source",
"pipeline/pipeline",
"pipeline/aggregation",
"query/query",
"range_deleter",
"repl/bgsync",
Expand Down
103 changes: 71 additions & 32 deletions src/mongo/db/commands/pipeline_command.cpp
Expand Up @@ -43,6 +43,7 @@
#include "mongo/db/exec/pipeline_proxy.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/pipeline/accumulator.h"
#include "mongo/db/pipeline/aggregation_request.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/expression.h"
Expand All @@ -68,16 +69,18 @@ using std::stringstream;
using std::unique_ptr;
using stdx::make_unique;

namespace {

/**
* Returns true if we need to keep a ClientCursor saved for this pipeline (for future getMore
* requests). Otherwise, returns false.
*/
static bool handleCursorCommand(OperationContext* txn,
const string& ns,
ClientCursorPin* pin,
PlanExecutor* exec,
const BSONObj& cmdObj,
BSONObjBuilder& result) {
bool handleCursorCommand(OperationContext* txn,
const string& ns,
ClientCursorPin* pin,
PlanExecutor* exec,
const BSONObj& cmdObj,
BSONObjBuilder& result) {
ClientCursor* cursor = pin ? pin->c() : NULL;
if (pin) {
invariant(cursor);
Expand Down Expand Up @@ -156,10 +159,42 @@ static bool handleCursorCommand(OperationContext* txn,
return static_cast<bool>(cursor);
}

/**
* Round trips the pipeline through serialization by calling serialize(), then Pipeline::parse().
* fasserts if it fails to parse after being serialized.
*/
boost::intrusive_ptr<Pipeline> reparsePipeline(
const boost::intrusive_ptr<Pipeline>& pipeline,
const AggregationRequest& request,
const boost::intrusive_ptr<ExpressionContext>& expCtx) {
auto serialized = pipeline->serialize();

// Convert vector<Value> to vector<BSONObj>.
std::vector<BSONObj> parseableSerialization;
parseableSerialization.reserve(serialized.size());
for (auto&& serializedStage : serialized) {
invariant(serializedStage.getType() == BSONType::Object);
parseableSerialization.push_back(serializedStage.getDocument().toBson());
}

auto reparsedPipeline = Pipeline::parse(parseableSerialization, expCtx);
if (!reparsedPipeline.isOK()) {
error() << "Aggregation command did not round trip through parsing and serialization "
"correctly. Input pipeline: "
<< Value(request.getPipeline()).toString()
<< ", serialized pipeline: " << Value(serialized).toString();
fassertFailedWithStatusNoTrace(40175, reparsedPipeline.getStatus());
}

return reparsedPipeline.getValue();
}

} // namespace

class PipelineCommand : public Command {
public:
PipelineCommand() : Command(Pipeline::commandName) {} // command is called "aggregate"
PipelineCommand()
: Command(AggregationRequest::kCommandName) {} // command is called "aggregate"

// Locks are managed manually, in particular by DocumentSourceCursor.
virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
Expand Down Expand Up @@ -210,24 +245,29 @@ class PipelineCommand : public Command {
}
NamespaceString nss(ns);

intrusive_ptr<ExpressionContext> pCtx = new ExpressionContext(txn, nss);
pCtx->tempDir = storageGlobalParams.dbpath + "/_tmp";
// Parse the options for this request.
auto request = AggregationRequest::parseFromBSON(nss, cmdObj);
if (!request.isOK()) {
return appendCommandStatus(result, request.getStatus());
}

/* try to parse the command; if this fails, then we didn't run */
intrusive_ptr<Pipeline> pPipeline = Pipeline::parseCommand(errmsg, cmdObj, pCtx);
if (!pPipeline.get())
return false;
// Set up the ExpressionContext.
intrusive_ptr<ExpressionContext> expCtx = new ExpressionContext(txn, request.getValue());
expCtx->tempDir = storageGlobalParams.dbpath + "/_tmp";

// This is outside of the if block to keep the object alive until the pipeline is finished.
BSONObj parsed;
if (kDebugBuild && !pPipeline->isExplain() && !pCtx->inShard) {
// Make sure all operations round-trip through Pipeline::toBson() correctly by
// reparsing every command in debug builds. This is important because sharded
// aggregations rely on this ability. Skipping when inShard because this has
// already been through the transformation (and this unsets pCtx->inShard).
parsed = pPipeline->serialize().toBson();
pPipeline = Pipeline::parseCommand(errmsg, parsed, pCtx);
verify(pPipeline);
// Parse the pipeline.
auto statusWithPipeline = Pipeline::parse(request.getValue().getPipeline(), expCtx);
if (!statusWithPipeline.isOK()) {
return appendCommandStatus(result, statusWithPipeline.getStatus());
}
auto pipeline = std::move(statusWithPipeline.getValue());

if (kDebugBuild && !expCtx->isExplain && !expCtx->inShard) {
// Make sure all operations round-trip through Pipeline::serialize() correctly by
// re-parsing every command in debug builds. This is important because sharded
// aggregations rely on this ability. Skipping when inShard because this has already
// been through the transformation (and this un-sets expCtx->inShard).
pipeline = reparsePipeline(pipeline, request.getValue(), expCtx);
}

unique_ptr<ClientCursorPin> pin; // either this OR the exec will be non-null
Expand All @@ -246,22 +286,21 @@ class PipelineCommand : public Command {

// If the pipeline does not have a user-specified collation, set it from the
// collection default.
if (pPipeline->getContext()->collation.isEmpty() && collection &&
if (request.getValue().getCollation().isEmpty() && collection &&
collection->getDefaultCollator()) {
pPipeline->setCollator(collection->getDefaultCollator()->clone());
pipeline->setCollator(collection->getDefaultCollator()->clone());
}

// This does mongod-specific stuff like creating the input PlanExecutor and adding
// it to the front of the pipeline if needed.
std::shared_ptr<PlanExecutor> input =
PipelineD::prepareCursorSource(txn, collection, nss, pPipeline, pCtx);
pPipeline->stitch();
PipelineD::prepareCursorSource(txn, collection, nss, pipeline, expCtx);

// Create the PlanExecutor which returns results from the pipeline. The WorkingSet
// ('ws') and the PipelineProxyStage ('proxy') will be owned by the created
// PlanExecutor.
auto ws = make_unique<WorkingSet>();
auto proxy = make_unique<PipelineProxyStage>(txn, pPipeline, input, ws.get());
auto proxy = make_unique<PipelineProxyStage>(txn, pipeline, input, ws.get());

auto statusWithPlanExecutor = (NULL == collection)
? PlanExecutor::make(
Expand Down Expand Up @@ -327,8 +366,8 @@ class PipelineCommand : public Command {
}

// If both explain and cursor are specified, explain wins.
if (pPipeline->isExplain()) {
result << "stages" << Value(pPipeline->writeExplainOps());
if (expCtx->isExplain) {
result << "stages" << Value(pipeline->writeExplainOps());
} else if (isCursorCommand) {
keepCursor = handleCursorCommand(txn,
nss.ns(),
Expand All @@ -337,10 +376,10 @@ class PipelineCommand : public Command {
cmdObj,
result);
} else {
pPipeline->run(result);
pipeline->run(result);
}

if (!pPipeline->isExplain()) {
if (!expCtx->isExplain) {
PlanSummaryStats stats;
Explain::getSummaryStats(pin ? *pin->c()->getExecutor() : *exec.get(), &stats);
curOp->debug().setPlanSummaryMetrics(stats);
Expand Down
52 changes: 51 additions & 1 deletion src/mongo/db/pipeline/SConscript
@@ -1,6 +1,19 @@
# -*- mode: python -*-

Import('env')

env.Library(
target='aggregation',
source=[
],
LIBDEPS=[
'aggregation_request',
'document_source',
'expression_context',
'pipeline',
]
)

env.Library(
target='field_path',
source=[
Expand Down Expand Up @@ -41,6 +54,42 @@ env.CppUnitTest(
],
)

env.Library(
target='aggregation_request',
source=[
'aggregation_request.cpp',
],
LIBDEPS=[
'document_value',
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/namespace_string',
'$BUILD_DIR/mongo/db/query/query_request',
'$BUILD_DIR/mongo/db/repl/read_concern_args',
'$BUILD_DIR/mongo/db/storage/storage_options',
]
)

env.CppUnitTest(
target='aggregation_request_test',
source='aggregation_request_test.cpp',
LIBDEPS=[
'aggregation_request',
],
)

env.Library(
target='expression_context',
source=[
'expression_context.cpp',
],
LIBDEPS=[
'aggregation_request',
'$BUILD_DIR/mongo/db/query/collation/collator_factory_interface',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/util/intrusive_counter',
]
)

env.CppUnitTest(
target='document_source_test',
source='document_source_test.cpp',
Expand Down Expand Up @@ -130,6 +179,7 @@ docSourceEnv.Library(
'dependencies',
'document_value',
'expression',
'expression_context',
'$BUILD_DIR/mongo/client/clientdriver',
'$BUILD_DIR/mongo/db/bson/dotted_path_support',
'$BUILD_DIR/mongo/db/matcher/expressions',
Expand All @@ -145,7 +195,6 @@ docSourceEnv.Library(
# which is not uniquely defined
'incomplete'
],

)

env.Library(
Expand All @@ -157,6 +206,7 @@ env.Library(
'dependencies',
'document_source',
'document_value',
'expression_context',
'$BUILD_DIR/mongo/db/auth/authorization_manager_global',
'$BUILD_DIR/mongo/db/auth/authcore',
'$BUILD_DIR/mongo/db/bson/dotted_path_support',
Expand Down

0 comments on commit 20e9b27

Please sign in to comment.