Skip to content

Commit

Permalink
SERVER-27123 Only update the commit point as a secondary from oplog q…
Browse files Browse the repository at this point in the history
…ueries against your sync source
  • Loading branch information
stbrody committed Jan 4, 2017
1 parent a59962f commit 87f4948
Show file tree
Hide file tree
Showing 22 changed files with 421 additions and 112 deletions.
33 changes: 22 additions & 11 deletions jstests/libs/write_concern_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,42 +17,53 @@ function shardCollectionWithChunks(st, coll) {
assert.eq(coll.count(), numberDoc);
}

// Stops replication at a server.
// Stops replication on the given server(s).
function stopServerReplication(conn) {
var errMsg = 'Failed to enable rsSyncApplyStop failpoint.';
if (conn.length) {
conn.forEach(function(n) {
stopServerReplication(n);
});
return;
}
var errMsg = 'Failed to enable stopOplogFetcher failpoint.';
assert.commandWorked(
conn.getDB('admin').runCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'alwaysOn'}),
conn.getDB('admin').runCommand({configureFailPoint: 'stopOplogFetcher', mode: 'alwaysOn'}),
errMsg);
}

// Stops replication at all replicaset secondaries.
function stopReplicationOnSecondaries(rs) {
var secondaries = rs.getSecondaries();
secondaries.forEach(stopServerReplication);
stopServerReplication(rs.getSecondaries());
}

// Stops replication at all shard secondaries.
function stopReplicationOnSecondariesOfAllShards(st) {
st._rsObjects.forEach(stopReplicationOnSecondaries);
}

// Restarts replication at a server.
// Restarts replication on the given server(s).
function restartServerReplication(conn) {
var errMsg = 'Failed to disable rsSyncApplyStop failpoint.';
if (conn.length) {
conn.forEach(function(n) {
restartServerReplication(n);
});
return;
}

var errMsg = 'Failed to disable stopOplogFetcher failpoint.';
assert.commandWorked(
conn.getDB('admin').runCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'off'}),
conn.getDB('admin').runCommand({configureFailPoint: 'stopOplogFetcher', mode: 'off'}),
errMsg);
}

// Restarts replication at all nodes in a replicaset.
function restartReplSetReplication(rs) {
rs.nodes.forEach(restartServerReplication);
restartServerReplication(rs.nodes);
}

// Restarts replication at all replicaset secondaries.
function restartReplicationOnSecondaries(rs) {
var secondaries = rs.getSecondaries();
secondaries.forEach(restartServerReplication);
restartServerReplication(rs.getSecondaries());
}

// Restarts replication at all nodes in a sharded cluster.
Expand Down
10 changes: 2 additions & 8 deletions jstests/replsets/double_rollback.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@

