From c5f3fa06a9281122fdc175ca12c49ec28a4a8d9c Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 4 Oct 2018 15:59:10 +0300 Subject: [PATCH] rpc: connection-level resource isolation Currently, rpc allows a scheduling_group to be configured on a per-verb basis. This method has some drawbacks: - if different verbs in the same connection are executed under different groups, head-of-line blocking can cause a low-priority group to block a high priority group by consuming all of the units in _resources_availble - the same verb can be executed under only one group, making multi-tenancy harder. This patch adds a new method, which isolates a entire connection rather than a verb. The new method improves upon the old method by: - allowing the same verb to be executed under different groups (if called under different connections) - executing more server code under scheduling groups - preventing head-of-line blocking (requires more work to have per-group memory reservations) Eventually, we'll deprecate and remove the old method. Message-Id: <20181004125910.13502-1-avi@scylladb.com> --- doc/rpc.md | 14 +++++++ rpc/rpc.cc | 21 ++++++++++- rpc/rpc.hh | 20 +++++++++- tests/rpc_test.cc | 93 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 145 insertions(+), 3 deletions(-) diff --git a/doc/rpc.md b/doc/rpc.md index b92f8c8236e..f4bb1367c97 100644 --- a/doc/rpc.md +++ b/doc/rpc.md @@ -72,6 +72,20 @@ Actual negotiation looks like this: more messages in each direction. Stream connection should be explicitly closed by both client and server. Closing is done by sending special EOS frame (described below). + +#### Isolation + feature number: 4 + uint32_t isolation_cookie_len + uint8_t isolation_cookie[len] + + The `isolation_cookie` field is used by the server to select a + `seastar::scheduling_group` (or equivalent in another implementation) that + will run this connection. In the future it will also be used for rpc buffer + isolation, to avoid rpc traffic in one isolation group from starving another. + + The server does not directly assign meaning to values of `isolation_cookie`; + instead, the interpretation is left to user code. + ##### Compressed frame format uint32_t len uint8_t compressed_data[len] diff --git a/rpc/rpc.cc b/rpc/rpc.cc index 053b175c096..8dabca6ac6f 100644 --- a/rpc/rpc.cc +++ b/rpc/rpc.cc @@ -628,6 +628,9 @@ namespace rpc { if (_options.stream_parent) { features[protocol_features::STREAM_PARENT] = serialize_connection_id(_options.stream_parent); } + if (!_options.isolation_cookie.empty()) { + features[protocol_features::ISOLATION] = _options.isolation_cookie; + } send_negotiation_frame(std::move(features)); @@ -751,6 +754,12 @@ namespace rpc { } break; } + case protocol_features::ISOLATION: { + auto&& isolation_cookie = e.second; + _isolation_config = _server._limits.isolate_connection(isolation_cookie); + ret.emplace(e); + break; + } default: // nothing to do ; @@ -832,6 +841,8 @@ namespace rpc { future<> server::connection::process() { return negotiate_protocol(_read_buf).then([this] () mutable { + auto sg = _isolation_config ? _isolation_config->sched_group : current_scheduling_group(); + return with_scheduling_group(sg, [this] { send_loop(); return do_until([this] { return _read_buf.eof() || _error; }, [this] () mutable { if (is_stream()) { @@ -848,7 +859,10 @@ namespace rpc { } auto h = _server._proto->get_handler(type); if (h) { - return with_scheduling_group(h->sg, std::ref(h->func), shared_from_this(), timeout, msg_id, std::move(data.value())); + // If the new method of per-connection scheduling group was used, honor it. + // Otherwise, use the old per-handler scheduling group. + auto sg = _isolation_config ? _isolation_config->sched_group : h->sg; + return with_scheduling_group(sg, std::ref(h->func), shared_from_this(), timeout, msg_id, std::move(data.value())); } else { return wait_for_resources(28, timeout).then([this, timeout, msg_id, type] (auto permit) { // send unknown_verb exception back @@ -870,6 +884,7 @@ namespace rpc { } }); }); + }); }).then_wrapped([this] (future<> f) { if (f.failed()) { log_exception(*this, sprint("server%s connection dropped", is_stream() ? " stream" : "").c_str(), f.get_exception()); @@ -981,6 +996,10 @@ namespace rpc { return fprint(os, "%d", domain._id); } + isolation_config default_isolate_connection(sstring isolation_cookie) { + return isolation_config{}; + } + } } diff --git a/rpc/rpc.hh b/rpc/rpc.hh index f7925968950..a08aae70799 100644 --- a/rpc/rpc.hh +++ b/rpc/rpc.hh @@ -59,6 +59,15 @@ struct SerializerConcept { static constexpr char rpc_magic[] = "SSTARRPC"; +/// Specifies resource isolation for a connection. +struct isolation_config { + /// Specifies a scheduling group under which the connection (and all its + /// verb handlers) will execute. + scheduling_group sched_group = current_scheduling_group(); +}; + +/// Default isolation configuration - run everything in the default scheduling group. +isolation_config default_isolate_connection(sstring isolation_cookie); /// \brief Resource limits for an RPC server /// @@ -75,6 +84,9 @@ struct resource_limits { size_t basic_request_size = 0; ///< Minimum request footprint in memory unsigned bloat_factor = 1; ///< Serialized size multiplied by this to estimate memory used by request size_t max_memory = rpc_semaphore::max_counter(); ///< Maximum amount of memory that may be consumed by all requests + /// Configures isolation for a connection based on its isolation cookie. May throw, + /// in which case the connection will be terminated. + std::function isolate_connection = default_isolate_connection; }; struct client_options { @@ -83,7 +95,10 @@ struct client_options { compressor::factory* compressor_factory = nullptr; bool send_timeout_data = true; connection_id stream_parent = invalid_connection_id; - + /// Configures how this connection is isolated from other connection on the same server. + /// + /// \see resource_limits::isolate_connection + sstring isolation_cookie; }; // RPC call that passes stream connection id as a parameter @@ -130,6 +145,7 @@ enum class protocol_features : uint32_t { TIMEOUT = 1, CONNECTION_ID = 2, STREAM_PARENT = 3, + ISOLATION = 4, }; // internal representation of feature data @@ -444,7 +460,7 @@ public: server& _server; client_info _info; connection_id _parent_id = invalid_connection_id; - + compat::optional _isolation_config; private: future<> negotiate_protocol(input_stream& in); future, uint64_t, int64_t, compat::optional> diff --git a/tests/rpc_test.cc b/tests/rpc_test.cc index 38c931d74e5..68444e6e003 100644 --- a/tests/rpc_test.cc +++ b/tests/rpc_test.cc @@ -28,6 +28,7 @@ #include "test-utils.hh" #include "core/thread.hh" #include "core/sleep.hh" +#include "util/defer.hh" using namespace seastar; @@ -451,3 +452,95 @@ SEASTAR_TEST_CASE(test_rpc_scheduling) { }); }); } + +SEASTAR_THREAD_TEST_CASE(test_rpc_scheduling_connection_based) { + auto sg1 = create_scheduling_group("sg1", 100).get0(); + auto sg1_kill = defer([&] { destroy_scheduling_group(sg1).get(); }); + auto sg2 = create_scheduling_group("sg2", 100).get0(); + auto sg2_kill = defer([&] { destroy_scheduling_group(sg2).get(); }); + rpc::resource_limits limits; + limits.isolate_connection = [sg1, sg2] (sstring cookie) { + auto sg = current_scheduling_group(); + if (cookie == "sg1") { + sg = sg1; + } else if (cookie == "sg2") { + sg = sg2; + } + rpc::isolation_config cfg; + cfg.sched_group = sg; + return cfg; + }; + with_rpc_env(limits, {}, true, false, [sg1, sg2] (test_rpc_proto& proto, test_rpc_proto::server& s, make_socket_fn make_socket) { + return async([&proto, make_socket, sg1, sg2] { + rpc::client_options co1; + co1.isolation_cookie = "sg1"; + test_rpc_proto::client c1(proto, co1, make_socket(), ipv4_addr()); + rpc::client_options co2; + co2.isolation_cookie = "sg2"; + test_rpc_proto::client c2(proto, co2, make_socket(), ipv4_addr()); + auto call = proto.register_handler(1, [sg1, sg2] (int which) mutable { + scheduling_group expected; + if (which == 1) { + expected = sg1; + } else if (which == 2) { + expected = sg2; + } + BOOST_REQUIRE(current_scheduling_group() == expected); + return make_ready_future<>(); + }); + call(c1, 1).get(); + call(c2, 2).get(); + c1.stop().get(); + c2.stop().get(); + }); + }).get(); +} + +SEASTAR_THREAD_TEST_CASE(test_rpc_scheduling_connection_based_compatibility) { + auto sg1 = create_scheduling_group("sg1", 100).get0(); + auto sg1_kill = defer([&] { destroy_scheduling_group(sg1).get(); }); + auto sg2 = create_scheduling_group("sg2", 100).get0(); + auto sg2_kill = defer([&] { destroy_scheduling_group(sg2).get(); }); + rpc::resource_limits limits; + limits.isolate_connection = [sg1, sg2] (sstring cookie) { + auto sg = current_scheduling_group(); + if (cookie == "sg1") { + sg = sg1; + } else if (cookie == "sg2") { + sg = sg2; + } + rpc::isolation_config cfg; + cfg.sched_group = sg; + return cfg; + }; + with_rpc_env(limits, {}, true, false, [sg1, sg2] (test_rpc_proto& proto, test_rpc_proto::server& s, make_socket_fn make_socket) { + return async([&proto, make_socket, sg1, sg2] { + rpc::client_options co1; + co1.isolation_cookie = "sg1"; + test_rpc_proto::client c1(proto, co1, make_socket(), ipv4_addr()); + rpc::client_options co2; + co2.isolation_cookie = "sg2"; + test_rpc_proto::client c2(proto, co2, make_socket(), ipv4_addr()); + // An old client, that doesn't have an isolation cookie + rpc::client_options co3; + test_rpc_proto::client c3(proto, co3, make_socket(), ipv4_addr()); + // A server that uses sg1 if the client is old + auto call = proto.register_handler(1, sg1, [sg1, sg2] (int which) mutable { + scheduling_group expected; + if (which == 1) { + expected = sg1; + } else if (which == 2) { + expected = sg2; + } + BOOST_REQUIRE(current_scheduling_group() == expected); + return make_ready_future<>(); + }); + call(c1, 1).get(); + call(c2, 2).get(); + call(c3, 1).get(); + c1.stop().get(); + c2.stop().get(); + c3.stop().get(); + }); + }).get(); +}