Navigation Menu

Skip to content

Commit

Permalink
Make exiting clients abort an ongoing keepalive RPC
Browse files Browse the repository at this point in the history
This required refactoring LeaderRPC::call() significantly so that the
RPC it uses is exposed for canceling.

Fix #71: client keepalive shouldn't prevent client programs from exiting
  • Loading branch information
ongardie committed Dec 9, 2014
1 parent 12ee2af commit e3b2a0d
Show file tree
Hide file tree
Showing 9 changed files with 387 additions and 77 deletions.
86 changes: 53 additions & 33 deletions Client/ClientImpl.cc
@@ -1,4 +1,5 @@
/* Copyright (c) 2012-2014 Stanford University
* Copyright (c) 2014 Diego Ongaro
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
Expand Down Expand Up @@ -106,6 +107,7 @@ ClientImpl::ExactlyOnceRPCHelper::ExactlyOnceRPCHelper(ClientImpl* client)
, lastKeepAliveStart(TimePoint::min())
// TODO(ongaro): set dynamically based on cluster configuration
, keepAliveIntervalMs(60 * 1000)
, keepAliveCall()
, keepAliveThread()
{
}
Expand All @@ -118,10 +120,11 @@ void
ClientImpl::ExactlyOnceRPCHelper::exit()
{
{
std::unique_lock<std::mutex> lockGuard(mutex);
std::unique_lock<Core::Mutex> lockGuard(mutex);
exiting = true;
keepAliveCV.notify_all();
// TODO(ongaro): would be better if we could cancel keep-alive calls
if (keepAliveCall)
keepAliveCall->cancel();
}
if (keepAliveThread.joinable())
keepAliveThread.join();
Expand All @@ -130,7 +133,22 @@ ClientImpl::ExactlyOnceRPCHelper::exit()
Protocol::Client::ExactlyOnceRPCInfo
ClientImpl::ExactlyOnceRPCHelper::getRPCInfo()
{
std::unique_lock<std::mutex> lockGuard(mutex);
std::unique_lock<Core::Mutex> lockGuard(mutex);
return getRPCInfo(lockGuard);
}

void
ClientImpl::ExactlyOnceRPCHelper::doneWithRPC(
const Protocol::Client::ExactlyOnceRPCInfo& rpcInfo)
{
std::unique_lock<Core::Mutex> lockGuard(mutex);
doneWithRPC(rpcInfo, lockGuard);
}

Protocol::Client::ExactlyOnceRPCInfo
ClientImpl::ExactlyOnceRPCHelper::getRPCInfo(
std::unique_lock<Core::Mutex>& lockGuard)
{
Protocol::Client::ExactlyOnceRPCInfo rpcInfo;
if (client == NULL) {
// Filling in rpcInfo is disabled for some unit tests, since it's
Expand Down Expand Up @@ -162,19 +180,17 @@ ClientImpl::ExactlyOnceRPCHelper::getRPCInfo()

void
ClientImpl::ExactlyOnceRPCHelper::doneWithRPC(
const Protocol::Client::ExactlyOnceRPCInfo& rpcInfo)
const Protocol::Client::ExactlyOnceRPCInfo& rpcInfo,
std::unique_lock<Core::Mutex>& lockGuard)
{
std::unique_lock<std::mutex> lockGuard(mutex);
outstandingRPCNumbers.erase(rpcInfo.rpc_number());
}

void
ClientImpl::ExactlyOnceRPCHelper::keepAliveThreadMain()
{
std::unique_lock<std::mutex> lockGuard(mutex);
while (true) {
if (exiting)
return;
std::unique_lock<Core::Mutex> lockGuard(mutex);
while (!exiting) {
TimePoint nextKeepAlive;
if (keepAliveIntervalMs > 0) {
nextKeepAlive = (lastKeepAliveStart +
Expand All @@ -183,9 +199,34 @@ ClientImpl::ExactlyOnceRPCHelper::keepAliveThreadMain()
nextKeepAlive = TimePoint::max();
}
if (Clock::now() > nextKeepAlive) {
// release lock to avoid deadlock
Core::MutexUnlock<std::mutex> unlockGuard(lockGuard);
client->keepAlive(); // will set nextKeepAlive
Protocol::Client::ReadWriteTree::Request request;
*request.mutable_exactly_once() = getRPCInfo(lockGuard);
setCondition(request,
{"keepalive",
"this is just a no-op to keep the client's session active; "
"the condition is expected to fail"});
request.mutable_write()->set_path("keepalive");
request.mutable_write()->set_contents("you shouldn't see this!");
Protocol::Client::ReadWriteTree::Response response;
keepAliveCall = client->leaderRPC->makeCall();
keepAliveCall->start(OpCode::READ_WRITE_TREE, request);
bool ok;
{
// release lock to allow concurrent cancellation
Core::MutexUnlock<Core::Mutex> unlockGuard(lockGuard);
ok = keepAliveCall->wait(response);
}
keepAliveCall.reset();
if (!ok)
continue;
doneWithRPC(request.exactly_once(), lockGuard);
if (response.status() !=
Protocol::Client::Status::CONDITION_NOT_MET) {
WARNING("Keep-alive write should have failed its condition. "
"Unexpected status was %d: %s",
response.status(),
response.error().c_str());
}
continue;
}
keepAliveCV.wait_until(lockGuard, nextKeepAlive);
Expand Down Expand Up @@ -463,27 +504,6 @@ ClientImpl::removeFile(const std::string& path,
return Result();
}

void
ClientImpl::keepAlive()
{
Protocol::Client::ReadWriteTree::Request request;
*request.mutable_exactly_once() = exactlyOnceRPCHelper.getRPCInfo();
setCondition(request,
{"keepalive",
"this is just a no-op to keep the client's session active; "
"the condition is expected to fail"});
request.mutable_write()->set_path("keepalive");
request.mutable_write()->set_contents("you shouldn't see this!");
Protocol::Client::ReadWriteTree::Response response;
leaderRPC->call(OpCode::READ_WRITE_TREE, request, response);
exactlyOnceRPCHelper.doneWithRPC(request.exactly_once());
if (response.status() != Protocol::Client::Status::CONDITION_NOT_MET) {
WARNING("Keep-alive write should have failed its condition. "
"Unexpected status was: %s",
response.error().c_str());
}
}

uint32_t
ClientImpl::negotiateRPCVersion()
{
Expand Down
28 changes: 21 additions & 7 deletions Client/ClientImpl.h
@@ -1,4 +1,5 @@
/* Copyright (c) 2012-2014 Stanford University
* Copyright (c) 2014 Diego Ongaro
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
Expand All @@ -14,13 +15,13 @@
*/

