From 88b9583662b87b449f930bb906c10f40c7a3a26d Mon Sep 17 00:00:00 2001 From: Diego Ongaro Date: Mon, 15 Jun 2015 16:59:27 -0700 Subject: [PATCH] Add mechanism for clients to close their sessions cleanly on shutdown From the release notes: Clients now make a best effort attempt to close their sessions when they shut down gracefully (issue #116). Before, client sessions were only ever expired after a timeout. This state could accumulate quickly when running short-lived clients in a tight loop. Enabling this change requires all servers to be updated (so that the state machine is updated); new clients talking to old clusters will issue a warning that they are unable to close their sessions. Close #116: client library should close session on exit --- Client/ClientImpl.cc | 46 +++++++++++++++++-- Client/ClientImpl.h | 8 +++- Client/ClientImplTest.cc | 88 +++++++++++++++++++++++++++++++++-- Client/MockClientImpl.cc | 2 + Protocol/Client.proto | 25 +++++++++- Protocol/ServerStats.proto | 1 + RELEASES.md | 6 +++ Server/StateMachine.cc | 57 +++++++++++++++-------- Server/StateMachine.h | 23 +++++++++- Server/StateMachineTest.cc | 94 ++++++++++++++++++++++++++++++++------ include/LogCabin/Client.h | 8 ++++ 11 files changed, 310 insertions(+), 48 deletions(-) diff --git a/Client/ClientImpl.cc b/Client/ClientImpl.cc index 1932ad98..2c1b27e3 100644 --- a/Client/ClientImpl.cc +++ b/Client/ClientImpl.cc @@ -221,7 +221,13 @@ ClientImpl::ExactlyOnceRPCHelper::ExactlyOnceRPCHelper(ClientImpl* client) , exiting(false) , lastKeepAliveStart(TimePoint::min()) // TODO(ongaro): set dynamically based on cluster configuration - , keepAliveIntervalMs(60 * 1000) + , keepAliveInterval(std::chrono::milliseconds(60 * 1000)) + , sessionCloseTimeout(std::chrono::milliseconds( + client->config.read( + "sessionCloseTimeoutMilliseconds", + client->config.read( + "tcpConnectTimeoutMilliseconds", + 1000)))) , keepAliveCall() , keepAliveThread() { @@ -240,6 +246,37 @@ ClientImpl::ExactlyOnceRPCHelper::exit() keepAliveCV.notify_all(); if (keepAliveCall) keepAliveCall->cancel(); + if (clientId > 0) { + Protocol::Client::StateMachineCommand::Request request; + Protocol::Client::StateMachineCommand::Response response; + request.mutable_close_session()->set_client_id(clientId); + LeaderRPC::Status status = client->leaderRPC->call( + OpCode::STATE_MACHINE_COMMAND, + request, + response, + ClientImpl::Clock::now() + sessionCloseTimeout); + switch (status) { + case LeaderRPC::Status::OK: + break; + case LeaderRPC::Status::TIMEOUT: + using Core::StringUtil::toString; + WARNING("Could not definitively close client session %lu " + "within timeout (%s). It may remain open until it " + "expires.", + clientId, + toString(sessionCloseTimeout).c_str()); + break; + case LeaderRPC::Status::INVALID_REQUEST: + WARNING("The server and/or replicated state machine " + "doesn't support the CloseSession command or " + "claims the request is malformed. This client's " + "session (%lu) will remain open until it expires. " + "Consider upgrading your servers (this command " + "was introduced in state machine version 2).", + clientId); + break; + } + } } if (keepAliveThread.joinable()) keepAliveThread.join(); @@ -324,9 +361,8 @@ ClientImpl::ExactlyOnceRPCHelper::keepAliveThreadMain() std::unique_lock lockGuard(mutex); while (!exiting) { TimePoint nextKeepAlive; - if (keepAliveIntervalMs > 0) { - nextKeepAlive = (lastKeepAliveStart + - std::chrono::milliseconds(keepAliveIntervalMs)); + if (keepAliveInterval.count() > 0) { + nextKeepAlive = lastKeepAliveStart + keepAliveInterval; } else { nextKeepAlive = TimePoint::max(); } @@ -408,10 +444,10 @@ ClientImpl::ClientImpl(const std::map& options) ClientImpl::~ClientImpl() { + exactlyOnceRPCHelper.exit(); eventLoop.exit(); if (eventLoopThread.joinable()) eventLoopThread.join(); - exactlyOnceRPCHelper.exit(); } void diff --git a/Client/ClientImpl.h b/Client/ClientImpl.h index 1186343c..650ef5aa 100644 --- a/Client/ClientImpl.h +++ b/Client/ClientImpl.h @@ -286,9 +286,13 @@ class ClientImpl { TimePoint lastKeepAliveStart; /** * How often session keep-alive requests are sent during periods of - * inactivity, in milliseconds. + * inactivity. */ - uint64_t keepAliveIntervalMs; + std::chrono::milliseconds keepAliveInterval; + /** + * How long to wait for the CloseSession RPC before giving up. + */ + std::chrono::milliseconds sessionCloseTimeout; /** * If set, this is an ongoing keep-alive RPC. This call is canceled to diff --git a/Client/ClientImplTest.cc b/Client/ClientImplTest.cc index c2708520..efcf3773 100644 --- a/Client/ClientImplTest.cc +++ b/Client/ClientImplTest.cc @@ -20,10 +20,12 @@ #include "Client/LeaderRPCMock.h" #include "Core/ProtoBuf.h" #include "Core/StringUtil.h" +#include "Core/Time.h" #include "Protocol/Common.h" #include "RPC/Server.h" #include "RPC/ServiceMock.h" #include "build/Protocol/Client.pb.h" +#include "include/LogCabin/Debug.h" // Most of the tests for ClientImpl are in ClientTest.cc. @@ -32,6 +34,7 @@ namespace { using Core::ProtoBuf::fromString; typedef Client::ClientImpl::TimePoint TimePoint; +using std::chrono::milliseconds; class ClientClientImplExactlyOnceTest : public ::testing::Test { public: @@ -52,6 +55,15 @@ class ClientClientImplExactlyOnceTest : public ::testing::Test { rpcInfo1 = client.exactlyOnceRPCHelper.getRPCInfo(TimePoint::max()); rpcInfo2 = client.exactlyOnceRPCHelper.getRPCInfo(TimePoint::max()); } + + ~ClientClientImplExactlyOnceTest() + { + mockRPC->expect(OpCode::STATE_MACHINE_COMMAND, + fromString( + "close_session { }")); + } + + Client::ClientImpl client; Client::LeaderRPCMock* mockRPC; RPCInfo rpcInfo1; @@ -63,6 +75,35 @@ class ClientClientImplExactlyOnceTest : public ::testing::Test { operator=(const ClientClientImplExactlyOnceTest&) = delete; }; +TEST_F(ClientClientImplExactlyOnceTest, exit_readonly) { + Client::ClientImpl client2; + Client::LeaderRPCMock* mockRPC2 = new Client::LeaderRPCMock(); + client2.leaderRPC = std::unique_ptr(mockRPC2); + EXPECT_EQ(0U, mockRPC2->requestLog.size()); +} + +TEST_F(ClientClientImplExactlyOnceTest, exit_normal) { + mockRPC->expect(OpCode::STATE_MACHINE_COMMAND, + fromString( + "close_session { }")); + client.exactlyOnceRPCHelper.exit(); + mockRPC->popRequest(); + EXPECT_EQ("close_session { client_id: 3 }", *mockRPC->popRequest()); +} + +TEST_F(ClientClientImplExactlyOnceTest, exit_timeout) { + // expect warnings + LogCabin::Core::Debug::setLogPolicy({ + {"Client/ClientImpl.cc", "ERROR"} + }); + client.exactlyOnceRPCHelper.sessionCloseTimeout = + std::chrono::milliseconds(-1); + client.exactlyOnceRPCHelper.exit(); + EXPECT_EQ(1U, mockRPC->requestLog.size()); +} + +// exit with a server that doesn't understand CloseSession is tested as +// ClientClientImplServiceMockTest::exactlyOnceRPCInfo_exit_invalidRequest. TEST_F(ClientClientImplExactlyOnceTest, getRPCInfo) { EXPECT_EQ((std::set{1, 2}), @@ -92,6 +133,10 @@ TEST_F(ClientClientImplExactlyOnceTest, getRPCInfo_timeout) { rpcInfo2 = client2.exactlyOnceRPCHelper.getRPCInfo(TimePoint::max()); EXPECT_EQ(4U, client2.exactlyOnceRPCHelper.clientId); EXPECT_EQ(4U, rpcInfo2.client_id()); + + mockRPC2->expect(OpCode::STATE_MACHINE_COMMAND, + fromString( + "close_session { }")); } TEST_F(ClientClientImplExactlyOnceTest, doneWithRPC) { @@ -125,21 +170,21 @@ TEST_F(ClientClientImplExactlyOnceTest, keepAliveThreadMain_TimingSensitive) { " error: 'err' " "}")); } - client.exactlyOnceRPCHelper.keepAliveIntervalMs = 2; + client.exactlyOnceRPCHelper.keepAliveInterval = milliseconds(2); client.exactlyOnceRPCHelper.keepAliveCV.notify_all(); // in 2ms, 4ms, 6ms, 8ms, 10ms usleep(11000); EXPECT_EQ(6U, mockRPC->requestLog.size()) << disclaimer; // Disable heartbeat. - client.exactlyOnceRPCHelper.keepAliveIntervalMs = 0; + client.exactlyOnceRPCHelper.keepAliveInterval = milliseconds(0); client.exactlyOnceRPCHelper.keepAliveCV.notify_all(); usleep(3000); EXPECT_EQ(6U, mockRPC->requestLog.size()) << disclaimer; // Now enable but "make a request" ourselves to prevent heartbeat. client.exactlyOnceRPCHelper.getRPCInfo(TimePoint::max()); - client.exactlyOnceRPCHelper.keepAliveIntervalMs = 10; + client.exactlyOnceRPCHelper.keepAliveInterval = milliseconds(10); client.exactlyOnceRPCHelper.keepAliveCV.notify_all(); usleep(7500); client.exactlyOnceRPCHelper.getRPCInfo(TimePoint::max()); @@ -183,6 +228,34 @@ class ClientClientImplServiceMockTest : public ClientClientImplTest { std::unique_ptr server; }; +TEST_F(ClientClientImplServiceMockTest, exactlyOnceRPCInfo_exit_invalidRequest) +{ + Protocol::Client::StateMachineCommand::Request request1; + Protocol::Client::StateMachineCommand::Response response1; + request1.mutable_open_session(); + response1.mutable_open_session()->set_client_id(3); + service->reply(Protocol::Client::OpCode::STATE_MACHINE_COMMAND, + request1, response1); + client.exactlyOnceRPCHelper.getRPCInfo(TimePoint::max()); + + Protocol::Client::StateMachineCommand::Request request2; + request2.mutable_close_session()->set_client_id(3); + service->rejectInvalidRequest( + Protocol::Client::OpCode::STATE_MACHINE_COMMAND, + request2); + // expect warning + LogCabin::Core::Debug::setLogPolicy({ + {"Client/ClientImpl.cc", "ERROR"} + }); + client.exactlyOnceRPCHelper.exit(); + + LogCabin::Core::Debug::setLogPolicy({ + {"", "WARNING"} + }); + // prevent destructor from calling CloseSession again + client.exactlyOnceRPCHelper.clientId = 0; +} + TEST_F(ClientClientImplServiceMockTest, getServerInfo) { Protocol::Client::GetServerInfo::Request request; Protocol::Client::GetServerInfo::Response response; @@ -265,6 +338,9 @@ TEST_F(ClientClientImplTest, makeDirectory_timeout) { TimePoint::min()); EXPECT_EQ(Client::Status::TIMEOUT, result.status); EXPECT_EQ("Client-specified timeout elapsed", result.error); + // set client ID to 0 so that the client doesn't try to close its session, + // wait the timeout there, and print a warning. + client.exactlyOnceRPCHelper.clientId = 0; } TEST_F(ClientClientImplTest, listDirectory_timeout) { @@ -301,13 +377,17 @@ class KeepAliveThreadMain_cancel_Helper { TEST_F(ClientClientImplExactlyOnceTest, keepAliveThreadMain_cancel) { + mockRPC->expect(OpCode::STATE_MACHINE_COMMAND, + fromString( + "close_session { }")); client.exactlyOnceRPCHelper.exit(); client.exactlyOnceRPCHelper.exiting = false; mockRPC->expect(OpCode::STATE_MACHINE_COMMAND, fromString( "tree { }")); client.exactlyOnceRPCHelper.lastKeepAliveStart = TimePoint::min(); - client.exactlyOnceRPCHelper.keepAliveIntervalMs = 200; + client.exactlyOnceRPCHelper.keepAliveInterval = + std::chrono::milliseconds(200); KeepAliveThreadMain_cancel_Helper helper(client.exactlyOnceRPCHelper); client.exactlyOnceRPCHelper.mutex.callback = std::ref(helper); client.exactlyOnceRPCHelper.keepAliveThreadMain(); diff --git a/Client/MockClientImpl.cc b/Client/MockClientImpl.cc index 87a79ed1..fc623b01 100644 --- a/Client/MockClientImpl.cc +++ b/Client/MockClientImpl.cc @@ -97,6 +97,8 @@ class TreeLeaderRPC : public LeaderRPCBase { cresponse.mutable_open_session()-> set_client_id(1); return Status::OK; + } else if (crequest.has_close_session()) { + return Status::OK; } } PANIC("Unexpected request: %d %s", diff --git a/Protocol/Client.proto b/Protocol/Client.proto index de5b8726..9b191d35 100644 --- a/Protocol/Client.proto +++ b/Protocol/Client.proto @@ -135,6 +135,26 @@ message OpenSession { } } +/** + * CloseSession state machine command: Terminate a session with the cluster. + * This cleans up any state that the session held; future RPCs on this session + * will have errors. + * \since + * This command was introduced in state machine version 2. State machines + * running version 1 will ignore this command; they only expire sessions + * based on timeouts. + */ +message CloseSession { + message Request { + /** + * The ID assigned to the client as previously returned by OpenSession. + */ + required uint64 client_id = 1; + } + message Response { + } +} + /** * A server in a configuration. Used in the GetConfiguration and * SetConfiguration RPCs. @@ -395,15 +415,18 @@ message StateMachineCommand { message Request { // The following are mutually exclusive. optional OpenSession.Request open_session = 1; + optional CloseSession.Request close_session = 4; optional ReadWriteTree.Request tree = 2; optional AdvanceStateMachineVersion.Request advance_version = 3; } /** - * This is what the state machine outputs for read-write commands from the log. + * This is what the state machine outputs for read-write commands from the + * log. */ message Response { // The following are mutually exclusive. optional OpenSession.Response open_session = 1; + optional CloseSession.Response close_session = 4; optional ReadWriteTree.Response tree = 2; optional AdvanceStateMachineVersion.Response advance_version = 3; } diff --git a/Protocol/ServerStats.proto b/Protocol/ServerStats.proto index 32bf5978..c144a8f6 100644 --- a/Protocol/ServerStats.proto +++ b/Protocol/ServerStats.proto @@ -144,6 +144,7 @@ message ServerStats { optional uint32 max_supported_version = 11; optional uint32 running_version = 12; optional Tree tree = 13; + optional uint64 num_unknown_requests = 14; }; /** diff --git a/RELEASES.md b/RELEASES.md index c28c3e6b..3c14b969 100644 --- a/RELEASES.md +++ b/RELEASES.md @@ -21,6 +21,12 @@ Improvements: - Optimizes setting nextIndex on leaders by capping it to just past the follower's last log index. This helps with followers that are new or have fallen far behind. +- Clients now make a best effort attempt to close their sessions when they shut + down gracefully (issue #116). Before, client sessions were only ever expired + after a timeout. This state could accumulate quickly when running short-lived + clients in a tight loop. Enabling this change requires all servers to be + updated (so that the state machine is updated); new clients talking to old + clusters will issue a warning that they are unable to close their sessions. Bug fixes (high severity): - Fixes packaging up very large AppendEntries requests. Before, it was possible diff --git a/Server/StateMachine.cc b/Server/StateMachine.cc index 1a243e32..22228c5f 100644 --- a/Server/StateMachine.cc +++ b/Server/StateMachine.cc @@ -70,6 +70,7 @@ StateMachine::StateMachine(std::shared_ptr consensus, , childPid(0) , lastIndex(0) , lastUnknownRequestMessage(TimePoint::min()) + , numUnknownRequests(0) , numUnknownRequestsSinceLastMessage(0) , numSnapshotsAttempted(0) , numSnapshotsFailed(0) @@ -121,7 +122,7 @@ StateMachine::query(const Query::Request& request, *response.mutable_tree()); return true; } - warnUnknownRequest(request); + warnUnknownRequest(request, "does not understand the given request"); return false; } @@ -135,6 +136,7 @@ StateMachine::updateServerStats(Protocol::ServerStats& serverStats) const smStats.set_snapshotting(childPid != 0); smStats.set_last_applied(lastIndex); smStats.set_num_sessions(sessions.size()); + smStats.set_num_unknown_requests(numUnknownRequests); smStats.set_num_snapshots_attempted(numSnapshotsAttempted); smStats.set_num_snapshots_failed(numSnapshotsFailed); smStats.set_num_redundant_advance_version_entries( @@ -172,6 +174,7 @@ StateMachine::waitForResponse(uint64_t logIndex, // was applied using getVersion(logIndex), then reply and return true/false // based on that. Existing commands have been around since version 1, so we // skip this check for now. + uint16_t versionThen = getVersion(logIndex); if (command.has_tree()) { const PC::ExactlyOnceRPCInfo& rpcInfo = command.tree().exactly_once(); @@ -201,9 +204,12 @@ StateMachine::waitForResponse(uint64_t logIndex, response.mutable_open_session()-> set_client_id(logIndex); return true; + } else if (versionThen >= 2 && command.has_close_session()) { + response.mutable_close_session(); // no fields to set + return true; } else if (command.has_advance_version()) { response.mutable_advance_version()-> - set_running_version(getVersion(logIndex)); + set_running_version(versionThen); return true; } // don't warnUnknownRequest here, since we already did so in apply() @@ -221,6 +227,7 @@ StateMachine::apply(const RaftConsensus::Entry& entry) PANIC("Failed to parse protobuf for entry %lu", entry.index); } + uint16_t runningVersion = getVersion(entry.index - 1); if (command.has_tree()) { PC::ExactlyOnceRPCInfo rpcInfo = command.tree().exactly_once(); auto it = sessions.find(rpcInfo.client_id()); @@ -251,42 +258,49 @@ StateMachine::apply(const RaftConsensus::Entry& entry) uint64_t clientId = entry.index; Session& session = sessions.insert({clientId, {}}).first->second; session.lastModified = entry.clusterTime; + } else if (command.has_close_session()) { + if (runningVersion >= 2) { + sessions.erase(command.close_session().client_id()); + } else { + // Command is ignored in version < 2. + warnUnknownRequest(command, "may not process the given request, " + "which was introduced in version 2"); + } } else if (command.has_advance_version()) { uint16_t requested = Core::Util::downCast( command.advance_version(). requested_version()); - uint16_t running = getVersion(entry.index - 1); - if (requested < running) { + if (requested < runningVersion) { WARNING("Rejecting downgrade of state machine version " "(running version %u but command at log index %lu wants " "to switch to version %u)", - running, + runningVersion, entry.index, requested); ++numRejectedAdvanceVersionEntries; - } else if (requested > running) { + } else if (requested > runningVersion) { if (requested > MAX_SUPPORTED_VERSION) { PANIC("Cannot upgrade state machine to version %u (from %u) " "because this code only supports up to version %u", requested, - running, + runningVersion, MAX_SUPPORTED_VERSION); } else { - // someday, maybe we'll get here - // versionHistory.insert(...) - PANIC("state machine version > 1 not supported"); + NOTICE("Upgrading state machine to version %u (from %u)", + requested, + runningVersion); + versionHistory.insert({entry.index, requested}); } ++numSuccessfulAdvanceVersionEntries; - } else { // requested == running + } else { // requested == runningVersion // nothing to do // If this stat is high, see note in RaftConsensus.cc. ++numRedundantAdvanceVersionEntries; } ++numTotalAdvanceVersionEntries; } else { // unknown command - // This could be because the state machine hasn't been upgraded yet to - // handle the command. These are (deterministically) ignored by all - // state machines running the current version. - warnUnknownRequest(command); + // This is (deterministically) ignored by all state machines running + // the current version. + warnUnknownRequest(command, "does not understand the given request"); } } @@ -688,22 +702,25 @@ StateMachine::takeSnapshot(uint64_t lastIncludedIndex, void StateMachine::warnUnknownRequest( - const google::protobuf::Message& request) const + const google::protobuf::Message& request, + const char* reason) const { + ++numUnknownRequests; TimePoint now = Clock::now(); if (lastUnknownRequestMessage + unknownRequestMessageBackoff < now) { lastUnknownRequestMessage = now; if (numUnknownRequestsSinceLastMessage > 0) { - WARNING("This version of the state machine (%u) does not " - "understand the given request (and %lu similar warnings " + WARNING("This version of the state machine (%u) %s " + "(and %lu similar warnings " "were suppressed since the last message): %s", getVersion(~0UL), + reason, numUnknownRequestsSinceLastMessage, Core::ProtoBuf::dumpString(request).c_str()); } else { - WARNING("This version of the state machine (%u) does not " - "understand the given request: %s", + WARNING("This version of the state machine (%u) %s: %s", getVersion(~0UL), + reason, Core::ProtoBuf::dumpString(request).c_str()); } numUnknownRequestsSinceLastMessage = 0; diff --git a/Server/StateMachine.h b/Server/StateMachine.h index 17eee1cc..6d5fa18d 100644 --- a/Server/StateMachine.h +++ b/Server/StateMachine.h @@ -38,6 +38,11 @@ class RaftConsensus; /** * Interprets and executes operations that have been committed into the Raft * log. + * + * Version history: + * - Version 1 of the State Machine shipped with LogCabin v1.0.0. + * - Version 2 added the CloseSession command, which clients can use when they + * gracefully shut down. */ class StateMachine { public: @@ -54,7 +59,7 @@ class StateMachine { * This state machine code can behave like all versions between * MIN_SUPPORTED_VERSION and MAX_SUPPORTED_VERSION, inclusive. */ - MAX_SUPPORTED_VERSION = 1, + MAX_SUPPORTED_VERSION = 2, }; @@ -200,8 +205,15 @@ class StateMachine { * Called to log a debug message if appropriate when the state machine * encounters a query or command that is not understood by the current * running version. + * \param request + * Problematic command/query. + * \param reason + * Explains why 'request' is problematic. Should complete the sentence + * "This version of the state machine (%lu) " + reason, and it should + * not contain end punctuation. */ - void warnUnknownRequest(const google::protobuf::Message& request) const; + void warnUnknownRequest(const google::protobuf::Message& request, + const char* reason) const; /** * Consensus module from which this state machine pulls commands and @@ -304,6 +316,13 @@ class StateMachine { */ mutable TimePoint lastUnknownRequestMessage; + /** + * Total number of commands/queries that this state machine either did not + * understand or could not process because they were introduced in a newer + * version. + */ + mutable uint64_t numUnknownRequests; + /** * The number of debug messages suppressed by warnUnknownRequest() since * lastUnknownRequestMessage. Used to prevent spamming the debug log. diff --git a/Server/StateMachineTest.cc b/Server/StateMachineTest.cc index 4e9218a2..b25f688b 100644 --- a/Server/StateMachineTest.cc +++ b/Server/StateMachineTest.cc @@ -194,6 +194,21 @@ TEST_F(ServerStateMachineTest, waitForResponse_openSession) response); } +TEST_F(ServerStateMachineTest, waitForResponse_closeSession) +{ + stateMachine->lastIndex = 3; + StateMachine::Command::Request request; + request.mutable_close_session()->set_client_id(3); + StateMachine::Command::Response response; + stateMachine->versionHistory.insert({3, 2}); + EXPECT_FALSE(stateMachine->waitForResponse(2, request, response)); + EXPECT_FALSE(response.has_close_session()); + EXPECT_TRUE(stateMachine->waitForResponse(3, request, response)); + EXPECT_EQ("close_session { " + "}", + response); +} + TEST_F(ServerStateMachineTest, waitForResponse_advanceVersion) { StateMachine::Command::Request request; @@ -302,34 +317,85 @@ TEST_F(ServerStateMachineTest, apply_openSession) EXPECT_EQ(0U, session.responses.size()); } +TEST_F(ServerStateMachineTest, apply_closeSession) +{ + stateMachine->sessions.insert({2, {}}); + stateMachine->sessions.insert({3, {}}); + stateMachine->sessions.insert({4, {}}); + StateMachine::Command::Request command; + command.mutable_close_session()->set_client_id(3); + + RaftConsensus::Entry entry; + entry.index = 6; + entry.type = RaftConsensus::Entry::DATA; + entry.command = serialize(command); + entry.clusterTime = 2; + + + // first apply will have no effect (only warning) because state machine + // version 1 does not support the CloseSession command + Core::Debug::setLogPolicy({ + {"Server/StateMachine.cc", "ERROR"}, + {"", "WARNING"}, + }); + stateMachine->versionHistory.insert({4, 1}); + stateMachine->apply(entry); + ASSERT_EQ((std::vector{2, 3, 4}), + Core::STLUtil::sorted( + Core::STLUtil::getKeys(stateMachine->sessions))); + Core::Debug::setLogPolicy({ + {"", "WARNING"}, + }); + + // second apply will work + stateMachine->versionHistory.insert({5, 2}); + stateMachine->apply(entry); + ASSERT_EQ((std::vector{2, 4}), + Core::STLUtil::sorted( + Core::STLUtil::getKeys(stateMachine->sessions))); + + // third apply will have no effect since the session was already closed + stateMachine->apply(entry); + ASSERT_EQ((std::vector{2, 4}), + Core::STLUtil::sorted( + Core::STLUtil::getKeys(stateMachine->sessions))); +} + + TEST_F(ServerStateMachineTest, apply_advanceVersion) { - StateMachine::Command::Request command = - Core::ProtoBuf::fromString( - "advance_version: { " - " requested_version: 1 " - "}"); RaftConsensus::Entry entry; entry.index = 6; entry.type = RaftConsensus::Entry::DATA; entry.clusterTime = 2; + // stay at version 1 + StateMachine::Command::Request command; + command.mutable_advance_version()->set_requested_version(1); entry.command = serialize(command); stateMachine->apply(entry); stateMachine->apply(entry); stateMachine->apply(entry); // should silently succeed + EXPECT_EQ(1U, stateMachine->getVersion(10000)); - command = Core::ProtoBuf::fromString( - "advance_version: { " - " requested_version: 1 " - "}"); + // up to version 2 + command.mutable_advance_version()->set_requested_version(2); + entry.command = serialize(command); + stateMachine->apply(entry); + EXPECT_EQ(2U, stateMachine->getVersion(10000)); + + // downgrade to version 1 should give warning + command.mutable_advance_version()->set_requested_version(1); entry.command = serialize(command); Core::Debug::setLogPolicy({ {"Server/StateMachine.cc", "ERROR"}, {"", "WARNING"}, }); - stateMachine->apply(entry); // expect warning - EXPECT_EQ(1U, stateMachine->getVersion(10000)); + stateMachine->apply(entry); + Core::Debug::setLogPolicy({ + {"", "WARNING"}, + }); + EXPECT_EQ(2U, stateMachine->getVersion(10000)); } TEST_F(ServerStateMachineTest, apply_unknown) @@ -539,12 +605,12 @@ TEST_F(ServerStateMachineTest, loadSnapshot_unknownFormatVersion) TEST_F(ServerStateMachineTest, loadVersionHistory_unknownVersion) { - stateMachine->versionHistory.insert({1, 2}); + stateMachine->versionHistory.insert({1, 3}); SnapshotStateMachine::Header header; stateMachine->serializeVersionHistory(header); EXPECT_DEATH(stateMachine->loadVersionHistory(header), - "State machine version read from snapshot was 2, but this " - "code only supports 1 through 1"); + "State machine version read from snapshot was 3, but this " + "code only supports 1 through 2"); } diff --git a/include/LogCabin/Client.h b/include/LogCabin/Client.h index 390f665b..3a05ebda 100644 --- a/include/LogCabin/Client.h +++ b/include/LogCabin/Client.h @@ -586,6 +586,14 @@ class Cluster { * - clusterUUID (see sample.conf) * - tcpHeartbeatTimeoutMilliseconds (see sample.conf) * - tcpConnectTimeoutMilliseconds (see sample.conf) + * - sessionCloseTimeoutMilliseconds: + * This Cluster object opens a session with LogCabin before issuing + * any read-write commands to the replicated state machine. When this + * Cluster object is destroyed, it will attempt to close its session + * gracefully. This timeout controls the number of milliseconds that + * the client will wait until giving up on the close session RPC. It + * defaults to tcpConnectTimeoutMilliseconds, since they should be on + * the same order of magnitude. */ typedef std::map Options;