Skip to content

Commit

Permalink
SERVER-31674 Rename misleading functions and structs in cluster_aggre…
Browse files Browse the repository at this point in the history
…gate.cpp
  • Loading branch information
gormanb committed Oct 26, 2017
1 parent 81c5724 commit 253d4c5
Showing 1 changed file with 45 additions and 46 deletions.
91 changes: 45 additions & 46 deletions src/mongo/s/commands/cluster_aggregate.cpp
Expand Up @@ -245,15 +245,15 @@ BSONObj createCommandForMergingShard(
return mergeCmd.freeze().toBson();
}

StatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>>
establishShardCursorsWithoutRetrying(OperationContext* opCtx,
const NamespaceString& nss,
const LiteParsedPipeline& litePipe,
CachedCollectionRoutingInfo* routingInfo,
const BSONObj& cmdObj,
const ReadPreferenceSetting& readPref,
const BSONObj& shardQuery,
const BSONObj& collation) {
StatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>> establishShardCursors(
OperationContext* opCtx,
const NamespaceString& nss,
const LiteParsedPipeline& litePipe,
CachedCollectionRoutingInfo* routingInfo,
const BSONObj& cmdObj,
const ReadPreferenceSetting& readPref,
const BSONObj& shardQuery,
const BSONObj& collation) {
LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards";

std::set<ShardId> shardIds =
Expand Down Expand Up @@ -306,7 +306,7 @@ establishShardCursorsWithoutRetrying(OperationContext* opCtx,
return swCursors;
}

struct EstablishShardCursorsResults {
struct DispatchShardPipelineResults {
// True if this pipeline was split, and the second half of the pipeline needs to be run on the
// primary shard for the database.
bool needsPrimaryShardMerge;
Expand All @@ -331,7 +331,7 @@ struct EstablishShardCursorsResults {
* the pipeline that will need to be executed to merge the results from the remotes. If a stale
* shard version is encountered, refreshes the routing table and tries again.
*/
StatusWith<EstablishShardCursorsResults> establishShardCursors(
StatusWith<DispatchShardPipelineResults> dispatchShardPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& executionNss,
BSONObj originalCmdObj,
Expand All @@ -341,8 +341,7 @@ StatusWith<EstablishShardCursorsResults> establishShardCursors(
// The process is as follows:
// - First, determine whether we need to target more than one shard. If so, we split the
// pipeline; if not, we retain the existing pipeline.
// - Call establishShardCursorsWithoutRetrying to dispatch the aggregation to the targeted
// shards.
// - Call establishShardCursors to dispatch the aggregation to the targeted shards.
// - If we get a staleConfig exception, re-evaluate whether we need to split the pipeline with
// the refreshed routing table data.
// - If the pipeline is not split and we now need to target multiple shards, split it. If the
Expand Down Expand Up @@ -431,14 +430,14 @@ StatusWith<EstablishShardCursorsResults> establishShardCursors(
nullptr /* viewDefinition */);
}
} else {
swCursors = establishShardCursorsWithoutRetrying(opCtx,
executionNss,
liteParsedPipeline,
&executionNsRoutingInfo,
targetedCommand,
ReadPreferenceSetting::get(opCtx),
shardQuery,
aggRequest.getCollation());
swCursors = establishShardCursors(opCtx,
executionNss,
liteParsedPipeline,
&executionNsRoutingInfo,
targetedCommand,
ReadPreferenceSetting::get(opCtx),
shardQuery,
aggRequest.getCollation());

if (ErrorCodes::isStaleShardingError(swCursors.getStatus().code())) {
LOG(1) << "got stale shardVersion error " << swCursors.getStatus()
Expand All @@ -455,7 +454,7 @@ StatusWith<EstablishShardCursorsResults> establishShardCursors(
if (!swCursors.isOK()) {
return swCursors.getStatus();
}
return EstablishShardCursorsResults{needsPrimaryShardMerge,
return DispatchShardPipelineResults{needsPrimaryShardMerge,
std::move(swCursors.getValue()),
std::move(swShardResults.getValue()),
std::move(pipelineForTargetedShards),
Expand Down Expand Up @@ -662,32 +661,32 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
return getStatusFromCommandResult(result->asTempObj());
}

auto targetingResults = uassertStatusOK(establishShardCursors(mergeCtx,
namespaces.executionNss,
cmdObj,
request,
liteParsedPipeline,
std::move(pipeline)));
auto dispatchResults = uassertStatusOK(dispatchShardPipeline(mergeCtx,
namespaces.executionNss,
cmdObj,
request,
liteParsedPipeline,
std::move(pipeline)));

if (mergeCtx->explain) {
// If we reach here, we've either succeeded in running the explain or exhausted all
// attempts. In either case, attempt to append the explain results to the output builder.
uassertAllShardsSupportExplain(targetingResults.remoteExplainOutput);
uassertAllShardsSupportExplain(dispatchResults.remoteExplainOutput);

return appendExplainResults(std::move(targetingResults.remoteExplainOutput),
return appendExplainResults(std::move(dispatchResults.remoteExplainOutput),
mergeCtx,
targetingResults.pipelineForTargetedShards,
targetingResults.pipelineForMerging,
dispatchResults.pipelineForTargetedShards,
dispatchResults.pipelineForMerging,
result);
}


invariant(targetingResults.remoteCursors.size() > 0);
invariant(dispatchResults.remoteCursors.size() > 0);

// If we dispatched to a single shard, store the remote cursor and return immediately.
if (!targetingResults.pipelineForTargetedShards->isSplitForShards()) {
invariant(targetingResults.remoteCursors.size() == 1);
const auto& remoteCursor = targetingResults.remoteCursors[0];
if (!dispatchResults.pipelineForTargetedShards->isSplitForShards()) {
invariant(dispatchResults.remoteCursors.size() == 1);
const auto& remoteCursor = dispatchResults.remoteCursors[0];
auto executorPool = Grid::get(opCtx)->getExecutorPool();
const BSONObj reply = uassertStatusOK(storePossibleCursor(
opCtx,
Expand All @@ -703,7 +702,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
}

// If we reach here, we have a merge pipeline to dispatch.
auto mergingPipeline = std::move(targetingResults.pipelineForMerging);
auto mergingPipeline = std::move(dispatchResults.pipelineForMerging);
invariant(mergingPipeline);

// First, check whether we can merge on the mongoS. If the merge pipeline MUST run on mongoS,
Expand All @@ -716,7 +715,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
request,
namespaces.requestedNss,
std::move(mergingPipeline),
std::move(targetingResults.remoteCursors));
std::move(dispatchResults.remoteCursors));

// We don't need to storePossibleCursor or propagate writeConcern errors; an $out pipeline
// can never run on mongoS. Filter the command response and return immediately.
Expand All @@ -726,16 +725,16 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,

// If we cannot merge on mongoS, establish the merge cursor on a shard.
mergingPipeline->addInitialSource(
DocumentSourceMergeCursors::create(parseCursors(targetingResults.remoteCursors), mergeCtx));
DocumentSourceMergeCursors::create(parseCursors(dispatchResults.remoteCursors), mergeCtx));
auto mergeCmdObj = createCommandForMergingShard(request, mergeCtx, cmdObj, mergingPipeline);

auto mergeResponse = uassertStatusOK(establishMergingShardCursor(
opCtx,
namespaces.executionNss,
targetingResults.remoteCursors,
mergeCmdObj,
boost::optional<ShardId>{targetingResults.needsPrimaryShardMerge,
executionNsRoutingInfo.primaryId()}));
auto mergeResponse = uassertStatusOK(
establishMergingShardCursor(opCtx,
namespaces.executionNss,
dispatchResults.remoteCursors,
mergeCmdObj,
boost::optional<ShardId>{dispatchResults.needsPrimaryShardMerge,
executionNsRoutingInfo.primaryId()}));

auto mergingShardId = mergeResponse.first;
auto response = mergeResponse.second;
Expand Down

0 comments on commit 253d4c5

Please sign in to comment.