Permalink
Browse files

SERVER-7612 explicit primary read pref does not work well with shard …

…versioning

Ensure that selectNodeUsingTags will use the same connection to the primary with checkMaster
  • Loading branch information...
1 parent 75d8b0b commit 3d3f7e28b06d42b3f7432d834ac7056e53c41c88 @renctan renctan committed Nov 12, 2012
@@ -0,0 +1,34 @@
+var st = new ShardingTest({ shards: { rs0: { quiet: '' }, rs1: { quiet: '' }}, mongos: 2 });
+
+var testDB1 = st.s0.getDB('test');
+var testDB2 = st.s1.getDB('test');
+
+// Trigger a query on mongos 1 so it will have a view of test.user as being unsharded.
+testDB1.user.findOne();
+
+testDB2.adminCommand({ enableSharding: 'test' });
+testDB2.adminCommand({ shardCollection: 'test.user', key: { x: 1 }});
+
+testDB2.adminCommand({ split: 'test.user', middle: { x: 100 }});
+
+var configDB2 = st.s1.getDB('config');
+var chunkToMove = configDB2.chunks.find().sort({ min: 1 }).next();
+var toShard = configDB2.shards.findOne({ _id: { $ne: chunkToMove.shard }})._id;
+testDB2.adminCommand({ moveChunk: 'test.user', to: toShard, find: { x: 50 }});
+
+for (var x = 0; x < 200; x++) {
+ testDB2.user.insert({ x: x });
+}
+
+testDB2.runCommand({ getLastError: 1 });
+
+var cursor = testDB1.user.find({ x: 30 }).readPref('primary');
+assert(cursor.hasNext());
+assert.eq(30, cursor.next().x);
+
+cursor = testDB1.user.find({ x: 130 }).readPref('primary');
+assert(cursor.hasNext());
+assert.eq(130, cursor.next().x);
+
+st.stop();
+
@@ -46,15 +46,19 @@ namespace mongo {
* @param lastHost the last host returned (mainly used for doing round-robin).
* Will be overwritten with the newly returned host if not empty. Should
* never be NULL.
+ * @param isPrimarySelected out parameter that is set to true if the returned host
+ * is a primary.
*
* @return the host object of the node selected. If none of the nodes are
- * eligible, returns an empty host.
+ * eligible, returns an empty host. Cannot be NULL and valid only if returned
+ * host is not empty.
*/
HostAndPort _selectNode(const vector<ReplicaSetMonitor::Node>& nodes,
const BSONObj& readPreferenceTag,
bool secOnly,
int localThresholdMillis,
- HostAndPort* lastHost /* in/out */) {
+ HostAndPort* lastHost /* in/out */,
+ bool* isPrimarySelected) {
HostAndPort fallbackHost;
// Implicit: start from index 0 if lastHost doesn't exist anymore
@@ -85,6 +89,7 @@ namespace mongo {
if (node.matchesTag(readPreferenceTag)) {
// found an ok candidate; may not be local.
fallbackHost = node.addr;
+ *isPrimarySelected = node.ismaster;
if (node.isLocalSecondary(localThresholdMillis)) {
// found a local node. return early.
@@ -1006,14 +1011,15 @@ namespace mongo {
}
HostAndPort ReplicaSetMonitor::selectAndCheckNode(ReadPreference preference,
- TagSet* tags) {
+ TagSet* tags,
+ bool* isPrimarySelected) {
HostAndPort candidate;
{
scoped_lock lk(_lock);
candidate = ReplicaSetMonitor::selectNode(_nodes, preference, tags,
- _localThresholdMillis, &_lastReadPrefHost);
+ _localThresholdMillis, &_lastReadPrefHost, isPrimarySelected);
}
if (candidate.empty()) {
@@ -1022,7 +1028,7 @@ namespace mongo {
scoped_lock lk(_lock);
return ReplicaSetMonitor::selectNode(_nodes, preference, tags, _localThresholdMillis,
- &_lastReadPrefHost);
+ &_lastReadPrefHost, isPrimarySelected);
}
return candidate;
@@ -1033,11 +1039,15 @@ namespace mongo {
ReadPreference preference,
TagSet* tags,
int localThresholdMillis,
- HostAndPort* lastHost) {
+ HostAndPort* lastHost,
+ bool* isPrimarySelected) {
+ *isPrimarySelected = false;
+
switch (preference) {
case ReadPreference_PrimaryOnly:
for (vector<Node>::const_iterator iter = nodes.begin(); iter != nodes.end(); ++iter) {
if (iter->ismaster && iter->ok) {
+ *isPrimarySelected = true;
return iter->addr;
}
}
@@ -1047,14 +1057,14 @@ namespace mongo {
case ReadPreference_PrimaryPreferred:
{
HostAndPort candidatePri = selectNode(nodes, ReadPreference_PrimaryOnly, tags,
- localThresholdMillis, lastHost);
+ localThresholdMillis, lastHost, isPrimarySelected);
if (!candidatePri.empty()) {
return candidatePri;
}
return selectNode(nodes, ReadPreference_SecondaryOnly, tags,
- localThresholdMillis, lastHost);
+ localThresholdMillis, lastHost, isPrimarySelected);
}
case ReadPreference_SecondaryOnly:
@@ -1063,7 +1073,7 @@ namespace mongo {
while (!tags->isExhausted()) {
candidate = _selectNode(nodes, tags->getCurrentTag(), true, localThresholdMillis,
- lastHost);
+ lastHost, isPrimarySelected);
if (candidate.empty()) {
tags->next();
@@ -1079,14 +1089,14 @@ namespace mongo {
case ReadPreference_SecondaryPreferred:
{
HostAndPort candidateSec = selectNode(nodes, ReadPreference_SecondaryOnly, tags,
- localThresholdMillis, lastHost);
+ localThresholdMillis, lastHost, isPrimarySelected);
if (!candidateSec.empty()) {
return candidateSec;
}
return selectNode(nodes, ReadPreference_PrimaryOnly, tags,
- localThresholdMillis, lastHost);
+ localThresholdMillis, lastHost, isPrimarySelected);
}
case ReadPreference_Nearest:
@@ -1095,7 +1105,7 @@ namespace mongo {
while (!tags->isExhausted()) {
candidate = _selectNode(nodes, tags->getCurrentTag(), false, localThresholdMillis,
- lastHost);
+ lastHost, isPrimarySelected);
if (candidate.empty()) {
tags->next();
@@ -1559,12 +1569,24 @@ namespace mongo {
}
ReplicaSetMonitorPtr monitor = _getMonitor();
- _lastSlaveOkHost = monitor->selectAndCheckNode(preference, tags);
+ bool isPrimarySelected = false;
+ _lastSlaveOkHost = monitor->selectAndCheckNode(preference, tags, &isPrimarySelected);
if ( _lastSlaveOkHost.empty() ){
return NULL;
}
+ // Primary connection is special because it is the only connection that is
+ // versioned in mongos. Therefore, we have to make sure that this object
+ // maintains only one connection to the primary and use that connection
+ // every time we need to talk to the primary.
+ if (isPrimarySelected) {
+ checkMaster();
+ _lastSlaveOkConn = _master;
+ _lastSlaveOkHost = _masterHost; // implied, but still assign just to be safe
+ return _master.get();
+ }
+
_lastSlaveOkConn.reset(new DBClientConnection(true , this , _so_timeout));
_lastSlaveOkConn->connect(_lastSlaveOkHost);
@@ -147,6 +147,8 @@ namespace mongo {
* robin, starting from the node next to this lastHost. This will be overwritten
* with the newly chosen host if not empty, not primary and when preference
* is not Nearest.
+ * @param isPrimarySelected out parameter that is set to true if the returned host
+ * is a primary. Cannot be NULL and valid only if returned host is not empty.
*
* @return the host object of the node selected. If none of the nodes are
* eligible, returns an empty host.
@@ -155,22 +157,26 @@ namespace mongo {
ReadPreference preference,
TagSet* tags,
int localThresholdMillis,
- HostAndPort* lastHost);
+ HostAndPort* lastHost,
+ bool* isPrimarySelected);
/**
* Selects the right node given the nodes to pick from and the preference. This
* will also attempt to refresh the local view of the replica set configuration
* if the primary node needs to be returned but is not currently available (except
* for ReadPrefrence_Nearest).
*
- * @param preference the read mode to use
- * @param tags the tags used for filtering nodes
+ * @param preference the read mode to use.
+ * @param tags the tags used for filtering nodes.
+ * @param isPrimarySelected out parameter that is set to true if the returned host
+ * is a primary. Cannot be NULL and valid only if returned host is not empty.
*
* @return the host object of the node selected. If none of the nodes are
* eligible, returns an empty host.
*/
HostAndPort selectAndCheckNode(ReadPreference preference,
- TagSet* tags);
+ TagSet* tags,
+ bool* isPrimarySelected);
/**
* Creates a new ReplicaSetMonitor, if it doesn't already exist.
@@ -270,6 +276,8 @@ namespace mongo {
*/
bool isAnyNodeOk() const;
+ bool isHostPrimary(const std::string& hostName) const;
+
private:
/**
* This populates a list of hosts from the list of seeds (discarding the
@@ -556,12 +564,18 @@ namespace mongo {
string _setName;
HostAndPort _masterHost;
- scoped_ptr<DBClientConnection> _master;
+ // Note: reason why this is a shared_ptr is because we want _lastSlaveOkConn to
+ // keep a reference of the _master connection when it selected a primary node.
+ // This is because the primary connection is special in mongos - it is the only
+ // connection that is versioned.
+ // WARNING: do not assign this variable (which will increment the internal ref
+ // counter) to any other variable other than _lastSlaveOkConn.
+ boost::shared_ptr<DBClientConnection> _master;
// Last used host in a slaveOk query (can be a primary)
HostAndPort _lastSlaveOkHost;
// Last used connection in a slaveOk query (can be a primary)
- scoped_ptr<DBClientConnection> _lastSlaveOkConn;
+ boost::shared_ptr<DBClientConnection> _lastSlaveOkConn;
double _so_timeout;
Oops, something went wrong.

0 comments on commit 3d3f7e2

Please sign in to comment.