diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index 46f9af9f6eeb7..62b17b562f605 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -41,6 +41,7 @@ #include "mongo/db/catalog/document_validation.h" #include "mongo/db/clientcursor.h" #include "mongo/db/commands.h" +#include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/db.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbhelpers.h" @@ -346,13 +347,16 @@ void State::dropTempCollections() { _txn->setReplicatedWrites(false); ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, _txn, shouldReplicateWrites); - ScopedTransaction scopedXact(_txn, MODE_IX); - Lock::DBLock lk(_txn->lockState(), nsToDatabaseSubstring(_config.incLong), MODE_X); - if (Database* db = dbHolder().get(_txn, _config.incLong)) { - WriteUnitOfWork wunit(_txn); - db->dropCollection(_txn, _config.incLong); - wunit.commit(); + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + ScopedTransaction scopedXact(_txn, MODE_IX); + Lock::DBLock lk(_txn->lockState(), nsToDatabaseSubstring(_config.incLong), MODE_X); + if (Database* db = dbHolder().get(_txn, _config.incLong)) { + WriteUnitOfWork wunit(_txn); + db->dropCollection(_txn, _config.incLong); + wunit.commit(); + } } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "M/R dropTempCollections", _config.incLong) ShardConnection::forgetNS(_config.incLong); } @@ -373,26 +377,30 @@ void State::prepTempCollection() { _txn->setReplicatedWrites(false); ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, _txn, shouldReplicateWrites); - OldClientWriteContext incCtx(_txn, _config.incLong); - WriteUnitOfWork wuow(_txn); - Collection* incColl = incCtx.getCollection(); - invariant(!incColl); - - CollectionOptions options; - options.setNoIdIndex(); - options.temp = true; - incColl = incCtx.db()->createCollection(_txn, _config.incLong, options); - invariant(incColl); - - BSONObj indexSpec = BSON("key" << BSON("0" << 1) << "ns" << _config.incLong << "name" - << "_temp_0"); - Status status = incColl->getIndexCatalog()->createIndexOnEmptyCollection(_txn, indexSpec); - if (!status.isOK()) { - uasserted(17305, - str::stream() << "createIndex failed for mr incLong ns: " << _config.incLong - << " err: " << status.code()); + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + OldClientWriteContext incCtx(_txn, _config.incLong); + WriteUnitOfWork wuow(_txn); + Collection* incColl = incCtx.getCollection(); + invariant(!incColl); + + CollectionOptions options; + options.setNoIdIndex(); + options.temp = true; + incColl = incCtx.db()->createCollection(_txn, _config.incLong, options); + invariant(incColl); + + BSONObj indexSpec = BSON("key" << BSON("0" << 1) << "ns" << _config.incLong << "name" + << "_temp_0"); + Status status = + incColl->getIndexCatalog()->createIndexOnEmptyCollection(_txn, indexSpec); + if (!status.isOK()) { + uasserted(17305, + str::stream() << "createIndex failed for mr incLong ns: " + << _config.incLong << " err: " << status.code()); + } + wuow.commit(); } - wuow.commit(); + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "M/R prepTempCollection", _config.incLong); } CollectionOptions finalOptions; @@ -426,7 +434,7 @@ void State::prepTempCollection() { } } - { + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { // create temp collection and insert the indexes from temporary storage OldClientWriteContext tempCtx(_txn, _config.tempNamespace); WriteUnitOfWork wuow(_txn); @@ -456,6 +464,7 @@ void State::prepTempCollection() { } wuow.commit(); } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "M/R prepTempCollection", _config.tempNamespace) } /** @@ -669,24 +678,26 @@ long long State::postProcessCollectionNonAtomic(OperationContext* txn, void State::insert(const string& ns, const BSONObj& o) { verify(_onDisk); + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + OldClientWriteContext ctx(_txn, ns); + WriteUnitOfWork wuow(_txn); + NamespaceString nss(ns); + uassert(ErrorCodes::NotMaster, + "no longer master", + repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)); + Collection* coll = getCollectionOrUassert(ctx.db(), ns); - OldClientWriteContext ctx(_txn, ns); - WriteUnitOfWork wuow(_txn); - NamespaceString nss(ns); - uassert(ErrorCodes::NotMaster, - "no longer master", - repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)); - Collection* coll = getCollectionOrUassert(ctx.db(), ns); + BSONObjBuilder b; + if (!o.hasField("_id")) { + b.appendOID("_id", NULL, true); + } + b.appendElements(o); + BSONObj bo = b.obj(); - BSONObjBuilder b; - if (!o.hasField("_id")) { - b.appendOID("_id", NULL, true); + uassertStatusOK(coll->insertDocument(_txn, bo, true).getStatus()); + wuow.commit(); } - b.appendElements(o); - BSONObj bo = b.obj(); - - uassertStatusOK(coll->insertDocument(_txn, bo, true).getStatus()); - wuow.commit(); + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "M/R insert", ns); } /** @@ -695,14 +706,17 @@ void State::insert(const string& ns, const BSONObj& o) { void State::_insertToInc(BSONObj& o) { verify(_onDisk); - OldClientWriteContext ctx(_txn, _config.incLong); - WriteUnitOfWork wuow(_txn); - Collection* coll = getCollectionOrUassert(ctx.db(), _config.incLong); - bool shouldReplicateWrites = _txn->writesAreReplicated(); - _txn->setReplicatedWrites(false); - ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, _txn, shouldReplicateWrites); - uassertStatusOK(coll->insertDocument(_txn, o, true, false).getStatus()); - wuow.commit(); + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + OldClientWriteContext ctx(_txn, _config.incLong); + WriteUnitOfWork wuow(_txn); + Collection* coll = getCollectionOrUassert(ctx.db(), _config.incLong); + bool shouldReplicateWrites = _txn->writesAreReplicated(); + _txn->setReplicatedWrites(false); + ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, _txn, shouldReplicateWrites); + uassertStatusOK(coll->insertDocument(_txn, o, true, false).getStatus()); + wuow.commit(); + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "M/R insertToInc", _config.incLong); } State::State(OperationContext* txn, const Config& c) @@ -967,7 +981,7 @@ void State::finalReduce(CurOp* op, ProgressMeterHolder& pm) { verify(_temp->size() == 0); BSONObj sortKey = BSON("0" << 1); - { + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { OldClientWriteContext incCtx(_txn, _config.incLong); WriteUnitOfWork wuow(_txn); Collection* incColl = getCollectionOrUassert(incCtx.db(), _config.incLong); @@ -987,6 +1001,7 @@ void State::finalReduce(CurOp* op, ProgressMeterHolder& pm) { verify(foundIndex); wuow.commit(); } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "finalReduce", _config.incLong); unique_ptr ctx(new AutoGetCollectionForRead(_txn, _config.incLong));