(function() {
'use strict';

load("jstests/libs/check_log.js");
load("jstests/replsets/rslib.js");

var name = "double_rollback";
var dbName = "test";
Expand All @@ -32,14 +34,6 @@
var nodes = rst.startSet();
rst.initiate();

function waitForState(node, state) {
assert.soonNoExcept(function() {
assert.commandWorked(node.adminCommand(
{replSetTest: 1, waitForMemberState: state, timeoutMillis: rst.kDefaultTimeoutMS}));
return true;
});
}

function stepUp(rst, node) {
var primary = rst.getPrimary();
if (primary != node) {
Expand Down
8 changes: 4 additions & 4 deletions jstests/replsets/read_committed_after_rollback.js
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,10 @@ load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority.
// now be visible as a committed read to both oldPrimary and newPrimary.
assert.commandWorked(
pureSecondary.adminCommand({configureFailPoint: "rsSyncApplyStop", mode: "off"}));
var gleResponse =
newPrimaryColl.runCommand({getLastError: 1, w: 'majority', wtimeout: 5 * 1000 * 60});
assert.commandWorked(gleResponse);
assert.eq(null, gleResponse.err, "GLE detected write error: " + tojson(gleResponse));
// Do a write to the new primary so that the old primary can establish a sync source to learn
// about the new commit.
assert.writeOK(newPrimary.getDB(name).unrelatedCollection.insert(
{a: 1}, {writeConcern: {w: 'majority', wtimeout: replTest.kDefaultTimeoutMS}}));
assert.eq(doCommittedRead(newPrimaryColl), 'new');
assert.eq(doCommittedRead(oldPrimaryColl), 'new');
}());
148 changes: 148 additions & 0 deletions jstests/replsets/read_committed_stale_history.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Tests that a node on a stale branch of history won't incorrectly mark its ops as committed even
* when hearing about a commit point with a higher optime from a new primary.
*/
(function() {
'use strict';

load("jstests/libs/check_log.js");
load("jstests/libs/write_concern_util.js");
load("jstests/replsets/rslib.js");

var name = "readCommittedStaleHistory";
var dbName = "wMajorityCheck";
var collName = "stepdown";

var rst = new ReplSetTest({
name: name,
nodes: [
{},
{},
{rsConfig: {priority: 0}},
],
nodeOptions: {enableMajorityReadConcern: ""},
useBridge: true
});

if (!startSetIfSupportsReadMajority(rst)) {
jsTest.log("skipping test since storage engine doesn't support committed reads");
return;
}

var nodes = rst.nodes;
rst.initiate();

/**
* Waits for the given node to be in state primary *and* have finished drain mode and thus
* be available for writes.
*/
function waitForPrimary(node) {
assert.soon(function() {
return node.adminCommand('ismaster').ismaster;
});
}

function stepUp(node) {
var primary = rst.getPrimary();
if (primary != node) {
assert.throws(function() {
primary.adminCommand({replSetStepDown: 60 * 5});
});
}
waitForPrimary(node);
}

// Asserts that the given document is not visible in the committed snapshot on the given node.
function checkDocNotCommitted(node, doc) {
var docs =
node.getDB(dbName).getCollection(collName).find(doc).readConcern('majority').toArray();
assert.eq(0, docs.length, tojson(docs));
}

jsTestLog("Make sure node 0 is primary.");
rst.getPrimary();
stepUp(nodes[0]);
var primary = rst.getPrimary();
var secondaries = rst.getSecondaries();
assert.eq(nodes[0], primary);
// Wait for all data bearing nodes to get up to date.
assert.writeOK(nodes[0].getDB(dbName).getCollection(collName).insert(
{a: 1}, {writeConcern: {w: 3, wtimeout: rst.kDefaultTimeoutMS}}));

// Stop the secondaries from replicating.
stopServerReplication(secondaries);
// Stop the primary from being able to complete stepping down.
assert.commandWorked(
nodes[0].adminCommand({configureFailPoint: 'blockHeartbeatStepdown', mode: 'alwaysOn'}));

jsTestLog("Do a write that won't ever reach a majority of nodes");
assert.writeOK(nodes[0].getDB(dbName).getCollection(collName).insert({a: 2}));

// Ensure that the write that was just done is not visible in the committed snapshot.
checkDocNotCommitted(nodes[0], {a: 2});

// Prevent the primary from rolling back later on.
assert.commandWorked(
nodes[0].adminCommand({configureFailPoint: 'rollbackHangBeforeStart', mode: 'alwaysOn'}));

jsTest.log("Disconnect primary from all secondaries");
nodes[0].disconnect(nodes[1]);
nodes[0].disconnect(nodes[2]);

jsTest.log("Wait for a new primary to be elected");
// Allow the secondaries to replicate again.
restartServerReplication(secondaries);

waitForPrimary(nodes[1]);

jsTest.log("Do a write to the new primary");
assert.writeOK(nodes[1].getDB(dbName).getCollection(collName).insert(
{a: 3}, {writeConcern: {w: 2, wtimeout: rst.kDefaultTimeoutMS}}));

jsTest.log("Reconnect the old primary to the rest of the nodes");
nodes[1].reconnect(nodes[0]);
nodes[2].reconnect(nodes[0]);

// Sleep 10 seconds to allow some heartbeats to be processed, so we can verify that the
// heartbeats don't cause the stale primary to incorrectly advance the commit point.
sleep(10000);

checkDocNotCommitted(nodes[0], {a: 2});

jsTest.log("Allow the old primary to finish stepping down and become secondary");
var res = null;
try {
res = nodes[0].adminCommand({configureFailPoint: 'blockHeartbeatStepdown', mode: 'off'});
} catch (e) {
// Expected - once we disable the fail point the stepdown will proceed and it's racy whether
// the stepdown closes all connections before or after the configureFailPoint command
// returns
}
if (res) {
assert.commandWorked(res);
}
rst.waitForState(nodes[0], ReplSetTest.State.SECONDARY);

// At this point the former primary will attempt to go into rollback, but the
// 'rollbackHangBeforeStart' will prevent it from doing so.
checkDocNotCommitted(nodes[0], {a: 2});
checkLog.contains(nodes[0], 'rollback - rollbackHangBeforeStart fail point enabled');
checkDocNotCommitted(nodes[0], {a: 2});

jsTest.log("Allow the original primary to roll back its write and catch up to the new primary");
assert.commandWorked(
nodes[0].adminCommand({configureFailPoint: 'rollbackHangBeforeStart', mode: 'off'}));

assert.soonNoExcept(function() {
return null == nodes[0].getDB(dbName).getCollection(collName).findOne({a: 2});
}, "Original primary never rolled back its write");

rst.awaitReplication();

// Ensure that the old primary got the write that the new primary did and sees it as committed.
assert.neq(
null,
nodes[0].getDB(dbName).getCollection(collName).find({a: 3}).readConcern('majority').next());

rst.stopSet();
}());
12 changes: 12 additions & 0 deletions jstests/replsets/rslib.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ var reconfig;
var awaitOpTime;
var startSetIfSupportsReadMajority;
var waitUntilAllNodesCaughtUp;
var waitForState;
var reInitiateWithoutThrowingOnAbortedMember;
var awaitRSClientHosts;
var getLastOpTime;
Expand Down Expand Up @@ -211,6 +212,17 @@ var getLastOpTime;
timeout);
};

