Skip to content

Commit

Permalink
rpc: applied PR bitcoin/bitcoin#12274, in attempt to fix crashes
Browse files Browse the repository at this point in the history
  • Loading branch information
Nico205 committed May 17, 2018
1 parent ec28170 commit 52d38c9
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 22 deletions.
178 changes: 156 additions & 22 deletions src/httpserver.cpp
Expand Up @@ -26,6 +26,7 @@
#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include <event2/util.h>
#include <event2/listener.h>
#include <event2/keyvalq_struct.h>

#include <support/events.h>
Expand All @@ -40,6 +41,55 @@
/** Maximum size of http request (request line + headers) */
static const size_t MAX_HEADERS_SIZE = 8192;

class ConnectionLimiter
{
public:
ConnectionLimiter(std::vector<evconnlistener*> listeners, unsigned int limit) : m_limit(limit), m_listeners(std::move(listeners))
{
assert(m_limit > 0);
}
void AddConnection(evutil_socket_t fd)
{
// Disable socket accepting if adding this connection puts us equal to the limit
if (!Interrupted() && m_sockets.insert(fd).second && m_sockets.size() == m_limit) {
LogPrint(BCLog::HTTP, "Suspending new connections");
for (const auto& listener : m_listeners) {
evconnlistener_disable(listener);
}
}
}
void RemoveConnection(evutil_socket_t fd)
{
// Re-enable socket accepting if removing this connection brings us
// back down under the limit
if (m_sockets.erase(fd) && m_sockets.size() + 1 == m_limit && !Interrupted()) {
LogPrint(BCLog::HTTP, "Resuming new connections\n");
for (const auto& listener : m_listeners) {
evconnlistener_enable(listener);
}
}
}
bool IsReady() const
{
return m_sockets.size() < m_limit && !Interrupted();
}
void Interrupt()
{
m_interrupted.store(true, std::memory_order_release);
}
private:

inline bool Interrupted() const
{
return m_interrupted.load(std::memory_order_acquire);
}

const unsigned int m_limit;
std::vector<evconnlistener*> m_listeners;
std::set<evutil_socket_t> m_sockets;
std::atomic<bool> m_interrupted{false};
};

/** HTTP request work item */
class HTTPWorkItem final : public HTTPClosure
{
Expand Down Expand Up @@ -208,33 +258,71 @@ static std::string RequestMethodString(HTTPRequest::RequestMethod m)
}
}

std::unique_ptr<ConnectionLimiter> g_limiter;

static void connection_close_cb(evhttp_connection* conn, void *arg)
{
ConnectionLimiter* limiter = static_cast<ConnectionLimiter*>(arg);
assert(limiter);
auto* bev = evhttp_connection_get_bufferevent(conn);
if (bev) {
evutil_socket_t fd = bufferevent_getfd(bev);
limiter->RemoveConnection(fd);
}
}

#if LIBEVENT_VERSION_NUMBER >= 0x02020001
static int http_newreq_cb(evhttp_request* req, void *arg)
{
/*
A return value of -1 here forces the connection to close immediately.
Otherwise, the connection's fd will be added to the limiter in the
normal request callback.
*/
ConnectionLimiter* limiter = static_cast<ConnectionLimiter*>(arg);
if (limiter && !limiter->IsReady()) {
return -1;
}
return 0;
}
#endif

