Skip to content

Commit

Permalink
Fix locking issues, updated httpserver
Browse files Browse the repository at this point in the history
Masternode layer was locking cs_main in a bunch of places causing deadlocks intermitently. ReadBlocksFromDisk also was locking which we fixed by removing cs_main. HTTPServer was updated by upstream bitcoin merges.


Former-commit-id: dbe0afd
  • Loading branch information
sidhujag committed May 4, 2018
1 parent bba1508 commit b0266d3
Show file tree
Hide file tree
Showing 18 changed files with 179 additions and 179 deletions.
2 changes: 1 addition & 1 deletion configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ dnl require autoconf 2.60 (AS_ECHO/AS_ECHO_N)
AC_PREREQ([2.60])
define(_CLIENT_VERSION_MAJOR, 3)
define(_CLIENT_VERSION_MINOR, 0)
define(_CLIENT_VERSION_REVISION, 0)
define(_CLIENT_VERSION_REVISION, 2)
define(_CLIENT_VERSION_BUILD, 0)
define(_CLIENT_VERSION_IS_RELEASE, true)
define(_DASH_VERSION_MAJOR, 0)
Expand Down
4 changes: 3 additions & 1 deletion src/clientversion.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
//! These need to be macros, as clientversion.cpp's and syscoin*-res.rc's voodoo requires it
#define CLIENT_VERSION_MAJOR 3
#define CLIENT_VERSION_MINOR 0
#define CLIENT_VERSION_REVISION 0
#define CLIENT_VERSION_REVISION 2
#define CLIENT_VERSION_BUILD 0

//! Set to true for release, false for prerelease or test build
Expand Down Expand Up @@ -64,12 +64,14 @@
#include <vector>
// SYSCOIN
static const std::string SYSCOIN_CLIENT_VERSION = SYSCOIN_VERSION;

static const int CLIENT_VERSION =
1000000 * CLIENT_VERSION_MAJOR
+ 10000 * CLIENT_VERSION_MINOR
+ 100 * CLIENT_VERSION_REVISION
+ 1 * CLIENT_VERSION_BUILD;

static const int CLIENT_MASTERNODE_VERSION = 3000000;
extern const std::string CLIENT_NAME;
extern const std::string CLIENT_BUILD;

