Skip to content
Permalink
Browse files

SERVER-28948 open up secondaries to checking shardVersion

  • Loading branch information
EshaMaharishi committed Jul 12, 2017
1 parent 6ed160d commit 32da960b3d4f80e4005bfcc21dd56067353d64fb
@@ -25,11 +25,20 @@ function getProfilerProtocolStringForCommand(conn) {
doassert(`Unknown prototocol string ${protocols}`);
}

// Throws an assertion if the profiler does not contain exactly one entry matching <filter>.
// Throws an assertion if the profiler does not contain *exactly one* entry matching <filter>.
// Optional arguments <errorMsgFilter> and <errorMsgProj> limit profiler output if this asserts.
function profilerHasSingleMatchingEntryOrThrow(inputDb, filter, errorMsgFilter, errorMsgProj) {
assert.eq(inputDb.system.profile.find(filter).itcount(),
1,
"Expected exactly one op matching: " + tojson(filter) + " in profiler " +
tojson(inputDb.system.profile.find(errorMsgFilter, errorMsgProj).toArray()));
}

// Throws an assertion if the profiler does not contain *at least one* entry matching <filter>.
// Optional arguments <errorMsgFilter> and <errorMsgProj> limit profiler output if this asserts.
function profilerHasMatchingEntryOrThrow(inputDb, filter, errorMsgFilter, errorMsgProj) {
assert.gte(inputDb.system.profile.find(filter).itcount(),
1,
"Expected at least one op matching: " + tojson(filter) + " in profiler " +
tojson(inputDb.system.profile.find(errorMsgFilter, errorMsgProj).toArray()));
}
@@ -4,7 +4,7 @@
(function() {
"use strict";

load("jstests/libs/profiler.js"); // For profilerHasSingleMatchingEntryOrThrow.
load("jstests/libs/profiler.js"); // For profilerHasMatchingEntryOrThrow.

const st = new ShardingTest({
name: "agg_explain_readPref",
@@ -81,7 +81,12 @@
// Look for an operation without an exception, since the shard throws a stale config
// exception if the shard or mongos has stale routing metadata, and the operation
// gets retried.
profilerHasSingleMatchingEntryOrThrow(target, {
// Note, we look for *at least one* (not exactly one) matching entry: Mongos cancels
// requests to all shards on receiving a stale version error from any shard.
// However, the requests may have reached the other shards before they are canceled.
// If the other shards were already fresh, they will re-receive the request in the
// next attempt, meaning the request can show up more than once in the profiler.
profilerHasMatchingEntryOrThrow(target, {
"ns": coll.getFullName(),
"command.explain.aggregate": coll.getName(),
"command.explain.comment": comment,
@@ -109,7 +114,12 @@
// Look for an operation without an exception, since the shard throws a stale config
// exception if the shard or mongos has stale routing metadata, and the operation
// gets retried.
profilerHasSingleMatchingEntryOrThrow(target, {
// Note, we look for *at least one* (not exactly one) matching entry: Mongos cancels
// requests to all shards on receiving a stale version error from any shard.
// However, the requests may have reached the other shards before they are canceled.
// If the other shards were already fresh, they will re-receive the request in the
// next attempt, meaning the request can show up more than once in the profiler.
profilerHasMatchingEntryOrThrow(target, {
"ns": coll.getFullName(),
"command.explain.aggregate": coll.getName(),
"command.explain.comment": comment,
@@ -67,6 +67,18 @@ function authDBUsers(conn) {
return conn;
}

// Secondaries do not refresh their in-memory routing table until a request with a higher version
// is received, and refreshing requires communication with the primary to obtain the newest version.
// Read from the secondaries once before taking down primaries to ensure they have loaded the
// routing table into memory.
// TODO SERVER-30148: replace this with calls to awaitReplication() on each shard owning data for
// the sharded collection once secondaries refresh proactively.
var mongosSetupConn = new Mongo(mongos.host);
mongosSetupConn.setReadPref("secondary");
assert(!mongosSetupConn.getCollection(collSharded.toString()).find({}).hasNext());

gc(); // Clean up connections

//
// Setup is complete
//
@@ -39,6 +39,18 @@
assert.commandWorked(admin.runCommand(
{moveChunk: collSharded.toString(), find: {_id: 0}, to: st.shard1.shardName}));

// Secondaries do not refresh their in-memory routing table until a request with a higher
// version is received, and refreshing requires communication with the primary to obtain the
// newest version. Read from the secondaries once before taking down primaries to ensure they
// have loaded the routing table into memory.
// TODO SERVER-30148: replace this with calls to awaitReplication() on each shard owning data
// for the sharded collection once secondaries refresh proactively.
var mongosSetupConn = new Mongo(mongos.host);
mongosSetupConn.setReadPref("secondary");
assert(!mongosSetupConn.getCollection(collSharded.toString()).find({}).hasNext());

gc(); // Clean up connections

st.printShardingStatus();

//
@@ -0,0 +1,82 @@
/**
* Tests that secondaries participate in the shard versioning protocol.
*/
(function() {
"use strict";

load('jstests/libs/profiler.js'); // for profilerHasSingleMatchingEntryOrThrow()

// Set the secondaries to priority 0 and votes 0 to prevent the primaries from stepping down.
let rsOpts = {nodes: [{rsConfig: {votes: 1}}, {rsConfig: {priority: 0, votes: 0}}]};
let st = new ShardingTest({mongos: 2, shards: {rs0: rsOpts, rs1: rsOpts}});

assert.commandWorked(st.s0.adminCommand({enableSharding: 'test'}));
st.ensurePrimaryShard('test', st.shard0.shardName);

assert.commandWorked(st.s0.adminCommand({shardCollection: 'test.foo', key: {x: 1}}));
assert.commandWorked(st.s0.adminCommand({split: 'test.foo', middle: {x: 0}}));

let freshMongos = st.s0;
let staleMongos = st.s1;

jsTest.log("do insert from stale mongos to make it load the routing table before the move");
assert.writeOK(staleMongos.getDB('test').foo.insert({x: 1}));

jsTest.log("do moveChunk from fresh mongos");
assert.commandWorked(freshMongos.adminCommand({
moveChunk: 'test.foo',
find: {x: 0},
to: st.shard1.shardName,
}));

// Turn on system profiler on secondaries to collect data on all future operations on the db.
let donorShardSecondary = st.rs0.getSecondary();
let recipientShardSecondary = st.rs1.getSecondary();
assert.commandWorked(donorShardSecondary.getDB('test').setProfilingLevel(2));
assert.commandWorked(recipientShardSecondary.getDB('test').setProfilingLevel(2));

// Use the mongos with the stale routing table to send read requests to the secondaries. Check
// that the donor shard returns a stale shardVersion error, which provokes mongos to refresh its
// routing table and re-target; that the recipient shard secondary refreshes its routing table
// on hearing the fresh version from mongos; and that the recipient shard secondary returns
// the results.

jsTest.log("do secondary read from stale mongos");
let res = staleMongos.getDB('test').runCommand(
{count: 'foo', query: {x: 1}, $readPreference: {mode: "secondary"}});
assert(res.ok);
assert.eq(1, res.n, tojson(res));

// Check that the donor shard secondary returned stale shardVersion.
profilerHasSingleMatchingEntryOrThrow(donorShardSecondary.getDB('test'), {
"ns": "test.foo",
"command.count": "foo",
"command.query": {x: 1},
"command.shardVersion": {"$exists": true},
"command.$readPreference": {"mode": "secondary"},
"exceptionCode": ErrorCodes.SendStaleConfig
});

// The recipient shard secondary will also return stale shardVersion once, even though the
// mongos is fresh, because the recipient shard secondary was stale.
profilerHasSingleMatchingEntryOrThrow(donorShardSecondary.getDB('test'), {
"ns": "test.foo",
"command.count": "foo",
"command.query": {x: 1},
"command.shardVersion": {"$exists": true},
"command.$readPreference": {"mode": "secondary"},
"exceptionCode": ErrorCodes.SendStaleConfig
});

// Check that the recipient shard secondary received the query and returned results.
profilerHasSingleMatchingEntryOrThrow(recipientShardSecondary.getDB('test'), {
"ns": "test.foo",
"command.count": "foo",
"command.query": {x: 1},
"command.shardVersion": {"$exists": true},
"command.$readPreference": {"mode": "secondary"},
"exceptionCode": {"$exists": false}
});

st.stop();
})();
@@ -150,7 +150,8 @@
"ns": coll.getFullName(),
"command.aggregate": coll.getName(),
"command.comment": "agg_readPref",
"command.pipeline.$mergeCursors": {"$exists": false}
"command.pipeline.$mergeCursors": {"$exists": false},
"nreturned": {"$exists": true}
});

// Find
@@ -163,7 +164,8 @@
"ns": coll.getFullName(),
"command.aggregate": coll.getName(),
"command.comment": "find_readPref",
"command.pipeline.$mergeCursors": {"$exists": false}
"command.pipeline.$mergeCursors": {"$exists": false},
"nreturned": {"$exists": true}
});

// Count
@@ -176,7 +178,8 @@
"ns": coll.getFullName(),
"command.aggregate": coll.getName(),
"command.comment": "count_readPref",
"command.pipeline.$mergeCursors": {"$exists": false}
"command.pipeline.$mergeCursors": {"$exists": false},
"nreturned": {"$exists": true}
});

// Distinct
@@ -189,7 +192,8 @@
"ns": coll.getFullName(),
"command.aggregate": coll.getName(),
"command.comment": "distinct_readPref",
"command.pipeline.$mergeCursors": {"$exists": false}
"command.pipeline.$mergeCursors": {"$exists": false},
"nreturned": {"$exists": true}
});

assert.commandWorked(shardSecondary.setProfilingLevel(0));
@@ -458,11 +458,6 @@ bool CollectionShardingState::_checkShardVersionOk(OperationContext* opCtx,
ChunkVersion* actualShardVersion) {
Client* client = opCtx->getClient();

if (!repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase(opCtx, _nss.db())) {
// Right now connections to secondaries aren't versioned at all.
return true;
}

auto& oss = OperationShardingState::get(opCtx);

// If there is a version attached to the OperationContext, use it as the received version.
@@ -653,9 +653,8 @@ void execCommandDatabase(OperationContext* opCtx,
opCtx->setDeadlineAfterNowBy(Milliseconds{maxTimeMS});
}

// Operations are only versioned against the primary. We also make sure not to redo shard
// version handling if this command was issued via the direct client.
if (iAmPrimary && !opCtx->getClient()->isInDirectClient()) {
// We do not redo shard version handling if this command was issued via the direct client.
if (!opCtx->getClient()->isInDirectClient()) {
// Handle a shard version that may have been sent along with the command.
auto commandNS = NamespaceString(command->parseNs(dbname, request.body));
auto& oss = OperationShardingState::get(opCtx);

0 comments on commit 32da960

Please sign in to comment.
You can’t perform that action at this time.