/** HTTP request callback */
static void http_request_cb(struct evhttp_request* req, void* arg)
{
// Disable reading to work around a libevent bug, fixed in 2.2.0.
if (event_get_version_number() >= 0x02010600 && event_get_version_number() < 0x02020001) {
evhttp_connection* conn = evhttp_request_get_connection(req);
if (conn) {
bufferevent* bev = evhttp_connection_get_bufferevent(conn);
if (bev) {
bufferevent_disable(bev, EV_READ);
}
}
}
std::unique_ptr<HTTPRequest> hreq(new HTTPRequest(req));

LogPrint(MCLog::HTTP, "Received a %s request for %s from %s\n",
RequestMethodString(hreq->GetRequestMethod()), hreq->GetURI(), hreq->GetPeer().ToString());

bufferevent* bev = nullptr;
evhttp_connection* conn = evhttp_request_get_connection(req);
if (conn) {
bev = evhttp_connection_get_bufferevent(conn);
}
if (!bev) {
hreq->WriteHeader("Connection", "close");
hreq->WriteReplyImmediate(HTTP_INTERNAL, "Unknown error\n");
return;
}
ConnectionLimiter* limiter = static_cast<ConnectionLimiter*>(arg);
assert(limiter);
evhttp_connection_set_closecb(conn, connection_close_cb, limiter);
limiter->AddConnection(bufferevent_getfd(bev));
if (!limiter->IsReady()) {
hreq->WriteHeader("Connection", "close");
hreq->WriteReplyImmediate(HTTP_SERVUNAVAIL, "No connection slots available\n");
return;
}

// Early address-based allow check
if (!ClientAllowed(hreq->GetPeer())) {
hreq->WriteReply(HTTP_FORBIDDEN);
hreq->WriteReplyImmediate(HTTP_FORBIDDEN);
return;
}

// Early reject unknown HTTP methods
if (hreq->GetRequestMethod() == HTTPRequest::UNKNOWN) {
hreq->WriteReply(HTTP_BADMETHOD);
hreq->WriteReplyImmediate(HTTP_BADMETHOD);
return;
}

Expand All @@ -259,14 +347,19 @@ static void http_request_cb(struct evhttp_request* req, void* arg)
if (i != iend) {
std::unique_ptr<HTTPWorkItem> item(new HTTPWorkItem(std::move(hreq), path, i->handler));
assert(workQueue);
if (workQueue->Enqueue(item.get()))
if (workQueue->Enqueue(item.get())) {
// Disable reading to work around a libevent bug, fixed in 2.2.0.
if (event_get_version_number() >= 0x02010600 && event_get_version_number() < 0x02020001) {
bufferevent_disable(bev, EV_READ);
}
item.release(); /* if true, queue took ownership */
else {
} else {
LogPrintf("WARNING: request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting\n");
item->req->WriteReply(HTTP_INTERNAL, "Work queue depth exceeded");
item->req->WriteHeader("Connection", "close");
item->req->WriteReplyImmediate(HTTP_INTERNAL, "Work queue depth exceeded");
}
} else {
hreq->WriteReply(HTTP_NOTFOUND);
hreq->WriteReplyImmediate(HTTP_NOTFOUND);
}
}

Expand All @@ -289,7 +382,7 @@ static bool ThreadHTTP(struct event_base* base, struct evhttp* http)
}

/** Bind HTTP server to specified addresses */
static bool HTTPBindAddresses(struct evhttp* http)
static std::vector<evhttp_bound_socket*> HTTPBindAddresses(struct evhttp* http)
{
int defaultPort = gArgs.GetArg("-rpcport", BaseParams().RPCPort());
std::vector<std::pair<std::string, uint16_t> > endpoints;
Expand All @@ -313,17 +406,18 @@ static bool HTTPBindAddresses(struct evhttp* http)
endpoints.push_back(std::make_pair("0.0.0.0", defaultPort));
}

std::vector<evhttp_bound_socket*> bound_sockets;
// Bind addresses
for (std::vector<std::pair<std::string, uint16_t> >::iterator i = endpoints.begin(); i != endpoints.end(); ++i) {
LogPrint(MCLog::HTTP, "Binding RPC on address %s port %i\n", i->first, i->second);
evhttp_bound_socket *bind_handle = evhttp_bind_socket_with_handle(http, i->first.empty() ? nullptr : i->first.c_str(), i->second);
if (bind_handle) {
boundSockets.push_back(bind_handle);
bound_sockets.push_back(bind_handle);
} else {
LogPrintf("Binding RPC on address %s port %i failed.\n", i->first, i->second);
}
}
return !boundSockets.empty();
return bound_sockets;
}

/** Simple wrapper to set thread name and run work queue */
Expand Down Expand Up @@ -386,9 +480,9 @@ bool InitHTTPServer()
evhttp_set_timeout(http, gArgs.GetArg("-rpcservertimeout", DEFAULT_HTTP_SERVER_TIMEOUT));
evhttp_set_max_headers_size(http, MAX_HEADERS_SIZE);
evhttp_set_max_body_size(http, MAX_SIZE);
evhttp_set_gencb(http, http_request_cb, nullptr);

