Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make max client connections configurable and refactor rpc::connection_cache #12906

Merged
merged 10 commits into from
Sep 12, 2023

Conversation

ballard26
Copy link
Contributor

@ballard26 ballard26 commented Aug 21, 2023

The first change this PR makes is to allow for the number of client connections to each of the brokers user configurable.

The second change this PR makes is to use a stateful connection allocation method that ensures the following;

  • Each shard should have an equal number of client connections
  • Each client connection should have an equal number of shards using it.
  • On each node there should be max_connections to every other node in the cluster.
  • If a shard wants a connection for a (node, shard) and the current shard has a connection for that (node, shard) it is selected. (from @travisdowns)
  • For any broker A. Connections to broker A when aggregated by shard ID across every broker in the cluster should have an equal count per shard ID. (from
    Shard aware connections v2 #8)

Fixes #12912

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v23.2.x
  • v23.1.x
  • v22.3.x

Release Notes

Features

  • Adds rpc_client_connections_per_shard cluster property that allows for the number of clients a broker opens to a given peer to be user configurable.

Copy link
Member

@StephanDollberg StephanDollberg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you maybe comment why you now switched away from your original hash ring approach to a simple shuffle based one?

};

explicit backoff_policy(std::unique_ptr<impl> i)
: _impl(std::move(i)) {}

backoff_policy(backoff_policy&&) = default;

backoff_policy(const backoff_policy& o) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rule of 3/5 something something? I have forgotten when things get implicitly constructed these days but might cleaner to just default/delete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, will add a copy/move assignment operator.

ss::future<>
remove_connection_location(ss::shard_id dest_shard, model::node_id node) {
return container().invoke_on(dest_shard, [node](auto& cache) {
auto conn_loc = cache._connection_map.find(node);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think the find is redundant. You can just call erase which will be noop if not found.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice find, switching to just erase

if (!_connection_map.contains(n)) {
return {};
}
return {_connection_map.at(n)};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use find for this pattern? Avoids an extra hash table look up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right you are, switching over to find

@ballard26
Copy link
Contributor Author

Could you maybe comment why you now switched away from your original hash ring approach to a simple shuffle based one?

Couple of reasons;

  • Even with the hash ring approach I couldn't ensure that the number of connections per shard were equal. With a simple shuffle its easily proven that connections per shard will be equal by construction.
  • Using a stateful connection allocation strategy allows for a lot of potential future enhancements. For example in clusters with a lot of nodes its possible for a given shard to only communicate with 2-3 of them. In those cases we may want the connection cache to allocate connections to those 2-3 nodes on that shard.


absl::flat_hash_map<model::node_id, absl::flat_hash_set<ss::shard_id>>
_node_to_shards;
std::vector<std::pair<ss::shard_id, size_t>> _connections_per_shard;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a struct would be nice here as well for readability as it avoids the std::get stuff.

@ballard26 ballard26 changed the title Draft: Make max client connections configurable and refactor rpc::connection_cache Make max client connections configurable and refactor rpc::connection_cache Aug 30, 2023
@ballard26 ballard26 marked this pull request as ready for review August 30, 2023 16:01
@ballard26 ballard26 merged commit 1ad588d into redpanda-data:dev Sep 12, 2023
25 checks passed
Copy link
Member

@dotnwat dotnwat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool stuff

_connections.erase(connection);
}

auto cert_creds = co_await maybe_build_reloadable_certificate_credentials(
Copy link
Member

@StephanDollberg StephanDollberg Sep 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we yield here then _connections now doesn't contain an entry for this connection which other fibers might rely on. Is that a problem?

E.g.: connection_set::get straight out does:

    transport_ptr get(model::node_id n) const {
        return _connections.find(n)->second;
    }

}
auto holder = _gate.hold();

auto& alloc_strat = _coordinator_state->alloc_strat;
Copy link
Member

@StephanDollberg StephanDollberg Sep 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potenial access to _coordinator_state->alloc_strat without owning the mutex?

}
auto holder = _gate.hold();

auto& alloc_strat = _coordinator_state->alloc_strat;
Copy link
Member

@StephanDollberg StephanDollberg Sep 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potenial access to _coordinator_state->alloc_strat without owning the mutex?

@StephanDollberg
Copy link
Member

Tried fixing the things I pointed out above as per https://github.com/redpanda-data/redpanda/compare/stephan/connection-cache-fixes?expand=1

but still seems to crash so probably not the root cause.

Maybe can still serve as inspiration.

@dotnwat
Copy link
Member

dotnwat commented Sep 15, 2023

I don't yet have any specific feedback, but generally the read_iobuf_exactly thing that is reading from the input stream looks pretty solid to me. however, the input stream is (1) passed as a reference and (2) the resulting iobuf may contain temporary_buffers (via iobuf::fragment) that are shared with temporary_buffers owned by the input_stream. none of that is inherently a problem, but it does increase a bit of the scope of potential issues i might look for.

rockwotj added a commit to rockwotj/redpanda that referenced this pull request Sep 19, 2023
…impl"

This reverts commit 1ad588d, reversing
changes made to 8088aeb.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Improve connection distribution in clusters to reduce latency and reactor utilization
4 participants