Skip to content

Commit

Permalink
Merge pull request #1033 from sipa/wait
Browse files Browse the repository at this point in the history
Condition variables instead of polling
  • Loading branch information
sipa committed Apr 6, 2012
2 parents 9682087 + 092631f commit 9362da7
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 122 deletions.
41 changes: 22 additions & 19 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ map<CInv, int64> mapAlreadyAskedFor;
set<CNetAddr> setservAddNodeAddresses;
CCriticalSection cs_setservAddNodeAddresses;

static CWaitableCriticalSection csOutbound;
static int nOutbound = 0;
static CConditionVariable condOutbound;


unsigned short GetListenPort()
Expand Down Expand Up @@ -361,6 +364,8 @@ CNode* ConnectNode(CAddress addrConnect, int64 nTimeout)
pnode->AddRef();
CRITICAL_BLOCK(cs_vNodes)
vNodes.push_back(pnode);
WAITABLE_CRITICAL_BLOCK(csOutbound)
nOutbound++;

pnode->nTimeConnected = GetTime();
return pnode;
Expand Down Expand Up @@ -504,6 +509,15 @@ void ThreadSocketHandler2(void* parg)
// remove from vNodes
vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());

if (!pnode->fInbound)
WAITABLE_CRITICAL_BLOCK(csOutbound)
{
nOutbound--;

// Connection slot(s) were removed, notify connection creator(s)
NOTIFY(condOutbound);
}

// close socket and cleanup
pnode->CloseSocketDisconnect();
pnode->Cleanup();
Expand Down Expand Up @@ -1172,32 +1186,20 @@ void ThreadOpenConnections2(void* parg)
int64 nStart = GetTime();
loop
{
int nOutbound = 0;

vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
Sleep(500);
vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
if (fShutdown)
return;

// Limit outbound connections
loop
{
nOutbound = 0;
CRITICAL_BLOCK(cs_vNodes)
BOOST_FOREACH(CNode* pnode, vNodes)
if (!pnode->fInbound)
nOutbound++;
int nMaxOutboundConnections = MAX_OUTBOUND_CONNECTIONS;
nMaxOutboundConnections = min(nMaxOutboundConnections, (int)GetArg("-maxconnections", 125));
if (nOutbound < nMaxOutboundConnections)
break;
vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
Sleep(2000);
vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
if (fShutdown)
return;
}
int nMaxOutbound = min(MAX_OUTBOUND_CONNECTIONS, (int)GetArg("-maxconnections", 125));
vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
WAITABLE_CRITICAL_BLOCK(csOutbound)
WAIT(condOutbound, fShutdown || nOutbound < nMaxOutbound);
vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
if (fShutdown)
return;

bool fAddSeeds = false;

Expand Down Expand Up @@ -1646,6 +1648,7 @@ bool StopNode()
fShutdown = true;
nTransactionsUpdated++;
int64 nStart = GetTime();
NOTIFY_ALL(condOutbound);
do
{
int nThreadsRunning = 0;
Expand Down
56 changes: 4 additions & 52 deletions src/util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1183,62 +1183,14 @@ static void pop_lock()
dd_mutex.unlock();
}

void CCriticalSection::Enter(const char* pszName, const char* pszFile, int nLine)
void EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs)
{
push_lock(this, CLockLocation(pszName, pszFile, nLine));
#ifdef DEBUG_LOCKCONTENTION
bool result = mutex.try_lock();
if (!result)
{
printf("LOCKCONTENTION: %s\n", pszName);
printf("Locker: %s:%d\n", pszFile, nLine);
mutex.lock();
printf("Locked\n");
}
#else
mutex.lock();
#endif
}
void CCriticalSection::Leave()
{
mutex.unlock();
pop_lock();
}
bool CCriticalSection::TryEnter(const char* pszName, const char* pszFile, int nLine)
{
push_lock(this, CLockLocation(pszName, pszFile, nLine));
bool result = mutex.try_lock();
if (!result) pop_lock();
return result;
push_lock(cs, CLockLocation(pszName, pszFile, nLine));
}

