Skip to content
Permalink
Browse files

net: make threads interruptible

  • Loading branch information...
theuni committed Dec 5, 2016
1 parent 7d5d449 commit 7b4d8f608cca17fba7311c310e74c4cbd7671b73
Showing with 137 additions and 43 deletions.
  1. +1 −0 src/Makefile.am
  2. +3 −1 src/init.cpp
  3. +70 −40 src/net.cpp
  4. +19 −2 src/net.h
  5. +44 −0 src/threadinterrupt.h
@@ -136,6 +136,7 @@ BITCOIN_CORE_H = \
support/lockedpool.h \
sync.h \
threadsafety.h \
threadinterrupt.h \
timedata.h \
torcontrol.h \
txdb.h \
@@ -175,6 +175,8 @@ void Interrupt(boost::thread_group& threadGroup)
InterruptRPC();
InterruptREST();
InterruptTorControl();
if(g_connman)
g_connman->Interrupt();
threadGroup.interrupt_all();
}

@@ -1554,7 +1556,7 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler)
connOptions.nMaxOutboundTimeframe = nMaxOutboundTimeframe;
connOptions.nMaxOutboundLimit = nMaxOutboundLimit;

if(!connman.Start(threadGroup, scheduler, strNodeError, connOptions))
if(!connman.Start(scheduler, strNodeError, connOptions))
return InitError(strNodeError);

// ********************************************************* Step 12: finished
@@ -35,9 +35,6 @@
#include <miniupnpc/upnperrors.h>
#endif

#include <boost/filesystem.hpp>
#include <boost/thread.hpp>

#include <math.h>

// Dump addresses to peers.dat and banlist.dat every 15 minutes (900s)
@@ -1042,7 +1039,7 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) {
void CConnman::ThreadSocketHandler()
{
unsigned int nPrevNodeCount = 0;
while (true)
while (!interruptNet)
{
//
// Disconnect nodes
@@ -1180,7 +1177,8 @@ void CConnman::ThreadSocketHandler()

int nSelect = select(have_fds ? hSocketMax + 1 : 0,
&fdsetRecv, &fdsetSend, &fdsetError, &timeout);
boost::this_thread::interruption_point();
if(interruptNet)
return;

if (nSelect == SOCKET_ERROR)
{
@@ -1219,7 +1217,8 @@ void CConnman::ThreadSocketHandler()
}
BOOST_FOREACH(CNode* pnode, vNodesCopy)
{
boost::this_thread::interruption_point();
if(interruptNet)
return;

//
// Receive
@@ -1241,7 +1240,7 @@ void CConnman::ThreadSocketHandler()
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
pnode->CloseSocketDisconnect();
if(notify)
messageHandlerCondition.notify_one();
condMsgProc.notify_one();
pnode->nLastRecv = GetTime();
pnode->nRecvBytes += nBytes;
RecordBytesRecv(nBytes);
@@ -1469,7 +1468,8 @@ void CConnman::ThreadDNSAddressSeed()
// less influence on the network topology, and reduces traffic to the seeds.
if ((addrman.size() > 0) &&
(!GetBoolArg("-forcednsseed", DEFAULT_FORCEDNSSEED))) {
MilliSleep(11 * 1000);
if(!InterruptibleSleep(std::chrono::seconds(11), interruptNet))
return;

LOCK(cs_vNodes);
int nRelevant = 0;
@@ -1488,6 +1488,8 @@ void CConnman::ThreadDNSAddressSeed()
LogPrintf("Loading addresses from DNS seeds (could take a while)\n");

BOOST_FOREACH(const CDNSSeedData &seed, vSeeds) {
if(interruptNet)
break;
if (HaveNameProxy()) {
AddOneShot(seed.host);
} else {
@@ -1580,10 +1582,12 @@ void CConnman::ThreadOpenConnections()
OpenNetworkConnection(addr, false, NULL, strAddr.c_str());
for (int i = 0; i < 10 && i < nLoop; i++)
{
MilliSleep(500);
if(!InterruptibleSleep(std::chrono::milliseconds(500), interruptNet))
return;
}
}
MilliSleep(500);
if(!InterruptibleSleep(std::chrono::milliseconds(500), interruptNet))
return;
}
}

@@ -1592,14 +1596,16 @@ void CConnman::ThreadOpenConnections()

// Minimum time before next feeler connection (in microseconds).
int64_t nNextFeeler = PoissonNextSend(nStart*1000*1000, FEELER_INTERVAL);
while (true)
while (!interruptNet)
{
ProcessOneShot();

MilliSleep(500);
if(!InterruptibleSleep(std::chrono::milliseconds(500), interruptNet))
return;

CSemaphoreGrant grant(*semOutbound);
boost::this_thread::interruption_point();
if(interruptNet)
return;

// Add seed nodes if DNS seeds are all down (an infrastructure attack?).
if (addrman.size() == 0 && (GetTime() - nStart > 60)) {
@@ -1657,7 +1663,7 @@ void CConnman::ThreadOpenConnections()

int64_t nANow = GetAdjustedTime();
int nTries = 0;
while (true)
while (!interruptNet)
{
CAddrInfo addr = addrman.Select(fFeeler);

@@ -1700,7 +1706,8 @@ void CConnman::ThreadOpenConnections()
if (fFeeler) {
// Add small amount of random noise before connection to avoid synchronization.
int randsleep = GetRandInt(FEELER_SLEEP_WINDOW * 1000);
MilliSleep(randsleep);
if(!InterruptibleSleep(std::chrono::milliseconds(randsleep), interruptNet))
return;
LogPrint("net", "Making feeler connection to %s\n", addrConnect.ToString());
}

@@ -1778,11 +1785,12 @@ void CConnman::ThreadOpenAddedConnections()
// OpenNetworkConnection can detect existing connections to that IP/port.
CService service(LookupNumeric(info.strAddedNode.c_str(), Params().GetDefaultPort()));
OpenNetworkConnection(CAddress(service, NODE_NONE), false, &grant, info.strAddedNode.c_str(), false);
MilliSleep(500);
if(!InterruptibleSleep(std::chrono::milliseconds(500), interruptNet))
return;
}
}

MilliSleep(120000); // Retry every 2 minutes
if(!InterruptibleSleep(std::chrono::minutes(2), interruptNet))
return;
}
}

@@ -1792,7 +1800,9 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
//
// Initiate outbound network connection
//
boost::this_thread::interruption_point();
if (interruptNet) {
return false;
}
if (!fNetworkActive) {
return false;
}
@@ -1805,7 +1815,6 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
return false;

CNode* pnode = ConnectNode(addrConnect, pszDest, fCountFailure);
boost::this_thread::interruption_point();

if (!pnode)
return false;
@@ -1819,13 +1828,9 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
return true;
}


void CConnman::ThreadMessageHandler()
{
boost::mutex condition_mutex;
boost::unique_lock<boost::mutex> lock(condition_mutex);

while (true)
while (!interruptMsgProc)
{
std::vector<CNode*> vNodesCopy;
{
@@ -1860,15 +1865,17 @@ void CConnman::ThreadMessageHandler()
}
}
}
boost::this_thread::interruption_point();
if(interruptMsgProc)
return;

// Send messages
{
TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend)
GetNodeSignals().SendMessages(pnode, *this);
}
boost::this_thread::interruption_point();
if(interruptMsgProc)
return;
}