if (!HTTPBindAddresses(http)) {
boundSockets = HTTPBindAddresses(http);
if (boundSockets.empty()) {
LogPrintf("Unable to bind any endpoint for RPC server\n");
return false;
}
Expand All @@ -397,6 +491,26 @@ bool InitHTTPServer()
int workQueueDepth = std::max((long)gArgs.GetArg("-rpcworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L);
LogPrintf("HTTP: creating work queue of depth %d\n", workQueueDepth);

std::vector<evconnlistener*> listeners;
for (const auto& bind_handle : boundSockets) {
evconnlistener* listener = evhttp_bound_socket_get_listener(bind_handle);
evutil_socket_t sock = evhttp_bound_socket_get_fd(bind_handle);
SetListenSocketDeferred(sock);
listeners.push_back(listener);
}
g_limiter = MakeUnique<ConnectionLimiter>(std::move(listeners), workQueueDepth * 2);
evhttp_set_gencb(http, http_request_cb, g_limiter.get());

#if LIBEVENT_VERSION_NUMBER >= 0x02020001
/* If the runtime libevent is new enough to have evhttp_set_newreqcb, use
it. http_newreq_cb will be called for each new request, and allows us to
reject the request (which closes the connection) immediately.
*/
if (event_get_version_number() >= 0x02020001) {
evhttp_set_newreqcb(http, http_newreq_cb, g_limiter.get());
}
#endif

workQueue = new WorkQueue<HTTPClosure>(workQueueDepth);
// transfer ownership to eventBase/HTTP via .release()
eventBase = base_ctr.release();
Expand Down Expand Up @@ -442,6 +556,14 @@ void InterruptHTTPServer()
LogPrint(MCLog::HTTP, "Interrupting HTTP server\n");
if (eventHTTP) {
// Unlisten sockets
#if LIBEVENT_VERSION_NUMBER >= 0x02020001
if (event_get_version_number() >= 0x02020001) {
evhttp_set_newreqcb(http, nullptr, nullptr);
}
#endif
if (g_limiter) {
g_limiter->Interrupt();
}
for (evhttp_bound_socket *socket : boundSockets) {
evhttp_del_accept_socket(eventHTTP, socket);
}
Expand All @@ -464,6 +586,7 @@ void StopHTTPServer()
delete workQueue;
workQueue = nullptr;
}
g_limiter.reset();
if (eventBase) {
LogPrint(MCLog::HTTP, "Waiting for HTTP event thread to exit\n");
// Exit the event loop as soon as there are no active events.
Expand Down Expand Up @@ -606,6 +729,17 @@ void HTTPRequest::WriteReply(int nStatus, const std::string& strReply)
req = nullptr; // transferred back to main thread
}

void HTTPRequest::WriteReplyImmediate(int nStatus, const std::string& strReply)
{
assert(!replySent && req);
struct evbuffer* evb = evhttp_request_get_output_buffer(req);
assert(evb);
evbuffer_add(evb, strReply.data(), strReply.size());
evhttp_send_reply(req, nStatus, nullptr, nullptr);
replySent = true;
req = nullptr;
}

CService HTTPRequest::GetPeer()
{
evhttp_connection* con = evhttp_request_get_connection(req);
Expand Down
10 changes: 10 additions & 0 deletions src/httpserver.h
Expand Up @@ -114,6 +114,16 @@ class HTTPRequest
* main thread, do not call any other HTTPRequest methods after calling this.
*/
void WriteReply(int nStatus, const std::string& strReply = "");


/**
* Write HTTP reply from the callback thread
*
* @note Behavior is exactly the same as WriteReply, except that the send queue
* is bypassed. This should _only_ be called from inside the request
* callback, the from any other thread is undefined.
*/
void WriteReplyImmediate(int nStatus, const std::string& strReply = "");
};

/** Event handler closure.
Expand Down
10 changes: 10 additions & 0 deletions src/netbase.cpp
Expand Up @@ -727,3 +727,13 @@ void InterruptSocks5(bool interrupt)
{
interruptSocks5Recv = interrupt;
}

bool SetListenSocketDeferred(const SOCKET& sock)
{
bool ret = false;
#ifdef TCP_DEFER_ACCEPT
static constexpr int set = 1;
ret = setsockopt(sock, IPPROTO_TCP, TCP_DEFER_ACCEPT, &set, sizeof(set)) == 0;
#endif
return ret;
}
1 change: 1 addition & 0 deletions src/netbase.h
Expand Up @@ -62,6 +62,7 @@ bool CloseSocket(SOCKET& hSocket);
bool SetSocketNonBlocking(const SOCKET& hSocket, bool fNonBlocking);
/** Set the TCP_NODELAY flag on a socket */
bool SetSocketNoDelay(const SOCKET& hSocket);
bool SetListenSocketDeferred(const SOCKET& sock);
/**
* Convert milliseconds to a struct timeval for e.g. select.
*/
Expand Down

0 comments on commit 52d38c9

Please sign in to comment.