Skip to content

Commit

Permalink
SERVER-5218 Batch oplog writes always wait for journal.
Browse files Browse the repository at this point in the history
  • Loading branch information
visualzhou committed Apr 30, 2015
1 parent 9da229e commit 673bb29
Show file tree
Hide file tree
Showing 15 changed files with 46 additions and 73 deletions.
2 changes: 1 addition & 1 deletion src/mongo/db/commands/cleanup_orphaned_cmd.cpp
Expand Up @@ -53,7 +53,7 @@ namespace {
using mongo::WriteConcernOptions;

const int kDefaultWTimeoutMs = 60 * 1000;
const WriteConcernOptions DefaultWriteConcern("majority",
const WriteConcernOptions DefaultWriteConcern(WriteConcernOptions::kMajority,
WriteConcernOptions::NONE,
kDefaultWTimeoutMs);
}
Expand Down
34 changes: 1 addition & 33 deletions src/mongo/db/commands/write_commands/batch_executor.cpp
Expand Up @@ -735,32 +735,13 @@ namespace mongo {
std::vector<BatchedUpsertDetail*>* upsertedIds,
std::vector<WriteErrorDetail*>* errors ) {

WriteConcernOptions originalWC = _txn->getWriteConcern();

// We adjust the write concern attached to the OperationContext to not wait for
// journal. Later, the code will restore the write concern to wait for journal on
// the last write of the batch.
if (request.sizeWriteOps() > 1
&& originalWC.syncMode == WriteConcernOptions::JOURNAL)
{
WriteConcernOptions writeConcern = originalWC;
writeConcern.syncMode = WriteConcernOptions::NONE;
_txn->setWriteConcern(writeConcern);
}

if ( request.getBatchType() == BatchedCommandRequest::BatchType_Insert ) {
execInserts( request, originalWC, errors );
execInserts( request, errors );
}
else if ( request.getBatchType() == BatchedCommandRequest::BatchType_Update ) {
for ( size_t i = 0; i < request.sizeWriteOps(); i++ ) {

if ( i + 1 == request.sizeWriteOps() ) {
// For the last write in the batch, restore the write concern back to the
// original provided one; this may set WriteConcernOptions::JOURNAL back
// to true.
_txn->setWriteConcern(originalWC);
// Use the original write concern to possibly await the commit of this write,
// in order to flush the journal as requested.
setupSynchronousCommit( _txn );
}

Expand All @@ -787,12 +768,6 @@ namespace mongo {
for ( size_t i = 0; i < request.sizeWriteOps(); i++ ) {

if ( i + 1 == request.sizeWriteOps() ) {
// For the last write in the batch, restore the write concern back to the
// original provided one; this may set WriteConcernOptions::JOURNAL back
// to true.
_txn->setWriteConcern(originalWC);
// Use the original write concern to possibly await the commit of this write,
// in order to flush the journal as requested.
setupSynchronousCommit( _txn );
}

Expand Down Expand Up @@ -837,7 +812,6 @@ namespace mongo {
}

void WriteBatchExecutor::execInserts( const BatchedCommandRequest& request,
const WriteConcernOptions& originalWC,
std::vector<WriteErrorDetail*>* errors ) {

// Theory of operation:
Expand Down Expand Up @@ -882,12 +856,6 @@ namespace mongo {
++state.currIndex) {

if (state.currIndex + 1 == state.request->sizeWriteOps()) {
// For the last write in the batch, restore the write concern back to the
// original provided one; this may set WriteConcernOptions::JOURNAL back
// to true.
_txn->setWriteConcern(originalWC);
// Use the original write concern to possibly await the commit of this write,
// in order to flush the journal as requested.
setupSynchronousCommit(_txn);
}

Expand Down
2 changes: 0 additions & 2 deletions src/mongo/db/commands/write_commands/batch_executor.h
Expand Up @@ -33,7 +33,6 @@

#include "mongo/base/disallow_copying.h"
#include "mongo/db/ops/update_request.h"
#include "mongo/db/write_concern_options.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/s/write_ops/batched_delete_document.h"
Expand Down Expand Up @@ -95,7 +94,6 @@ namespace mongo {
* times.
*/
void execInserts( const BatchedCommandRequest& request,
const WriteConcernOptions& originalWC,
std::vector<WriteErrorDetail*>* errors );

/**
Expand Down
2 changes: 1 addition & 1 deletion src/mongo/db/range_deleter.cpp
Expand Up @@ -269,7 +269,7 @@ namespace {
const int kWTimeoutMillis = 60 * 60 * 1000;

bool _waitForMajority(OperationContext* txn, std::string* errMsg) {
const WriteConcernOptions writeConcern("majority",
const WriteConcernOptions writeConcern(WriteConcernOptions::kMajority,
WriteConcernOptions::NONE,
kWTimeoutMillis);

Expand Down
2 changes: 1 addition & 1 deletion src/mongo/db/repl/initial_sync.h
Expand Up @@ -48,7 +48,7 @@ namespace repl {
*/
void oplogApplication(OperationContext* txn, const Timestamp& endOpTime);

// Initial sync will ignore all journal requirement flags and dones't await commit
// Initial sync will ignore all journal requirement flags and doesn't await commit
// before updating last OpTime.
virtual bool supportsAwaitingCommit() { return false; }
};
Expand Down
6 changes: 0 additions & 6 deletions src/mongo/db/repl/oplog.cpp
Expand Up @@ -302,12 +302,6 @@ namespace {
b.appendBool("fromMigrate", true);
}

if (txn->getWriteConcern().shouldWaitForOtherNodes()
&& txn->getWriteConcern().syncMode == WriteConcernOptions::JOURNAL)
{
b.appendBool("j", true);
}

if ( o2 ) {
b.append("o2", *o2);
}
Expand Down
4 changes: 2 additions & 2 deletions src/mongo/db/repl/replica_set_config.cpp
Expand Up @@ -359,7 +359,7 @@ namespace {
}
}
else {
if ("majority" != _defaultWriteConcern.wMode &&
if (WriteConcernOptions::kMajority != _defaultWriteConcern.wMode &&
!findCustomWriteMode(_defaultWriteConcern.wMode).isOK()) {
return Status(ErrorCodes::BadValue, str::stream() <<
"Default write concern requires undefined write mode " <<
Expand All @@ -377,7 +377,7 @@ namespace {

Status ReplicaSetConfig::checkIfWriteConcernCanBeSatisfied(
const WriteConcernOptions& writeConcern) const {
if (!writeConcern.wMode.empty() && writeConcern.wMode != "majority") {
if (!writeConcern.wMode.empty() && writeConcern.wMode != WriteConcernOptions::kMajority) {
StatusWith<ReplicaSetTagPattern> tagPatternStatus =
findCustomWriteMode(writeConcern.wMode);
if (!tagPatternStatus.isOK()) {
Expand Down
4 changes: 2 additions & 2 deletions src/mongo/db/repl/replication_coordinator_impl.cpp
Expand Up @@ -859,7 +859,7 @@ namespace {

if (!writeConcern.wMode.empty()) {
StringData patternName;
if (writeConcern.wMode == "majority") {
if (writeConcern.wMode == WriteConcernOptions::kMajority) {
patternName = ReplicaSetConfig::kMajorityWriteConcernModeName;
}
else {
Expand Down Expand Up @@ -960,7 +960,7 @@ namespace {
return StatusAndDuration(Status::OK(), Milliseconds(timer->millis()));
}

if (replMode == modeMasterSlave && writeConcern.wMode == "majority") {
if (replMode == modeMasterSlave && writeConcern.wMode == WriteConcernOptions::kMajority) {
// with master/slave, majority is equivalent to w=1
return StatusAndDuration(Status::OK(), Milliseconds(timer->millis()));
}
Expand Down
10 changes: 5 additions & 5 deletions src/mongo/db/repl/replication_coordinator_impl_test.cpp
Expand Up @@ -432,7 +432,7 @@ namespace {


writeConcern.wNumNodes = 0;
writeConcern.wMode = "majority";
writeConcern.wMode = WriteConcernOptions::kMajority;
// w:majority always works on master/slave
ReplicationCoordinator::StatusAndDuration statusAndDur = getReplCoord()->awaitReplication(
&txn, time, writeConcern);
Expand Down Expand Up @@ -574,7 +574,7 @@ namespace {
// Set up valid write concerns for the rest of the test
WriteConcernOptions majorityWriteConcern;
majorityWriteConcern.wTimeout = WriteConcernOptions::kNoWaiting;
majorityWriteConcern.wMode = "majority";
majorityWriteConcern.wMode = WriteConcernOptions::kMajority;

WriteConcernOptions multiDCWriteConcern;
multiDCWriteConcern.wTimeout = WriteConcernOptions::kNoWaiting;
Expand Down Expand Up @@ -1817,7 +1817,7 @@ namespace {
// majority nodes waiting for time
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
writeConcern.wMode = "majority";
writeConcern.wMode = WriteConcernOptions::kMajority;

ReplicationAwaiter awaiter(getReplCoord(), &txn);
awaiter.setOpTime(time);
Expand All @@ -1827,7 +1827,7 @@ namespace {
// demonstrate that majority cannot currently be satisfied
WriteConcernOptions writeConcern2;
writeConcern2.wTimeout = WriteConcernOptions::kNoWaiting;
writeConcern2.wMode = "majority";
writeConcern2.wMode = WriteConcernOptions::kMajority;
ASSERT_EQUALS(ErrorCodes::ExceededTimeLimit,
getReplCoord()->awaitReplication(&txn, time, writeConcern2).status);

Expand Down Expand Up @@ -1884,7 +1884,7 @@ namespace {

WriteConcernOptions majorityWriteConcern;
majorityWriteConcern.wTimeout = WriteConcernOptions::kNoWaiting;
majorityWriteConcern.wMode = "majority";
majorityWriteConcern.wMode = WriteConcernOptions::kMajority;

ASSERT_EQUALS(ErrorCodes::ExceededTimeLimit,
getReplCoord()->awaitReplication(&txn, time, majorityWriteConcern).status);
Expand Down
21 changes: 8 additions & 13 deletions src/mongo/db/repl/sync_tail.cpp
Expand Up @@ -53,6 +53,7 @@
#include "mongo/db/repl/minvalid.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replica_set_config.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/stats/timer_stats.h"
#include "mongo/util/exit.h"
Expand Down Expand Up @@ -276,9 +277,8 @@ namespace repl {
}

std::vector< std::vector<BSONObj> > writerVectors(replWriterThreadCount);
bool mustAwaitCommit = false;

fillWriterVectors(ops, &writerVectors, &mustAwaitCommit);
fillWriterVectors(ops, &writerVectors);
LOG(2) << "replication batch size is " << ops.size() << endl;
// We must grab this because we're going to grab write locks later.
// We hold this mutex the entire time we're writing; it doesn't matter
Expand All @@ -302,12 +302,14 @@ namespace repl {
return Timestamp();
}

if (supportsAwaitingCommit() && mustAwaitCommit) {
const bool mustAwaitCommit = replCoord->isV1ElectionProtocol() && supportsAwaitingCommit();
if (mustAwaitCommit) {
txn->recoveryUnit()->goingToAwaitCommit();
}

Timestamp lastOpTime = writeOpsToOplog(txn, ops);
// Wait for journal before setting last op time if any op in batch had j:true
if (supportsAwaitingCommit() && mustAwaitCommit) {

if (mustAwaitCommit) {
txn->recoveryUnit()->awaitCommit();
}
ReplClientInfo::forClient(txn->getClient()).setLastOp(lastOpTime);
Expand All @@ -320,8 +322,7 @@ namespace repl {
}

void SyncTail::fillWriterVectors(const std::deque<BSONObj>& ops,
std::vector< std::vector<BSONObj> >* writerVectors,
bool* mustAwaitCommit) {
std::vector< std::vector<BSONObj> >* writerVectors) {

for (std::deque<BSONObj>::const_iterator it = ops.begin();
it != ops.end();
Expand All @@ -335,12 +336,6 @@ namespace repl {

const char* opType = it->getField( "op" ).valuestrsafe();

// Check if any entry needs journaling, and if so return the need
const bool foundJournal = it->getField("j").trueValue();
if (foundJournal) {
*mustAwaitCommit = true;
}

if (getGlobalServiceContext()->getGlobalStorageEngine()->supportsDocLocking() &&
isCrudOpType(opType)) {
BSONElement id;
Expand Down
7 changes: 2 additions & 5 deletions src/mongo/db/repl/sync_tail.h
Expand Up @@ -128,11 +128,8 @@ namespace repl {
// Doles out all the work to the writer pool threads and waits for them to complete
void applyOps(const std::vector< std::vector<BSONObj> >& writerVectors);

// mustAwaitCommit is an out-parameter and indicates that at least one of the ops
// in 'ops' had j:true.
void fillWriterVectors(const std::deque<BSONObj>& ops,
std::vector< std::vector<BSONObj> >* writerVectors,
bool* mustAwaitCommit);
void fillWriterVectors(const std::deque<BSONObj>& ops,
std::vector< std::vector<BSONObj> >* writerVectors);
void handleSlaveDelay(const BSONObj& op);

// persistent pool of worker threads for writing ops to the databases
Expand Down
19 changes: 18 additions & 1 deletion src/mongo/db/write_concern.cpp
Expand Up @@ -62,6 +62,18 @@ namespace mongo {
}
}

namespace {
// The consensus protocol requires that w: majority implies j: true on all nodes.
void addJournalSyncForWMajority(WriteConcernOptions* writeConcern) {
if (repl::getGlobalReplicationCoordinator()->isV1ElectionProtocol()
&& writeConcern->wMode == WriteConcernOptions::kMajority
&& writeConcern->syncMode == WriteConcernOptions::NONE)
{
writeConcern->syncMode = WriteConcernOptions::JOURNAL;
}
}
} // namespace

StatusWith<WriteConcernOptions> extractWriteConcern(const BSONObj& cmdObj) {
// The default write concern if empty is w : 1
// Specifying w : 0 is/was allowed, but is interpreted identically to w : 1
Expand All @@ -70,6 +82,8 @@ namespace mongo {
if (writeConcern.wNumNodes == 0 && writeConcern.wMode.empty()) {
writeConcern.wNumNodes = 1;
}
// Upgrade default write concern if necessary.
addJournalSyncForWMajority(&writeConcern);

BSONElement writeConcernElement;
Status wcStatus = bsonExtractTypedField(cmdObj,
Expand Down Expand Up @@ -100,6 +114,9 @@ namespace mongo {
return wcStatus;
}

// Upgrade parsed write concern if necessary.
addJournalSyncForWMajority(&writeConcern);

return writeConcern;
}

Expand Down Expand Up @@ -130,7 +147,7 @@ namespace mongo {

if ( replMode != repl::ReplicationCoordinator::modeReplSet &&
!writeConcern.wMode.empty() &&
writeConcern.wMode != "majority" ) {
writeConcern.wMode != WriteConcernOptions::kMajority ) {
return Status( ErrorCodes::BadValue,
string( "cannot use non-majority 'w' mode " ) + writeConcern.wMode
+ " when a host is not a member of a replica set" );
Expand Down
2 changes: 2 additions & 0 deletions src/mongo/db/write_concern_options.cpp
Expand Up @@ -39,6 +39,8 @@ namespace mongo {
const BSONObj WriteConcernOptions::Acknowledged(BSON("w" << W_NORMAL));
const BSONObj WriteConcernOptions::Unacknowledged(BSON("w" << W_NONE));

const char WriteConcernOptions::kMajority[] = "majority";

static const BSONField<bool> mongosSecondaryThrottleField("_secondaryThrottle", true);
static const BSONField<bool> secondaryThrottleField("secondaryThrottle", true);
static const BSONField<BSONObj> writeConcernField("writeConcern");
Expand Down
2 changes: 2 additions & 0 deletions src/mongo/db/write_concern_options.h
Expand Up @@ -46,6 +46,8 @@ namespace mongo {
static const BSONObj Acknowledged;
static const BSONObj Unacknowledged;

static const char kMajority[]; // = "majority"

WriteConcernOptions() { reset(); }

WriteConcernOptions(int numNodes,
Expand Down
2 changes: 1 addition & 1 deletion src/mongo/s/d_migrate.cpp
Expand Up @@ -2508,7 +2508,7 @@ namespace mongo {
const WriteConcernOptions& writeConcern) {
WriteConcernOptions majorityWriteConcern;
majorityWriteConcern.wTimeout = -1;
majorityWriteConcern.wMode = "majority";
majorityWriteConcern.wMode = WriteConcernOptions::kMajority;
Status majorityStatus = repl::getGlobalReplicationCoordinator()->awaitReplication(
txn, lastOpApplied, majorityWriteConcern).status;

Expand Down

0 comments on commit 673bb29

Please sign in to comment.