Permalink
Browse files

Limit rate of connections in Client/LeaderRPC

The current limit is 5 connections every 100 ms: enough to allow clients
to find the cluster leader without delay in most cases, yet keep CPU
usage and other waste low.

Close #53: Client/LeaderRPC needs rate limiting
  • Loading branch information...
ongardie committed Dec 8, 2014
1 parent 0927631 commit 37a53ec9287528f20dabebfa19ffde2c42f9f037
Showing with 42 additions and 4 deletions.
  1. +17 −1 Client/LeaderRPC.cc
  2. +25 −3 Client/LeaderRPC.h
@@ -1,4 +1,5 @@
/* Copyright (c) 2012 Stanford University
* Copyright (c) 2014 Diego Ongaro
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
@@ -17,6 +18,7 @@

#include "Client/LeaderRPC.h"
#include "Core/Debug.h"
#include "Core/Time.h"
#include "Protocol/Common.h"
#include "RPC/ClientSession.h"
#include "RPC/ClientRPC.h"
@@ -25,13 +27,18 @@ namespace LogCabin {
namespace Client {

LeaderRPC::LeaderRPC(const RPC::Address& hosts)
: hosts(hosts)
: windowCount(5)
, windowNanos(1000 * 1000 * 100)
, hosts(hosts)
, eventLoop()
, eventLoopThread(&Event::Loop::runForever, &eventLoop)
, mutex()
, leaderSession() // set by connect()
, lastConnectTimes()
{
std::unique_lock<std::mutex> lockGuard(mutex);
for (uint64_t i = 0; i < windowCount; ++i)
lastConnectTimes.push_back(0);
connect(hosts, lockGuard);
}

@@ -123,6 +130,15 @@ void
LeaderRPC::connect(const RPC::Address& address,
std::unique_lock<std::mutex>& lockGuard)
{
uint64_t nowNanos = Core::Time::getTimeNanos();
if (lastConnectTimes.front() > nowNanos - windowNanos) {
usleep(unsigned(
std::min(lastConnectTimes.front() + windowNanos - nowNanos,
windowNanos) / 1000));
nowNanos = Core::Time::getTimeNanos();
}
lastConnectTimes.pop_front();
lastConnectTimes.push_back(nowNanos);
leaderSession = RPC::ClientSession::makeSession(
eventLoop,
address,
@@ -1,4 +1,5 @@
/* Copyright (c) 2012 Stanford University
* Copyright (c) 2014 Diego Ongaro
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
@@ -14,6 +15,7 @@
*/

#include <cinttypes>
#include <deque>
#include <memory>
#include <mutex>
#include <thread>
@@ -146,6 +148,18 @@ class LeaderRPC : public LeaderRPCBase {
connectHost(const std::string& host,
std::shared_ptr<RPC::ClientSession> cachedSession);

/**
* As a backoff mechanism, at most #windowCount connections are allowed in
* any #windowNanos period of time.
*/
const uint64_t windowCount;

/**
* As a backoff mechanism, at most #windowCount connections are allowed in
* any #windowNanos period of time.
*/
const uint64_t windowNanos;

/**
* An address referring to the hosts in the LogCabin cluster. A random host
* is selected from here when this class doesn't know who the cluster
@@ -164,9 +178,9 @@ class LeaderRPC : public LeaderRPCBase {
std::thread eventLoopThread;

/**
* Protects #leaderSession. Threads hang on to this mutex while initiating
* new sessions to possible cluster leaders, in case other threads are
* already handling the problem.
* Protects #leaderSession and #lastConnectTimes.
* Threads hang on to this mutex while initiating new sessions to possible
* cluster leaders, in case other threads are already handling the problem.
*/
std::mutex mutex;

@@ -175,6 +189,14 @@ class LeaderRPC : public LeaderRPCBase {
* This is never null, but it might sometimes point to the wrong host.
*/
std::shared_ptr<RPC::ClientSession> leaderSession;

/**
* The time in nanoseconds since the Unix epoch when the last #windowCount
* connections were initiated. If fewer than #windowCount connections have
* been initiated, this is padded with zeros. The first time is the
* oldest, and the last is the most recent.
*/
std::deque<uint64_t> lastConnectTimes;
};

} // namespace LogCabin::Client

0 comments on commit 37a53ec

Please sign in to comment.