Skip to content

Commit

Permalink
rpc: connection-level resource isolation
Browse files Browse the repository at this point in the history
Currently, rpc allows a scheduling_group to be configured on a per-verb basis.
This method has some drawbacks:
 - if different verbs in the same connection are executed under different
   groups, head-of-line blocking can cause a low-priority group to block a
   high priority group by consuming all of the units in _resources_availble
 - the same verb can be executed under only one group, making multi-tenancy
   harder.

This patch adds a new method, which isolates a entire connection rather than
a verb. The new method improves upon the old method by:
 - allowing the same verb to be executed under different groups (if called
   under different connections)
 - executing more server code under scheduling groups
 - preventing head-of-line blocking (requires more work to have per-group
   memory reservations)

Eventually, we'll deprecate and remove the old method.
Message-Id: <20181004125910.13502-1-avi@scylladb.com>
  • Loading branch information
avikivity authored and tgrabiec committed Oct 5, 2018
1 parent 71e914e commit c5f3fa0
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 3 deletions.
14 changes: 14 additions & 0 deletions doc/rpc.md
Expand Up @@ -72,6 +72,20 @@ Actual negotiation looks like this:
more messages in each direction. Stream connection should be explicitly closed by both client and
server. Closing is done by sending special EOS frame (described below).


#### Isolation
feature number: 4
uint32_t isolation_cookie_len
uint8_t isolation_cookie[len]

The `isolation_cookie` field is used by the server to select a
`seastar::scheduling_group` (or equivalent in another implementation) that
will run this connection. In the future it will also be used for rpc buffer
isolation, to avoid rpc traffic in one isolation group from starving another.

The server does not directly assign meaning to values of `isolation_cookie`;
instead, the interpretation is left to user code.