#else

void CCriticalSection::Enter(const char* pszName, const char* pszFile, int nLine)
void LeaveCritical()
{
#ifdef DEBUG_LOCKCONTENTION
bool result = mutex.try_lock();
if (!result)
{
printf("LOCKCONTENTION: %s\n", pszName);
printf("Locker: %s:%d\n", pszFile, nLine);
mutex.lock();
}
#else
mutex.lock();
#endif
}

void CCriticalSection::Leave()
{
mutex.unlock();
}

bool CCriticalSection::TryEnter(const char*, const char*, int)
{
bool result = mutex.try_lock();
return result;
pop_lock();
}

#endif /* DEBUG_LOCKORDER */

157 changes: 106 additions & 51 deletions src/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ typedef int pid_t; /* define for windows compatiblity */

#include <boost/thread.hpp>
#include <boost/interprocess/sync/interprocess_recursive_mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/interprocess/sync/interprocess_condition.hpp>
#include <boost/interprocess/sync/lock_options.hpp>
#include <boost/date_time/gregorian/gregorian_types.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>

Expand Down Expand Up @@ -180,82 +183,134 @@ void AddTimeData(const CNetAddr& ip, int64 nTime);



/** Wrapped boost mutex: supports recursive locking, but no waiting */
typedef boost::interprocess::interprocess_recursive_mutex CCriticalSection;

/** Wrapper to automatically initialize mutex. */
class CCriticalSection
{
protected:
boost::interprocess::interprocess_recursive_mutex mutex;
public:
explicit CCriticalSection() { }
~CCriticalSection() { }
void Enter(const char* pszName, const char* pszFile, int nLine);
void Leave();
bool TryEnter(const char* pszName, const char* pszFile, int nLine);
};
/** Wrapped boost mutex: supports waiting but not recursive locking */
typedef boost::interprocess::interprocess_mutex CWaitableCriticalSection;

/** RAII object that acquires mutex. Needed for exception safety. */
class CCriticalBlock
{
protected:
CCriticalSection* pcs;
#ifdef DEBUG_LOCKORDER
void EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs);
void LeaveCritical();
#else
void static inline EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs) {}
void static inline LeaveCritical() {}
#endif

/** Wrapper around boost::interprocess::scoped_lock */
template<typename Mutex>
class CMutexLock
{
private:
boost::interprocess::scoped_lock<Mutex> lock;
public:
CCriticalBlock(CCriticalSection& csIn, const char* pszName, const char* pszFile, int nLine)

void Enter(const char* pszName, const char* pszFile, int nLine)
{
pcs = &csIn;
pcs->Enter(pszName, pszFile, nLine);
if (!lock.owns())
{
EnterCritical(pszName, pszFile, nLine, (void*)(lock.mutex()));
#ifdef DEBUG_LOCKCONTENTION
if (!lock.try_lock())
{
printf("LOCKCONTENTION: %s\n", pszName);
printf("Locker: %s:%d\n", pszFile, nLine);
}
#endif
lock.lock();
}
}

operator bool() const
void Leave()
{
return true;
if (lock.owns())
{
lock.unlock();
LeaveCritical();
}
}

~CCriticalBlock()
bool TryEnter(const char* pszName, const char* pszFile, int nLine)
{
pcs->Leave();
if (!lock.owns())
{
EnterCritical(pszName, pszFile, nLine, (void*)(lock.mutex()));
lock.try_lock();
if (!lock.owns())
LeaveCritical();
}
return lock.owns();
}
};