{
@@ -1877,8 +1884,10 @@ void CConnman::ThreadMessageHandler()
pnode->Release();
}

if (fSleep)
messageHandlerCondition.timed_wait(lock, boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(100));
if(fSleep) {
std::unique_lock<std::mutex> lock(mutexMsgProc);
condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100));
}
}
}

@@ -2057,7 +2066,7 @@ void CConnman::SetNetworkActive(bool active)
uiInterface.NotifyNetworkActiveChanged(fNetworkActive);
}

CConnman::CConnman(uint64_t nSeed0In, uint64_t nSeed1In) : nSeed0(nSeed0In), nSeed1(nSeed1In)
CConnman::CConnman(uint64_t nSeed0In, uint64_t nSeed1In) : nSeed0(nSeed0In), nSeed1(nSeed1In), interruptMsgProc(condMsgProc, mutexMsgProc), interruptNet(condNet, mutexNet)
{
fNetworkActive = true;
setBannedIsDirty = false;
@@ -2077,7 +2086,7 @@ NodeId CConnman::GetNewNodeId()
return nLastNodeId.fetch_add(1, std::memory_order_relaxed);
}

bool CConnman::Start(boost::thread_group& threadGroup, CScheduler& scheduler, std::string& strNodeError, Options connOptions)
bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options connOptions)
{
nTotalBytesRecv = 0;
nTotalBytesSent = 0;
@@ -2145,23 +2154,26 @@ bool CConnman::Start(boost::thread_group& threadGroup, CScheduler& scheduler, st
// Start threads
//

interruptNet.reset();
interruptMsgProc.reset();

// Send and receive from sockets, accept connections
threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this)));

