Skip to content

Commit

Permalink
SERVER-17805 logOp / OperationObserver should always check shardversion
Browse files Browse the repository at this point in the history
  • Loading branch information
renctan committed Apr 1, 2015
1 parent 827d709 commit fe8a416
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 1 deletion.
63 changes: 63 additions & 0 deletions src/mongo/db/commands/write_commands/batch_executor.cpp
Expand Up @@ -69,6 +69,7 @@
#include "mongo/s/collection_metadata.h"
#include "mongo/s/d_state.h"
#include "mongo/s/shard_key_pattern.h"
#include "mongo/s/stale_exception.h"
#include "mongo/s/write_ops/batched_upsert_detail.h"
#include "mongo/s/write_ops/write_error_detail.h"
#include "mongo/util/elapsed_tracker.h"
Expand Down Expand Up @@ -826,6 +827,17 @@ namespace mongo {
ExecInsertsState state(_txn, &request);
normalizeInserts(request, &state.normalizedInserts);

ShardedConnectionInfo* info = ShardedConnectionInfo::get(false);
if (info) {
if (request.isMetadataSet() && request.getMetadata()->isShardVersionSet()) {
info->setVersion(request.getTargetingNS(),
request.getMetadata()->getShardVersion());
}
else {
info->setVersion(request.getTargetingNS(), ChunkVersion::IGNORED());
}
}

// Yield frequency is based on the same constants used by PlanYieldPolicy.
ElapsedTracker elapsedTracker(internalQueryExecYieldIterations,
internalQueryExecYieldPeriodMS);
Expand Down Expand Up @@ -874,6 +886,20 @@ namespace mongo {
beginCurrentOp( &currentOp, _txn->getClient(), updateItem );
incOpStats( updateItem );

ShardedConnectionInfo* info = ShardedConnectionInfo::get(false);
if (info) {
auto rootRequest = updateItem.getRequest();
if (!updateItem.getUpdate()->getMulti() &&
rootRequest->isMetadataSet() &&
rootRequest->getMetadata()->isShardVersionSet()) {
info->setVersion(rootRequest->getTargetingNS(),
rootRequest->getMetadata()->getShardVersion());
}
else {
info->setVersion(rootRequest->getTargetingNS(), ChunkVersion::IGNORED());
}
}

WriteOpResult result;

multiUpdate( _txn, updateItem, &result );
Expand Down Expand Up @@ -904,6 +930,20 @@ namespace mongo {
beginCurrentOp( &currentOp, _txn->getClient(), removeItem );
incOpStats( removeItem );

ShardedConnectionInfo* info = ShardedConnectionInfo::get(false);
if (info) {
auto rootRequest = removeItem.getRequest();
if (removeItem.getDelete()->getLimit() == 1 &&
rootRequest->isMetadataSet() &&
rootRequest->getMetadata()->isShardVersionSet()) {
info->setVersion(rootRequest->getTargetingNS(),
rootRequest->getMetadata()->getShardVersion());
}
else {
info->setVersion(rootRequest->getTargetingNS(), ChunkVersion::IGNORED());
}
}

WriteOpResult result;

multiRemove( _txn, removeItem, &result );
Expand Down Expand Up @@ -1059,6 +1099,14 @@ namespace mongo {
state->getCollection()->ns().ns() :
"index" );
}
catch (const StaleConfigException& staleExcep) {
result->setError(new WriteErrorDetail);
result->getError()->setErrCode(ErrorCodes::StaleShardVersion);
buildStaleError(staleExcep.getVersionReceived(),
staleExcep.getVersionWanted(),
result->getError());
break;
}
catch (const DBException& ex) {
Status status(ex.toStatus());
if (ErrorCodes::isInterruption(status.code()))
Expand Down Expand Up @@ -1316,6 +1364,13 @@ namespace mongo {

WriteConflictException::logAndBackoff( attempt++, "update", nsString.ns() );
}
catch (const StaleConfigException& staleExcep) {
result->setError(new WriteErrorDetail);
result->getError()->setErrCode(ErrorCodes::StaleShardVersion);
buildStaleError(staleExcep.getVersionReceived(),
staleExcep.getVersionWanted(),
result->getError());
}
catch (const DBException& ex) {
Status status = ex.toStatus();
if (ErrorCodes::isInterruption(status.code())) {
Expand Down Expand Up @@ -1399,6 +1454,14 @@ namespace mongo {
txn->getCurOp()->debug().writeConflicts++;
WriteConflictException::logAndBackoff( attempt++, "delete", nss.ns() );
}
catch (const StaleConfigException& staleExcep) {
result->setError(new WriteErrorDetail);
result->getError()->setErrCode(ErrorCodes::StaleShardVersion);
buildStaleError(staleExcep.getVersionReceived(),
staleExcep.getVersionWanted(),
result->getError());
return;
}
catch ( const DBException& ex ) {
Status status = ex.toStatus();
if (ErrorCodes::isInterruption(status.code())) {
Expand Down
4 changes: 3 additions & 1 deletion src/mongo/s/d_migrate.cpp
Expand Up @@ -332,6 +332,8 @@ namespace mongo {
const BSONObj& obj,
BSONObj* patt,
bool notInActiveChunk) {
ensureShardVersionOKOrThrow(ns);

const char op = opstr[0];

if (notInActiveChunk) {
Expand Down Expand Up @@ -376,7 +378,7 @@ namespace mongo {

if (op == 'u') {
BSONObj fullDoc;
OldClientContext ctx(txn, _ns);
OldClientContext ctx(txn, _ns, false);
if (!Helpers::findById(txn, ctx.db(), _ns.c_str(), idObj, fullDoc)) {
warning() << "logOpForSharding couldn't find: " << idObj
<< " even though should have" << migrateLog;
Expand Down
5 changes: 5 additions & 0 deletions src/mongo/s/d_state.cpp
Expand Up @@ -1338,6 +1338,11 @@ namespace mongo {
// TODO : all collections at some point, be sharded or not, will have a version
// (and a CollectionMetadata)
received = info->getVersion( ns );

if (ChunkVersion::isIgnoredVersion(received)) {
return true;
}

wanted = shardingState.getVersion( ns );

if( received.isWriteCompatibleWith( wanted ) ) return true;
Expand Down

0 comments on commit fe8a416

Please sign in to comment.