Permalink
Browse files

Expose setting for ClientSession timeout

See tcpHeartbeatTimeoutMilliseconds in sample.conf for docs.

This affects both clients and servers, so clients can now pass settings
in on the Client::Cluster constructor.
  • Loading branch information...
ongardie committed Jan 27, 2015
1 parent ac3aae0 commit 2c645dea85312e8476972ae17ed6d4671b7c1a24
@@ -433,14 +433,16 @@ Tree::getTreeDetails() const

////////// Cluster //////////

Cluster::Cluster(ForTesting t)
Cluster::Cluster(ForTesting t,
const std::map<std::string, std::string>& options)
: clientImpl(std::make_shared<MockClientImpl>())
{
clientImpl->init("-MOCK-");
}

Cluster::Cluster(const std::string& hosts)
: clientImpl(std::make_shared<ClientImpl>())
Cluster::Cluster(const std::string& hosts,
const std::map<std::string, std::string>& options)
: clientImpl(std::make_shared<ClientImpl>(options))
{
#if DEBUG // for testing purposes only
if (hosts == "-MOCK-SKIP-INIT-")
@@ -307,8 +307,9 @@ ClientImpl::ExactlyOnceRPCHelper::keepAliveThreadMain()

////////// class ClientImpl //////////

ClientImpl::ClientImpl()
: eventLoop()
ClientImpl::ClientImpl(const std::map<std::string, std::string>& options)
: config(options)
, eventLoop()
, sessionCreationBackoff(5, // 5 new connections per
100UL * 1000 * 1000) // 100 ms
, hosts()
@@ -342,7 +343,8 @@ ClientImpl::initDerived()
leaderRPC.reset(new LeaderRPC(
RPC::Address(hosts, Protocol::Common::DEFAULT_PORT),
eventLoop,
sessionCreationBackoff));
sessionCreationBackoff,
config));
}
if (rpcProtocolVersion == ~0U)
rpcProtocolVersion = negotiateRPCVersion();
@@ -423,7 +425,8 @@ ClientImpl::getServerStats(const std::string& host,
eventLoop,
address,
Protocol::Common::MAX_MESSAGE_LENGTH,
timeout);
timeout,
config);

Protocol::Client::GetServerStats::Request request;
RPC::ClientRPC rpc(session,
@@ -23,6 +23,7 @@
#include "Client/Backoff.h"
#include "Client/LeaderRPC.h"
#include "Core/ConditionVariable.h"
#include "Core/Config.h"
#include "Core/Mutex.h"
#include "Core/Time.h"
#include "Event/Loop.h"
@@ -54,7 +55,8 @@ class ClientImpl {
typedef LeaderRPC::TimePoint TimePoint;

/// Constructor.
ClientImpl();
explicit ClientImpl(const std::map<std::string, std::string>& options =
std::map<std::string, std::string>());
/// Destructor.
virtual ~ClientImpl();

@@ -147,6 +149,11 @@ class ClientImpl {
*/
uint32_t negotiateRPCVersion();

/**
* Options/settings.
*/
const Core::Config config;

/**
* The Event::Loop used to drive the underlying RPC mechanism.
*/
@@ -155,9 +155,11 @@ LeaderRPC::Call::wait(google::protobuf::Message& response,

LeaderRPC::LeaderRPC(const RPC::Address& hosts,
Event::Loop& eventLoop,
Backoff& sessionCreationBackoff)
Backoff& sessionCreationBackoff,
const Core::Config& config)
: eventLoop(eventLoop)
, sessionCreationBackoff(sessionCreationBackoff)
, config(config)
, mutex()
, hosts(hosts)
, leaderHint()
@@ -222,7 +224,8 @@ LeaderRPC::getSession(TimePoint timeout)
eventLoop,
address,
Protocol::Common::MAX_MESSAGE_LENGTH,
timeout);
timeout,
config);

return leaderSession;
}
@@ -28,6 +28,11 @@

