Skip to content

Commit

Permalink
SERVER-4399 make replica set updating more reliable in c++ driver, re…
Browse files Browse the repository at this point in the history
…move nodes when no longer present in config
  • Loading branch information
Greg Studer authored and erh committed Feb 1, 2012
1 parent 9a09c9f commit 6b2231a
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 37 deletions.
192 changes: 156 additions & 36 deletions client/dbclient_rs.cpp
Expand Up @@ -72,6 +72,15 @@ namespace mongo {

} replicaSetMonitorWatcher;

string seedString( const vector<HostAndPort>& servers ){
string seedStr;
for ( unsigned i = 0; i < servers.size(); i++ ){
seedStr += servers[i].toString();
if( i < servers.size() - 1 ) seedStr += ",";
}

return seedStr;
}

ReplicaSetMonitor::ReplicaSetMonitor( const string& name , const vector<HostAndPort>& servers )
: _lock( "ReplicaSetMonitor instance" ) , _checkConnectionLock( "ReplicaSetMonitor check connection lock" ), _name( name ) , _master(-1), _nextSlave(0) {
Expand All @@ -82,28 +91,36 @@ namespace mongo {
warning() << "replica set name empty, first node: " << servers[0] << endl;
}

string errmsg;
log() << "starting new replica set monitor for replica set " << _name << " with seed of " << seedString( servers ) << endl;

for ( unsigned i=0; i<servers.size(); i++ ) {
string errmsg;
for ( unsigned i = 0; i < servers.size(); i++ ) {

bool haveAlready = false;
for ( unsigned n = 0; n < _nodes.size() && ! haveAlready; n++ )
haveAlready = ( _nodes[n].addr == servers[i] );
if( haveAlready ) continue;
// Don't check servers we have already
if( _find_inlock( servers[i] ) >= 0 ) continue;

auto_ptr<DBClientConnection> conn( new DBClientConnection( true , 0, 5.0 ) );
if (!conn->connect( servers[i] , errmsg ) ) {
log(1) << "error connecting to seed " << servers[i] << ": " << errmsg << endl;
try{
if( ! conn->connect( servers[i] , errmsg ) ){
throw DBException( errmsg, 15928 );
}
log() << "successfully connected to seed " << servers[i] << " for replica set " << this->_name << endl;
}
catch( DBException& e ){
log() << "error connecting to seed " << servers[i] << causedBy( e ) << endl;
// skip seeds that don't work
continue;
}

_nodes.push_back( Node( servers[i] , conn.release() ) );

int myLoc = _nodes.size() - 1;
string maybePrimary;
_checkConnection( _nodes[myLoc].conn.get() , maybePrimary, false, myLoc );
_checkConnection( conn.get(), maybePrimary, false, -1 );
}

// Check everything to get the first data
_check( true );

log() << "replica set monitor for replica set " << _name << " started, address is " << getServerAddress() << endl;

}

ReplicaSetMonitor::~ReplicaSetMonitor() {
Expand Down Expand Up @@ -164,18 +181,21 @@ namespace mongo {
}

string ReplicaSetMonitor::getServerAddress() const {
scoped_lock lk( _lock );
return _getServerAddress_inlock();
}

string ReplicaSetMonitor::_getServerAddress_inlock() const {
StringBuilder ss;
if ( _name.size() )
ss << _name << "/";

{
scoped_lock lk( _lock );
for ( unsigned i=0; i<_nodes.size(); i++ ) {
if ( i > 0 )
ss << ",";
ss << _nodes[i].addr.toString();
}
for ( unsigned i=0; i<_nodes.size(); i++ ) {
if ( i > 0 )
ss << ",";
ss << _nodes[i].addr.toString();
}

return ss.str();
}

Expand Down Expand Up @@ -313,29 +333,126 @@ namespace mongo {
}
}

void ReplicaSetMonitor::_checkHosts( const BSONObj& hostList, bool& changed ) {
NodeDiff ReplicaSetMonitor::_getHostDiff_inlock( const BSONObj& hostList ){

NodeDiff diff;
set<int> nodesFound;

int index = 0;
BSONObjIterator hi( hostList );
while( hi.more() ){

string toCheck = hi.next().String();
int nodeIndex = _find_inlock( toCheck );

// Node-to-add
if( nodeIndex < 0 ) diff.first.insert( toCheck );
else nodesFound.insert( nodeIndex );

index++;
}

for( size_t i = 0; i < _nodes.size(); i++ ){
if( nodesFound.find( static_cast<int>(i) ) == nodesFound.end() ) diff.second.insert( static_cast<int>(i) );
}

return diff;
}

bool ReplicaSetMonitor::_shouldChangeHosts( const BSONObj& hostList, bool inlock ){

int origHosts = 0;
if( ! inlock ){
scoped_lock lk( _lock );
origHosts = _nodes.size();
}
else origHosts = _nodes.size();
int numHosts = 0;
bool changed = false;

BSONObjIterator hi(hostList);
while ( hi.more() ) {
string toCheck = hi.next().String();

if ( _find( toCheck ) >= 0 )
continue;
numHosts++;
int index = 0;
if( ! inlock ) index = _find( toCheck );
else index = _find_inlock( toCheck );

if ( index >= 0 ) continue;

changed = true;
break;
}

return changed || origHosts != numHosts;

}

void ReplicaSetMonitor::_checkHosts( const BSONObj& hostList, bool& changed ) {

// Fast path, still requires intermittent locking
if( ! _shouldChangeHosts( hostList, false ) ){
changed = false;
return;
}

// Slow path, double-checked though
scoped_lock lk( _lock );

// Our host list may have changed while waiting for another thread in the meantime,
// so double-check here
// TODO: Do we really need this much protection, this should be pretty rare and not triggered
// from lots of threads, duping old behavior for safety
if( ! _shouldChangeHosts( hostList, true ) ){
changed = false;
return;
}

// LogLevel can be pretty low, since replica set reconfiguration should be pretty rare and we
// want to record our changes
log() << "changing hosts to " << hostList << " from " << _getServerAddress_inlock() << endl;

HostAndPort h( toCheck );
NodeDiff diff = _getHostDiff_inlock( hostList );
set<string> added = diff.first;
set<int> removed = diff.second;

assert( added.size() > 0 || removed.size() > 0 );
changed = true;

// Delete from the end so we don't invalidate as we delete, delete indices are ascending
for( set<int>::reverse_iterator i = removed.rbegin(), end = removed.rend(); i != end; ++i ){

log() << "erasing host " << _nodes[ *i ] << " from replica set " << this->_name << endl;

_nodes.erase( _nodes.begin() + *i );
}

// Add new nodes
for( set<string>::iterator i = added.begin(), end = added.end(); i != end; ++i ){

log() << "trying to add new host " << *i << " to replica set " << this->_name << endl;

// Connect to new node
HostAndPort h( *i );
DBClientConnection * newConn = new DBClientConnection( true, 0, 5.0 );
string temp;
newConn->connect( h , temp );
{
scoped_lock lk( _lock );
if ( _find_inlock( toCheck ) >= 0 ) {
// we need this check inside the lock so there isn't thread contention on adding to vector
continue;

string errmsg;
try{
if( ! newConn->connect( h , errmsg ) ){
throw DBException( errmsg, 15927 );
}
_nodes.push_back( Node( h , newConn ) );
log() << "successfully connected to new host " << *i << " in replica set " << this->_name << endl;
}
log() << "updated set (" << _name << ") to: " << getServerAddress() << endl;
changed = true;
catch( DBException& e ){
warning() << "cannot connect to new host " << *i << " to replica set " << this->_name << causedBy( e ) << endl;
delete newConn;
newConn = NULL;
}

_nodes.push_back( Node( h , newConn ) );
}

}


Expand All @@ -348,7 +465,6 @@ namespace mongo {
Timer t;
BSONObj o;
c->isMaster(isMaster, &o);

if ( o["setName"].type() != String || o["setName"].String() != _name ) {
warning() << "node: " << c->getServerAddress() << " isn't a part of set: " << _name
<< " ismaster: " << o << endl;
Expand All @@ -369,16 +485,20 @@ namespace mongo {
log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: " << c->toString() << ' ' << o << endl;

// add other nodes
BSONArrayBuilder b;
if ( o["hosts"].type() == Array ) {
if ( o["primary"].type() == String )
maybePrimary = o["primary"].String();

_checkHosts(o["hosts"].Obj(), changed);
BSONObjIterator it( o["hosts"].Obj() );
while( it.more() ) b.append( it.next() );
}
if (o.hasField("passives") && o["passives"].type() == Array) {
_checkHosts(o["passives"].Obj(), changed);
BSONObjIterator it( o["passives"].Obj() );
while( it.more() ) b.append( it.next() );
}

_checkHosts( b.arr(), changed);
_checkStatus(c);


Expand Down
10 changes: 9 additions & 1 deletion client/dbclient_rs.h
Expand Up @@ -24,6 +24,7 @@ namespace mongo {

class ReplicaSetMonitor;
typedef shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorPtr;
typedef pair<set<string>,set<int> > NodeDiff;

/**
* manages state about a replica set for client
Expand Down Expand Up @@ -92,7 +93,7 @@ namespace mongo {
string getName() const { return _name; }

string getServerAddress() const;

bool contains( const string& server ) const;

void appendInfo( BSONObjBuilder& b ) const;
Expand Down Expand Up @@ -132,6 +133,12 @@ namespace mongo {
*/
bool _checkConnection( DBClientConnection * c , string& maybePrimary , bool verbose , int nodesOffset );

string _getServerAddress_inlock() const;

NodeDiff _getHostDiff_inlock( const BSONObj& hostList );
bool _shouldChangeHosts( const BSONObj& hostList, bool inlock );


int _find( const string& server ) const ;
int _find_inlock( const string& server ) const ;
int _find( const HostAndPort& server ) const ;
Expand All @@ -144,6 +151,7 @@ namespace mongo {
Node( const HostAndPort& a , DBClientConnection* c )
: addr( a ) , conn(c) , ok(true) ,
ismaster(false), secondary( false ) , hidden( false ) , pingTimeMillis(0) {
ok = conn.get() == NULL;
}

bool okForSecondaryQueries() const {
Expand Down

0 comments on commit 6b2231a

Please sign in to comment.