##### Compressed frame format
uint32_t len
uint8_t compressed_data[len]
Expand Down
21 changes: 20 additions & 1 deletion rpc/rpc.cc
Expand Up @@ -628,6 +628,9 @@ namespace rpc {
if (_options.stream_parent) {
features[protocol_features::STREAM_PARENT] = serialize_connection_id(_options.stream_parent);
}
if (!_options.isolation_cookie.empty()) {
features[protocol_features::ISOLATION] = _options.isolation_cookie;
}

send_negotiation_frame(std::move(features));

Expand Down Expand Up @@ -751,6 +754,12 @@ namespace rpc {
}
break;
}
case protocol_features::ISOLATION: {
auto&& isolation_cookie = e.second;
_isolation_config = _server._limits.isolate_connection(isolation_cookie);
ret.emplace(e);
break;
}
default:
// nothing to do
;
Expand Down Expand Up @@ -832,6 +841,8 @@ namespace rpc {

future<> server::connection::process() {
return negotiate_protocol(_read_buf).then([this] () mutable {
auto sg = _isolation_config ? _isolation_config->sched_group : current_scheduling_group();
return with_scheduling_group(sg, [this] {
send_loop();
return do_until([this] { return _read_buf.eof() || _error; }, [this] () mutable {
if (is_stream()) {
Expand All @@ -848,7 +859,10 @@ namespace rpc {
}
auto h = _server._proto->get_handler(type);
if (h) {
return with_scheduling_group(h->sg, std::ref(h->func), shared_from_this(), timeout, msg_id, std::move(data.value()));
// If the new method of per-connection scheduling group was used, honor it.
// Otherwise, use the old per-handler scheduling group.
auto sg = _isolation_config ? _isolation_config->sched_group : h->sg;
return with_scheduling_group(sg, std::ref(h->func), shared_from_this(), timeout, msg_id, std::move(data.value()));
} else {
return wait_for_resources(28, timeout).then([this, timeout, msg_id, type] (auto permit) {
// send unknown_verb exception back
Expand All @@ -870,6 +884,7 @@ namespace rpc {
}
});
});
});
}).then_wrapped([this] (future<> f) {
if (f.failed()) {
log_exception(*this, sprint("server%s connection dropped", is_stream() ? " stream" : "").c_str(), f.get_exception());
Expand Down Expand Up @@ -981,6 +996,10 @@ namespace rpc {
return fprint(os, "%d", domain._id);
}

isolation_config default_isolate_connection(sstring isolation_cookie) {
return isolation_config{};
}

}

}
20 changes: 18 additions & 2 deletions rpc/rpc.hh
Expand Up @@ -59,6 +59,15 @@ struct SerializerConcept {

static constexpr char rpc_magic[] = "SSTARRPC";

/// Specifies resource isolation for a connection.
struct isolation_config {
/// Specifies a scheduling group under which the connection (and all its
/// verb handlers) will execute.
scheduling_group sched_group = current_scheduling_group();
};

/// Default isolation configuration - run everything in the default scheduling group.
isolation_config default_isolate_connection(sstring isolation_cookie);

/// \brief Resource limits for an RPC server
///
Expand All @@ -75,6 +84,9 @@ struct resource_limits {
size_t basic_request_size = 0; ///< Minimum request footprint in memory
unsigned bloat_factor = 1; ///< Serialized size multiplied by this to estimate memory used by request
size_t max_memory = rpc_semaphore::max_counter(); ///< Maximum amount of memory that may be consumed by all requests
/// Configures isolation for a connection based on its isolation cookie. May throw,
/// in which case the connection will be terminated.
std::function<isolation_config (sstring isolation_cookie)> isolate_connection = default_isolate_connection;
};

struct client_options {
Expand All @@ -83,7 +95,10 @@ struct client_options {
compressor::factory* compressor_factory = nullptr;
bool send_timeout_data = true;
connection_id stream_parent = invalid_connection_id;

/// Configures how this connection is isolated from other connection on the same server.
///
/// \see resource_limits::isolate_connection
sstring isolation_cookie;
};

// RPC call that passes stream connection id as a parameter
Expand Down Expand Up @@ -130,6 +145,7 @@ enum class protocol_features : uint32_t {
TIMEOUT = 1,
CONNECTION_ID = 2,
STREAM_PARENT = 3,
ISOLATION = 4,
};

// internal representation of feature data
Expand Down Expand Up @@ -444,7 +460,7 @@ public:
server& _server;
client_info _info;
connection_id _parent_id = invalid_connection_id;

compat::optional<isolation_config> _isolation_config;
private:
future<> negotiate_protocol(input_stream<char>& in);
future<compat::optional<uint64_t>, uint64_t, int64_t, compat::optional<rcv_buf>>
Expand Down
93 changes: 93 additions & 0 deletions tests/rpc_test.cc
Expand Up @@ -28,6 +28,7 @@
#include "test-utils.hh"
#include "core/thread.hh"
#include "core/sleep.hh"
#include "util/defer.hh"

using namespace seastar;

Expand Down Expand Up @@ -451,3 +452,95 @@ SEASTAR_TEST_CASE(test_rpc_scheduling) {
});
});
}

SEASTAR_THREAD_TEST_CASE(test_rpc_scheduling_connection_based) {
auto sg1 = create_scheduling_group("sg1", 100).get0();
auto sg1_kill = defer([&] { destroy_scheduling_group(sg1).get(); });
auto sg2 = create_scheduling_group("sg2", 100).get0();
auto sg2_kill = defer([&] { destroy_scheduling_group(sg2).get(); });
rpc::resource_limits limits;
limits.isolate_connection = [sg1, sg2] (sstring cookie) {
auto sg = current_scheduling_group();
if (cookie == "sg1") {
sg = sg1;
} else if (cookie == "sg2") {
sg = sg2;
}
rpc::isolation_config cfg;
cfg.sched_group = sg;
return cfg;
};
with_rpc_env(limits, {}, true, false, [sg1, sg2] (test_rpc_proto& proto, test_rpc_proto::server& s, make_socket_fn make_socket) {
return async([&proto, make_socket, sg1, sg2] {
rpc::client_options co1;
co1.isolation_cookie = "sg1";
test_rpc_proto::client c1(proto, co1, make_socket(), ipv4_addr());
rpc::client_options co2;
co2.isolation_cookie = "sg2";
test_rpc_proto::client c2(proto, co2, make_socket(), ipv4_addr());
auto call = proto.register_handler(1, [sg1, sg2] (int which) mutable {
scheduling_group expected;
if (which == 1) {
expected = sg1;
} else if (which == 2) {
expected = sg2;
}
BOOST_REQUIRE(current_scheduling_group() == expected);
return make_ready_future<>();
});
call(c1, 1).get();
call(c2, 2).get();
c1.stop().get();
c2.stop().get();
});
}).get();
}

SEASTAR_THREAD_TEST_CASE(test_rpc_scheduling_connection_based_compatibility) {
auto sg1 = create_scheduling_group("sg1", 100).get0();
auto sg1_kill = defer([&] { destroy_scheduling_group(sg1).get(); });
auto sg2 = create_scheduling_group("sg2", 100).get0();
auto sg2_kill = defer([&] { destroy_scheduling_group(sg2).get(); });
rpc::resource_limits limits;
limits.isolate_connection = [sg1, sg2] (sstring cookie) {
auto sg = current_scheduling_group();
if (cookie == "sg1") {
sg = sg1;
} else if (cookie == "sg2") {
sg = sg2;
}
rpc::isolation_config cfg;
cfg.sched_group = sg;
return cfg;
};
with_rpc_env(limits, {}, true, false, [sg1, sg2] (test_rpc_proto& proto, test_rpc_proto::server& s, make_socket_fn make_socket) {
return async([&proto, make_socket, sg1, sg2] {
rpc::client_options co1;
co1.isolation_cookie = "sg1";
test_rpc_proto::client c1(proto, co1, make_socket(), ipv4_addr());
rpc::client_options co2;
co2.isolation_cookie = "sg2";
test_rpc_proto::client c2(proto, co2, make_socket(), ipv4_addr());
// An old client, that doesn't have an isolation cookie
rpc::client_options co3;
test_rpc_proto::client c3(proto, co3, make_socket(), ipv4_addr());
// A server that uses sg1 if the client is old
auto call = proto.register_handler(1, sg1, [sg1, sg2] (int which) mutable {
scheduling_group expected;
if (which == 1) {
expected = sg1;
} else if (which == 2) {
expected = sg2;
}
BOOST_REQUIRE(current_scheduling_group() == expected);
return make_ready_future<>();
});
call(c1, 1).get();
call(c2, 2).get();
call(c3, 1).get();
c1.stop().get();
c2.stop().get();
c3.stop().get();
});
}).get();
}

0 comments on commit c5f3fa0

Please sign in to comment.