Permalink
Browse files

proxyserver: add stats endpoint for monitoring

  • Loading branch information...
AmarOk1412 committed Aug 1, 2018
1 parent c794781 commit f54fcd768f98704eb8fe5b791110bec87d66402d
Showing with 87 additions and 15 deletions.
  1. +1 −0 include/opendht/callbacks.h
  2. +44 −1 include/opendht/dht_proxy_server.h
  3. +2 −4 src/callbacks.cpp
  4. +39 −9 src/dht_proxy_server.cpp
  5. +1 −1 tools/dhtnode.cpp
@@ -51,6 +51,7 @@ struct OPENDHT_PUBLIC NodeStats {
incoming_nodes {0};
unsigned table_depth {0};
unsigned getKnownNodes() const { return good_nodes + dubious_nodes; }
unsigned long getNetworkSizeEstimation() const { return 8 * std::exp2(table_depth); }
std::string toString() const;
#ifdef OPENDHT_JSONCPP
@@ -34,6 +34,10 @@
#include <mutex>
#include <restbed>
#ifdef OPENDHT_JSONCPP
#include <json/json.h>
#endif
namespace Json {
class Value;
}
@@ -73,15 +77,44 @@ class OPENDHT_PUBLIC DhtProxyServer
size_t pushListenersCount;
/** Average requests per second */
double requestRate;
/** Node Info **/
NodeInfo nodeInfo;
std::string toString() const {
std::ostringstream ss;
ss << "Listens: " << listenCount << " Puts: " << putCount << " PushListeners: " << pushListenersCount << std::endl;
ss << "Requests: " << requestRate << " per second." << std::endl;
auto& ni = nodeInfo;
auto& ipv4 = ni.ipv4;
if (ipv4.table_depth > 1) {
ss << "IPv4 Network estimation: " << ipv4.getNetworkSizeEstimation() << std::endl;;
}
auto& ipv6 = ni.ipv6;
if (ipv6.table_depth > 1) {
ss << "IPv6 Network estimation: " << ipv6.getNetworkSizeEstimation() << std::endl;;
}
return ss.str();
}
#ifdef OPENDHT_JSONCPP
/**
* Build a json object from a NodeStats
*/
Json::Value toJson() const {
Json::Value result;
result["listenCount"] = static_cast<Json::UInt64>(listenCount);
result["putCount"] = static_cast<Json::UInt64>(putCount);
result["pushListenersCount"] = static_cast<Json::UInt64>(pushListenersCount);
result["requestRate"] = requestRate;
result["nodeInfo"] = nodeInfo.toJson();
return result;
}
#endif
};
ServerStats getStats() const;
ServerStats stats() const { return stats_; }
void updateStats() const;
std::shared_ptr<DhtRunner> getNode() const { return dht_; }
@@ -100,6 +133,14 @@ class OPENDHT_PUBLIC DhtProxyServer
*/
void getNodeInfo(const std::shared_ptr<restbed::Session>& session) const;
/**
* Return ServerStats in JSON format
* Method: STATS "/"
* Result: HTTP 200, body: Node infos in JSON format
* @param session
*/
void getStats(const std::shared_ptr<restbed::Session>& session) const;
/**
* Return Values of an infoHash
* Method: GET "/{InfoHash: .*}"
@@ -262,6 +303,8 @@ class OPENDHT_PUBLIC DhtProxyServer
const std::string pushServer_;
mutable ServerStats stats_;
#if OPENDHT_PUSH_NOTIFICATIONS
struct Listener;
struct PushListener;
View
@@ -62,8 +62,7 @@ NodeStats::toString() const
ss << "Known nodes: " << good_nodes << " good, " << dubious_nodes << " dubious, " << incoming_nodes << " incoming." << std::endl;
if (table_depth > 1) {
ss << "Routing table depth: " << table_depth << std::endl;
unsigned long tot_nodes = 8 * std::exp2(table_depth);
ss << "Network size estimation: " << tot_nodes << " nodes" << std::endl;
ss << "Network size estimation: " << getNetworkSizeEstimation() << " nodes" << std::endl;
}
return ss.str();
}
@@ -81,8 +80,7 @@ NodeStats::toJson() const
val["incoming"] = static_cast<Json::LargestUInt>(incoming_nodes);
if (table_depth > 1) {
val["table_depth"] = static_cast<Json::LargestUInt>(table_depth);
unsigned long tot_nodes = 8 * std::exp2(table_depth);
val["network_size_estimation"] = static_cast<Json::LargestUInt>(tot_nodes);
val["network_size_estimation"] = static_cast<Json::LargestUInt>(getNetworkSizeEstimation());
}
return val;
}
View
@@ -70,6 +70,7 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port ,
auto resource = std::make_shared<restbed::Resource>();
resource->set_path("/");
resource->set_method_handler("GET", std::bind(&DhtProxyServer::getNodeInfo, this, _1));
resource->set_method_handler("STATS", std::bind(&DhtProxyServer::getStats, this, _1));
service_->publish(resource);
resource = std::make_shared<restbed::Resource>();
resource->set_path("/{hash: .*}");
@@ -137,7 +138,7 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port ,
printStatsJob_ = scheduler_.add(scheduler_.time() + PRINT_STATS_PERIOD, [this] {
if (stopListeners) return;
if (service_->is_up())
std::cout << getStats().toString() << std::endl;
updateStats();
// Refresh stats cache
auto newInfo = dht_->getNodeInfo();
{
@@ -178,21 +179,20 @@ DhtProxyServer::stop()
server_thread.join();
}
DhtProxyServer::ServerStats
DhtProxyServer::getStats() const
void
DhtProxyServer::updateStats() const
{
ServerStats ret {};
auto now = clock::now();
auto last = lastStatsReset_.exchange(now);
auto count = requestNum_.exchange(0);
auto dt = std::chrono::duration<double>(now - last);
ret.requestRate = count / dt.count();
stats_.requestRate = count / dt.count();
#if OPENDHT_PUSH_NOTIFICATIONS
ret.pushListenersCount = pushListeners_.size();
stats_.pushListenersCount = pushListeners_.size();
#endif
ret.putCount = puts_.size();
ret.listenCount = currentListeners_.size();
return ret;
stats_.putCount = puts_.size();
stats_.listenCount = currentListeners_.size();
stats_.nodeInfo = nodeInfo_;
}
void
@@ -234,6 +234,36 @@ DhtProxyServer::getNodeInfo(const std::shared_ptr<restbed::Session>& session) co
);
}
void
DhtProxyServer::getStats(const std::shared_ptr<restbed::Session>& session) const
{
requestNum_++;
const auto request = session->get_request();
int content_length = std::stoi(request->get_header("Content-Length", "0"));
session->fetch(content_length,
[this](const std::shared_ptr<restbed::Session> s, const restbed::Bytes& /*b*/) mutable
{
try {
if (dht_) {
#ifdef OPENDHT_JSONCPP
Json::StreamWriterBuilder wbuilder;
wbuilder["commentStyle"] = "None";
wbuilder["indentation"] = "";
auto output = Json::writeString(wbuilder, stats_.toJson()) + "\n";
s->close(restbed::OK, output);
#else
s->close(restbed::NotFound, "{\"err\":\"JSON not enabled on this instance\"}");
#endif
}
else
s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}");
} catch (...) {
s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}");
}
}
);
}
void
DhtProxyServer::get(const std::shared_ptr<restbed::Session>& session) const
{
View
@@ -144,7 +144,7 @@ void cmd_loop(std::shared_ptr<DhtRunner>& dht, dht_params& params
#if OPENDHT_PROXY_SERVER
for (const auto& proxy : proxies) {
std::cout << "Stats for proxy on port " << proxy.first << std::endl;
std::cout << " " << proxy.second->getStats().toString() << std::endl;
std::cout << " " << proxy.second->stats().toString() << std::endl;
}
#endif
continue;

0 comments on commit f54fcd7

Please sign in to comment.