From e3b2a0d97b42b831e483cfb1c975a72dd64e48a8 Mon Sep 17 00:00:00 2001 From: Diego Ongaro Date: Mon, 8 Dec 2014 17:53:19 -0800 Subject: [PATCH] Make exiting clients abort an ongoing keepalive RPC This required refactoring LeaderRPC::call() significantly so that the RPC it uses is exposed for canceling. Fix #71: client keepalive shouldn't prevent client programs from exiting --- Client/ClientImpl.cc | 86 ++++++++++++++++++++------------ Client/ClientImpl.h | 28 ++++++++--- Client/ClientImplTest.cc | 35 +++++++++++++ Client/LeaderRPC.cc | 105 +++++++++++++++++++++++++++------------ Client/LeaderRPC.h | 78 +++++++++++++++++++++++++++++ Client/LeaderRPCMock.cc | 47 ++++++++++++++++-- Client/LeaderRPCMock.h | 16 ++++++ Client/LeaderRPCTest.cc | 34 +++++++++++++ Client/MockClientImpl.cc | 35 +++++++++++++ 9 files changed, 387 insertions(+), 77 deletions(-) diff --git a/Client/ClientImpl.cc b/Client/ClientImpl.cc index 9d9e1bed..e9fcdff6 100644 --- a/Client/ClientImpl.cc +++ b/Client/ClientImpl.cc @@ -1,4 +1,5 @@ /* Copyright (c) 2012-2014 Stanford University + * Copyright (c) 2014 Diego Ongaro * * Permission to use, copy, modify, and distribute this software for any * purpose with or without fee is hereby granted, provided that the above @@ -106,6 +107,7 @@ ClientImpl::ExactlyOnceRPCHelper::ExactlyOnceRPCHelper(ClientImpl* client) , lastKeepAliveStart(TimePoint::min()) // TODO(ongaro): set dynamically based on cluster configuration , keepAliveIntervalMs(60 * 1000) + , keepAliveCall() , keepAliveThread() { } @@ -118,10 +120,11 @@ void ClientImpl::ExactlyOnceRPCHelper::exit() { { - std::unique_lock lockGuard(mutex); + std::unique_lock lockGuard(mutex); exiting = true; keepAliveCV.notify_all(); - // TODO(ongaro): would be better if we could cancel keep-alive calls + if (keepAliveCall) + keepAliveCall->cancel(); } if (keepAliveThread.joinable()) keepAliveThread.join(); @@ -130,7 +133,22 @@ ClientImpl::ExactlyOnceRPCHelper::exit() Protocol::Client::ExactlyOnceRPCInfo ClientImpl::ExactlyOnceRPCHelper::getRPCInfo() { - std::unique_lock lockGuard(mutex); + std::unique_lock lockGuard(mutex); + return getRPCInfo(lockGuard); +} + +void +ClientImpl::ExactlyOnceRPCHelper::doneWithRPC( + const Protocol::Client::ExactlyOnceRPCInfo& rpcInfo) +{ + std::unique_lock lockGuard(mutex); + doneWithRPC(rpcInfo, lockGuard); +} + +Protocol::Client::ExactlyOnceRPCInfo +ClientImpl::ExactlyOnceRPCHelper::getRPCInfo( + std::unique_lock& lockGuard) +{ Protocol::Client::ExactlyOnceRPCInfo rpcInfo; if (client == NULL) { // Filling in rpcInfo is disabled for some unit tests, since it's @@ -162,19 +180,17 @@ ClientImpl::ExactlyOnceRPCHelper::getRPCInfo() void ClientImpl::ExactlyOnceRPCHelper::doneWithRPC( - const Protocol::Client::ExactlyOnceRPCInfo& rpcInfo) + const Protocol::Client::ExactlyOnceRPCInfo& rpcInfo, + std::unique_lock& lockGuard) { - std::unique_lock lockGuard(mutex); outstandingRPCNumbers.erase(rpcInfo.rpc_number()); } void ClientImpl::ExactlyOnceRPCHelper::keepAliveThreadMain() { - std::unique_lock lockGuard(mutex); - while (true) { - if (exiting) - return; + std::unique_lock lockGuard(mutex); + while (!exiting) { TimePoint nextKeepAlive; if (keepAliveIntervalMs > 0) { nextKeepAlive = (lastKeepAliveStart + @@ -183,9 +199,34 @@ ClientImpl::ExactlyOnceRPCHelper::keepAliveThreadMain() nextKeepAlive = TimePoint::max(); } if (Clock::now() > nextKeepAlive) { - // release lock to avoid deadlock - Core::MutexUnlock unlockGuard(lockGuard); - client->keepAlive(); // will set nextKeepAlive + Protocol::Client::ReadWriteTree::Request request; + *request.mutable_exactly_once() = getRPCInfo(lockGuard); + setCondition(request, + {"keepalive", + "this is just a no-op to keep the client's session active; " + "the condition is expected to fail"}); + request.mutable_write()->set_path("keepalive"); + request.mutable_write()->set_contents("you shouldn't see this!"); + Protocol::Client::ReadWriteTree::Response response; + keepAliveCall = client->leaderRPC->makeCall(); + keepAliveCall->start(OpCode::READ_WRITE_TREE, request); + bool ok; + { + // release lock to allow concurrent cancellation + Core::MutexUnlock unlockGuard(lockGuard); + ok = keepAliveCall->wait(response); + } + keepAliveCall.reset(); + if (!ok) + continue; + doneWithRPC(request.exactly_once(), lockGuard); + if (response.status() != + Protocol::Client::Status::CONDITION_NOT_MET) { + WARNING("Keep-alive write should have failed its condition. " + "Unexpected status was %d: %s", + response.status(), + response.error().c_str()); + } continue; } keepAliveCV.wait_until(lockGuard, nextKeepAlive); @@ -463,27 +504,6 @@ ClientImpl::removeFile(const std::string& path, return Result(); } -void -ClientImpl::keepAlive() -{ - Protocol::Client::ReadWriteTree::Request request; - *request.mutable_exactly_once() = exactlyOnceRPCHelper.getRPCInfo(); - setCondition(request, - {"keepalive", - "this is just a no-op to keep the client's session active; " - "the condition is expected to fail"}); - request.mutable_write()->set_path("keepalive"); - request.mutable_write()->set_contents("you shouldn't see this!"); - Protocol::Client::ReadWriteTree::Response response; - leaderRPC->call(OpCode::READ_WRITE_TREE, request, response); - exactlyOnceRPCHelper.doneWithRPC(request.exactly_once()); - if (response.status() != Protocol::Client::Status::CONDITION_NOT_MET) { - WARNING("Keep-alive write should have failed its condition. " - "Unexpected status was: %s", - response.error().c_str()); - } -} - uint32_t ClientImpl::negotiateRPCVersion() { diff --git a/Client/ClientImpl.h b/Client/ClientImpl.h index 08380299..3e86bc3f 100644 --- a/Client/ClientImpl.h +++ b/Client/ClientImpl.h @@ -1,4 +1,5 @@ /* Copyright (c) 2012-2014 Stanford University + * Copyright (c) 2014 Diego Ongaro * * Permission to use, copy, modify, and distribute this software for any * purpose with or without fee is hereby granted, provided that the above @@ -14,13 +15,13 @@ */ #include -#include #include #include #include "include/LogCabin/Client.h" #include "Client/LeaderRPC.h" #include "Core/ConditionVariable.h" +#include "Core/Mutex.h" #include "Core/Time.h" #ifndef LOGCABIN_CLIENT_CLIENTIMPL_H @@ -120,11 +121,6 @@ class ClientImpl { protected: - /** - * Make no-op request to the cluster to keep the client's session active. - */ - void keepAlive(); - /** * Asks the cluster leader for the range of supported RPC protocol * versions, and select the best one. This is used to make sure the client @@ -181,6 +177,17 @@ class ClientImpl { void doneWithRPC(const Protocol::Client::ExactlyOnceRPCInfo&); private: + + /** + * Internal version of getRPCInfo() to avoid deadlock with self. + */ + Protocol::Client::ExactlyOnceRPCInfo getRPCInfo( + std::unique_lock& lockGuard); + /** + * Internal version of doneWithRPC() to avoid deadlock with self. + */ + void doneWithRPC(const Protocol::Client::ExactlyOnceRPCInfo&, + std::unique_lock& lockGuard); /** * Main function for keep-alive thread. Periodically makes * requests to the cluster to keep the client's session active. @@ -204,7 +211,7 @@ class ClientImpl { /** * Protects all the members of this class. */ - mutable std::mutex mutex; + mutable Core::Mutex mutex; /** * The numbers of the RPCs for which this client is still awaiting a * response. @@ -239,6 +246,13 @@ class ClientImpl { * inactivity, in milliseconds. */ uint64_t keepAliveIntervalMs; + + /** + * If set, this is an ongoing keep-alive RPC. This call is canceled to + * interrupt #keepAliveThread when exiting. + */ + std::unique_ptr keepAliveCall; + /** * Runs keepAliveThreadMain(). * Since this thread would be unexpected/wasteful for clients that only diff --git a/Client/ClientImplTest.cc b/Client/ClientImplTest.cc index f9a34419..308db7a7 100644 --- a/Client/ClientImplTest.cc +++ b/Client/ClientImplTest.cc @@ -122,7 +122,42 @@ TEST_F(ClientClientImplExactlyOnceTest, keepAliveThreadMain) { EXPECT_EQ(6U, mockRPC->requestLog.size()) << disclaimer; usleep(6000); EXPECT_EQ(7U, mockRPC->requestLog.size()) << disclaimer; +} + +class KeepAliveThreadMain_cancel_Helper { + explicit KeepAliveThreadMain_cancel_Helper( + Client::ClientImpl::ExactlyOnceRPCHelper& exactlyOnceRPCHelper) + : exactlyOnceRPCHelper(exactlyOnceRPCHelper) + , iter(0) + { + } + void operator()() { + ++iter; + if (iter == 2) { + EXPECT_TRUE(exactlyOnceRPCHelper.keepAliveCall.get() != NULL); + exactlyOnceRPCHelper.keepAliveCall->cancel(); + exactlyOnceRPCHelper.exiting = true; + } + } + Client::ClientImpl::ExactlyOnceRPCHelper& exactlyOnceRPCHelper; + uint64_t iter; +}; + +TEST_F(ClientClientImplExactlyOnceTest, keepAliveThreadMain_cancel) { + client.exactlyOnceRPCHelper.exit(); + client.exactlyOnceRPCHelper.exiting = false; + mockRPC->expect(OpCode::READ_WRITE_TREE, + fromString( + "")); + client.exactlyOnceRPCHelper.lastKeepAliveStart = + Client::ClientImpl::ExactlyOnceRPCHelper::TimePoint::min(); + client.exactlyOnceRPCHelper.keepAliveIntervalMs = 200; + KeepAliveThreadMain_cancel_Helper helper(client.exactlyOnceRPCHelper); + client.exactlyOnceRPCHelper.mutex.callback = std::ref(helper); + client.exactlyOnceRPCHelper.keepAliveThreadMain(); + client.exactlyOnceRPCHelper.mutex.callback = std::function(); + EXPECT_EQ(4U, helper.iter); } using Client::Result; diff --git a/Client/LeaderRPC.cc b/Client/LeaderRPC.cc index 8bcac876..3b426462 100644 --- a/Client/LeaderRPC.cc +++ b/Client/LeaderRPC.cc @@ -26,6 +26,68 @@ namespace LogCabin { namespace Client { +//// class LeaderRPC::Call //// + +LeaderRPC::Call::Call(LeaderRPC& leaderRPC) + : leaderRPC(leaderRPC) + , cachedSession() + , rpc() +{ +} + +LeaderRPC::Call::~Call() +{ +} + +void +LeaderRPC::Call::start(OpCode opCode, const google::protobuf::Message& request) +{ + { // Save a reference to the leaderSession + std::unique_lock lockGuard(leaderRPC.mutex); + cachedSession = leaderRPC.leaderSession; + } + rpc = RPC::ClientRPC(cachedSession, + Protocol::Common::ServiceId::CLIENT_SERVICE, + 1, + opCode, + request); +} + +void +LeaderRPC::Call::cancel() +{ + rpc.cancel(); + cachedSession.reset(); +} + +bool +LeaderRPC::Call::wait(google::protobuf::Message& response) +{ + typedef RPC::ClientRPC::Status Status; + Protocol::Client::Error serviceSpecificError; + Status status = rpc.waitForReply(&response, &serviceSpecificError); + + // Decode the response + switch (status) { + case Status::OK: + return true; + case Status::SERVICE_SPECIFIC_ERROR: + leaderRPC.handleServiceSpecificError(cachedSession, + serviceSpecificError); + return false; + case Status::RPC_FAILED: + // If the session is broken, get a new one and try again. + leaderRPC.connectRandom(cachedSession); + return false; + case Status::RPC_CANCELED: + return false; + } + PANIC("Unexpected RPC status"); +} + + +//// class LeaderRPC //// + LeaderRPC::LeaderRPC(const RPC::Address& hosts) : windowCount(5) , windowNanos(1000 * 1000 * 100) @@ -55,43 +117,20 @@ LeaderRPC::call(OpCode opCode, const google::protobuf::Message& request, google::protobuf::Message& response) { - typedef RPC::ClientRPC::Status Status; - while (true) { - // Save a reference to the leaderSession - std::shared_ptr cachedSession; - { - std::unique_lock lockGuard(mutex); - cachedSession = leaderSession; - } - - // Execute the RPC - RPC::ClientRPC rpc(cachedSession, - Protocol::Common::ServiceId::CLIENT_SERVICE, - 1, - opCode, - request); - Protocol::Client::Error serviceSpecificError; - Status status = rpc.waitForReply(&response, &serviceSpecificError); - - // Decode the response - switch (status) { - case Status::OK: - return; - case Status::SERVICE_SPECIFIC_ERROR: - handleServiceSpecificError(cachedSession, - serviceSpecificError); - break; - case Status::RPC_FAILED: - // If the session is broken, get a new one and try again. - connectRandom(cachedSession); - break; - case Status::RPC_CANCELED: - PANIC("RPC unexpectedly canceled"); - } + Call c(*this); + c.start(opCode, request); + if (c.wait(response)) + return; } } +std::unique_ptr +LeaderRPC::makeCall() +{ + return std::unique_ptr(new LeaderRPC::Call(*this)); +} + void LeaderRPC::handleServiceSpecificError( std::shared_ptr cachedSession, diff --git a/Client/LeaderRPC.h b/Client/LeaderRPC.h index 00916aa2..54ff25fe 100644 --- a/Client/LeaderRPC.h +++ b/Client/LeaderRPC.h @@ -23,6 +23,7 @@ #include "build/Protocol/Client.pb.h" #include "Event/Loop.h" #include "RPC/Address.h" +#include "RPC/ClientRPC.h" #ifndef LOGCABIN_CLIENT_LEADERRPC_H #define LOGCABIN_CLIENT_LEADERRPC_H @@ -74,11 +75,62 @@ class LeaderRPCBase { const google::protobuf::Message& request, google::protobuf::Message& response) = 0; + /** + * An asynchronous version of call(). This allows multiple RPCs to be + * executed concurrently, or canceling an RPC that is running on a separate + * thread. + */ + class Call { + public: + /** + * Constructor. + */ + Call() {} + /** + * Destructor. + */ + virtual ~Call() {} + /** + * Invoke the RPC. + * \param opCode + * RPC operation code. The caller must guarantee that this is a + * valid opCode. (If the server rejects it, this will PANIC.) + * \param request + * The parameters for the operation. The caller must guarantee + * that this is a well-formed request. (If the server rejects it, + * this will PANIC.) + */ + virtual void start(OpCode opCode, + const google::protobuf::Message& request) = 0; + /** + * Cancel the RPC. This may only be called after start(), but it may + * be called safely from a separate thread. + */ + virtual void cancel() = 0; + /** + * Wait for the RPC to complete. + * \param[out] response + * If successful, the response to the operation will be filled in + * here. + * \return + * True if the RPC completed successfully, false otherwise. If + * this returns false, it is the callers responsibility to start + * over to achieve the same at-most-once semantics as #call(). + */ + virtual bool wait(google::protobuf::Message& response) = 0; + }; + + /** + * Return a new Call object. + */ + virtual std::unique_ptr makeCall() = 0; + // LeaderRPCBase is not copyable LeaderRPCBase(const LeaderRPCBase&) = delete; LeaderRPCBase& operator=(const LeaderRPCBase&) = delete; }; + /** * This is the implementation of LeaderRPCBase that uses the RPC system. * (The other implementation, LeaderRPCMock, is only used for testing.) @@ -97,11 +149,37 @@ class LeaderRPC : public LeaderRPCBase { /// Destructor. ~LeaderRPC(); + /// See LeaderRPCBase::call(). void call(OpCode opCode, const google::protobuf::Message& request, google::protobuf::Message& response); + + /// See LeaderRPCBase::makeCall(). + std::unique_ptr makeCall(); + private: + /// See LeaderRPCBase::Call. + class Call : public LeaderRPCBase::Call { + public: + explicit Call(LeaderRPC& leaderRPC); + ~Call(); + void start(OpCode opCode, const google::protobuf::Message& request); + void cancel(); + bool wait(google::protobuf::Message& response); + LeaderRPC& leaderRPC; + /** + * Copy of leaderSession when the RPC was started (might have changed + * since). + */ + std::shared_ptr cachedSession; + /** + * RPC object which may be canceled. + */ + RPC::ClientRPC rpc; + }; + + /** * A helper for call() that decodes errors thrown by the service. */ diff --git a/Client/LeaderRPCMock.cc b/Client/LeaderRPCMock.cc index 90b02308..f8e5af7a 100644 --- a/Client/LeaderRPCMock.cc +++ b/Client/LeaderRPCMock.cc @@ -1,4 +1,5 @@ /* Copyright (c) 2012 Stanford University + * Copyright (c) 2014 Diego Ongaro * * Permission to use, copy, modify, and distribute this software for any * purpose with or without fee is hereby granted, provided that the above @@ -53,18 +54,56 @@ void LeaderRPCMock::call(OpCode opCode, const google::protobuf::Message& request, google::protobuf::Message& response) +{ + Call c(*this); + c.start(opCode, request); + c.wait(response); +} + +LeaderRPCMock::Call::Call(LeaderRPCMock& leaderRPC) + : leaderRPC(leaderRPC) + , canceled(false) +{ +} + +void +LeaderRPCMock::Call::start(OpCode opCode, + const google::protobuf::Message& request) { MessagePtr requestCopy(request.New()); requestCopy->CopyFrom(request); - requestLog.push_back({opCode, std::move(requestCopy)}); - ASSERT_LT(0U, responseQueue.size()) + leaderRPC.requestLog.push_back({opCode, std::move(requestCopy)}); + ASSERT_LT(0U, leaderRPC.responseQueue.size()) << "The client sent an unexpected RPC:\n" << request.GetTypeName() << ":\n" << Core::ProtoBuf::dumpString(request); - auto& opCodeMsgPair = responseQueue.front(); + auto& opCodeMsgPair = leaderRPC.responseQueue.front(); EXPECT_EQ(opCode, opCodeMsgPair.first); +} + +void +LeaderRPCMock::Call::cancel() +{ + canceled = true; +} + +bool +LeaderRPCMock::Call::wait(google::protobuf::Message& response) +{ + if (canceled) { + leaderRPC.responseQueue.pop(); + return false; + } + auto& opCodeMsgPair = leaderRPC.responseQueue.front(); response.CopyFrom(*opCodeMsgPair.second); - responseQueue.pop(); + leaderRPC.responseQueue.pop(); + return true; +} + +std::unique_ptr +LeaderRPCMock::makeCall() +{ + return std::unique_ptr(new Call(*this)); } } // namespace LogCabin::Client diff --git a/Client/LeaderRPCMock.h b/Client/LeaderRPCMock.h index 105c0822..ba28ae70 100644 --- a/Client/LeaderRPCMock.h +++ b/Client/LeaderRPCMock.h @@ -1,4 +1,5 @@ /* Copyright (c) 2012 Stanford University + * Copyright (c) 2014 Diego Ongaro * * Permission to use, copy, modify, and distribute this software for any * purpose with or without fee is hereby granted, provided that the above @@ -57,7 +58,22 @@ class LeaderRPCMock : public LeaderRPCBase { void call(OpCode opCode, const google::protobuf::Message& request, google::protobuf::Message& response); + + /// See LeaderRPCBase::makeCall. + std::unique_ptr makeCall(); + private: + /// See LeaderRPCBase::Call. + class Call : public LeaderRPCBase::Call { + public: + explicit Call(LeaderRPCMock& leaderRPC); + void start(OpCode opCode, const google::protobuf::Message& request); + void cancel(); + bool wait(google::protobuf::Message& response); + LeaderRPCMock& leaderRPC; + bool canceled; + }; + /** * A queue of requests that have come in from call(). */ diff --git a/Client/LeaderRPCTest.cc b/Client/LeaderRPCTest.cc index c61a890b..0b71b056 100644 --- a/Client/LeaderRPCTest.cc +++ b/Client/LeaderRPCTest.cc @@ -1,4 +1,5 @@ /* Copyright (c) 2012 Stanford University + * Copyright (c) 2014 Diego Ongaro * * Permission to use, copy, modify, and distribute this software for any * purpose with or without fee is hereby granted, provided that the above @@ -77,6 +78,39 @@ class ClientLeaderRPCTest : public ::testing::Test { Protocol::Client::ReadOnlyTree::Response expResponse; }; +TEST_F(ClientLeaderRPCTest, CallOK) { + init(); + service->reply(OpCode::READ_ONLY_TREE, request, expResponse); + std::unique_ptr call = leaderRPC->makeCall(); + call->start(OpCode::READ_ONLY_TREE, request); + EXPECT_TRUE(call->wait(response)); + EXPECT_EQ(expResponse, response); +} + +TEST_F(ClientLeaderRPCTest, CallCanceled) { + init(); + std::unique_ptr call = leaderRPC->makeCall(); + call->start(OpCode::READ_ONLY_TREE, request); + call->cancel(); + EXPECT_FALSE(call->wait(response)); + EXPECT_FALSE(call->wait(response)); + call->cancel(); + EXPECT_FALSE(call->wait(response)); +} + +TEST_F(ClientLeaderRPCTest, CallRPCFailed) { + init(); + service->closeSession(OpCode::READ_ONLY_TREE, request); + service->reply(OpCode::READ_ONLY_TREE, request, expResponse); + std::unique_ptr call = leaderRPC->makeCall(); + call->start(OpCode::READ_ONLY_TREE, request); + EXPECT_FALSE(call->wait(response)); + call->start(OpCode::READ_ONLY_TREE, request); + EXPECT_TRUE(call->wait(response)); + EXPECT_EQ(expResponse, response); +} + + // constructor and destructor tested adequately in tests for call() TEST_F(ClientLeaderRPCTest, callOK) { diff --git a/Client/MockClientImpl.cc b/Client/MockClientImpl.cc index 1a51e1c3..aca4cc98 100644 --- a/Client/MockClientImpl.cc +++ b/Client/MockClientImpl.cc @@ -1,4 +1,5 @@ /* Copyright (c) 2012 Stanford University + * Copyright (c) 2014 Diego Ongaro * * Permission to use, copy, modify, and distribute this software for any * purpose with or without fee is hereby granted, provided that the above @@ -59,6 +60,40 @@ class TreeLeaderRPC : public LeaderRPCBase { Core::ProtoBuf::dumpString(request).c_str()); } } + + class Call : public LeaderRPCBase::Call { + public: + explicit Call(TreeLeaderRPC& leaderRPC) + : leaderRPC(leaderRPC) + , opCode() + , request() + { + } + void start(OpCode _opCode, const google::protobuf::Message& _request) { + opCode = _opCode; + request.reset(_request.New()); + request->CopyFrom(_request); + } + void cancel() { + // no op + } + bool wait(google::protobuf::Message& response) { + leaderRPC.call(opCode, *request, response); + return true; + } + TreeLeaderRPC& leaderRPC; + OpCode opCode; + std::unique_ptr request; + + // Call is not copyable. + Call(const Call&) = delete; + Call& operator=(const Call&) = delete; + }; + + std::unique_ptr makeCall() { + return std::unique_ptr(new Call(*this)); + } + std::mutex mutex; ///< prevents concurrent access to 'tree' LogCabin::Tree::Tree tree; };