Permalink
Browse files

SERVER-27053 Don't acknowledge writes if the term has changed.

  • Loading branch information...
stbrody committed Nov 16, 2016
1 parent f948e73 commit 8347e322cd46e8ee847e1730a7e94ea8e3981c53
@@ -16,6 +16,8 @@ selector:
- jstests/replsets/stepup.js
# The combination of new bridges and PV0 can lead to an improper spanning tree in sync2.js.
- jstests/replsets/sync2.js
+ # PV0's w:majority guarantees aren't strong enough for this test to pass.
+ - jstests/replsets/write_concern_after_stepdown_and_stepup.js
executor:
js_test:
@@ -0,0 +1,120 @@
+/*
+ * Tests that heartbeats containing writes from a different branch of history can't cause a stale
+ * primary to incorrectly acknowledge a w:majority write that's about to be rolled back.
+ */
+(function() {
+ 'use strict';
+
+ var name = "writeConcernStepDownAndBackUp";
+ var dbName = "wMajorityCheck";
+ var collName = "stepdownAndBackUp";
+
+ var rst = new ReplSetTest({
+ name: name,
+ nodes: [
+ {},
+ {},
+ {rsConfig: {priority: 0}},
+ ],
+ useBridge: true
+ });
+ var nodes = rst.startSet();
+ rst.initiate();
+
+ function waitForState(node, state) {
+ assert.soonNoExcept(function() {
+ assert.commandWorked(node.adminCommand(
+ {replSetTest: 1, waitForMemberState: state, timeoutMillis: rst.kDefaultTimeoutMS}));
+ return true;
+ });
+ }
+
+ function waitForPrimary(node) {
+ assert.soon(function() {
+ return node.adminCommand('ismaster').ismaster;
+ });
+ }
+
+ function stepUp(node) {
+ var primary = rst.getPrimary();
+ if (primary != node) {
+ assert.throws(function() {
+ primary.adminCommand({replSetStepDown: 60 * 5});
+ });
+ }
+ waitForPrimary(node);
+ }
+
+ jsTestLog("Make sure node 0 is primary.");
+ stepUp(nodes[0]);
+ var primary = rst.getPrimary();
+ var secondaries = rst.getSecondaries();
+ assert.eq(nodes[0], primary);
+ // Wait for all data bearing nodes to get up to date.
+ assert.writeOK(nodes[0].getDB(dbName).getCollection(collName).insert(
+ {a: 1}, {writeConcern: {w: 3, wtimeout: rst.kDefaultTimeoutMS}}));
+
+ // Stop the secondaries from replicating.
+ secondaries.forEach(function(node) {
+ assert.commandWorked(
+ node.adminCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'alwaysOn'}));
+ });
+ // Stop the primary from being able to complete stepping down.
+ assert.commandWorked(
+ nodes[0].adminCommand({configureFailPoint: 'blockHeartbeatStepdown', mode: 'alwaysOn'}));
+
+ jsTestLog("Do w:majority write that will block waiting for replication.");
+ var doMajorityWrite = function() {
+ var res = db.getSiblingDB('wMajorityCheck').stepdownAndBackUp.insert({a: 2}, {
+ writeConcern: {w: 'majority'}
+ });
+ assert.writeErrorWithCode(res, ErrorCodes.PrimarySteppedDown);
+ };
+
+ var joinMajorityWriter = startParallelShell(doMajorityWrite, nodes[0].port);
+
+ jsTest.log("Disconnect primary from all secondaries");
+ nodes[0].disconnect(nodes[1]);
+ nodes[0].disconnect(nodes[2]);
+
+ jsTest.log("Wait for a new primary to be elected");
+ // Allow the secondaries to replicate again.
+ secondaries.forEach(function(node) {
+ assert.commandWorked(
+ node.adminCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'off'}));
+ });
+
+ waitForPrimary(nodes[1]);
+
+ jsTest.log("Do a write to the new primary");
+ assert.writeOK(nodes[1].getDB(dbName).getCollection(collName).insert(
+ {a: 3}, {writeConcern: {w: 2, wtimeout: rst.kDefaultTimeoutMS}}));
+
+ jsTest.log("Reconnect the old primary to the rest of the nodes");
+ // Only allow the old primary to connect to the other nodes, not the other way around.
+ // This is so that the old priamry will detect that it needs to step down and step itself down,
+ // rather than one of the other nodes detecting this and sending it a replSetStepDown command,
+ // which would cause the old primary to kill all operations and close all connections, making
+ // the way that the insert in the parallel shell fails be nondeterministic. Rather than
+ // handling all possible failure modes in the parallel shell, allowing heartbeat connectivity in
+ // only one direction makes it easier for the test to fail deterministically.
+ nodes[1].acceptConnectionsFrom(nodes[0]);
+ nodes[2].acceptConnectionsFrom(nodes[0]);
+
+ joinMajorityWriter();
+
+ // Allow the old primary to finish stepping down so that shutdown can finish.
+ var res = null;
+ try {
+ res = nodes[0].adminCommand({configureFailPoint: 'blockHeartbeatStepdown', mode: 'off'});
+ } catch (e) {
+ // Expected - once we disable the fail point the stepdown will proceed and it's racy whether
+ // the stepdown closes all connections before or after the configureFailPoint command
+ // returns
+ }
+ if (res) {
+ assert.commandWorked(res);
+ }
+
+ rst.stopSet();
+}());
@@ -0,0 +1,134 @@
+/*
+ * Tests that heartbeats containing writes from a different branch of history can't cause a stale
+ * primary to incorrectly acknowledge a w:majority write that's about to be rolled back, even if the
+ * stale primary is re-elected primary before waiting for the write concern acknowledgement.
+ */
+(function() {
+ 'use strict';
+
+ var name = "writeConcernStepDownAndBackUp";
+ var dbName = "wMajorityCheck";
+ var collName = "stepdownAndBackUp";
+
+ var rst = new ReplSetTest({
+ name: name,
+ nodes: [
+ {},
+ {},
+ {rsConfig: {priority: 0}},
+ ],
+ useBridge: true
+ });
+ var nodes = rst.startSet();
+ rst.initiate();
+
+ function waitForState(node, state) {
+ assert.soonNoExcept(function() {
+ assert.commandWorked(node.adminCommand(
+ {replSetTest: 1, waitForMemberState: state, timeoutMillis: rst.kDefaultTimeoutMS}));
+ return true;
+ });
+ }
+
+ function waitForPrimary(node) {
+ assert.soon(function() {
+ return node.adminCommand('ismaster').ismaster;
+ });
+ }
+
+ function stepUp(node) {
+ var primary = rst.getPrimary();
+ if (primary != node) {
+ assert.throws(function() {
+ primary.adminCommand({replSetStepDown: 60 * 5});
+ });
+ }
+ waitForPrimary(node);
+ }
+
+ jsTestLog("Make sure node 0 is primary.");
+ stepUp(nodes[0]);
+ var primary = rst.getPrimary();
+ var secondaries = rst.getSecondaries();
+ assert.eq(nodes[0], primary);
+ // Wait for all data bearing nodes to get up to date.
+ assert.writeOK(nodes[0].getDB(dbName).getCollection(collName).insert(
+ {a: 1}, {writeConcern: {w: 3, wtimeout: rst.kDefaultTimeoutMS}}));
+
+ // Stop the secondaries from replicating.
+ secondaries.forEach(function(node) {
+ assert.commandWorked(
+ node.adminCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'alwaysOn'}));
+ });
+ // Stop the primary from calling into awaitReplication()
+ assert.commandWorked(nodes[0].adminCommand(
+ {configureFailPoint: 'hangBeforeWaitingForWriteConcern', mode: 'alwaysOn'}));
+
+ jsTestLog("Do w:majority write that won't enter awaitReplication() until after the primary " +
+ "has stepped down and back up");
+ var doMajorityWrite = function() {
+ // Run ismaster command with 'hangUpOnStepDown' set to false to mark this connection as
+ // one that shouldn't be closed when the node steps down. This simulates the scenario where
+ // the write was coming from a mongos.
+ assert.commandWorked(db.adminCommand({ismaster: 1, hangUpOnStepDown: false}));
+
+ var res = db.getSiblingDB('wMajorityCheck').stepdownAndBackUp.insert({a: 2}, {
+ writeConcern: {w: 'majority'}
+ });
+ assert.writeErrorWithCode(res, ErrorCodes.PrimarySteppedDown);
+ };
+
+ var joinMajorityWriter = startParallelShell(doMajorityWrite, nodes[0].port);
+
+ jsTest.log("Disconnect primary from all secondaries");
+ nodes[0].disconnect(nodes[1]);
+ nodes[0].disconnect(nodes[2]);
+
+ jsTest.log("Wait for a new primary to be elected");
+ // Allow the secondaries to replicate again.
+ secondaries.forEach(function(node) {
+ assert.commandWorked(
+ node.adminCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'off'}));
+ });
+
+ waitForPrimary(nodes[1]);
+
+ jsTest.log("Do a write to the new primary");
+ assert.writeOK(nodes[1].getDB(dbName).getCollection(collName).insert(
+ {a: 3}, {writeConcern: {w: 2, wtimeout: rst.kDefaultTimeoutMS}}));
+
+ jsTest.log("Reconnect the old primary to the rest of the nodes");
+ nodes[0].reconnect(nodes[1]);
+ nodes[0].reconnect(nodes[2]);
+
+ jsTest.log("Wait for the old primary to step down, roll back its write, and apply the " +
+ "new writes from the new primary");
+ waitForState(nodes[0], ReplSetTest.State.SECONDARY);
+ rst.awaitReplication();
+
+ // At this point all 3 nodes should have the same data
+ assert.soonNoExcept(function() {
+ nodes.forEach(function(node) {
+ assert.eq(null,
+ node.getDB(dbName).getCollection(collName).findOne({a: 2}),
+ "Node " + node.host + " contained op that should have been rolled back");
+ assert.neq(null,
+ node.getDB(dbName).getCollection(collName).findOne({a: 3}),
+ "Node " + node.host +
+ " was missing op from branch of history that should have persisted");
+ });
+ return true;
+ });
+
+ jsTest.log("Make the original primary become primary once again");
+ stepUp(nodes[0]);
+
+ jsTest.log("Unblock the thread waiting for replication of the now rolled-back write, ensure " +
+ "that the write concern failed");
+ assert.commandWorked(nodes[0].adminCommand(
+ {configureFailPoint: 'hangBeforeWaitingForWriteConcern', mode: 'off'}));
+
+ joinMajorityWriter();
+
+ rst.stopSet();
+}());
@@ -1617,9 +1617,35 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock(
return Status::OK();
}
- if (replMode == modeReplSet && !_memberState.primary()) {
- return {ErrorCodes::PrimarySteppedDown,
- "Primary stepped down while waiting for replication"};
+ auto checkForStepDown = [&]() -> Status {
+ if (replMode == modeReplSet && !_memberState.primary()) {
+ return {ErrorCodes::PrimarySteppedDown,
+ "Primary stepped down while waiting for replication"};
+ }
+
+ if (opTime.getTerm() != _cachedTerm) {
+ return {
+ ErrorCodes::PrimarySteppedDown,
+ str::stream() << "Term changed from " << opTime.getTerm() << " to " << _cachedTerm
+ << " while waiting for replication, indicating that this node must "
+ "have stepped down."};
+ }
+
+ if (_stepDownPending) {
+ return {ErrorCodes::PrimarySteppedDown,
+ "Received stepdown request while waiting for replication"};
+ }
+ return Status::OK();
+ };
+
+ Status stepdownStatus = checkForStepDown();
+ if (!stepdownStatus.isOK()) {
+ return stepdownStatus;
+ }
+
+ auto interruptStatus = txn->checkForInterruptNoAssert();
+ if (!interruptStatus.isOK()) {
+ return interruptStatus;
}
if (writeConcern.wMode.empty()) {
@@ -1647,10 +1673,6 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock(
WaiterInfoGuard waitInfo(
&_replicationWaiterList, txn->getOpID(), opTime, &writeConcern, &condVar);
while (!_doneWaitingForReplication_inlock(opTime, minSnapshot, writeConcern)) {
- if (replMode == modeReplSet && !_getMemberState_inlock().primary()) {
- return {ErrorCodes::PrimarySteppedDown,
- "Not primary anymore while waiting for replication - primary stepped down"};
- }
if (_inShutdown) {
return {ErrorCodes::ShutdownInProgress, "Replication is being shut down"};
@@ -1672,6 +1694,11 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock(
}
return {ErrorCodes::WriteConcernFailed, "waiting for replication timed out"};
}
+
+ stepdownStatus = checkForStepDown();
+ if (!stepdownStatus.isOK()) {
+ return stepdownStatus;
+ }
}
return _checkIfWriteConcernCanBeSatisfied_inlock(writeConcern);
@@ -2520,6 +2547,7 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() {
_canAcceptNonLocalWrites = false;
_isCatchingUp = false;
_isWaitingForDrainToComplete = false;
+ _stepDownPending = false;
_drainFinishedCond_forTest.notify_all();
serverGlobalParams.featureCompatibility.validateFeaturesAsMaster.store(false);
result = kActionCloseAllConnections;
@@ -3367,7 +3395,7 @@ EventHandle ReplicationCoordinatorImpl::_updateTerm_incallback(
if (localUpdateTermResult == TopologyCoordinator::UpdateTermResult::kTriggerStepDown) {
log() << "stepping down from primary, because a new term has begun: " << term;
_topCoord->prepareForStepDown();
- return _stepDownStart();
+ return _stepDownStart(false);
}
return EventHandle();
}
@@ -910,7 +910,10 @@ class ReplicationCoordinatorImpl : public ReplicationCoordinator {
*/
void _requestRemotePrimaryStepdown(const HostAndPort& target);
- ReplicationExecutor::EventHandle _stepDownStart();
+ /**
+ * Schedules stepdown to run with the global exclusive lock.
+ */
+ ReplicationExecutor::EventHandle _stepDownStart(bool hasMutex);
/**
* Completes a step-down of the current node. Must be run with a global
@@ -949,9 +952,11 @@ class ReplicationCoordinatorImpl : public ReplicationCoordinator {
* Utility method that schedules or performs actions specified by a HeartbeatResponseAction
* returned by a TopologyCoordinator::processHeartbeatResponse(V1) call with the given
* value of "responseStatus".
+ * 'hasMutex' is true if the caller is holding _mutex. TODO(SERVER-27083): Remove this.
*/
void _handleHeartbeatResponseAction(const HeartbeatResponseAction& action,
- const StatusWith<ReplSetHeartbeatResponse>& responseStatus);
+ const StatusWith<ReplSetHeartbeatResponse>& responseStatus,
+ bool hasMutex);
/**
* Scan the SlaveInfoVector and determine the highest OplogEntry present on a majority of
@@ -1180,6 +1185,14 @@ class ReplicationCoordinatorImpl : public ReplicationCoordinator {
// TODO: ideally this should only change on rollbacks NOT on mongod restarts also.
int _rbid; // (M)
+ // Indicates that we've received a request to stepdown from PRIMARY (likely via a heartbeat)
+ // TODO(SERVER-27083): This bool is redundant of the same-named bool in TopologyCoordinatorImpl,
+ // but due to mutex ordering between _mutex and _topoMutex we can't inspect the
+ // TopologyCoordinator field in awaitReplication() where this bool is used. Once we get rid
+ // of topoMutex and start guarding access to the TopologyCoordinator via _mutex we should
+ // consolidate the two bools.
+ bool _stepDownPending = false; // (M)
+
// list of information about clients waiting on replication. Does *not* own the WaiterInfos.
WaiterList _replicationWaiterList; // (M)
Oops, something went wrong.

0 comments on commit 8347e32

Please sign in to comment.