Skip to content

Commit

Permalink
Merge pull request #14907 from ballard26/connection-count-bp-n
Browse files Browse the repository at this point in the history
Allow for `rpc_client_connections_per_peer` to be configured in v23.2.x
  • Loading branch information
ballard26 committed Nov 20, 2023
2 parents 083386d + 55784b8 commit c9d2968
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 40 deletions.
11 changes: 6 additions & 5 deletions src/v/cluster/cluster_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "config/configuration.h"
#include "raft/errc.h"
#include "rpc/backoff_policy.h"
#include "rpc/connection_cache.h"
#include "rpc/types.h"
#include "storage/kvstore.h"
#include "vlog.h"
Expand Down Expand Up @@ -96,11 +97,11 @@ ss::future<> move_persistent_stm_state(

namespace cluster {

std::vector<ss::shard_id>
virtual_nodes(model::node_id self, model::node_id node) {
std::vector<ss::shard_id> virtual_nodes(
const rpc::connection_cache& c, model::node_id self, model::node_id node) {
std::set<ss::shard_id> owner_shards;
for (ss::shard_id i = 0; i < ss::smp::count; ++i) {
auto shard = rpc::connection_cache::shard_for(self, i, node);
auto shard = c.shard_for(self, i, node);
owner_shards.insert(shard);
}
return std::vector<ss::shard_id>(owner_shards.begin(), owner_shards.end());
Expand All @@ -110,7 +111,7 @@ ss::future<> remove_broker_client(
model::node_id self,
ss::sharded<rpc::connection_cache>& clients,
model::node_id id) {
auto shards = virtual_nodes(self, id);
auto shards = virtual_nodes(clients.local(), self, id);
vlog(clusterlog.debug, "Removing {} TCP client from shards {}", id, shards);
return ss::do_with(
std::move(shards), [id, &clients](std::vector<ss::shard_id>& i) {
Expand Down Expand Up @@ -193,7 +194,7 @@ ss::future<> update_broker_client(
model::node_id node,
net::unresolved_address addr,
config::tls_config tls_config) {
auto shards = virtual_nodes(self, node);
auto shards = virtual_nodes(clients.local(), self, node);
vlog(clusterlog.debug, "Adding {} TCP client on shards:{}", node, shards);
return ss::do_with(
std::move(shards),
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/node_status_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ ss::future<result<node_status>> node_status_backend::send_node_status_request(
ss::future<ss::shard_id> node_status_backend::maybe_create_client(
model::node_id target, net::unresolved_address address) {
auto source_shard = connection_source_shard(target);
auto target_shard = rpc::connection_cache::shard_for(
auto target_shard = _node_connection_cache.local().shard_for(
_self, source_shard, target);

co_await add_one_tcp_client(
Expand Down
8 changes: 8 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,14 @@ configuration::configuration()
{.example = "65536"},
std::nullopt,
{.min = 32_KiB, .align = 4_KiB})
, rpc_client_connections_per_peer(
*this,
"rpc_client_connections_per_peer",
"The maximum number of connections a broker will open to each of its "
"peers",
{.example = "8"},
8,
{.min = 2})
, enable_coproc(*this, "enable_coproc")
, coproc_max_inflight_bytes(*this, "coproc_max_inflight_bytes")
, coproc_max_ingest_bytes(*this, "coproc_max_ingest_bytes")
Expand Down
1 change: 1 addition & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ struct configuration final : public config_store {
bounded_property<std::optional<int>> rpc_server_listen_backlog;
bounded_property<std::optional<int>> rpc_server_tcp_recv_buf;
bounded_property<std::optional<int>> rpc_server_tcp_send_buf;
bounded_property<size_t> rpc_client_connections_per_peer;
// Coproc
deprecated_property enable_coproc;
deprecated_property coproc_max_inflight_bytes;
Expand Down
2 changes: 1 addition & 1 deletion src/v/raft/tests/raft_group_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ struct raft_node {
void
create_connection_to(model::node_id self, const model::broker& broker) {
for (ss::shard_id i = 0; i < ss::smp::count; ++i) {
auto sh = rpc::connection_cache::shard_for(self, i, broker.id());
auto sh = cache.local().shard_for(self, i, broker.id());
cache
.invoke_on(
sh,
Expand Down
6 changes: 5 additions & 1 deletion src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1072,7 +1072,11 @@ void application::wire_up_redpanda_services(

// cluster
syschecks::systemd_message("Initializing connection cache").get();
construct_service(_connection_cache, std::ref(_as)).get();
construct_service(
_connection_cache,
config::shard_local_cfg().rpc_client_connections_per_peer(),
std::ref(_as))
.get();
syschecks::systemd_message("Building shard-lookup tables").get();
construct_service(shard_table).get();

Expand Down
30 changes: 29 additions & 1 deletion src/v/rpc/connection_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,18 @@ namespace rpc {
connection_cache::connection_cache(
ss::sharded<ss::abort_source>& as,
std::optional<connection_cache_label> label)
: _label(std::move(label)) {
: _max_connections(8)
, _label(std::move(label)) {
_as_subscription = as.local().subscribe(
[this]() mutable noexcept { shutdown(); });
}

connection_cache::connection_cache(
size_t max_connections,
ss::sharded<ss::abort_source>& as,
std::optional<connection_cache_label> label)
: _max_connections(max_connections)
, _label(std::move(label)) {
_as_subscription = as.local().subscribe(
[this]() mutable noexcept { shutdown(); });
}
Expand Down Expand Up @@ -99,4 +110,21 @@ ss::future<> connection_cache::stop() {
return _gate.close();
}

ss::shard_id connection_cache::shard_for(
model::node_id self,
ss::shard_id src_shard,
model::node_id n,
ss::shard_id total_shards) const {
if (total_shards <= _max_connections) {
return src_shard;
}

// NOLINTNEXTLINE
size_t h = 805306457;
boost::hash_combine(h, jump_consistent_hash(src_shard, _max_connections));
boost::hash_combine(h, std::hash<model::node_id>{}(n));
boost::hash_combine(h, std::hash<model::node_id>{}(self));
// use self node id to shift jump_consistent_hash_assignment
return jump_consistent_hash(h, total_shards);
}
} // namespace rpc
40 changes: 9 additions & 31 deletions src/v/rpc/connection_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*/

#pragma once
#include "config/property.h"
#include "hashing/jump_consistent_hash.h"
#include "model/metadata.h"
#include "outcome.h"
Expand All @@ -34,16 +35,21 @@ class connection_cache final
using underlying = std::unordered_map<model::node_id, transport_ptr>;
using iterator = typename underlying::iterator;

static inline ss::shard_id shard_for(
ss::shard_id shard_for(
model::node_id self,
ss::shard_id src,
model::node_id node,
ss::shard_id max_shards = ss::smp::count);
ss::shard_id max_shards = ss::smp::count) const;

explicit connection_cache(
ss::sharded<ss::abort_source>&,
std::optional<connection_cache_label> label = std::nullopt);

explicit connection_cache(
size_t,
ss::sharded<ss::abort_source>&,
std::optional<connection_cache_label> label = std::nullopt);

bool contains(model::node_id n) const {
return _cache.find(n) != _cache.end();
}
Expand Down Expand Up @@ -157,6 +163,7 @@ class connection_cache final
}

private:
size_t _max_connections;
std::optional<connection_cache_label> _label;
mutex _mutex; // to add/remove nodes
underlying _cache;
Expand All @@ -165,33 +172,4 @@ class connection_cache final
ss::optimized_optional<ss::abort_source::subscription> _as_subscription;
bool _shutting_down = false;
};
inline ss::shard_id connection_cache::shard_for(
model::node_id self,
ss::shard_id src_shard,
model::node_id n,
ss::shard_id total_shards) {
if (ss::smp::count <= 8) {
return src_shard;
}
static const constexpr size_t vnodes = 8;
/// make deterministic - choose 1 prime to mix node_id with
/// https://planetmath.org/goodhashtableprimes
static const constexpr std::array<size_t, vnodes> universe{
{12582917,
25165843,
50331653,
100663319,
201326611,
402653189,
805306457,
1610612741}};

// NOLINTNEXTLINE
size_t h = universe[jump_consistent_hash(src_shard, vnodes)];
boost::hash_combine(h, std::hash<model::node_id>{}(n));
boost::hash_combine(h, std::hash<model::node_id>{}(self));
// use self node id to shift jump_consistent_hash_assignment
return jump_consistent_hash(h, total_shards);
}

} // namespace rpc

0 comments on commit c9d2968

Please sign in to comment.