From 08b089c0fbee8dea534f54a01bde096aee4352a2 Mon Sep 17 00:00:00 2001 From: SDCDev Date: Mon, 11 May 2015 10:23:41 +0200 Subject: [PATCH] Deadlock fixes --- shadow.pro | 2 +- src/clientversion.h | 2 +- src/main.cpp | 29 ++++--- src/main.h | 2 +- src/net.cpp | 4 +- src/net.h | 1 + src/smessage.cpp | 182 +++++++++++++++++++++++++------------------- src/wallet.cpp | 23 +++--- 8 files changed, 135 insertions(+), 110 deletions(-) diff --git a/shadow.pro b/shadow.pro index b565c7f32b..670bdea6ef 100644 --- a/shadow.pro +++ b/shadow.pro @@ -1,6 +1,6 @@ TEMPLATE = app TARGET = shadow -VERSION = 1.3.2.3 +VERSION = 1.3.2.4 INCLUDEPATH += src src/json src/qt DEFINES += BOOST_THREAD_USE_LIB BOOST_SPIRIT_THREADSAFE CONFIG += no_include_pwd diff --git a/src/clientversion.h b/src/clientversion.h index 7de9d24558..e33241637d 100644 --- a/src/clientversion.h +++ b/src/clientversion.h @@ -9,7 +9,7 @@ #define CLIENT_VERSION_MAJOR 1 #define CLIENT_VERSION_MINOR 3 #define CLIENT_VERSION_REVISION 2 -#define CLIENT_VERSION_BUILD 3 +#define CLIENT_VERSION_BUILD 4 // Converts the parameter X to a string after macro replacement on X has been performed. // Don't merge these into one macro! diff --git a/src/main.cpp b/src/main.cpp index afe5142eb7..a0565d1337 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -6166,7 +6166,7 @@ bool ProcessMessages(CNode* pfrom) } -bool SendMessages(CNode* pto, bool fSendTrickle) +bool SendMessages(CNode* pto, std::vector &vNodesCopy, bool fSendTrickle) { // Don't send anything until we get their version message if (pto->nVersion == 0) @@ -6214,23 +6214,20 @@ bool SendMessages(CNode* pto, bool fSendTrickle) static int64_t nLastRebroadcast; if (!IsInitialBlockDownload() && (GetTime() - nLastRebroadcast > 24 * 60 * 60)) { + BOOST_FOREACH(CNode* pnode, vNodesCopy) { - LOCK(cs_vNodes); - BOOST_FOREACH(CNode* pnode, vNodes) - { - // Periodically clear setAddrKnown to allow refresh broadcasts - if (nLastRebroadcast) - pnode->setAddrKnown.clear(); + // Periodically clear setAddrKnown to allow refresh broadcasts + if (nLastRebroadcast) + pnode->setAddrKnown.clear(); - // Rebroadcast our address - if (!fNoListen) - { - CAddress addr = GetLocalAddress(&pnode->addr); - if (addr.IsRoutable()) - pnode->PushAddress(addr); - }; + // Rebroadcast our address + if (!fNoListen) + { + CAddress addr = GetLocalAddress(&pnode->addr); + if (addr.IsRoutable()) + pnode->PushAddress(addr); }; - } + }; nLastRebroadcast = GetTime(); }; @@ -6268,7 +6265,7 @@ bool SendMessages(CNode* pto, bool fSendTrickle) std::vector vInvWait; { - LOCK(pto->cs_inventory); + LOCK2(pwalletMain->cs_wallet, pto->cs_inventory); vInv.reserve(pto->vInventoryToSend.size()); vInvWait.reserve(pto->vInventoryToSend.size()); BOOST_FOREACH(const CInv& inv, pto->vInventoryToSend) diff --git a/src/main.h b/src/main.h index 06ad9f5727..a727d21ded 100644 --- a/src/main.h +++ b/src/main.h @@ -117,7 +117,7 @@ void PrintBlockTree(); CBlockIndex* FindBlockByHeight(int nHeight); CBlockThinIndex* FindBlockThinByHeight(int nHeight); bool ProcessMessages(CNode* pfrom); -bool SendMessages(CNode* pto, bool fSendTrickle); +bool SendMessages(CNode* pto, std::vector &vNodesCopy, bool fSendTrickle); bool LoadExternalBlockFile(int nFile, FILE* fileIn); diff --git a/src/net.cpp b/src/net.cpp index 0bd205c3fb..73d3a7926e 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1520,7 +1520,7 @@ void ThreadMessageHandler() for (;;) { boost::this_thread::interruption_point(); - vector vNodesCopy; + std::vector vNodesCopy; { LOCK(cs_vNodes); vNodesCopy = vNodes; @@ -1559,7 +1559,7 @@ void ThreadMessageHandler() { TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) - SendMessages(pnode, pnode == pnodeTrickle); + SendMessages(pnode, vNodesCopy, pnode == pnodeTrickle); } }; diff --git a/src/net.h b/src/net.h index 1fa2f4627a..01379f92fa 100644 --- a/src/net.h +++ b/src/net.h @@ -245,6 +245,7 @@ class SecMsgNode ~SecMsgNode() {}; + CCriticalSection cs_smsg_net; int64_t lastSeen; int64_t lastMatched; int64_t ignoreUntil; diff --git a/src/smessage.cpp b/src/smessage.cpp index 1e1e86313a..3096b23a51 100644 --- a/src/smessage.cpp +++ b/src/smessage.cpp @@ -577,18 +577,21 @@ bool SecMsgDB::EraseSmesg(uint8_t* chKey) void ThreadSecureMsg() { // -- bucket management thread - + + std::vector > vTimedOutLocks; while (fSecMsgEnabled) { int64_t now = GetTime(); if (fDebugSmsg) LogPrintf("SecureMsgThread %d \n", now); - + + vTimedOutLocks.resize(0); + int64_t cutoffTime = now - SMSG_RETENTION; { LOCK(cs_smsg); - + for (std::map::iterator it(smsgBuckets.begin()); it != smsgBuckets.end(); it++) { //if (fDebugSmsg) @@ -603,8 +606,7 @@ void ThreadSecureMsg() fs::path fullPath = GetDataDir() / "smsgStore" / (fileName + "_01.dat"); if (fs::exists(fullPath)) { - try { - fs::remove(fullPath); + try { fs::remove(fullPath); } catch (const fs::filesystem_error& ex) { LogPrintf("Error removing bucket file %s.\n", ex.what()); @@ -618,8 +620,7 @@ void ThreadSecureMsg() fullPath = GetDataDir() / "smsgStore" / (fileName + "_01_wl.dat"); if (fs::exists(fullPath)) { - try { - fs::remove(fullPath); + try { fs::remove(fullPath); } catch (const fs::filesystem_error& ex) { LogPrintf("Error removing wallet locked file %s.\n", ex.what()); @@ -634,35 +635,46 @@ void ThreadSecureMsg() if (it->second.nLockCount == 0) // lock timed out { - NodeId nPeerId = it->second.nLockPeerId; - int64_t ignoreUntil = GetTime() + SMSG_TIME_IGNORE; - - if (fDebugSmsg) - LogPrintf("Lock on bucket %d for peer %d timed out.\n", it->first, nPeerId); - - // -- look through the nodes for the peer that locked this bucket - LOCK(cs_vNodes); - BOOST_FOREACH(CNode* pnode, vNodes) - { - if (pnode->id != nPeerId) - continue; - pnode->smsgData.ignoreUntil = ignoreUntil; - - // -- alert peer that they are being ignored - std::vector vchData; - vchData.resize(8); - memcpy(&vchData[0], &ignoreUntil, 8); - pnode->PushMessage("smsgIgnore", vchData); - - if (fDebugSmsg) - LogPrintf("This node will ignore peer %d until %d.\n", nPeerId, ignoreUntil); - break; - }; + vTimedOutLocks.push_back(std::make_pair(it->first, it->second.nLockPeerId)); // cs_vNodes + it->second.nLockPeerId = 0; }; // if (it->second.nLockCount == 0) + }; // ! if (it->first < cutoffTime) }; - }; // LOCK(cs_smsg); + } // cs_smsg + + for (std::vector >::iterator it(vTimedOutLocks.begin()); it != vTimedOutLocks.end(); it++) + { + NodeId nPeerId = it->second; + int64_t ignoreUntil = GetTime() + SMSG_TIME_IGNORE; + + if (fDebugSmsg) + LogPrintf("Lock on bucket %d for peer %d timed out.\n", it->first, nPeerId); + + // -- look through the nodes for the peer that locked this bucket + + { + LOCK(cs_vNodes); + BOOST_FOREACH(CNode* pnode, vNodes) + { + if (pnode->id != nPeerId) + continue; + LOCK2(pnode->cs_vSend, pnode->smsgData.cs_smsg_net); + pnode->smsgData.ignoreUntil = ignoreUntil; + + // -- alert peer that they are being ignored + std::vector vchData; + vchData.resize(8); + memcpy(&vchData[0], &ignoreUntil, 8); + pnode->PushMessage("smsgIgnore", vchData); + + if (fDebugSmsg) + LogPrintf("This node will ignore peer %d until %d.\n", nPeerId, ignoreUntil); + break; + }; + } // cs_vNodes + }; MilliSleep(SMSG_THREAD_DELAY * 1000); // // check every SMSG_THREAD_DELAY seconds }; @@ -1120,6 +1132,7 @@ bool SecureMsgStart(bool fDontStart, bool fScanChain) threadGroupSmsg.create_thread(boost::bind(&TraceThread, "smsg", &ThreadSecureMsg)); threadGroupSmsg.create_thread(boost::bind(&TraceThread, "smsg-pow", &ThreadSecureMsgPow)); + /* // -- start threads if (!NewThread(ThreadSecureMsg, NULL) @@ -1192,11 +1205,12 @@ bool SecureMsgEnable() return false; }; - }; // LOCK(cs_smsg); - + } // cs_smsg + // -- start threads threadGroupSmsg.create_thread(boost::bind(&TraceThread, "smsg", &ThreadSecureMsg)); threadGroupSmsg.create_thread(boost::bind(&TraceThread, "smsg-pow", &ThreadSecureMsgPow)); + /* if (!NewThread(ThreadSecureMsg, NULL) || !NewThread(ThreadSecureMsgPow, NULL)) @@ -1214,8 +1228,7 @@ bool SecureMsgEnable() pnode->PushMessage("smsgPing"); pnode->PushMessage("smsgPong"); // Send pong as have missed initial ping sent by peer when it connected }; - } - + } // cs_vNodes LogPrintf("Secure messaging enabled.\n"); return true; }; @@ -1228,7 +1241,7 @@ bool SecureMsgDisable() LogPrintf("SecureMsgDisable: secure messaging is already disabled.\n"); return false; }; - + { LOCK(cs_smsg); fSecMsgEnabled = false; @@ -1244,26 +1257,25 @@ bool SecureMsgDisable() it->second.setTokens.clear(); }; smsgBuckets.clear(); - - // -- tell each smsg enabled peer that this node is disabling + smsgAddresses.clear(); + } // cs_smsg + + // -- tell each smsg enabled peer that this node is disabling + { + LOCK(cs_vNodes); + BOOST_FOREACH(CNode* pnode, vNodes) { - LOCK(cs_vNodes); - BOOST_FOREACH(CNode* pnode, vNodes) - { - if (!pnode->smsgData.fEnabled) - continue; - - pnode->PushMessage("smsgDisabled"); - pnode->smsgData.fEnabled = false; - }; - } - - if (SecureMsgWriteIni() != 0) - LogPrintf("Failed to save smsg.ini\n"); + if (!pnode->smsgData.fEnabled) + continue; + LOCK2(pnode->cs_vSend, pnode->smsgData.cs_smsg_net); + pnode->PushMessage("smsgDisabled"); + pnode->smsgData.fEnabled = false; + }; + } // cs_vNodes - smsgAddresses.clear(); - } // LOCK(cs_smsg); + if (SecureMsgWriteIni() != 0) + LogPrintf("Failed to save smsg.ini\n"); // -- allow time for threads to stop MilliSleep(3000); // seconds @@ -1291,11 +1303,9 @@ bool SecureMsgReceiveData(CNode* pfrom, std::string strCommand, CDataStream& vRe if (fDebugSmsg) LogPrintf("SecureMsgReceiveData() %s %s.\n", pfrom->addrName.c_str(), strCommand.c_str()); - - { - + if (strCommand == "smsgInv") { std::vector vchData; @@ -1308,14 +1318,18 @@ bool SecureMsgReceiveData(CNode* pfrom, std::string strCommand, CDataStream& vRe }; int64_t now = GetTime(); - - if (now < pfrom->smsgData.ignoreUntil) + { - if (fDebugSmsg) - LogPrintf("Node is ignoring peer %d until %d.\n", pfrom->id, pfrom->smsgData.ignoreUntil); - return false; - }; - + LOCK(pfrom->smsgData.cs_smsg_net); + + if (now < pfrom->smsgData.ignoreUntil) + { + if (fDebugSmsg) + LogPrintf("Node is ignoring peer %d until %d.\n", pfrom->id, pfrom->smsgData.ignoreUntil); + return false; + }; + } + uint32_t nBuckets = smsgBuckets.size(); uint32_t nLocked = 0; // no. of locked buckets on this node uint32_t nInvBuckets; // no. of bucket headers sent by peer in smsgInv @@ -1701,8 +1715,10 @@ bool SecureMsgReceiveData(CNode* pfrom, std::string strCommand, CDataStream& vRe time = now; }; - pfrom->smsgData.lastMatched = time; - + { + LOCK(pfrom->smsgData.cs_smsg_net); + pfrom->smsgData.lastMatched = time; + } if (fDebugSmsg) LogPrintf("Peer buckets matched at %d.\n", time); @@ -1716,15 +1732,22 @@ bool SecureMsgReceiveData(CNode* pfrom, std::string strCommand, CDataStream& vRe { if (fDebugSmsg) LogPrintf("Peer replied, secure messaging enabled.\n"); - - pfrom->smsgData.fEnabled = true; + + { + LOCK(pfrom->smsgData.cs_smsg_net); + pfrom->smsgData.fEnabled = true; + } + } else if (strCommand == "smsgDisabled") { // -- peer has disabled secure messaging. - - pfrom->smsgData.fEnabled = false; - + + { + LOCK(pfrom->smsgData.cs_smsg_net); + pfrom->smsgData.fEnabled = false; + } + if (fDebugSmsg) LogPrintf("Peer %d has disabled secure messaging.\n", pfrom->id); @@ -1745,8 +1768,12 @@ bool SecureMsgReceiveData(CNode* pfrom, std::string strCommand, CDataStream& vRe int64_t time; memcpy(&time, &vchData[0], 8); - - pfrom->smsgData.ignoreUntil = time; + + { + LOCK(pfrom->smsgData.cs_smsg_net); + pfrom->smsgData.ignoreUntil = time; + } + if (fDebugSmsg) LogPrintf("Peer %d is ignoring this node until %d, ignore peer too.\n", pfrom->id, time); @@ -1755,8 +1782,6 @@ bool SecureMsgReceiveData(CNode* pfrom, std::string strCommand, CDataStream& vRe // Unknown message }; - }; // LOCK(cs_smsg); - return true; }; @@ -1766,10 +1791,11 @@ bool SecureMsgSendData(CNode* pto, bool fSendTrickle) Called from ProcessMessage Runs in ThreadMessageHandler2 */ + + LOCK(pto->smsgData.cs_smsg_net); //LogPrintf("SecureMsgSendData() %s.\n", pto->addrName.c_str()); - - + int64_t now = GetTime(); if (pto->smsgData.lastSeen == 0) @@ -2453,7 +2479,7 @@ int SecureMsgWalletKeyChanged(std::string sAddress, std::string sLabel, ChangeTy break; } - }; // LOCK(cs_smsg); + } // cs_smsg return 0; diff --git a/src/wallet.cpp b/src/wallet.cpp index 1c4bcac1a9..72840dcddd 100644 --- a/src/wallet.cpp +++ b/src/wallet.cpp @@ -1433,11 +1433,11 @@ void CWallet::ResendWalletTransactions(bool fForce) // Rebroadcast any of our txes that aren't in a block yet LogPrintf("ResendWalletTransactions()\n"); CTxDB txdb("r"); - + + multimap mapSorted; { LOCK(cs_wallet); // Sort them in chronological order - multimap mapSorted; BOOST_FOREACH(PAIRTYPE(const uint256, CWalletTx)& item, mapWallet) { CWalletTx& wtx = item.second; @@ -1446,16 +1446,17 @@ void CWallet::ResendWalletTransactions(bool fForce) if (fForce || nTimeBestReceived - (int64_t)wtx.nTimeReceived > 5 * 60) mapSorted.insert(make_pair(wtx.nTimeReceived, &wtx)); }; + } // cs_wallet - BOOST_FOREACH(PAIRTYPE(const unsigned int, CWalletTx*)& item, mapSorted) - { - CWalletTx& wtx = *item.second; - if (wtx.CheckTransaction()) - wtx.RelayWalletTransaction(txdb); - else - LogPrintf("ResendWalletTransactions() : CheckTransaction failed for transaction %s\n", wtx.GetHash().ToString().c_str()); - }; - } + BOOST_FOREACH(PAIRTYPE(const unsigned int, CWalletTx*)& item, mapSorted) + { + CWalletTx& wtx = *item.second; + if (wtx.CheckTransaction()) + wtx.RelayWalletTransaction(txdb); + else + LogPrintf("ResendWalletTransactions() : CheckTransaction failed for transaction %s\n", wtx.GetHash().ToString().c_str()); + }; + }