Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transport: load balancing module refactor #612

Merged
merged 32 commits into from
Mar 17, 2023
Merged

transport: load balancing module refactor #612

merged 32 commits into from
Mar 17, 2023

Conversation

havaker
Copy link
Contributor

@havaker havaker commented Dec 4, 2022

Description

This is the third attempt to refactor this module;)

Replica set calculation was moved to a separate module (transport::locator).

Replica set calculation is done by a module originally written by @cvybhu - transport::locator::replication_info. struct ReplicationData lives in that module, and provides a set of functions that allow calculating of SimpleStrategy & NetworkTopologyStrategy replica lists.

To make load balancing fast, precomputing replica sets is required. This is done by another of @cvybhu's modules - transport::locator::precomputed_replicas. precomputed_replicas::PrecomputedReplicas is a struct that precomputes replica lists of a given strategies, and provides O(1) access to desired replica slices.

locator::ReplicaLocator combines the functionality of the previously mentioned modules, and provides a unified API for getting precomputed or calculated on the fly replica sets. Representing replica sets by a custom ReplicaSet type allowed creating an API that supported optional limiting replicas to a single data center (and allocation-free creation, getting & iteration through precomputed replica sets).

Things left for a follow-up:

  • A way to configure which strategies need replica set precomputation

LoadBalancingPolicy interface was changed.

plan was split to pick and fallback methods. This allows to better optimize the most common case, where only one node from the load balancing plan is needed. Changes required in query execution code were minimized by providing a lazy chaining iterator ltransport::load_balancing::plan::Plan. This iterator's first element is a node returned by the LoadBalancingPolicy::pick function, the next items come from LoadBalancingPolicy::fallback iterator. Falback method is called lazily - only when second+ element of the Plan iterator is needed.

The following methods were added:

