Permalink
Browse files

need to have waitable ticket holder, actually wait for ticket for con…

…fig server
  • Loading branch information...
1 parent cc61e1f commit 22ffe27906c23d1b8567dfc1ab7276f579019b78 Greg Studer committed Mar 8, 2012
Showing with 128 additions and 18 deletions.
  1. +85 −0 dbtests/threadedtests.cpp
  2. +4 −2 s/d_state.cpp
  3. +39 −16 util/goodies.h
View
@@ -544,6 +544,15 @@ namespace ThreadedTests {
};
#endif
+ void sleepalittle() {
+ Timer t;
+ while( 1 ) {
+ boost::this_thread::yield();
+ if( t.micros() > 8 )
+ break;
+ }
+ }
+
class WriteLocksAreGreedy : public ThreadedTest<3> {
public:
WriteLocksAreGreedy() : m("gtest") {}
@@ -579,6 +588,81 @@ namespace ThreadedTests {
}
};
+ // Tests waiting on the TicketHolder by running many more threads than can fit into the "hotel", but only
+ // max _nRooms threads should ever get in at once
+ class TicketHolderWaits : public ThreadedTest<10> {
+
+ static const int checkIns = 1000;
+ static const int rooms = 3;
+
+ public:
+ TicketHolderWaits() : _hotel( rooms ), _tickets( _hotel._nRooms ) {}
+
+ private:
+
+ class Hotel {
+ public:
+ Hotel( int nRooms ) : _frontDesk( "frontDesk" ), _nRooms( nRooms ), _checkedIn( 0 ), _maxRooms( 0 ) {}
+
+ void checkIn(){
+ scoped_lock lk( _frontDesk );
+ _checkedIn++;
+ assert( _checkedIn <= _nRooms );
+ if( _checkedIn > _maxRooms ) _maxRooms = _checkedIn;
+ }
+
+ void checkOut(){
+ scoped_lock lk( _frontDesk );
+ _checkedIn--;
+ assert( _checkedIn >= 0 );
+ }
+
+ mongo::mutex _frontDesk;
+ int _nRooms;
+ int _checkedIn;
+ int _maxRooms;
+ };
+
+ Hotel _hotel;
+ TicketHolder _tickets;
+
+ virtual void subthread(int x) {
+
+ string threadName = ( str::stream() << "ticketHolder" << x );
+ Client::initThread( threadName.c_str() );
+
+ for( int i = 0; i < checkIns; i++ ){
+
+ _tickets.waitForTicket();
+ TicketHolderReleaser whenDone( &_tickets );
+
+ _hotel.checkIn();
+
+ sleepalittle();
+ if( i == checkIns - 1 ) sleepsecs( 2 );
+
+ _hotel.checkOut();
+
+ if( ( i % ( checkIns / 10 ) ) == 0 )
+ log() << "checked in " << i << " times..." << endl;
+
+ }
+
+ cc().shutdown();
+
+ }
+
+ virtual void validate() {
+
+ // This should always be true, assuming that it takes < 1 sec for the hardware to process a check-out/check-in
+ // Time for test is then ~ #threads / _nRooms * 2 seconds
+ assert( _hotel._maxRooms == _hotel._nRooms );
+
+ }
+
+ };
+
+
class All : public Suite {
public:
All() : Suite( "threading" ) { }
@@ -600,6 +684,7 @@ namespace ThreadedTests {
add< RWLockTest4 >();
add< MongoMutexTest >();
+ add< TicketHolderWaits >();
}
} myall;
}
View
@@ -193,6 +193,7 @@ namespace mongo {
LOG( 2 ) << "trying to set shard version of " << version.toString() << " for '" << ns << "'" << endl;
+ _configServerTickets.waitForTicket();
TicketHolderReleaser needTicketFrom( &_configServerTickets );
// fast path - double-check if requested version is at the same version as this chunk manager before verifying
@@ -207,14 +208,15 @@ namespace mongo {
// + two clients reloaded
// one triggered the 'slow path' (below)
// when the second's request gets here, the version is already current
+ ConfigVersion storedVersion;
{
scoped_lock lk( _mutex );
ChunkManagersMap::const_iterator it = _chunks.find( ns );
- if ( it != _chunks.end() && it->second->getVersion() == version )
+ if ( it != _chunks.end() && ( storedVersion = it->second->getVersion() ) == version )
return true;
}
- LOG( 2 ) << "verifying remote version against " << version.toString() << " for '" << ns << "'" << endl;
+ LOG( 2 ) << "verifying cached version " << storedVersion.toString() << " and new version " << version.toString() << " for '" << ns << "'" << endl;
// slow path - requested version is different than the current chunk manager's, if one exists, so must check for
// newest version in the config server
View
@@ -324,31 +324,41 @@ namespace mongo {
bool tryAcquire() {
scoped_lock lk( _mutex );
- if ( _num <= 0 ) {
- if ( _num < 0 ) {
- cerr << "DISASTER! in TicketHolder" << endl;
- }
- return false;
+ return _tryAcquire();
+ }
+
+ void waitForTicket() {
+ scoped_lock lk( _mutex );
+
+ while( ! _tryAcquire() ) {
+ _newTicket.wait( lk.boost() );
}
- _num--;
- return true;
}
void release() {
- scoped_lock lk( _mutex );
- _num++;
+ {
+ scoped_lock lk( _mutex );
+ _num++;
+ }
+ _newTicket.notify_one();
}
void resize( int newSize ) {
- scoped_lock lk( _mutex );
- int used = _outof - _num;
- if ( used > newSize ) {
- cout << "ERROR: can't resize since we're using (" << used << ") more than newSize(" << newSize << ")" << endl;
- return;
+ {
+ scoped_lock lk( _mutex );
+
+ int used = _outof - _num;
+ if ( used > newSize ) {
+ cout << "ERROR: can't resize since we're using (" << used << ") more than newSize(" << newSize << ")" << endl;
+ return;
+ }
+
+ _outof = newSize;
+ _num = _outof - used;
}
- _outof = newSize;
- _num = _outof - used;
+ // Potentially wasteful, but easier to see is correct
+ _newTicket.notify_all();
}
int available() const {
@@ -362,9 +372,22 @@ namespace mongo {
int outof() const { return _outof; }
private:
+
+ bool _tryAcquire(){
+ if ( _num <= 0 ) {
+ if ( _num < 0 ) {
+ cerr << "DISASTER! in TicketHolder" << endl;
+ }
+ return false;
+ }
+ _num--;
+ return true;
+ }
+
int _outof;
int _num;
mongo::mutex _mutex;
+ boost::condition_variable_any _newTicket;
};
class TicketHolderReleaser {

0 comments on commit 22ffe27

Please sign in to comment.