Skip to content

Commit

Permalink
SERVER-16322: Make sure that RecoveryUnit::commitUnitOfWork can throw…
Browse files Browse the repository at this point in the history
… WCE

Map reduce did not handle WriteConflictExceptions at commit time well.
  • Loading branch information
GeertBosch committed Jul 31, 2015
1 parent ce1019e commit ba6cfc1
Showing 1 changed file with 65 additions and 50 deletions.
115 changes: 65 additions & 50 deletions src/mongo/db/commands/mr.cpp
Expand Up @@ -41,6 +41,7 @@
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/clientcursor.h"
#include "mongo/db/commands.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/db.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbhelpers.h"
Expand Down Expand Up @@ -346,13 +347,16 @@ void State::dropTempCollections() {
_txn->setReplicatedWrites(false);
ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, _txn, shouldReplicateWrites);

ScopedTransaction scopedXact(_txn, MODE_IX);
Lock::DBLock lk(_txn->lockState(), nsToDatabaseSubstring(_config.incLong), MODE_X);
if (Database* db = dbHolder().get(_txn, _config.incLong)) {
WriteUnitOfWork wunit(_txn);
db->dropCollection(_txn, _config.incLong);
wunit.commit();
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
ScopedTransaction scopedXact(_txn, MODE_IX);
Lock::DBLock lk(_txn->lockState(), nsToDatabaseSubstring(_config.incLong), MODE_X);
if (Database* db = dbHolder().get(_txn, _config.incLong)) {
WriteUnitOfWork wunit(_txn);
db->dropCollection(_txn, _config.incLong);
wunit.commit();
}
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "M/R dropTempCollections", _config.incLong)

ShardConnection::forgetNS(_config.incLong);
}
Expand All @@ -373,26 +377,30 @@ void State::prepTempCollection() {
_txn->setReplicatedWrites(false);
ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, _txn, shouldReplicateWrites);

OldClientWriteContext incCtx(_txn, _config.incLong);
WriteUnitOfWork wuow(_txn);
Collection* incColl = incCtx.getCollection();
invariant(!incColl);

CollectionOptions options;
options.setNoIdIndex();
options.temp = true;
incColl = incCtx.db()->createCollection(_txn, _config.incLong, options);
invariant(incColl);

BSONObj indexSpec = BSON("key" << BSON("0" << 1) << "ns" << _config.incLong << "name"
<< "_temp_0");
Status status = incColl->getIndexCatalog()->createIndexOnEmptyCollection(_txn, indexSpec);
if (!status.isOK()) {
uasserted(17305,
str::stream() << "createIndex failed for mr incLong ns: " << _config.incLong
<< " err: " << status.code());
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
OldClientWriteContext incCtx(_txn, _config.incLong);
WriteUnitOfWork wuow(_txn);
Collection* incColl = incCtx.getCollection();
invariant(!incColl);

CollectionOptions options;
options.setNoIdIndex();
options.temp = true;
incColl = incCtx.db()->createCollection(_txn, _config.incLong, options);
invariant(incColl);

BSONObj indexSpec = BSON("key" << BSON("0" << 1) << "ns" << _config.incLong << "name"
<< "_temp_0");
Status status =
incColl->getIndexCatalog()->createIndexOnEmptyCollection(_txn, indexSpec);
if (!status.isOK()) {
uasserted(17305,
str::stream() << "createIndex failed for mr incLong ns: "
<< _config.incLong << " err: " << status.code());
}
wuow.commit();
}
wuow.commit();
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "M/R prepTempCollection", _config.incLong);
}

CollectionOptions finalOptions;
Expand Down Expand Up @@ -426,7 +434,7 @@ void State::prepTempCollection() {
}
}

{
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
// create temp collection and insert the indexes from temporary storage
OldClientWriteContext tempCtx(_txn, _config.tempNamespace);
WriteUnitOfWork wuow(_txn);
Expand Down Expand Up @@ -456,6 +464,7 @@ void State::prepTempCollection() {
}
wuow.commit();
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "M/R prepTempCollection", _config.tempNamespace)
}

/**
Expand Down Expand Up @@ -669,24 +678,26 @@ long long State::postProcessCollectionNonAtomic(OperationContext* txn,
void State::insert(const string& ns, const BSONObj& o) {
verify(_onDisk);

MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
OldClientWriteContext ctx(_txn, ns);
WriteUnitOfWork wuow(_txn);
NamespaceString nss(ns);
uassert(ErrorCodes::NotMaster,
"no longer master",
repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss));
Collection* coll = getCollectionOrUassert(ctx.db(), ns);

OldClientWriteContext ctx(_txn, ns);
WriteUnitOfWork wuow(_txn);
NamespaceString nss(ns);
uassert(ErrorCodes::NotMaster,
"no longer master",
repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss));
Collection* coll = getCollectionOrUassert(ctx.db(), ns);
BSONObjBuilder b;
if (!o.hasField("_id")) {
b.appendOID("_id", NULL, true);
}
b.appendElements(o);
BSONObj bo = b.obj();

BSONObjBuilder b;
if (!o.hasField("_id")) {
b.appendOID("_id", NULL, true);
uassertStatusOK(coll->insertDocument(_txn, bo, true).getStatus());
wuow.commit();
}
b.appendElements(o);
BSONObj bo = b.obj();

uassertStatusOK(coll->insertDocument(_txn, bo, true).getStatus());
wuow.commit();
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "M/R insert", ns);
}

/**
Expand All @@ -695,14 +706,17 @@ void State::insert(const string& ns, const BSONObj& o) {
void State::_insertToInc(BSONObj& o) {
verify(_onDisk);

OldClientWriteContext ctx(_txn, _config.incLong);
WriteUnitOfWork wuow(_txn);
Collection* coll = getCollectionOrUassert(ctx.db(), _config.incLong);
bool shouldReplicateWrites = _txn->writesAreReplicated();
_txn->setReplicatedWrites(false);
ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, _txn, shouldReplicateWrites);
uassertStatusOK(coll->insertDocument(_txn, o, true, false).getStatus());
wuow.commit();
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
OldClientWriteContext ctx(_txn, _config.incLong);
WriteUnitOfWork wuow(_txn);
Collection* coll = getCollectionOrUassert(ctx.db(), _config.incLong);
bool shouldReplicateWrites = _txn->writesAreReplicated();
_txn->setReplicatedWrites(false);
ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, _txn, shouldReplicateWrites);
uassertStatusOK(coll->insertDocument(_txn, o, true, false).getStatus());
wuow.commit();
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "M/R insertToInc", _config.incLong);
}

State::State(OperationContext* txn, const Config& c)
Expand Down Expand Up @@ -967,7 +981,7 @@ void State::finalReduce(CurOp* op, ProgressMeterHolder& pm) {
verify(_temp->size() == 0);
BSONObj sortKey = BSON("0" << 1);

{
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
OldClientWriteContext incCtx(_txn, _config.incLong);
WriteUnitOfWork wuow(_txn);
Collection* incColl = getCollectionOrUassert(incCtx.db(), _config.incLong);
Expand All @@ -987,6 +1001,7 @@ void State::finalReduce(CurOp* op, ProgressMeterHolder& pm) {
verify(foundIndex);
wuow.commit();
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "finalReduce", _config.incLong);

unique_ptr<AutoGetCollectionForRead> ctx(new AutoGetCollectionForRead(_txn, _config.incLong));

Expand Down

0 comments on commit ba6cfc1

Please sign in to comment.