#include <memory>
#include <mutex>
#include <set>
#include <string>

#include "include/LogCabin/Client.h"
#include "Client/LeaderRPC.h"
#include "Core/ConditionVariable.h"
#include "Core/Mutex.h"
#include "Core/Time.h"

#ifndef LOGCABIN_CLIENT_CLIENTIMPL_H
Expand Down Expand Up @@ -120,11 +121,6 @@ class ClientImpl {

protected:

/**
* Make no-op request to the cluster to keep the client's session active.
*/
void keepAlive();

/**
* Asks the cluster leader for the range of supported RPC protocol
* versions, and select the best one. This is used to make sure the client
Expand Down Expand Up @@ -181,6 +177,17 @@ class ClientImpl {
void doneWithRPC(const Protocol::Client::ExactlyOnceRPCInfo&);

private:

/**
* Internal version of getRPCInfo() to avoid deadlock with self.
*/
Protocol::Client::ExactlyOnceRPCInfo getRPCInfo(
std::unique_lock<Core::Mutex>& lockGuard);
/**
* Internal version of doneWithRPC() to avoid deadlock with self.
*/
void doneWithRPC(const Protocol::Client::ExactlyOnceRPCInfo&,
std::unique_lock<Core::Mutex>& lockGuard);
/**
* Main function for keep-alive thread. Periodically makes
* requests to the cluster to keep the client's session active.
Expand All @@ -204,7 +211,7 @@ class ClientImpl {
/**
* Protects all the members of this class.
*/
mutable std::mutex mutex;
mutable Core::Mutex mutex;
/**
* The numbers of the RPCs for which this client is still awaiting a
* response.
Expand Down Expand Up @@ -239,6 +246,13 @@ class ClientImpl {
* inactivity, in milliseconds.
*/
uint64_t keepAliveIntervalMs;

/**
* If set, this is an ongoing keep-alive RPC. This call is canceled to
* interrupt #keepAliveThread when exiting.
*/
std::unique_ptr<LeaderRPCBase::Call> keepAliveCall;

/**
* Runs keepAliveThreadMain().
* Since this thread would be unexpected/wasteful for clients that only
Expand Down
35 changes: 35 additions & 0 deletions Client/ClientImplTest.cc
Expand Up @@ -122,7 +122,42 @@ TEST_F(ClientClientImplExactlyOnceTest, keepAliveThreadMain) {
EXPECT_EQ(6U, mockRPC->requestLog.size()) << disclaimer;
usleep(6000);
EXPECT_EQ(7U, mockRPC->requestLog.size()) << disclaimer;
}

class KeepAliveThreadMain_cancel_Helper {
explicit KeepAliveThreadMain_cancel_Helper(
Client::ClientImpl::ExactlyOnceRPCHelper& exactlyOnceRPCHelper)
: exactlyOnceRPCHelper(exactlyOnceRPCHelper)
, iter(0)
{
}
void operator()() {
++iter;
if (iter == 2) {
EXPECT_TRUE(exactlyOnceRPCHelper.keepAliveCall.get() != NULL);
exactlyOnceRPCHelper.keepAliveCall->cancel();
exactlyOnceRPCHelper.exiting = true;
}
}
Client::ClientImpl::ExactlyOnceRPCHelper& exactlyOnceRPCHelper;
uint64_t iter;
};


TEST_F(ClientClientImplExactlyOnceTest, keepAliveThreadMain_cancel) {
client.exactlyOnceRPCHelper.exit();
client.exactlyOnceRPCHelper.exiting = false;
mockRPC->expect(OpCode::READ_WRITE_TREE,
fromString<Protocol::Client::ReadWriteTree::Response>(
""));
client.exactlyOnceRPCHelper.lastKeepAliveStart =
Client::ClientImpl::ExactlyOnceRPCHelper::TimePoint::min();
client.exactlyOnceRPCHelper.keepAliveIntervalMs = 200;
KeepAliveThreadMain_cancel_Helper helper(client.exactlyOnceRPCHelper);
client.exactlyOnceRPCHelper.mutex.callback = std::ref(helper);
client.exactlyOnceRPCHelper.keepAliveThreadMain();
client.exactlyOnceRPCHelper.mutex.callback = std::function<void()>();
EXPECT_EQ(4U, helper.iter);
}

using Client::Result;
Expand Down
105 changes: 72 additions & 33 deletions Client/LeaderRPC.cc
Expand Up @@ -26,6 +26,68 @@
namespace LogCabin {
namespace Client {

//// class LeaderRPC::Call ////

LeaderRPC::Call::Call(LeaderRPC& leaderRPC)
: leaderRPC(leaderRPC)
, cachedSession()
, rpc()
{
}

LeaderRPC::Call::~Call()
{
}

void
LeaderRPC::Call::start(OpCode opCode, const google::protobuf::Message& request)
{
{ // Save a reference to the leaderSession
std::unique_lock<std::mutex> lockGuard(leaderRPC.mutex);
cachedSession = leaderRPC.leaderSession;
}
rpc = RPC::ClientRPC(cachedSession,
Protocol::Common::ServiceId::CLIENT_SERVICE,
1,
opCode,
request);
}

void
LeaderRPC::Call::cancel()
{
rpc.cancel();
cachedSession.reset();
}

bool
LeaderRPC::Call::wait(google::protobuf::Message& response)
{
typedef RPC::ClientRPC::Status Status;
Protocol::Client::Error serviceSpecificError;
Status status = rpc.waitForReply(&response, &serviceSpecificError);

// Decode the response
switch (status) {
case Status::OK:
return true;
case Status::SERVICE_SPECIFIC_ERROR:
leaderRPC.handleServiceSpecificError(cachedSession,
serviceSpecificError);
return false;
case Status::RPC_FAILED:
// If the session is broken, get a new one and try again.
leaderRPC.connectRandom(cachedSession);
return false;
case Status::RPC_CANCELED:
return false;
}
PANIC("Unexpected RPC status");
}


//// class LeaderRPC ////

LeaderRPC::LeaderRPC(const RPC::Address& hosts)
: windowCount(5)
, windowNanos(1000 * 1000 * 100)
Expand Down Expand Up @@ -55,43 +117,20 @@ LeaderRPC::call(OpCode opCode,
const google::protobuf::Message& request,
google::protobuf::Message& response)
{
typedef RPC::ClientRPC::Status Status;

while (true) {
// Save a reference to the leaderSession
std::shared_ptr<RPC::ClientSession> cachedSession;
{
std::unique_lock<std::mutex> lockGuard(mutex);
cachedSession = leaderSession;
}

// Execute the RPC
RPC::ClientRPC rpc(cachedSession,
Protocol::Common::ServiceId::CLIENT_SERVICE,
1,
opCode,
request);
Protocol::Client::Error serviceSpecificError;
Status status = rpc.waitForReply(&response, &serviceSpecificError);

// Decode the response
switch (status) {
case Status::OK:
return;
case Status::SERVICE_SPECIFIC_ERROR:
handleServiceSpecificError(cachedSession,
serviceSpecificError);
break;
case Status::RPC_FAILED:
// If the session is broken, get a new one and try again.
connectRandom(cachedSession);
break;
case Status::RPC_CANCELED:
PANIC("RPC unexpectedly canceled");
}
Call c(*this);
c.start(opCode, request);
if (c.wait(response))
return;
}
}

std::unique_ptr<LeaderRPCBase::Call>
LeaderRPC::makeCall()
{
return std::unique_ptr<LeaderRPCBase::Call>(new LeaderRPC::Call(*this));
}

void
LeaderRPC::handleServiceSpecificError(
std::shared_ptr<RPC::ClientSession> cachedSession,
Expand Down

0 comments on commit e3b2a0d

Please sign in to comment.