Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shard aware connections v2 #8

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions src/v/cluster/cluster_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,21 @@ patch<broker_ptr> calculate_changed_brokers(
return patch;
}

std::vector<ss::shard_id> virtual_nodes(model::node_id node) {
std::vector<ss::shard_id>
virtual_nodes(model::node_id self, model::node_id node) {
std::set<ss::shard_id> owner_shards;
for (ss::shard_id i = 0; i < ss::smp::count; ++i) {
auto shard = rpc::connection_cache::shard_for(i, node);
auto shard = rpc::connection_cache::shard_for(self, i, node);
owner_shards.insert(shard);
}
return std::vector<ss::shard_id>(owner_shards.begin(), owner_shards.end());
}

ss::future<> remove_broker_client(
ss::sharded<rpc::connection_cache>& clients, model::node_id id) {
auto shards = virtual_nodes(id);
model::node_id self,
ss::sharded<rpc::connection_cache>& clients,
model::node_id id) {
auto shards = virtual_nodes(self, id);
vlog(clusterlog.debug, "Removing {} TCP client from shards {}", id, shards);
return ss::do_with(
std::move(shards), [id, &clients](std::vector<ss::shard_id>& i) {
Expand Down Expand Up @@ -123,11 +126,12 @@ ss::future<> add_one_tcp_client(
}

ss::future<> update_broker_client(
model::node_id self,
ss::sharded<rpc::connection_cache>& clients,
model::node_id node,
unresolved_address addr,
config::tls_config tls_config) {
auto shards = virtual_nodes(node);
auto shards = virtual_nodes(self, node);
vlog(clusterlog.debug, "Adding {} TCP client on shards:{}", node, shards);
return ss::do_with(
std::move(shards),
Expand Down
13 changes: 8 additions & 5 deletions src/v/cluster/cluster_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@ std::vector<topic_result> create_topic_results(
const std::vector<model::topic_namespace>& topics, errc error_code);

ss::future<> update_broker_client(
model::node_id,
ss::sharded<rpc::connection_cache>&,
model::node_id node,
unresolved_address addr,
config::tls_config);
ss::future<>
remove_broker_client(ss::sharded<rpc::connection_cache>&, model::node_id);

ss::future<> remove_broker_client(
model::node_id, ss::sharded<rpc::connection_cache>&, model::node_id);

// clang-format off
template<typename Proto, typename Func>
Expand All @@ -56,16 +58,17 @@ CONCEPT(requires requires(Func&& f, Proto c) {
})
// clang-format on
auto with_client(
model::node_id self,
ss::sharded<rpc::connection_cache>& cache,
model::node_id id,
unresolved_address addr,
config::tls_config tls_config,
Func&& f) {
return update_broker_client(
cache, id, std::move(addr), std::move(tls_config))
.then([id, &cache, f = std::forward<Func>(f)]() mutable {
self, cache, id, std::move(addr), std::move(tls_config))
.then([id, self, &cache, f = std::forward<Func>(f)]() mutable {
return cache.local().with_node_client<Proto, Func>(
ss::this_shard_id(), id, std::forward<Func>(f));
self, ss::this_shard_id(), id, std::forward<Func>(f));
});
}

Expand Down
7 changes: 6 additions & 1 deletion src/v/cluster/members_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ ss::future<> members_manager::update_connections(patch<broker_ptr> diff) {
diff.deletions,
[this](broker_ptr removed) {
return remove_broker_client(
_connection_cache, removed->id());
_self.id(), _connection_cache, removed->id());
})
.then([this, &diff] {
return ss::do_for_each(diff.additions, [this](broker_ptr b) {
Expand All @@ -170,6 +170,7 @@ ss::future<> members_manager::update_connections(patch<broker_ptr> diff) {
return ss::make_ready_future<>();
}
return update_broker_client(
_self.id(),
_connection_cache,
b->id(),
b->rpc_address(),
Expand Down Expand Up @@ -201,6 +202,7 @@ ss::future<result<join_reply>> members_manager::dispatch_join_to_remote(
target.addr);

return with_client<controller_client_protocol>(
_self.id(),
_connection_cache,
target.id,
target.addr,
Expand Down Expand Up @@ -284,6 +286,7 @@ auto members_manager::dispatch_rpc_to_leader(Func&& f) {
}

return with_client<controller_client_protocol, Func>(
_self.id(),
_connection_cache,
*leader_id,
leader->rpc_address(),
Expand Down Expand Up @@ -379,6 +382,7 @@ members_manager::do_dispatch_configuration_update(
}

return with_client<controller_client_protocol>(
_self.id(),
_connection_cache,
target.id(),
target.rpc_address(),
Expand Down Expand Up @@ -479,6 +483,7 @@ members_manager::handle_configuration_update_request(

auto tout = ss::lowres_clock::now() + _join_timeout;
return with_client<controller_client_protocol>(
_self.id(),
_connection_cache,
*leader_id,
(*leader)->rpc_address(),
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/metadata_dissemination_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ metadata_dissemination_service::dispatch_get_metadata_update(
vlog(clusterlog.debug, "Requesting metadata update from node {}", id);
return _clients.local()
.with_node_client<metadata_dissemination_rpc_client_protocol>(
_self,
ss::this_shard_id(),
id,
[this](metadata_dissemination_rpc_client_protocol c) {
Expand Down Expand Up @@ -293,6 +294,7 @@ ss::future<> metadata_dissemination_service::dispatch_one_update(
model::node_id target_id, update_retry_meta& meta) {
return _clients.local()
.with_node_client<metadata_dissemination_rpc_client_protocol>(
_self,
ss::this_shard_id(),
target_id,
[this, &meta, target_id](
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/topics_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ topics_frontend::dispatch_create_to_leader(
vlog(clusterlog.trace, "Dispatching create topics to {}", leader);
return _connections.local()
.with_node_client<cluster::controller_client_protocol>(
_self,
ss::this_shard_id(),
leader,
[topics, timeout](controller_client_protocol cp) mutable {
Expand Down
2 changes: 1 addition & 1 deletion src/v/raft/group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ group_manager::group_manager(
ss::sharded<storage::api>& storage)
: _self(self)
, _disk_timeout(disk_timeout)
, _client(make_rpc_client_protocol(clients))
, _client(make_rpc_client_protocol(self, clients))
, _heartbeats(heartbeat_interval, _client, _self)
, _storage(storage.local()) {
setup_metrics();
Expand Down
18 changes: 11 additions & 7 deletions src/v/raft/kvelldb/kvserver.cc
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include "model/metadata.h"
#include "platform/stop_signal.h"
#include "raft/consensus.h"
#include "raft/consensus_client_protocol.h"
Expand Down Expand Up @@ -80,7 +81,8 @@ class simple_group_manager {
std::chrono::milliseconds raft_heartbeat_interval,
ss::sharded<rpc::connection_cache>& clients)
: _self(self)
, _consensus_client_protocol(raft::make_rpc_client_protocol(clients))
, _consensus_client_protocol(
raft::make_rpc_client_protocol(self, clients))
, _storage(
storage::kvstore_config(
1_MiB, 10ms, directory, storage::debug_sanitize_files::yes),
Expand Down Expand Up @@ -170,11 +172,13 @@ extract_peer(ss::sstring peer) {
}

static void initialize_connection_cache_in_thread(
ss::sharded<rpc::connection_cache>& cache, std::vector<ss::sstring> opts) {
model::node_id self,
ss::sharded<rpc::connection_cache>& cache,
std::vector<ss::sstring> opts) {
for (auto& i : opts) {
auto [node, cfg] = extract_peer(i);
for (ss::shard_id i = 0; i < ss::smp::count; ++i) {
auto shard = rpc::connection_cache::shard_for(i, node);
auto shard = rpc::connection_cache::shard_for(self, i, node);
ss::smp::submit_to(shard, [&cache, shard, n = node, config = cfg] {
return cache.local().emplace(
n,
Expand Down Expand Up @@ -268,21 +272,21 @@ int main(int args, char** argv, char** env) {
.get();
scfg.credentials = std::move(builder);
}
auto self_id = cfg["node-id"].as<int32_t>();
if (cfg.find("peers") != cfg.end()) {
initialize_connection_cache_in_thread(
model::node_id(self_id),
connection_cache,
cfg["peers"].as<std::vector<ss::sstring>>());
}
const ss::sstring workdir = fmt::format(
"{}/greetings-{}",
cfg["workdir"].as<ss::sstring>(),
cfg["node-id"].as<int32_t>());
"{}/greetings-{}", cfg["workdir"].as<ss::sstring>(), self_id);
vlog(kvelldblog.info, "Work directory:{}", workdir);

// initialize group_manager
group_manager
.start(
model::node_id(cfg["node-id"].as<int32_t>()),
model::node_id(self_id),
workdir,
std::chrono::milliseconds(
cfg["heartbeat-timeout-ms"].as<int32_t>()),
Expand Down
5 changes: 5 additions & 0 deletions src/v/raft/rpc_client_protocol.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace raft {
ss::future<result<vote_reply>> rpc_client_protocol::vote(
model::node_id n, vote_request&& r, rpc::client_opts opts) {
return _connection_cache.local().with_node_client<raftgen_client_protocol>(
_self,
ss::this_shard_id(),
n,
[r = std::move(r),
Expand All @@ -24,6 +25,7 @@ ss::future<result<vote_reply>> rpc_client_protocol::vote(
ss::future<result<append_entries_reply>> rpc_client_protocol::append_entries(
model::node_id n, append_entries_request&& r, rpc::client_opts opts) {
return _connection_cache.local().with_node_client<raftgen_client_protocol>(
_self,
ss::this_shard_id(),
n,
[r = std::move(r),
Expand All @@ -36,6 +38,7 @@ ss::future<result<append_entries_reply>> rpc_client_protocol::append_entries(
ss::future<result<heartbeat_reply>> rpc_client_protocol::heartbeat(
model::node_id n, heartbeat_request&& r, rpc::client_opts opts) {
return _connection_cache.local().with_node_client<raftgen_client_protocol>(
_self,
ss::this_shard_id(),
n,
[r = std::move(r),
Expand All @@ -49,6 +52,7 @@ ss::future<result<install_snapshot_reply>>
rpc_client_protocol::install_snapshot(
model::node_id n, install_snapshot_request&& r, rpc::client_opts opts) {
return _connection_cache.local().with_node_client<raftgen_client_protocol>(
_self,
ss::this_shard_id(),
n,
[r = std::move(r),
Expand All @@ -61,6 +65,7 @@ rpc_client_protocol::install_snapshot(
ss::future<result<timeout_now_reply>> rpc_client_protocol::timeout_now(
model::node_id n, timeout_now_request&& r, rpc::client_opts opts) {
return _connection_cache.local().with_node_client<raftgen_client_protocol>(
_self,
ss::this_shard_id(),
n,
[r = std::move(r),
Expand Down
13 changes: 8 additions & 5 deletions src/v/raft/rpc_client_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ namespace raft {
/// Raft client protocol implementation underlied by RPC connections cache
class rpc_client_protocol final : public consensus_client_protocol::impl {
public:
explicit rpc_client_protocol(ss::sharded<rpc::connection_cache>& cache)
: _connection_cache(cache) {}
explicit rpc_client_protocol(
model::node_id self, ss::sharded<rpc::connection_cache>& cache)
: _self(self)
, _connection_cache(cache) {}

ss::future<result<vote_reply>>
vote(model::node_id, vote_request&&, rpc::client_opts) final;
Expand All @@ -35,13 +37,14 @@ class rpc_client_protocol final : public consensus_client_protocol::impl {
timeout_now(model::node_id, timeout_now_request&&, rpc::client_opts) final;

private:
model::node_id _self;
ss::sharded<rpc::connection_cache>& _connection_cache;
};

inline consensus_client_protocol
make_rpc_client_protocol(ss::sharded<rpc::connection_cache>& clients) {
inline consensus_client_protocol make_rpc_client_protocol(
model::node_id self, ss::sharded<rpc::connection_cache>& clients) {
return raft::make_consensus_client_protocol<raft::rpc_client_protocol>(
clients);
self, clients);
}

} // namespace raft
20 changes: 11 additions & 9 deletions src/v/raft/tests/raft_group_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,22 +105,23 @@ struct raft_node {
storage.local().log_mgr().manage(std::move(ntp_cfg)).get0());

// setup consensus
auto self_id = broker.id();
consensus = ss::make_lw_shared<raft::consensus>(
broker.id(),
self_id,
gr_id,
std::move(cfg),
std::move(jit),
*log,
seastar::default_priority_class(),
std::chrono::seconds(10),
raft::make_rpc_client_protocol(cache),
raft::make_rpc_client_protocol(self_id, cache),
[this](raft::leadership_status st) { leader_callback(st); },
storage.local());

// create connections to initial nodes
consensus->config().for_each_broker(
[this](const model::broker& broker) {
create_connection_to(broker);
[this, self_id](const model::broker& broker) {
create_connection_to(self_id, broker);
});
}

Expand Down Expand Up @@ -157,7 +158,7 @@ struct raft_node {
server.invoke_on_all(&rpc::server::start).get0();
hbeats = std::make_unique<raft::heartbeat_manager>(
heartbeat_interval,
raft::make_rpc_client_protocol(cache),
raft::make_rpc_client_protocol(broker.id(), cache),
broker.id());
hbeats->start().get0();
hbeats->register_group(consensus).get();
Expand Down Expand Up @@ -239,9 +240,10 @@ struct raft_node {

model::node_id id() { return broker.id(); }

void create_connection_to(const model::broker& broker) {
void
create_connection_to(model::node_id self, const model::broker& broker) {
for (ss::shard_id i = 0; i < ss::smp::count; ++i) {
auto sh = rpc::connection_cache::shard_for(i, broker.id());
auto sh = rpc::connection_cache::shard_for(self, i, broker.id());
cache
.invoke_on(
sh,
Expand Down Expand Up @@ -365,8 +367,8 @@ struct raft_group {
it->second.start();

for (auto& [_, n] : _members) {
n.create_connection_to(broker);
it->second.create_connection_to(n.broker);
n.create_connection_to(n.id(), broker);
it->second.create_connection_to(broker.id(), n.broker);
}

return broker;
Expand Down
11 changes: 8 additions & 3 deletions src/v/raft/tron/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ class simple_group_manager {
std::chrono::milliseconds raft_heartbeat_interval,
ss::sharded<rpc::connection_cache>& clients)
: _self(self)
, _consensus_client_protocol(raft::make_rpc_client_protocol(clients))
, _consensus_client_protocol(
raft::make_rpc_client_protocol(self, clients))
, _storage(
storage::kvstore_config(
1_MiB, 10ms, directory, storage::debug_sanitize_files::yes),
Expand Down Expand Up @@ -161,11 +162,13 @@ extract_peer(ss::sstring peer) {
}

static void initialize_connection_cache_in_thread(
ss::sharded<rpc::connection_cache>& cache, std::vector<ss::sstring> opts) {
model::node_id self,
ss::sharded<rpc::connection_cache>& cache,
std::vector<ss::sstring> opts) {
for (auto& i : opts) {
auto [node, cfg] = extract_peer(i);
for (ss::shard_id i = 0; i < ss::smp::count; ++i) {
auto shard = rpc::connection_cache::shard_for(i, node);
auto shard = rpc::connection_cache::shard_for(self, i, node);
ss::smp::submit_to(shard, [&cache, shard, n = node, config = cfg] {
return cache.local().emplace(
n,
Expand Down Expand Up @@ -259,8 +262,10 @@ int main(int args, char** argv, char** env) {
.get();
scfg.credentials = std::move(builder);
}
auto self_id = cfg["node-id"].as<int32_t>();
if (cfg.find("peers") != cfg.end()) {
initialize_connection_cache_in_thread(
model::node_id(self_id),
connection_cache,
cfg["peers"].as<std::vector<ss::sstring>>());
}
Expand Down
6 changes: 6 additions & 0 deletions src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,12 @@ void application::wire_up_services() {
construct_service(_quota_mgr).get();
// rpc
rpc::server_configuration rpc_cfg("internal_rpc");
/**
* Use port based load_balancing_algorithm to make connection shard
* assignment deterministic.
**/
rpc_cfg.load_balancing_algo
= ss::server_socket::load_balancing_algorithm::port;
rpc_cfg.max_service_memory_per_core = memory_groups::rpc_total_memory();
auto rpc_server_addr
= config::shard_local_cfg().rpc_server().resolve().get0();
Expand Down
Loading