Skip to content
Permalink
Browse files

dht: allow to configure rate limiter

  • Loading branch information...
aberaud committed Nov 7, 2019
1 parent f5a6801 commit cb7d7f6614d68b84aa72b76931af3a50a4cc4e79
@@ -106,6 +106,12 @@ struct OPENDHT_PUBLIC Config {

/** If set, the dht will load its state from this file on start and save its state in this file on shutdown */
std::string persist_path {};

/** If non-0, overrides the default global rate-limit.-1 means no limit. */
ssize_t max_req_per_sec {0};

/** If non-0, overrides the default per-IP address rate-limit. -1 means no limit. */
ssize_t max_peer_req_per_sec {0};
};

/**
@@ -48,6 +48,12 @@ struct TransId;
#define MSG_CONFIRM 0
#endif

struct NetworkConfig {
NetId network {0};
ssize_t max_req_per_sec {0};
ssize_t max_peer_req_per_sec {0};
};

class DhtProtocolException : public DhtException {
public:
// sent to another peer (http-like).
@@ -207,7 +213,12 @@ class NetworkEngine final
using RequestExpiredCb = std::function<void(const Request&, bool)>;

NetworkEngine(Logger& log, Scheduler& scheduler, std::unique_ptr<DatagramSocket>&& sock);
NetworkEngine(InfoHash& myid, NetId net, std::unique_ptr<DatagramSocket>&& sock, Logger& log, Scheduler& scheduler,
NetworkEngine(
InfoHash& myid,
NetworkConfig config,
std::unique_ptr<DatagramSocket>&& sock,
Logger& log,
Scheduler& scheduler,
decltype(NetworkEngine::onError)&& onError,
decltype(NetworkEngine::onNewNode)&& onNewNode,
decltype(NetworkEngine::onReportedAddr)&& onReportedAddr,
@@ -425,7 +436,6 @@ class NetworkEngine final
/***************
* Constants *
***************/
static constexpr size_t MAX_REQUESTS_PER_SEC {1600};
/* the length of a node info buffer in ipv4 format */
static const constexpr size_t NODE4_INFO_BUF_LEN {HASH_LEN + sizeof(in_addr) + sizeof(in_port_t)};
/* the length of a node info buffer in ipv6 format */
@@ -513,17 +523,17 @@ class NetworkEngine final

/* DHT info */
const InfoHash& myid;
const NetId network {0};
const NetworkConfig config {};
const std::unique_ptr<DatagramSocket> dht_socket;
const Logger& DHT_LOG;

NodeCache cache {};

// global limiting should be triggered by at least 8 different IPs
using IpLimiter = RateLimiter<MAX_REQUESTS_PER_SEC/8>;
using IpLimiter = RateLimiter;
using IpLimiterMap = std::map<SockAddr, IpLimiter, SockAddr::ipCmp>;
IpLimiterMap address_rate_limiter {};
RateLimiter<MAX_REQUESTS_PER_SEC> rate_limiter {};
IpLimiterMap address_rate_limiter;
RateLimiter rate_limiter;
size_t limiter_maintenance {0};

// requests handling
@@ -23,27 +23,35 @@

namespace dht {

template<size_t Quota, unsigned long Period=1>
class RateLimiter {
public:
RateLimiter() = delete;
RateLimiter(size_t quota) : quota_(quota) {}

/** Clear outdated records and return current quota usage */
size_t maintain(const time_point& now) {
auto limit = now - std::chrono::seconds(Period);
auto limit = now - std::chrono::seconds(1);
while (not records.empty() and records.front() < limit)
records.pop();
return records.size();
}
/** Return false if quota is reached, insert record and return true otherwise. */
bool limit(const time_point& now) {
if (maintain(now) >= Quota)
if (quota_ == (size_t)-1)
return true;
if (maintain(now) >= quota_)
return false;
records.emplace(now);
return true;
}
bool empty() const {
return records.empty();
}
size_t quota() const {
return quota_;
}
private:
const size_t quota_;
std::queue<time_point> records {};
};

@@ -458,6 +458,9 @@ cdef class DhtConfig(object):
self._config.dht_config.node_config.network = netid
def setMaintainStorage(self, bool maintain_storage):
self._config.dht_config.node_config.maintain_storage = maintain_storage
def setRateLimit(self, ssize_t max_req_per_sec, ssize_t max_peer_req_per_sec):
self._config.dht_config.node_config.max_req_per_sec = max_req_per_sec
self._config.dht_config.node_config.max_peer_req_per_sec = max_peer_req_per_sec

cdef class DhtRunner(_WithID):
cdef cpp.shared_ptr[cpp.DhtRunner] thisptr
@@ -211,6 +211,9 @@ cdef extern from "opendht/callbacks.h" namespace "dht":
uint32_t network
bool is_bootstrap
bool maintain_storage
string persist_path
size_t max_req_per_sec
size_t max_peer_req_per_sec
cppclass SecureDhtConfig:
Config node_config
Identity id
@@ -19,13 +19,16 @@
import time
import asyncio

config = dht.DhtConfig()
config.setRateLimit(-1, -1)

ping_node = dht.DhtRunner()
ping_node.run()
ping_node.run(config=config)
#ping_node.enableLogging()
#ping_node.bootstrap("bootstrap.ring.cx", "4222")

pong_node = dht.DhtRunner()
pong_node.run()
pong_node.run(config=config)
#pong_node.enableLogging()
pong_node.ping(ping_node.getBound())

@@ -42,7 +45,6 @@ def done(h, ok):

def ping(node, h):
global i
time.sleep(0.0075)
i += 1
if i < MAX:
node.put(h, dht.Value(b"hey"), lambda ok, nodes: done(node.getNodeId().decode(), ok))
@@ -39,6 +39,7 @@ constexpr std::chrono::minutes Dht::MAX_STORAGE_MAINTENANCE_EXPIRE_TIME;
constexpr std::chrono::minutes Dht::SEARCH_EXPIRE_TIME;
constexpr std::chrono::seconds Dht::LISTEN_EXPIRE_TIME;
constexpr std::chrono::seconds Dht::REANNOUNCE_MARGIN;
static constexpr size_t MAX_REQUESTS_PER_SEC {1600};

NodeStatus
Dht::getStatus(sa_family_t af) const
@@ -1688,11 +1689,21 @@ Dht::~Dht()
s.second->clear();
}

net::NetworkConfig
fromDhtConfig(const Config& config)
{
net::NetworkConfig netConf;
netConf.network = config.network;
netConf.max_req_per_sec = config.max_req_per_sec ? config.max_req_per_sec : MAX_REQUESTS_PER_SEC;
netConf.max_peer_req_per_sec = config.max_peer_req_per_sec ? config.max_peer_req_per_sec : MAX_REQUESTS_PER_SEC/8;
return netConf;
}

Dht::Dht() : store(), network_engine(DHT_LOG, scheduler, {}) {}

Dht::Dht(std::unique_ptr<net::DatagramSocket>&& sock, const Config& config, const Logger& l)
: DhtInterface(l), myid(config.node_id ? config.node_id : InfoHash::getRandom()), store(), store_quota(),
network_engine(myid, config.network, std::move(sock), DHT_LOG, scheduler,
network_engine(myid, fromDhtConfig(config), std::move(sock), DHT_LOG, scheduler,
std::bind(&Dht::onError, this, _1, _2),
std::bind(&Dht::onNewNode, this, _1, _2),
std::bind(&Dht::onReportedAddr, this, _1, _2),

0 comments on commit cb7d7f6

Please sign in to comment.
You can’t perform that action at this time.