namespace LogCabin {

// forward declaration
namespace Core {
class Config;
}

// forward declaration
namespace Event {
class Loop;
@@ -216,10 +221,13 @@ class LeaderRPC : public LeaderRPCBase {
* Used to invoke RPCs.
* \param sessionCreationBackoff
* Used to rate-limit new TCP connections.
* \param config
* Settings for the client library. This object keeps a reference.
*/
LeaderRPC(const RPC::Address& hosts,
Event::Loop& eventLoop,
Backoff& sessionCreationBackoff);
Backoff& sessionCreationBackoff,
const Core::Config& config);

/// Destructor.
~LeaderRPC();
@@ -306,6 +314,11 @@ class LeaderRPC : public LeaderRPCBase {
*/
Backoff& sessionCreationBackoff;

/**
* Settings for client library.
*/
const Core::Config& config;

/**
* Protects all of the following member variables in this class.
* Threads hang on to this mutex while initiating new sessions to possible
@@ -42,6 +42,7 @@ class ClientLeaderRPCTest : public ::testing::Test {
, service()
, server()
, eventLoopThread()
, config()
, leaderRPC()
, request()
, response()
@@ -57,7 +58,8 @@ class ClientLeaderRPCTest : public ::testing::Test {
service, 1);
leaderRPC.reset(new LeaderRPC(address,
eventLoop,
sessionCreationBackoff));
sessionCreationBackoff,
config));


request.mutable_read()->set_path("foo");
@@ -81,6 +83,7 @@ class ClientLeaderRPCTest : public ::testing::Test {
std::shared_ptr<RPC::ServiceMock> service;
std::unique_ptr<RPC::Server> server;
std::thread eventLoopThread;
Core::Config config;
std::unique_ptr<LeaderRPC> leaderRPC;
Protocol::Client::ReadOnlyTree::Request request;
Protocol::Client::ReadOnlyTree::Response response;
@@ -27,6 +27,7 @@
* It was subsequently modified:
*
* Copyright (c) 2012 Stanford University
* Copyright (c) 2015 Diego Ongaro
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
@@ -136,6 +137,14 @@ Config::Config(const string& delimiter,
{
}

Config::Config(const std::map<string, string>& options)
: delimiter("=")
, comment("#")
, contents(options)
{
}


void
Config::readFile(const string& filename)
{
@@ -27,6 +27,7 @@
* It was subsequently modified:
*
* Copyright (c) 2012 Stanford University
* Copyright (c) 2015 Diego Ongaro
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
@@ -90,6 +91,11 @@ class Config {
explicit Config(const string& delimiter = "=",
const string& comment = "#");

/**
* Construct a Config from the given map of options.
*/
explicit Config(const std::map<string, string>& options);

/**
* Load a Config from a file.
* This is a convenience wrapper around operator>>.
@@ -1,4 +1,5 @@
/* Copyright (c) 2012 Stanford University
* Copyright (c) 2015 Diego Ongaro
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
@@ -57,7 +58,7 @@ class CoreConfigTest : public ::testing::Test {
std::string tmpdir;
};

TEST_F(CoreConfigTest, constructor) {
TEST_F(CoreConfigTest, constructor_delim) {
Config config;
EXPECT_EQ("=", config.delimiter);
EXPECT_EQ("#", config.comment);
@@ -69,6 +70,16 @@ TEST_F(CoreConfigTest, constructor) {
EXPECT_TRUE(config2.contents.empty());
}

TEST_F(CoreConfigTest, constructor_withOptions) {
map<string, string> options = {
{"double", "3.14"},
{"int", "314"},
{"string", "a = b = c = d"}
};
Config config(options);
EXPECT_EQ(options, config.contents);
}

TEST_F(CoreConfigTest, readFile) {
Config config;
config.readFile(tmpdir + "/a");
@@ -74,7 +74,8 @@ class RPCClientRPCTest : public ::testing::Test {
EXPECT_EQ("", server.bind(address));
session = ClientSession::makeSession(eventLoop, address,
ProtocolCommon::MAX_MESSAGE_LENGTH,
TimePoint::max());
TimePoint::max(),
Core::Config());
payload.set_field_a(3);
payload.set_field_b(4);
}
@@ -92,11 +92,14 @@ class RPCClientServerTest : public ::testing::Test {
, server(rpcHandler, serverEventLoop, 1024)
, clientSession()
{
Core::Config config;
config.set("tcpHeartbeatTimeoutMilliseconds", 24);
address.refresh(RPC::Address::TimePoint::max());
EXPECT_EQ("", server.bind(address));
clientSession = RPC::ClientSession::makeSession(
clientEventLoop, address, 1024,
RPC::ClientSession::TimePoint::max());
RPC::ClientSession::TimePoint::max(),
config);
}
~RPCClientServerTest()
{
@@ -133,9 +136,9 @@ TEST_F(RPCClientServerTest, echo) {
}

// Test the RPC timeout (ping) mechanism.
// This test assumes TIMEOUT_MS is set to 100ms in ClientSession.
TEST_F(RPCClientServerTest, timeout) {
rpcHandler.delayMicros = 110 * 1000;
EXPECT_EQ(12U, clientSession->PING_TIMEOUT_MS);
rpcHandler.delayMicros = 14 * 1000;

// The server should not time out, since the serverEventLoopThread should
// respond to pings.
@@ -27,20 +27,6 @@
#include "Protocol/Common.h"
#include "RPC/ClientSession.h"

/**
* The number of milliseconds to wait until the client gets suspicious about
* the server not responding. After this amount of time elapses, the client
* will send a ping to the server. If no response is received within another
* TIMEOUT_MS milliseconds, the session is closed.
*
* TODO(ongaro): How should this value be chosen?
* Ideally, you probably want this to be set to something like the 99-th
* percentile of your RPC latency.
*
* TODO(ongaro): How does this interact with TCP?
*/
enum { TIMEOUT_MS = 100 };

namespace LogCabin {
namespace RPC {

@@ -101,9 +87,9 @@ ClientSession::MessageSocketHandler::handleReceivedMessage(
if (messageId == Protocol::Common::PING_MESSAGE_ID) {
if (session.numActiveRPCs > 0 && session.activePing) {
// The server has shown that it is alive for now.
// Let's get suspicious again in another TIMEOUT_MS.
// Let's get suspicious again in another PING_TIMEOUT_MS.
session.activePing = false;
session.timer.schedule(TIMEOUT_MS * 1000 * 1000);
session.timer.schedule(session.PING_TIMEOUT_MS * 1000 * 1000);
} else {
VERBOSE("Received an unexpected ping response. This can happen "
"for a number of reasons and is no cause for alarm. For "
@@ -138,7 +124,7 @@ ClientSession::MessageSocketHandler::handleReceivedMessage(
if (session.numActiveRPCs == 0)
session.timer.deschedule();
else
session.timer.schedule(TIMEOUT_MS * 1000 * 1000);
session.timer.schedule(session.PING_TIMEOUT_MS * 1000 * 1000);

// Fill in the response
response.status = Response::HAS_REPLY;
@@ -202,7 +188,7 @@ ClientSession::Timer::handleTimerEvent()
session.activePing = true;
session.messageSocket->sendMessage(Protocol::Common::PING_MESSAGE_ID,
Buffer());
schedule(TIMEOUT_MS * 1000 * 1000);
schedule(session.PING_TIMEOUT_MS * 1000 * 1000);
} else {
VERBOSE("ClientSession to %s timed out.",
session.address.toString().c_str());
@@ -230,8 +216,11 @@ std::function<
ClientSession::ClientSession(Event::Loop& eventLoop,
const Address& address,
uint32_t maxMessageLength,
TimePoint timeout)
TimePoint timeout,
const Core::Config& config)
: self() // makeSession will fill this in shortly
, PING_TIMEOUT_MS(config.read<uint64_t>(
"tcpHeartbeatTimeoutMilliseconds", 200) / 2)
, eventLoop(eventLoop)
, address(address)
, messageSocketHandler(*this)
@@ -339,10 +328,15 @@ std::shared_ptr<ClientSession>
ClientSession::makeSession(Event::Loop& eventLoop,
const Address& address,
uint32_t maxMessageLength,
TimePoint timeout)
TimePoint timeout,
const Core::Config& config)
{
std::shared_ptr<ClientSession> session(
new ClientSession(eventLoop, address, maxMessageLength, timeout));
new ClientSession(eventLoop,
address,
maxMessageLength,
timeout,
config));
session->self = session;
return session;
}
@@ -370,7 +364,7 @@ ClientSession::sendRequest(Buffer request)
if (numActiveRPCs == 1) {
// activePing's value was undefined while numActiveRPCs = 0
activePing = false;
timer.schedule(TIMEOUT_MS * 1000 * 1000);
timer.schedule(PING_TIMEOUT_MS * 1000 * 1000);
}
}
// Release the mutex before sending so that receives can be processed
Oops, something went wrong.

0 comments on commit 2c645de

Please sign in to comment.