/**
* Waits for the given node to reach the given state, ignoring network errors.
*/
waitForState = function(node, state) {
assert.soonNoExcept(function() {
assert.commandWorked(node.adminCommand(
{replSetTest: 1, waitForMemberState: state, timeoutMillis: 60 * 1000 * 5}));
return true;
});
};

/**
* Starts each node in the given replica set if the storage engine supports readConcern
*'majority'.
Expand Down
21 changes: 5 additions & 16 deletions jstests/replsets/write_concern_after_stepdown.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
(function() {
'use strict';

load("jstests/replsets/rslib.js");
load("jstests/libs/write_concern_util.js");

var name = "writeConcernStepDownAndBackUp";
var dbName = "wMajorityCheck";
var collName = "stepdownAndBackUp";
Expand All @@ -21,14 +24,6 @@
var nodes = rst.startSet();
rst.initiate();

function waitForState(node, state) {
assert.soonNoExcept(function() {
assert.commandWorked(node.adminCommand(
{replSetTest: 1, waitForMemberState: state, timeoutMillis: rst.kDefaultTimeoutMS}));
return true;
});
}

function waitForPrimary(node) {
assert.soon(function() {
return node.adminCommand('ismaster').ismaster;
Expand All @@ -55,10 +50,7 @@
{a: 1}, {writeConcern: {w: 3, wtimeout: rst.kDefaultTimeoutMS}}));

// Stop the secondaries from replicating.
secondaries.forEach(function(node) {
assert.commandWorked(
node.adminCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'alwaysOn'}));
});
stopServerReplication(secondaries);
// Stop the primary from being able to complete stepping down.
assert.commandWorked(
nodes[0].adminCommand({configureFailPoint: 'blockHeartbeatStepdown', mode: 'alwaysOn'}));
Expand All @@ -79,10 +71,7 @@

jsTest.log("Wait for a new primary to be elected");
// Allow the secondaries to replicate again.
secondaries.forEach(function(node) {
assert.commandWorked(
node.adminCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'off'}));
});
restartServerReplication(secondaries);

waitForPrimary(nodes[1]);

Expand Down
21 changes: 5 additions & 16 deletions jstests/replsets/write_concern_after_stepdown_and_stepup.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
(function() {
'use strict';

load("jstests/replsets/rslib.js");
load("jstests/libs/write_concern_util.js");

var name = "writeConcernStepDownAndBackUp";
var dbName = "wMajorityCheck";
var collName = "stepdownAndBackUp";
Expand All @@ -22,14 +25,6 @@
var nodes = rst.startSet();
rst.initiate();

function waitForState(node, state) {
assert.soonNoExcept(function() {
assert.commandWorked(node.adminCommand(
{replSetTest: 1, waitForMemberState: state, timeoutMillis: rst.kDefaultTimeoutMS}));
return true;
});
}

function waitForPrimary(node) {
assert.soon(function() {
return node.adminCommand('ismaster').ismaster;
Expand All @@ -56,10 +51,7 @@
{a: 1}, {writeConcern: {w: 3, wtimeout: rst.kDefaultTimeoutMS}}));

// Stop the secondaries from replicating.
secondaries.forEach(function(node) {
assert.commandWorked(
node.adminCommand({configureFailPoint: 'stopOplogFetcher', mode: 'alwaysOn'}));
});
stopServerReplication(secondaries);
// Stop the primary from calling into awaitReplication()
assert.commandWorked(nodes[0].adminCommand(
{configureFailPoint: 'hangBeforeWaitingForWriteConcern', mode: 'alwaysOn'}));
Expand All @@ -86,10 +78,7 @@

jsTest.log("Wait for a new primary to be elected");
// Allow the secondaries to replicate again.
secondaries.forEach(function(node) {
assert.commandWorked(
node.adminCommand({configureFailPoint: 'stopOplogFetcher', mode: 'off'}));
});
restartServerReplication(secondaries);

waitForPrimary(nodes[1]);

Expand Down
Loading

0 comments on commit 87f4948

Please sign in to comment.