#define CRITICAL_BLOCK(cs) \
if (CCriticalBlock criticalblock = CCriticalBlock(cs, #cs, __FILE__, __LINE__))

#define ENTER_CRITICAL_SECTION(cs) \
(cs).Enter(#cs, __FILE__, __LINE__)

#define LEAVE_CRITICAL_SECTION(cs) \
(cs).Leave()

/** RAII object that tries to acquire mutex. Needed for exception safety. */
class CTryCriticalBlock
{
protected:
CCriticalSection* pcs;
CMutexLock(Mutex& mutexIn, const char* pszName, const char* pszFile, int nLine, bool fTry = false) : lock(mutexIn, boost::interprocess::defer_lock)
{
if (fTry)
TryEnter(pszName, pszFile, nLine);
else
Enter(pszName, pszFile, nLine);
}

public:
CTryCriticalBlock(CCriticalSection& csIn, const char* pszName, const char* pszFile, int nLine)
~CMutexLock()
{
pcs = (csIn.TryEnter(pszName, pszFile, nLine) ? &csIn : NULL);
if (lock.owns())
LeaveCritical();
}

operator bool() const
operator bool()
{
return Entered();
return lock.owns();
}

~CTryCriticalBlock()
boost::interprocess::scoped_lock<Mutex> &GetLock()
{
if (pcs)
{
pcs->Leave();
}
return lock;
}
bool Entered() const { return pcs != NULL; }
};

typedef CMutexLock<CCriticalSection> CCriticalBlock;
typedef CMutexLock<CWaitableCriticalSection> CWaitableCriticalBlock;
typedef boost::interprocess::interprocess_condition CConditionVariable;

/** Wait for a given condition inside a WAITABLE_CRITICAL_BLOCK */
#define WAIT(name,condition) \
do { while(!(condition)) { (name).wait(waitablecriticalblock.GetLock()); } } while(0)

/** Notify waiting threads that a condition may hold now */
#define NOTIFY(name) \
do { (name).notify_one(); } while(0)

#define NOTIFY_ALL(name) \
do { (name).notify_all(); } while(0)

#define CRITICAL_BLOCK(cs) \
for (bool fcriticalblockonce=true; fcriticalblockonce; assert(("break caught by CRITICAL_BLOCK!" && !fcriticalblockonce)), fcriticalblockonce=false) \
for (CCriticalBlock criticalblock(cs, #cs, __FILE__, __LINE__); fcriticalblockonce; fcriticalblockonce=false)

#define WAITABLE_CRITICAL_BLOCK(cs) \
for (bool fcriticalblockonce=true; fcriticalblockonce; assert(("break caught by WAITABLE_CRITICAL_BLOCK!" && !fcriticalblockonce)), fcriticalblockonce=false) \
for (CWaitableCriticalBlock waitablecriticalblock(cs, #cs, __FILE__, __LINE__); fcriticalblockonce; fcriticalblockonce=false)

#define ENTER_CRITICAL_SECTION(cs) \
{ \
EnterCritical(#cs, __FILE__, __LINE__, (void*)(&cs)); \
(cs).lock(); \
}

#define LEAVE_CRITICAL_SECTION(cs) \
{ \
(cs).unlock(); \
LeaveCritical(); \
}

#define TRY_CRITICAL_BLOCK(cs) \
if (CTryCriticalBlock criticalblock = CTryCriticalBlock(cs, #cs, __FILE__, __LINE__))
for (bool fcriticalblockonce=true; fcriticalblockonce; assert(("break caught by TRY_CRITICAL_BLOCK!" && !fcriticalblockonce)), fcriticalblockonce=false) \
for (CCriticalBlock criticalblock(cs, #cs, __FILE__, __LINE__, true); fcriticalblockonce && (fcriticalblockonce = criticalblock); fcriticalblockonce=false)


// This is exactly like std::string, but with a custom allocator.
// (secure_allocator<> is defined in serialize.h)
typedef std::basic_string<char, std::char_traits<char>, secure_allocator<char> > SecureString;




Expand Down

0 comments on commit 9362da7

Please sign in to comment.