Skip to content

Commit

Permalink
Add mechanism for clients to close their sessions cleanly on shutdown
Browse files Browse the repository at this point in the history
From the release notes:
Clients now make a best effort attempt to close their sessions when they
shut down gracefully (issue logcabin#116). Before, client sessions were only
ever expired after a timeout. This state could accumulate quickly when
running short-lived clients in a tight loop. Enabling this change
requires all servers to be updated (so that the state machine is
updated); new clients talking to old clusters will issue a warning that
they are unable to close their sessions.

Close logcabin#116: client library should close session on exit
  • Loading branch information
ongardie authored and Nate Hardt committed Aug 20, 2015
1 parent 02acb96 commit 88b9583
Show file tree
Hide file tree
Showing 11 changed files with 310 additions and 48 deletions.
46 changes: 41 additions & 5 deletions Client/ClientImpl.cc
Expand Up @@ -221,7 +221,13 @@ ClientImpl::ExactlyOnceRPCHelper::ExactlyOnceRPCHelper(ClientImpl* client)
, exiting(false)
, lastKeepAliveStart(TimePoint::min())
// TODO(ongaro): set dynamically based on cluster configuration
, keepAliveIntervalMs(60 * 1000)
, keepAliveInterval(std::chrono::milliseconds(60 * 1000))
, sessionCloseTimeout(std::chrono::milliseconds(
client->config.read<uint64_t>(
"sessionCloseTimeoutMilliseconds",
client->config.read<uint64_t>(
"tcpConnectTimeoutMilliseconds",
1000))))
, keepAliveCall()
, keepAliveThread()
{
Expand All @@ -240,6 +246,37 @@ ClientImpl::ExactlyOnceRPCHelper::exit()
keepAliveCV.notify_all();
if (keepAliveCall)
keepAliveCall->cancel();
if (clientId > 0) {
Protocol::Client::StateMachineCommand::Request request;
Protocol::Client::StateMachineCommand::Response response;
request.mutable_close_session()->set_client_id(clientId);
LeaderRPC::Status status = client->leaderRPC->call(
OpCode::STATE_MACHINE_COMMAND,
request,
response,
ClientImpl::Clock::now() + sessionCloseTimeout);
switch (status) {
case LeaderRPC::Status::OK:
break;
case LeaderRPC::Status::TIMEOUT:
using Core::StringUtil::toString;
WARNING("Could not definitively close client session %lu "
"within timeout (%s). It may remain open until it "
"expires.",
clientId,
toString(sessionCloseTimeout).c_str());
break;
case LeaderRPC::Status::INVALID_REQUEST:
WARNING("The server and/or replicated state machine "
"doesn't support the CloseSession command or "
"claims the request is malformed. This client's "
"session (%lu) will remain open until it expires. "
"Consider upgrading your servers (this command "
"was introduced in state machine version 2).",
clientId);
break;
}
}
}
if (keepAliveThread.joinable())
keepAliveThread.join();
Expand Down Expand Up @@ -324,9 +361,8 @@ ClientImpl::ExactlyOnceRPCHelper::keepAliveThreadMain()
std::unique_lock<Core::Mutex> lockGuard(mutex);
while (!exiting) {
TimePoint nextKeepAlive;
if (keepAliveIntervalMs > 0) {
nextKeepAlive = (lastKeepAliveStart +
std::chrono::milliseconds(keepAliveIntervalMs));
if (keepAliveInterval.count() > 0) {
nextKeepAlive = lastKeepAliveStart + keepAliveInterval;
} else {
nextKeepAlive = TimePoint::max();
}
Expand Down Expand Up @@ -408,10 +444,10 @@ ClientImpl::ClientImpl(const std::map<std::string, std::string>& options)

ClientImpl::~ClientImpl()
{
exactlyOnceRPCHelper.exit();
eventLoop.exit();
if (eventLoopThread.joinable())
eventLoopThread.join();
exactlyOnceRPCHelper.exit();
}

void
Expand Down
8 changes: 6 additions & 2 deletions Client/ClientImpl.h
Expand Up @@ -286,9 +286,13 @@ class ClientImpl {
TimePoint lastKeepAliveStart;
/**
* How often session keep-alive requests are sent during periods of
* inactivity, in milliseconds.
* inactivity.
*/
uint64_t keepAliveIntervalMs;
std::chrono::milliseconds keepAliveInterval;
/**
* How long to wait for the CloseSession RPC before giving up.
*/
std::chrono::milliseconds sessionCloseTimeout;

/**
* If set, this is an ongoing keep-alive RPC. This call is canceled to
Expand Down
88 changes: 84 additions & 4 deletions Client/ClientImplTest.cc
Expand Up @@ -20,10 +20,12 @@
#include "Client/LeaderRPCMock.h"
#include "Core/ProtoBuf.h"
#include "Core/StringUtil.h"
#include "Core/Time.h"
#include "Protocol/Common.h"
#include "RPC/Server.h"
#include "RPC/ServiceMock.h"
#include "build/Protocol/Client.pb.h"
#include "include/LogCabin/Debug.h"

// Most of the tests for ClientImpl are in ClientTest.cc.

Expand All @@ -32,6 +34,7 @@ namespace {

using Core::ProtoBuf::fromString;
typedef Client::ClientImpl::TimePoint TimePoint;
using std::chrono::milliseconds;

class ClientClientImplExactlyOnceTest : public ::testing::Test {
public:
Expand All @@ -52,6 +55,15 @@ class ClientClientImplExactlyOnceTest : public ::testing::Test {
rpcInfo1 = client.exactlyOnceRPCHelper.getRPCInfo(TimePoint::max());
rpcInfo2 = client.exactlyOnceRPCHelper.getRPCInfo(TimePoint::max());
}

~ClientClientImplExactlyOnceTest()
{
mockRPC->expect(OpCode::STATE_MACHINE_COMMAND,
fromString<Protocol::Client::StateMachineCommand::Response>(
"close_session { }"));
}


Client::ClientImpl client;
Client::LeaderRPCMock* mockRPC;
RPCInfo rpcInfo1;
Expand All @@ -63,6 +75,35 @@ class ClientClientImplExactlyOnceTest : public ::testing::Test {
operator=(const ClientClientImplExactlyOnceTest&) = delete;
};

TEST_F(ClientClientImplExactlyOnceTest, exit_readonly) {
Client::ClientImpl client2;
Client::LeaderRPCMock* mockRPC2 = new Client::LeaderRPCMock();
client2.leaderRPC = std::unique_ptr<Client::LeaderRPCBase>(mockRPC2);
EXPECT_EQ(0U, mockRPC2->requestLog.size());
}

TEST_F(ClientClientImplExactlyOnceTest, exit_normal) {
mockRPC->expect(OpCode::STATE_MACHINE_COMMAND,
fromString<Protocol::Client::StateMachineCommand::Response>(
"close_session { }"));
client.exactlyOnceRPCHelper.exit();
mockRPC->popRequest();
EXPECT_EQ("close_session { client_id: 3 }", *mockRPC->popRequest());
}

TEST_F(ClientClientImplExactlyOnceTest, exit_timeout) {
// expect warnings
LogCabin::Core::Debug::setLogPolicy({
{"Client/ClientImpl.cc", "ERROR"}
});
client.exactlyOnceRPCHelper.sessionCloseTimeout =
std::chrono::milliseconds(-1);
client.exactlyOnceRPCHelper.exit();
EXPECT_EQ(1U, mockRPC->requestLog.size());
}

// exit with a server that doesn't understand CloseSession is tested as
// ClientClientImplServiceMockTest::exactlyOnceRPCInfo_exit_invalidRequest.

TEST_F(ClientClientImplExactlyOnceTest, getRPCInfo) {
EXPECT_EQ((std::set<uint64_t>{1, 2}),
Expand Down Expand Up @@ -92,6 +133,10 @@ TEST_F(ClientClientImplExactlyOnceTest, getRPCInfo_timeout) {
rpcInfo2 = client2.exactlyOnceRPCHelper.getRPCInfo(TimePoint::max());
EXPECT_EQ(4U, client2.exactlyOnceRPCHelper.clientId);
EXPECT_EQ(4U, rpcInfo2.client_id());

mockRPC2->expect(OpCode::STATE_MACHINE_COMMAND,
fromString<Protocol::Client::StateMachineCommand::Response>(
"close_session { }"));
}

TEST_F(ClientClientImplExactlyOnceTest, doneWithRPC) {
Expand Down Expand Up @@ -125,21 +170,21 @@ TEST_F(ClientClientImplExactlyOnceTest, keepAliveThreadMain_TimingSensitive) {
" error: 'err' "
"}"));
}
client.exactlyOnceRPCHelper.keepAliveIntervalMs = 2;
client.exactlyOnceRPCHelper.keepAliveInterval = milliseconds(2);
client.exactlyOnceRPCHelper.keepAliveCV.notify_all();
// in 2ms, 4ms, 6ms, 8ms, 10ms
usleep(11000);
EXPECT_EQ(6U, mockRPC->requestLog.size()) << disclaimer;

// Disable heartbeat.
client.exactlyOnceRPCHelper.keepAliveIntervalMs = 0;
client.exactlyOnceRPCHelper.keepAliveInterval = milliseconds(0);
client.exactlyOnceRPCHelper.keepAliveCV.notify_all();
usleep(3000);
EXPECT_EQ(6U, mockRPC->requestLog.size()) << disclaimer;

// Now enable but "make a request" ourselves to prevent heartbeat.
client.exactlyOnceRPCHelper.getRPCInfo(TimePoint::max());
client.exactlyOnceRPCHelper.keepAliveIntervalMs = 10;
client.exactlyOnceRPCHelper.keepAliveInterval = milliseconds(10);
client.exactlyOnceRPCHelper.keepAliveCV.notify_all();
usleep(7500);
client.exactlyOnceRPCHelper.getRPCInfo(TimePoint::max());
Expand Down Expand Up @@ -183,6 +228,34 @@ class ClientClientImplServiceMockTest : public ClientClientImplTest {
std::unique_ptr<RPC::Server> server;
};

TEST_F(ClientClientImplServiceMockTest, exactlyOnceRPCInfo_exit_invalidRequest)
{
Protocol::Client::StateMachineCommand::Request request1;
Protocol::Client::StateMachineCommand::Response response1;
request1.mutable_open_session();
response1.mutable_open_session()->set_client_id(3);
service->reply(Protocol::Client::OpCode::STATE_MACHINE_COMMAND,
request1, response1);
client.exactlyOnceRPCHelper.getRPCInfo(TimePoint::max());

Protocol::Client::StateMachineCommand::Request request2;
request2.mutable_close_session()->set_client_id(3);
service->rejectInvalidRequest(
Protocol::Client::OpCode::STATE_MACHINE_COMMAND,
request2);
// expect warning
LogCabin::Core::Debug::setLogPolicy({
{"Client/ClientImpl.cc", "ERROR"}
});
client.exactlyOnceRPCHelper.exit();

LogCabin::Core::Debug::setLogPolicy({
{"", "WARNING"}
});
// prevent destructor from calling CloseSession again
client.exactlyOnceRPCHelper.clientId = 0;
}

TEST_F(ClientClientImplServiceMockTest, getServerInfo) {
Protocol::Client::GetServerInfo::Request request;
Protocol::Client::GetServerInfo::Response response;
Expand Down Expand Up @@ -265,6 +338,9 @@ TEST_F(ClientClientImplTest, makeDirectory_timeout) {
TimePoint::min());
EXPECT_EQ(Client::Status::TIMEOUT, result.status);
EXPECT_EQ("Client-specified timeout elapsed", result.error);
// set client ID to 0 so that the client doesn't try to close its session,
// wait the timeout there, and print a warning.
client.exactlyOnceRPCHelper.clientId = 0;
}

TEST_F(ClientClientImplTest, listDirectory_timeout) {
Expand Down Expand Up @@ -301,13 +377,17 @@ class KeepAliveThreadMain_cancel_Helper {


TEST_F(ClientClientImplExactlyOnceTest, keepAliveThreadMain_cancel) {
mockRPC->expect(OpCode::STATE_MACHINE_COMMAND,
fromString<Protocol::Client::StateMachineCommand::Response>(
"close_session { }"));
client.exactlyOnceRPCHelper.exit();
client.exactlyOnceRPCHelper.exiting = false;
mockRPC->expect(OpCode::STATE_MACHINE_COMMAND,
fromString<Protocol::Client::StateMachineCommand::Response>(
"tree { }"));
client.exactlyOnceRPCHelper.lastKeepAliveStart = TimePoint::min();
client.exactlyOnceRPCHelper.keepAliveIntervalMs = 200;
client.exactlyOnceRPCHelper.keepAliveInterval =
std::chrono::milliseconds(200);
KeepAliveThreadMain_cancel_Helper helper(client.exactlyOnceRPCHelper);
client.exactlyOnceRPCHelper.mutex.callback = std::ref(helper);
client.exactlyOnceRPCHelper.keepAliveThreadMain();
Expand Down
2 changes: 2 additions & 0 deletions Client/MockClientImpl.cc
Expand Up @@ -97,6 +97,8 @@ class TreeLeaderRPC : public LeaderRPCBase {
cresponse.mutable_open_session()->
set_client_id(1);
return Status::OK;
} else if (crequest.has_close_session()) {
return Status::OK;
}
}
PANIC("Unexpected request: %d %s",
Expand Down
25 changes: 24 additions & 1 deletion Protocol/Client.proto
Expand Up @@ -135,6 +135,26 @@ message OpenSession {
}
}

/**
* CloseSession state machine command: Terminate a session with the cluster.
* This cleans up any state that the session held; future RPCs on this session
* will have errors.
* \since
* This command was introduced in state machine version 2. State machines
* running version 1 will ignore this command; they only expire sessions
* based on timeouts.
*/
message CloseSession {
message Request {
/**
* The ID assigned to the client as previously returned by OpenSession.
*/
required uint64 client_id = 1;
}
message Response {
}
}

/**
* A server in a configuration. Used in the GetConfiguration and
* SetConfiguration RPCs.
Expand Down Expand Up @@ -395,15 +415,18 @@ message StateMachineCommand {
message Request {
// The following are mutually exclusive.
optional OpenSession.Request open_session = 1;
optional CloseSession.Request close_session = 4;
optional ReadWriteTree.Request tree = 2;
optional AdvanceStateMachineVersion.Request advance_version = 3;
}
/**
* This is what the state machine outputs for read-write commands from the log.
* This is what the state machine outputs for read-write commands from the
* log.
*/
message Response {
// The following are mutually exclusive.
optional OpenSession.Response open_session = 1;
optional CloseSession.Response close_session = 4;
optional ReadWriteTree.Response tree = 2;
optional AdvanceStateMachineVersion.Response advance_version = 3;
}
Expand Down
1 change: 1 addition & 0 deletions Protocol/ServerStats.proto
Expand Up @@ -144,6 +144,7 @@ message ServerStats {
optional uint32 max_supported_version = 11;
optional uint32 running_version = 12;
optional Tree tree = 13;
optional uint64 num_unknown_requests = 14;
};

/**
Expand Down
6 changes: 6 additions & 0 deletions RELEASES.md
Expand Up @@ -21,6 +21,12 @@ Improvements:
- Optimizes setting nextIndex on leaders by capping it to just past the
follower's last log index. This helps with followers that are new or have
fallen far behind.
- Clients now make a best effort attempt to close their sessions when they shut
down gracefully (issue #116). Before, client sessions were only ever expired
after a timeout. This state could accumulate quickly when running short-lived
clients in a tight loop. Enabling this change requires all servers to be
updated (so that the state machine is updated); new clients talking to old
clusters will issue a warning that they are unable to close their sessions.

Bug fixes (high severity):
- Fixes packaging up very large AppendEntries requests. Before, it was possible
Expand Down

0 comments on commit 88b9583

Please sign in to comment.