fn on_query_success(&self, query: &QueryInfo, latency: Duration, node: NodeRef<_'>);
fn on_query_error(&self, query: &QueryInfo, latency: Duration, node: NodeRef<_'>, error: &QueryError);

The motivation for adding them was the ability to contain the logic of latency-aware policy inside some load balancing policy.

Default policy was created.

Default policy supports token and data center awareness. It also has a logic to do a data center failover.

Things left for a follow-up:

  • LWT routing support

Latency aware policy was merged with the default one

Mechanisms used only by latency aware policy were removed (e.g. TimestampedAverage living in Node).

Pre-review checklist

  • I have split my patch into logically separate commits.
  • All commit messages clearly explain what they change and why.
  • I added relevant tests for new features and bug fixes.
  • All commits compile, pass static checks and pass test.
  • PR description sums up the changes and reasons why they should be introduced.
  • I added appropriate Fixes: annotations to PR description.

Fixes: #408

@havaker havaker requested review from cvybhu and piodul and removed request for cvybhu December 5, 2022 15:06
@wprzytula
Copy link
Collaborator

I'm not familiar with the token ring, replica computation etc., but apart from that, this approach looks good.

scylla/src/transport/locator/token_ring.rs Show resolved Hide resolved
scylla/src/transport/locator/replicas.rs Outdated Show resolved Hide resolved
scylla/src/transport/locator/replicas.rs Outdated Show resolved Hide resolved
scylla/src/transport/locator/replication_info.rs Outdated Show resolved Hide resolved
scylla/src/transport/locator/replication_info.rs Outdated Show resolved Hide resolved
scylla/src/transport/locator/mod.rs Show resolved Hide resolved
scylla/src/transport/load_balancing/default.rs Outdated Show resolved Hide resolved
scylla/src/transport/load_balancing/default.rs Outdated Show resolved Hide resolved
scylla/src/transport/load_balancing/default.rs Outdated Show resolved Hide resolved
pub fn new() -> Self {
Self {}
impl LoadBalancingPolicy for DefaultPolicy {
fn pick<'a>(&'a self, query: &'a QueryInfo, cluster: &'a ClusterData) -> NodeRef<'a> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've got a feeling that pick and fallback methods follow a similar pattern (control flow-wise) and we could try to reduce the complexity by deduplicating them. Basically, you could think of pick as a function that returns the first element of fallback without having to allocate memory for the FallbackPlan. Of course, if we implemented pick such that it calls fallback then the performance benefit would be gone - however, what if we removed the need to allocate the plan with a callback?

You could implement a function do_with_plan (didn't think too much about the name):

// FnOnce won't be sufficient to express callback's type, because you don't have
// access to the iterator's type - you will most likely have to use a trait
trait PlanExtractor<'a> {
    type Output;
    fn extract<I>(plan: I) -> Self::Output
    where
        I: Iterator<Output = NodeRef<'a> + Send + Sync + 'a>;
}

fn do_with_plan<'a, E>(
    &'a self,
    query: &'a QueryInfo,
    cluster: &'a ClusterData,
    extractor: E,
) -> E::Output
where
    E: PlanExtractor<'a>,
{
    /* snip */

    let plan = maybe_replicas
        .chain(robined_local_nodes)
        .chain(maybe_remote_nodes)
        .unique();

    extractor.extract(plan)
}

I didn't really scrutinize the pick and fallback methods so I'm not sure what the performance impact of this approach will be. If computing the plan requires significantly more computations than the current implementation of pick (apart from avoiding the allocation), then I guess we can stay with the current approach.

It would be great if we could compare the performance of both approaches. Perhaps a benchmark would be in order? After all, one of the reasons for the refactor was to improve the performance.

Copy link
Contributor

@cvybhu cvybhu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had an initial look, generally looks nice although I didn't dig into the juicy details yet.

scylla/src/transport/load_balancing/default.rs Outdated Show resolved Hide resolved
examples/compare-tokens.rs Show resolved Hide resolved
scylla/tests/retries.rs Outdated Show resolved Hide resolved
scylla/src/transport/mod.rs Outdated Show resolved Hide resolved
scylla/src/transport/locator/replication_info.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@cvybhu cvybhu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks nice.
We got rid of the problematic policy chaining and the code is much clearer.

I like how simple planning has become, in my implementation I had a bunch of convoluted iterators that were a pain to write and read. This implementation is much more elegant.

The direction seems good, but there are still a few things that need to be solved before proceeding, like LWT routing, latency awareness and proper UP/DOWN event handling.

scylla/src/transport/load_balancing/default.rs Outdated Show resolved Hide resolved
scylla/src/transport/load_balancing/default.rs Outdated Show resolved Hide resolved
scylla/src/transport/locator/precomputed_replicas.rs Outdated Show resolved Hide resolved
scylla/src/transport/locator/precomputed_replicas.rs Outdated Show resolved Hide resolved

// Get a list of all local alive nodes, and apply a round robin to it
let local_nodes = self.preffered_node_set(cluster);
let robined_local_nodes = Self::round_robin_nodes(local_nodes, Self::is_alive);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Round robin has the problem of overloads in case of node failure.
If the usual round robin order is A->B->C and A fails, B will take over all of A's requests. It would be better to try nodes in random order, then in case of A's failure the load that used to be handled by A will be shared equally among B and C.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we have no quick idea how to mitigate this problem yet, let's put off this problem until a follow-up.

@wprzytula
Copy link
Collaborator

I'm sad to see that we didn't really take advantage of grouping all default features in one Default Policy. Namely, we still pass the load balancing policy in an Arc, which imposes some overhead. Instead, I would propose:

struct LoadBalancing(Or)

enum Or {
    Default(DefaultLoadBalancingPolicy),
    Custom(Arc<dyn LoadBalancingPolicy>)
}

impl LoadBalancingPolicy for LoadBalancing {
   ... 
}

@havaker
Copy link
Contributor Author

havaker commented Jan 19, 2023

Rebased on top of main (using @wprzytula's pull request havaker#8)

@havaker
Copy link
Contributor Author

havaker commented Jan 19, 2023

Removed scylla/src/transport/load_balancing/latency_aware.rs in transport: load_balancing: remove policies other than the default one.

@havaker
Copy link
Contributor Author

havaker commented Jan 22, 2023

Rebased on top of main, merged havaker#6 (with additional NodeRef import fixes)

@havaker
Copy link
Contributor Author

havaker commented Feb 10, 2023

I'm sad to see that we didn't really take advantage of grouping all default features in one Default Policy. Namely, we still pass the load balancing policy in an Arc, which imposes some overhead. Instead, I would propose:

struct LoadBalancing(Or)

enum Or {
    Default(DefaultLoadBalancingPolicy),
    Custom(Arc<dyn LoadBalancingPolicy>)
}

impl LoadBalancingPolicy for LoadBalancing {
   ... 
}

I don't think that the overhead you mentioned is noticeable.

@havaker
Copy link
Contributor Author

havaker commented Feb 13, 2023

v1:

  • Rebased on top of main
  • Prepared a commit series that is suitable for review

@havaker havaker marked this pull request as ready for review February 13, 2023 15:24
@cvybhu
Copy link
Contributor

cvybhu commented Feb 13, 2023

clippy check is failing. It looks like 9bc33b0 removed a clone() and we started doing slice.into_iter().

@piodul
Copy link
Collaborator

piodul commented Feb 14, 2023

Please update the PR description. It mentions that some things still need to be done - the PR was marked as ready so I guess those things are ready?

@havaker
Copy link
Contributor Author

havaker commented Feb 14, 2023

v2:

  • applied clippy suggestions
  • added doc comments to the new LoadBalancingPolicy interface

havaker and others added 13 commits March 17, 2023 13:25
`LoadBalancingPolicy::plan` was split to `pick` and `fallback` methods.
This allows to better optimize the most common case, where only one node
from the load balancing plan is needed.
Changes required in query execution code were minimized by providing a
lazy chaining iterator `transport::load_balancing::plan::Plan`. This
iterator's first element is a node returned by the
`LoadBalancingPolicy::pick` function, the next items come from
`LoadBalancingPolicy::fallback` iterator. Falback method is called
lazily - only when second+ element of the `Plan` iterator is needed.
Implemented token and datacenter awareness.
Added a builder for default policy (adding new parameters to default
policy won't break the API).

Default policy prefers to return nodes in the following order:
- Alive local replicas (if token is available & token awareness is
  enabled)
- Alive remote replicas (if datacenter failover is permitted & possible
  due to consistency constraints)
- Alive local nodes
- Alive remote nodes (if datacenter failover is permitted & possible due
  to consistency constraints)
- Enabled down nodes

If no preferred datacenter is specified, all nodes are treated as local
ones.

`DefaultPolicy::pick` method does not allocate if the replica lists for
given strategy were precomputed.

Co-authored-by: Wojciech Przytuła <wojciech.przytula@scylladb.com>
Two methods were added to the LoadBalancingPolicy trait:
fn on_query_success(&self, query: &QueryInfo, latency: Duration, node: NodeRef<'_>);
fn on_query_failure(&self, query: &QueryInfo, latency: Duration, node: NodeRef<'_>, error: &QueryError);

Their addition allows implementing latency aware policy.
This commit prepares for cleaner introduction of latency awareness into
DefaultPolicy.
The experimental latency awareness module is added to DefaultPolicy.
Its behaviour is based on the previous LatencyAwarePolicy
implementation.

Notes on performance related to operations involving pick predicate:
Pick predicate is boxed inside DefaultPolicy.
Fallback performed efficiently - predicate is only borrowed, for eager
computation of fallback iterator.
@havaker
Copy link
Contributor Author

havaker commented Mar 17, 2023

v12:

  • reworded docs
  • rebased on top of main (to make clippy happy)

@cvybhu
Copy link
Contributor

cvybhu commented Mar 17, 2023

The new version of load balancing does the precomputation of replica sets for each vnode, which could turn out to be quite a bit of work, so I measured how much resources the precomputation consumes when compared to the previous version which didn't do the precomputation (using lbbench):

Cluster: 3 datacenters, 8 nodes in each one

A few SimpleStrategy keyspaces, RF <= 8:

Before: 1.8MB of memory used, 420µs to compute ClusterData
After: 2.5MB of memory used, 5ms to compute ClusterData

A few big SimpleStrategt keyspaces, RF ~= 20

Before: 1.9MB of memory used, 558.229µs to compute ClusterData
After: 3.7MB of memory used, 14ms to compute ClusterData

A few NetworkTopologyStrategy keyspaces RF <= 8

Before: 1.9MB of memory used, 454.11µs to compute ClusterData
After: 6.3MB of memory used, 25ms to compute ClusterData

All of the above

Before: 2MB of memory used, 354.14µs to compute ClusterData
After: 7.8MB of memory used, 37ms to compute ClusterData


We use a bit more memory and compute power, but it's within reasonable limits.


One positive thing stemming from the new precomputation is that the number of allocations per request got reduced:
Before:

Inserts:
----------
allocs/req:                15.00
reallocs/req:               8.00
frees/req:                 15.00
bytes allocated/req:     2458.05
bytes reallocated/req:    269.06
bytes freed/req:         2456.80
(allocated - freed)/req:      1.25
----------
Sending 100000 selects, hold tight ..........
----------
Selects:
----------
allocs/req:                48.00
reallocs/req:               8.00
frees/req:                 48.00
bytes allocated/req:     5266.07
bytes reallocated/req:    209.00
bytes freed/req:         5266.00
(allocated - freed)/req:      0.07

After:

Inserts:
----------
allocs/req:                 6.01
reallocs/req:               6.00
frees/req:                  6.00
bytes allocated/req:      381.80
bytes reallocated/req:    173.05
bytes freed/req:          380.62
(allocated - freed)/req:      1.18
----------
Sending 100000 selects, hold tight ..........
----------
Selects:
----------
allocs/req:                39.00
reallocs/req:               6.00
frees/req:                 39.00
bytes allocated/req:     3190.15
bytes reallocated/req:    113.01
bytes freed/req:         3190.04
(allocated - freed)/req:      0.11
----------

Here are the exact results:
lb-results.zip

@havaker havaker requested a review from cvybhu March 17, 2023 14:21
Copy link
Contributor

@cvybhu cvybhu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 🚀

@mykaul
Copy link
Contributor

mykaul commented Mar 17, 2023

Nice work!

Ten0 added a commit to Ten0/scylla-rust-driver that referenced this pull request Jun 3, 2023
Resolves scylladb#468

This is a follow-up on scylladb#508 and scylladb#658:
- To minimize CPU usage related to network operations when inserting a very large number of lines, it is relevant to batch.
- To batch in the most efficient manner, these batches have to be shard-aware. Since scylladb#508, `batch` will pick the shard of the first statement to send the query to. However it is left to the user to constitute the batches in such a way that the target shard is the same for all the elements of the batch.
- This was made *possible* by scylladb#658, but it was still very boilerplate-ish. I was waiting for scylladb#612 to be merged (amazing work btw! 😃) to implement a more direct and factored API (as that would use it).
- This new ~`Session::first_shard_for_statement(self, &PreparedStatement, &SerializedValues) -> Option<(Node, Option<Shard>)>` makes shard-aware batching easy on the users, by providing access to the first node and shard of the query plan.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Refactor load balancing module
6 participants