Expand Down
50 changes: 24 additions & 26 deletions src/governance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ void CGovernanceManager::ProcessMessage(CNode* pfrom, const std::string& strComm
else {
LogPrint("gobject", "MNGOVERNANCEOBJECTVOTE -- Rejected vote, error = %s\n", exception.what());
if((exception.GetNodePenalty() != 0) && masternodeSync.IsSynced()) {
LOCK(cs_main);
Misbehaving(pfrom->GetId(), exception.GetNodePenalty());
}
return;
Expand Down Expand Up @@ -702,7 +701,6 @@ void CGovernanceManager::SyncAll(CNode* pnode, CConnman& connman) const
if(!masternodeSync.IsSynced()) return;

if(netfulfilledman.HasFulfilledRequest(pnode->addr, NetMsgType::MNGOVERNANCESYNC)) {
LOCK(cs_main);
// Asking for the whole list multiple times in a short period of time is no good
LogPrint("gobject", "CGovernanceManager::%s -- peer already asked me for the list\n", __func__);
Misbehaving(pnode->GetId(), 20);
Expand All @@ -716,28 +714,28 @@ void CGovernanceManager::SyncAll(CNode* pnode, CConnman& connman) const
// SYNC GOVERNANCE OBJECTS WITH OTHER CLIENT

LogPrint("gobject", "CGovernanceManager::%s -- syncing all objects to peer=%d\n", __func__, pnode->id);

LOCK2(cs_main, cs);

// all valid objects, no votes
for(object_m_cit it = mapObjects.begin(); it != mapObjects.end(); ++it) {
const CGovernanceObject& govobj = it->second;
std::string strHash = it->first.ToString();

LogPrint("gobject", "CGovernanceManager::%s -- attempting to sync govobj: %s, peer=%d\n", __func__, strHash, pnode->id);

if(govobj.IsSetCachedDelete() || govobj.IsSetExpired()) {
LogPrintf("CGovernanceManager::%s -- not syncing deleted/expired govobj: %s, peer=%d\n", __func__,
strHash, pnode->id);
continue;
}

// Push the inventory budget proposal message over to the other client
LogPrint("gobject", "CGovernanceManager::%s -- syncing govobj: %s, peer=%d\n", __func__, strHash, pnode->id);
pnode->PushInventory(CInv(MSG_GOVERNANCE_OBJECT, it->first));
++nObjCount;
}

{
LOCK2(cs_main, cs);

// all valid objects, no votes
for (object_m_cit it = mapObjects.begin(); it != mapObjects.end(); ++it) {
const CGovernanceObject& govobj = it->second;
std::string strHash = it->first.ToString();

LogPrint("gobject", "CGovernanceManager::%s -- attempting to sync govobj: %s, peer=%d\n", __func__, strHash, pnode->id);

if (govobj.IsSetCachedDelete() || govobj.IsSetExpired()) {
LogPrintf("CGovernanceManager::%s -- not syncing deleted/expired govobj: %s, peer=%d\n", __func__,
strHash, pnode->id);
continue;
}

// Push the inventory budget proposal message over to the other client
LogPrint("gobject", "CGovernanceManager::%s -- syncing govobj: %s, peer=%d\n", __func__, strHash, pnode->id);
pnode->PushInventory(CInv(MSG_GOVERNANCE_OBJECT, it->first));
++nObjCount;
}
}
CNetMsgMaker msgMaker(pnode->GetSendVersion());
connman.PushMessage(pnode, msgMaker.Make(NetMsgType::SYNCSTATUSCOUNT, MASTERNODE_SYNC_GOVOBJ, nObjCount));
connman.PushMessage(pnode, msgMaker.Make(NetMsgType::SYNCSTATUSCOUNT, MASTERNODE_SYNC_GOVOBJ_VOTE, nVoteCount));
Expand Down Expand Up @@ -1073,9 +1071,9 @@ int CGovernanceManager::RequestGovernanceObjectVotes(const std::vector<CNode*>&
// On mainnet nMaxObjRequestsPerNode is always set to 1.
int nMaxObjRequestsPerNode = 1;
size_t nProjectedVotes = 2000;
// if(Params().NetworkIDString() != CBaseChainParams::MAIN) {
if(Params().NetworkIDString() != CBaseChainParams::MAIN) {
nMaxObjRequestsPerNode = std::max(1, int(nProjectedVotes / std::max(1, mnodeman.size())));
/// }
}

{
LOCK2(cs_main, cs);
Expand Down
10 changes: 4 additions & 6 deletions src/graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using namespace boost;
using namespace std;
typedef typename std::vector<int> container;
extern CCriticalSection cs_main;
bool OrderBasedOnArrivalTime(std::vector<CTransactionRef>& blockVtx) {
std::vector<vector<unsigned char> > vvchArgs;
std::vector<vector<unsigned char> > vvchAliasArgs;
Expand Down Expand Up @@ -94,12 +93,11 @@ bool CreateGraphFromVTX(const std::vector<CTransactionRef>& blockVtx, Graph &gra
{
if (DecodeAssetAllocationTx(tx, op, vvchArgs))
{
{
LOCK(cs_main);
if (!FindAliasInTx(tx, vvchAliasArgs)) {
continue;
}

if (!FindAliasInTx(tx, vvchAliasArgs)) {
continue;
}

const string& sender = stringFromVch(vvchAliasArgs[0]);
AliasMap::const_iterator it = mapAliasIndex.find(sender);
if (it == mapAliasIndex.end()) {
Expand Down
118 changes: 68 additions & 50 deletions src/httpserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "sync.h"
#include "ui_interface.h"

#include <memory>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
Expand All @@ -25,9 +26,12 @@
#include <event2/http.h>
#include <event2/thread.h>
#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include <event2/util.h>
#include <event2/keyvalq_struct.h>

#include <support/events.h>

#ifdef EVENT__HAVE_NETINET_IN_H
#include <netinet/in.h>
#ifdef _XOPEN_SOURCE_EXTENDED
Expand Down Expand Up @@ -249,6 +253,16 @@ static std::string RequestMethodString(HTTPRequest::RequestMethod m)
/** HTTP request callback */
static void http_request_cb(struct evhttp_request* req, void* arg)
{
// Disable reading to work around a libevent bug, fixed in 2.2.0.
if (event_get_version_number() >= 0x02010600 && event_get_version_number() < 0x02020001) {
evhttp_connection* conn = evhttp_request_get_connection(req);
if (conn) {
bufferevent* bev = evhttp_connection_get_bufferevent(conn);
if (bev) {
bufferevent_disable(bev, EV_READ);
}
}
}
std::unique_ptr<HTTPRequest> hreq(new HTTPRequest(req));

LogPrint("http", "Received a %s request for %s from %s\n",
Expand Down Expand Up @@ -377,9 +391,6 @@ static void libevent_log_cb(int severity, const char *msg)

bool InitHTTPServer()
{
struct evhttp* http = 0;
struct event_base* base = 0;

if (!InitHTTPAllowList())
return false;

Expand All @@ -406,19 +417,16 @@ bool InitHTTPServer()
evthread_use_pthreads();
#endif

base = event_base_new(); // XXX RAII
if (!base) {
LogPrintf("Couldn't create an event_base: exiting\n");
return false;
}
raii_event_base base_ctr = obtain_event_base();

/* Create a new evhttp object to handle requests. */
raii_evhttp http_ctr = obtain_evhttp(base_ctr.get());
struct evhttp* http = http_ctr.get();
if (!http) {
LogPrintf("couldn't create evhttp. Exiting.\n");
return false;
}

/* Create a new evhttp object to handle requests. */
http = evhttp_new(base); // XXX RAII
if (!http) {
LogPrintf("couldn't create evhttp. Exiting.\n");
event_base_free(base);
return false;
}

evhttp_set_timeout(http, GetArg("-rpcservertimeout", DEFAULT_HTTP_SERVER_TIMEOUT));
evhttp_set_max_headers_size(http, MAX_HEADERS_SIZE);
Expand All @@ -427,8 +435,6 @@ bool InitHTTPServer()

if (!HTTPBindAddresses(http)) {
LogPrintf("Unable to bind any endpoint for RPC server\n");
evhttp_free(http);
event_base_free(base);
return false;
}

Expand All @@ -437,13 +443,15 @@ bool InitHTTPServer()
LogPrintf("HTTP: creating work queue of depth %d\n", workQueueDepth);

workQueue = new WorkQueue<HTTPClosure>(workQueueDepth);
eventBase = base;
eventHTTP = http;
// transfer ownership to eventBase/HTTP via .release()
eventBase = base_ctr.release();
eventHTTP = http_ctr.release();
return true;
}

std::thread threadHTTP;
std::future<bool> threadResult;
static std::vector<std::thread> g_thread_http_workers;

bool StartHTTPServer()
{
Expand All @@ -454,10 +462,9 @@ bool StartHTTPServer()
threadResult = task.get_future();
threadHTTP = std::thread(std::move(task), eventBase, eventHTTP);

for (int i = 0; i < rpcThreads; i++) {
std::thread rpc_worker(HTTPWorkQueueRun, workQueue);
rpc_worker.detach();
}
for (int i = 0; i < rpcThreads; i++) {
g_thread_http_workers.emplace_back(HTTPWorkQueueRun, workQueue);
}
return true;
}

Expand All @@ -479,16 +486,15 @@ void InterruptHTTPServer()
void StopHTTPServer()
{
LogPrint("http", "Stopping HTTP server\n");
if (workQueue) {
LogPrint("http", "Waiting for HTTP worker threads to exit\n");
#ifndef WIN32
// ToDo: Disabling WaitExit() for Windows platforms is an ugly workaround for the wallet not
// closing during a repair-restart. It doesn't hurt, though, because threadHTTP.timed_join
// below takes care of this and sends a loopbreak.
workQueue->WaitExit();
#endif
delete workQueue;
}
if (workQueue) {
LogPrint("http", "Waiting for HTTP worker threads to exit\n");
for (auto& thread : g_thread_http_workers) {
thread.join();
}
g_thread_http_workers.clear();
delete workQueue;
workQueue = nullptr;
}
if (eventBase) {
LogPrint("http", "Waiting for HTTP event thread to exit\n");
// Give event loop a few seconds to exit (to send back last RPC responses), then break it
Expand All @@ -505,11 +511,11 @@ void StopHTTPServer()
}
if (eventHTTP) {
evhttp_free(eventHTTP);
eventHTTP = 0;
eventHTTP = nullptr;
}
if (eventBase) {
event_base_free(eventBase);
eventBase = 0;
eventBase = nullptr;
}
LogPrint("http", "Stopped HTTP server\n");
}
Expand Down Expand Up @@ -596,24 +602,36 @@ void HTTPRequest::WriteHeader(const std::string& hdr, const std::string& value)
assert(headers);
evhttp_add_header(headers, hdr.c_str(), value.c_str());
}

/** Closure sent to main thread to request a reply to be sent to
* a HTTP request.
* Replies must be sent in the main loop in the main http thread,
* this cannot be done from worker threads.
*/
* a HTTP request.
* Replies must be sent in the main loop in the main http thread,
* this cannot be done from worker threads.
*/
void HTTPRequest::WriteReply(int nStatus, const std::string& strReply)
{
assert(!replySent && req);
// Send event to main http thread to send reply message
struct evbuffer* evb = evhttp_request_get_output_buffer(req);
assert(evb);
evbuffer_add(evb, strReply.data(), strReply.size());
HTTPEvent* ev = new HTTPEvent(eventBase, true,
std::bind(evhttp_send_reply, req, nStatus, (const char*)NULL, (struct evbuffer *)NULL));
ev->trigger(0);
replySent = true;
req = 0; // transferred back to main thread
assert(!replySent && req);
// Send event to main http thread to send reply message
struct evbuffer* evb = evhttp_request_get_output_buffer(req);
assert(evb);
evbuffer_add(evb, strReply.data(), strReply.size());
auto req_copy = req;
HTTPEvent* ev = new HTTPEvent(eventBase, true, [req_copy, nStatus] {
evhttp_send_reply(req_copy, nStatus, nullptr, nullptr);
// Re-enable reading from the socket. This is the second part of the libevent
// workaround above.
if (event_get_version_number() >= 0x02010600 && event_get_version_number() < 0x02020001) {
evhttp_connection* conn = evhttp_request_get_connection(req_copy);
if (conn) {
bufferevent* bev = evhttp_connection_get_bufferevent(conn);
if (bev) {
bufferevent_enable(bev, EV_READ | EV_WRITE);
}
}
}
});
ev->trigger(nullptr);
replySent = true;
req = nullptr; // transferred back to main thread
}

CService HTTPRequest::GetPeer()
Expand Down
Loading

0 comments on commit b0266d3

Please sign in to comment.