if (!GetBoolArg("-dnsseed", true))
LogPrintf("DNS seeding disabled\n");
else
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "dnsseed", boost::function<void()>(boost::bind(&CConnman::ThreadDNSAddressSeed, this))));

// Send and receive from sockets, accept connections
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "net", boost::function<void()>(boost::bind(&CConnman::ThreadSocketHandler, this))));
threadDNSAddressSeed = std::thread(&TraceThread<std::function<void()> >, "dnsseed", std::function<void()>(std::bind(&CConnman::ThreadDNSAddressSeed, this)));

// Initiate outbound connections from -addnode
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "addcon", boost::function<void()>(boost::bind(&CConnman::ThreadOpenAddedConnections, this))));
threadOpenAddedConnections = std::thread(&TraceThread<std::function<void()> >, "addcon", std::function<void()>(std::bind(&CConnman::ThreadOpenAddedConnections, this)));

// Initiate outbound connections unless connect=0
if (!mapArgs.count("-connect") || mapMultiArgs["-connect"].size() != 1 || mapMultiArgs["-connect"][0] != "0")
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "opencon", boost::function<void()>(boost::bind(&CConnman::ThreadOpenConnections, this))));
threadOpenConnections = std::thread(&TraceThread<std::function<void()> >, "opencon", std::function<void()>(std::bind(&CConnman::ThreadOpenConnections, this)));

// Process messages
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "msghand", boost::function<void()>(boost::bind(&CConnman::ThreadMessageHandler, this))));
threadMessageHandler = std::thread(&TraceThread<std::function<void()> >, "msghand", std::function<void()>(std::bind(&CConnman::ThreadMessageHandler, this)));

// Dump network addresses
scheduler.scheduleEvery(boost::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL);
@@ -2183,13 +2195,30 @@ class CNetCleanup
}
}
instance_of_cnetcleanup;

void CConnman::Stop()
void CConnman::Interrupt()
{
LogPrintf("%s\n",__func__);
interruptMsgProc();
interruptNet();

if (semOutbound)
for (int i=0; i<(nMaxOutbound + nMaxFeeler); i++)
semOutbound->post();
}

void CConnman::Stop()
{
LogPrintf("%s\n",__func__);
if (threadMessageHandler.joinable())
threadMessageHandler.join();
if (threadOpenConnections.joinable())
threadOpenConnections.join();
if (threadOpenAddedConnections.joinable())
threadOpenAddedConnections.join();
if (threadDNSAddressSeed.joinable())
threadDNSAddressSeed.join();
if (threadSocketHandler.joinable())
threadSocketHandler.join();

if (fAddressesInitialized)
{
@@ -2232,6 +2261,7 @@ void CConnman::DeleteNode(CNode* pnode)

CConnman::~CConnman()
{
Interrupt();
Stop();
}

@@ -19,11 +19,14 @@
#include "streams.h"
#include "sync.h"
#include "uint256.h"
#include "threadinterrupt.h"

#include <atomic>
#include <deque>
#include <stdint.h>
#include <thread>
#include <memory>
#include <condition_variable>

#ifndef WIN32
#include <arpa/inet.h>
@@ -142,8 +145,9 @@ class CConnman
};
CConnman(uint64_t seed0, uint64_t seed1);
~CConnman();
bool Start(boost::thread_group& threadGroup, CScheduler& scheduler, std::string& strNodeError, Options options);
bool Start(CScheduler& scheduler, std::string& strNodeError, Options options);
void Stop();
void Interrupt();
bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false);
bool GetNetworkActive() const { return fNetworkActive; };
void SetNetworkActive(bool active);
@@ -402,7 +406,6 @@ class CConnman
std::list<CNode*> vNodesDisconnected;
mutable CCriticalSection cs_vNodes;
std::atomic<NodeId> nLastNodeId;
boost::condition_variable messageHandlerCondition;

/** Services this instance offers */
ServiceFlags nLocalServices;
@@ -419,6 +422,20 @@ class CConnman

/** SipHasher seeds for deterministic randomness */
const uint64_t nSeed0, nSeed1;

std::condition_variable condMsgProc;
std::mutex mutexMsgProc;
CThreadInterrupt interruptMsgProc;

std::condition_variable condNet;
std::mutex mutexNet;
CThreadInterrupt interruptNet;

std::thread threadDNSAddressSeed;
std::thread threadSocketHandler;
std::thread threadOpenAddedConnections;
std::thread threadOpenConnections;
std::thread threadMessageHandler;
};
extern std::unique_ptr<CConnman> g_connman;
void Discover(boost::thread_group& threadGroup);

0 comments on commit 7b4d8f6

Please sign in to comment.
You can’t perform that action at this time.