Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve throughput throttling #16441

Merged
merged 6 commits into from
Feb 29, 2024

Conversation

BenPope
Copy link
Member

@BenPope BenPope commented Feb 2, 2024

When kafka_throughput_throttling_v2 is enabled (true by default), use an ss::internal::shared_token_bucket for requests across all shards, instead of balancing quota between shards.

Fixes https://github.com/redpanda-data/core-internal/issues/929
Fixes https://github.com/redpanda-data/core-internal/issues/1021
Fixes https://github.com/redpanda-data/core-internal/issues/1022
Fixes https://github.com/redpanda-data/core-internal/issues/1023
MaybeFixes: #8809

Cluster configuration - probably temporary

config description default
kafka_throughput_throttling_v2 Use throughput throttling based on a shared token bucket
instead of balancing quota between shards
true
kafka_throughput_replenish_threshold Threshold for refilling the token bucket.
Will be clamped between 1 and rate.
1

Metrics (prefix vectorized_kafka_quotas_)

name type description new? version
balancer_runs counter Number of times throughput quota balancer has been run no 1
quota_effective counter Currently effective quota, in bytes/s no 1
traffic_intake counter Amount of Kafka traffic received from the clients that is
taken into processing, in bytes
no 1, 2
traffic_egress counter Amount of Kafka traffic published to the clients that was
taken into processing, in bytes
yes 1, 2
throttle_time histogram Throttle time histogram (in seconds) yes 1, 2

Testing Tier 5

Aggregated Pub Latency (ms) avg

config avg 50% 95% 99% 99.9% 99.99% Max
Nighty (no limit) 3.741 3.393 6.437 9.801 19.332 37.823 81.581
Nightly (limits) 32.776 3.478 111.498 815.123 1996.375 3639.119 6641.311
v2 off 20.987 3.568 12.894 588.399 2042.519 3357.151 5120.831
v2 on 4.052 3.540 7.287 12.381 36.850 58.363 99.782

Artefacts

There's a (local) build on DockerHub
See also: https://github.com/redpanda-data/cloudv2/pull/12488

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v23.3.x
  • v23.2.x
  • v23.1.x

Release Notes

Improvements

  • Node-wide throughput throttling is now fair an responsive.

@BenPope BenPope self-assigned this Feb 2, 2024
@BenPope BenPope force-pushed the throughput_shared_token_bucket branch from 9be68fd to c5327d4 Compare February 2, 2024 02:00
@BenPope BenPope force-pushed the throughput_shared_token_bucket branch from c5327d4 to 22e66c1 Compare February 21, 2024 21:25
@BenPope BenPope marked this pull request as ready for review February 21, 2024 21:31
@vbotbuildovich
Copy link
Collaborator

@dotnwat dotnwat self-requested a review February 22, 2024 00:25
@emaxerrno
Copy link
Contributor

well.... this is dope!

@c4milo for uptaking this into cloud. cc; @timyim-rp

src/v/ssx/sharded_ptr.h Outdated Show resolved Hide resolved
@StephanDollberg
Copy link
Member

What's the difference between "v2 off" and "nightly (no limit)"? I would expect latency to be fine with "v2 off". It seems like (v1) throttling was still applied there.

Further the "v2 on" seems quite a bit worse than "nightly (no limit)". How stable were those numbers? Usually the later runs are quite a bit worse as the topis stay around so later runs get worse if the topics are not deleted manually.

@BenPope
Copy link
Member Author

BenPope commented Feb 22, 2024

What's the difference between "v2 off" and "nightly (no limit)"?

I think "Nightly (no limit)" is with the limits set to nullopt (I hadn't changed any configuration).

I would expect latency to be fine with "v2 off". It seems like (v1) throttling was still applied there.

Yes, "v2 off" was with limits applied, with v1 throttling, so it should be compared to "Nightly (limits)".

Further the "v2 on" seems quite a bit worse than "nightly (no limit)". How stable were those numbers? Usually the later runs are quite a bit worse as the topis stay around so later runs get worse if the topics are not deleted manually.

I didn't run it lots of times, just once with the default 10mins or so. Runs were sequential with restarts in between, and no topic cleanup. Actually the order of the runs was: "Nightly (no limits)", "Nightly (limits)", "v2 on", "v2 off", I shuffled the table for readability.

I was thinking it's kind of in the noise; I can tell you that no requests or responses were throttled with v2 on.

if (_use_throttling_v2()) {
if (_node_quota.in) {
_node_quota.in->replenish(now);
_node_quota.in->grab(request_size);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So given how the rovers in the shared_token_bucket work and given that we unconditionally grab from the bucket I think we can end up in a situation where:

  • A large burst causes tail >> head
  • Incoming rate then equals replenish rate / limit
  • All requests continue to get dropped as tail >= head is a continuous state

Am I missing something? Is that the intended behaviour?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess assuming everyone fully respects the throttle time it will work fine.

Copy link
Member Author

@BenPope BenPope Feb 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, if a client doesn't respect the throttle time, it's enforced (on the second read), so as long as the set of outstanding requests doesn't overflow a uint64_t, it should all work.

/// reset() must be called on the home shard.
/// The pointer is stable until the fiber yields.
template<typename T>
class sharded_ptr {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain this struct a bit more, I don't fully understand the usecase over a plain pointer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shared_token_bucket is const, due to a const limit (this probably makes the atomic operations manageable by limiting the number of atomic variables).

So I hold it by pointer. When the throughput limit is updated, I need to reseat that pointer with a new bucket that has a new limit. That must be done on the home core, but even with an atomic pointer, I can't guarantee that the pointed to bucket is stable between scheduling points.

We make multiple accesses to the token bucket, e.g.:

tb.duration_for(tb.deficiency(tb.grab(0))

So I need to pointer to be stable at least between scheduling points.

That's what sharded_ptr is for; it keeps the original alive, updates the local copy on other shards, and then takes ownership of the new pointer, and destroys the old one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this is only applicable when reset is called? Since this is the only time you'd need to update the pointer? I'd assume this would be done if someone modifies a configuration variable to update the threshold?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to see a bit more detail on how it is to be used safely. For example, unlike most sharded services, some methods are safe to call concurrently from other shards even before start() (which happens implicitly) is called, right?

How about stop(): what are the semantics of concurrent access on other shards while stop() is called on the owner? Concurrent stop()s on the owner? Etc.

If possible we should try to enforce those rules, as you've done with the owning shard assert, since these are things that are easy to mis-use and then they seem to work until one day ... boom.

Copy link
Member Author

@BenPope BenPope Feb 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT std::shared_ptr<T> instances never escape the interface (e.g. dereference operators proxy to the underlying pointer), so I'm not entirely sure why _state is a vector of std::shared_ptr<T> instances. That is, we'd pay an atomic increment cost to copy the instances into _state, but further accesses via the dereference operators aren't bumping the reference count. am i missing something here?

It made the implementation a lot cleaner.

But the intention was to allow a shared_ptr or weak_ptr to be held, although it's currently a bit YAGNI. Would making local public suffice, or would another name be more appropriate? operator() is available.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would making local public suffice, or would another name be more appropriate? operator() is available.

suffice for what?

Copy link
Member Author

@BenPope BenPope Feb 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A name that can be used to bind a shared or weak pointer.,:

    sharded_ptr<int> sp;
    sp.reset(42);
    std::shared_ptr<int> shared = p0.local();

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a preference. I only was making the observation that if the shared pointer is not escaping the interface, then it didn't seem to serve a purpose to have one per core.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dotnwat - you need the vector of std::shared_pointer (or some other approach) for thread-safety, even if they don't escape the interface. With a single pointer shard 1 can be accessing the shared pointer, while shard 0 (the owning shard) is resetting it: this is not safe (e.g., the object could be destroyed after the pointer was read but before subsequent use, a UAF).

So the vector of shared pointers is to allow each shard to own a copy and update it in the usual message passing way.

@BenPope BenPope force-pushed the throughput_shared_token_bucket branch from 22e66c1 to 600f598 Compare February 25, 2024 00:46
@BenPope
Copy link
Member Author

BenPope commented Feb 25, 2024

Changes in force-push

  • Better description for sharded_ptr
  • Initialise shared_token_bucket without add_replenish_iffset.
  • Add throttling histogram test
  • Improve testing by having a warmup batch that exhausts the quota.
  • Improve testing by running for 2s for each client; since requests are only force-throttled on the second read.

@BenPope
Copy link
Member Author

BenPope commented Feb 26, 2024

/cdt

co_await _state
.invoke_on_others([up](auto& state) noexcept { state = up; })
.finally([this, u{std::move(u)}]() mutable noexcept {
std::default_delete<T>{}(_state.local());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what effect does this have? is it the same as doing _state.local().release() ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_state.local() is a raw pointer, it's basically delete _state.local(); I'm using default_delete since that's what unique_ptr uses.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe local() is just the raw pointer type T*. The u.release() returns the raw pointer held by the std::unique_ptr and tells std::unique_ptr to not clean up the memory when destructed (ownership transfer).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All changed

}
co_return co_await _state.start(up).finally(
[this, u{std::move(u)}]() mutable noexcept {
if (u.get() == _state.local()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when would this not be the case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case start fails for some reason, possibly a failed allocation or something like that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't you want a similar condition in the finally for other case below (we were don't do the init)?

Overall this might be simpler to reason about if this init path just null-init'd the pointers on all the other shards, and then full through to the update logic below: fewer cases to think about. It's only a one-time inefficiency (per sharded_ptr).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed: no raw pointers or funky memory management.

return _state.local_is_initialized() && _state.local() != nullptr;
}

ss::future<> reset(std::unique_ptr<T> u = nullptr) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would we want a mutex here? I could see strange things possibly occurring if this method is called again when a previous invocation hasn't completely finished executing yet

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The handling of the pointers look safe to me, but there is a potential race between _state.local_is_initialized() and _state.start(). Maybe I'll split out a start method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a mutex and rewrote it.

/// reset() must be called on the home shard.
/// The pointer is stable until the fiber yields.
template<typename T>
class sharded_ptr {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this is only applicable when reset is called? Since this is the only time you'd need to update the pointer? I'd assume this would be done if someone modifies a configuration variable to update the threshold?

Copy link
Contributor

@michael-redpanda michael-redpanda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks awesome :)

"reset must be called on home shard: ",
_shard);
}
ss::shard_id _shard;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: would it be helpful to have a getter for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

co_await _state
.invoke_on_others([up](auto& state) noexcept { state = up; })
.finally([this, u{std::move(u)}]() mutable noexcept {
std::default_delete<T>{}(_state.local());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe local() is just the raw pointer type T*. The u.release() returns the raw pointer held by the std::unique_ptr and tells std::unique_ptr to not clean up the memory when destructed (ownership transfer).

void(u.release());
}
});
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall I think this method has several races, but I guess it depends on the exact semantics of the sharded_ptr.

E.g., I would expect that when we are not doing any further reset() calls on the pointer, all shards eventually converge on the same value for the pointer, right? I don't think that's going to be the case now since when the owning shard does some concurrent resets, different shards may get a different final value depending on what the last writer was on that shard. This re-ordering could happen on the owning shard, if it suspends part way through start() or invoke_on_others, in the shard<->shard queues (if reordering is allowed there) or on the other shards since task execution order is not guaranteed.

I think there are other races along these lines (all related to concurrent resets).

You could do something fancy with generation counter and ignore old updates, but a big fat mutex around reset should solve it? I think the mutex could also replace the gate: just take the mutex also in stop.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, mutex works for me:

    ss::future<> reset(std::unique_ptr<T> u = nullptr) {
        assert_shard();
        auto mu{co_await _mutex.get_units()};
        auto up{u.get()};

        std::unique_ptr<T> l{
          !_state.local_is_initialized() ? nullptr : _state.local()};
        co_await (
          !_state.local_is_initialized()
            ? _state.start(up)
            : _state.invoke_on_all([up](T*& state) noexcept { state = up; }))
          .finally([this, l{std::move(l)}, u{std::move(u)}]() mutable noexcept {
              if (_state.local() == u.get()) {
                  void(u.release());
              } else if (_state.local() == l.get()) {
                  void(l.release());
              }
          });
    }

    ss::future<> stop() {
        co_await reset();
        co_await _mutex.with([this] { _mutex.broken(); });
        co_await _state.stop();
    }

@BenPope
Copy link
Member Author

BenPope commented Feb 27, 2024

Changes in forcepush

  1. sharded_ptr
    1. Implement over a std::vector<std::shared_ptr>> and a mutex
    2. Add shard_id accesser
    3. Add more testing
    4. Improve documentation
  2. Remove no longer required changes to application.cc

Copy link
Member

@dotnwat dotnwat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haven't made it through the entire PR, but left two questions on the sharded_ptr commit.

sharded_ptr(sharded_ptr&& other) noexcept = delete;
sharded_ptr(sharded_ptr const&) noexcept = delete;
sharded_ptr& operator=(sharded_ptr const&) = delete;
sharded_ptr& operator=(sharded_ptr&&) = delete;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you intended to make the move assignment noexcept (and not the copy constructor)?

Copy link
Member Author

@BenPope BenPope Feb 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you intended to make the move assignment noexcept (and not the copy constructor)?

That was not intentional. Do you have an opinion on not disallowing copy and move? I'm tempted to leave them default, or at least leaving move default.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made it movable and fixed up the destructor and noexcept.

/// reset() must be called on the home shard.
/// The pointer is stable until the fiber yields.
template<typename T>
class sharded_ptr {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT std::shared_ptr<T> instances never escape the interface (e.g. dereference operators proxy to the underlying pointer), so I'm not entirely sure why _state is a vector of std::shared_ptr<T> instances. That is, we'd pay an atomic increment cost to copy the instances into _state, but further accesses via the dereference operators aren't bumping the reference count. am i missing something here?

Copy link
Member

@StephanDollberg StephanDollberg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me now minus the reference comment.

I guess you have decided against adding an interface to sharded_ptr which returns a copy of the local std::shared_ptr given we don't really need it yet?

src/v/kafka/server/snc_quota_manager.h Outdated Show resolved Hide resolved
src/v/ssx/sharded_ptr.h Outdated Show resolved Hide resolved
@BenPope BenPope force-pushed the throughput_shared_token_bucket branch from 06334ae to af9317b Compare February 28, 2024 11:33
ballard26 and others added 6 commits February 28, 2024 12:16
Historical data for this test has shown the p75 end-to-end latency to
always be around 5 to 6ms. This commit changes the validator to allow
for p75 latency of 6ms or less to reduce the number of false positives
in the nightly regression tests.
Signed-off-by: Ben Pope <ben@redpanda.com>
Refactor only, no functional change.

Signed-off-by: Ben Pope <ben@redpanda.com>
New metrics:
* `traffic_egress` - mirrors `traffic_intake`
* `throttle_time` - Histogram of throttle time requested

Signed-off-by: Ben Pope <ben@redpanda.com>
Signed-off-by: Ben Pope <ben@redpanda.com>
When `kafka_throughput_throttling_v2` is enabled (true by default),
use an `ss::internal::shared_token_bucket` for requests across all
shards, instead of balancing quota between shards.

Signed-off-by: Ben Pope <ben@redpanda.com>
@BenPope BenPope force-pushed the throughput_shared_token_bucket branch from af9317b to 6ddfb32 Compare February 28, 2024 12:17
@parametrize(driver_idx="ACK_ALL_GROUP_LINGER_1MS_IDEM_MAX_IN_FLIGHT",
workload_idx="RELEASE_CERT_SMOKE_LOAD_625k")
def test_perf(self, driver_idx, workload_idx):
@parametrize(driver_idx="ACK_ALL_GROUP_LINGER_1MS_IDEM_MAX_IN_FLIGHT")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something went wrong rebasing here I assume? These seem like unrelated changes.

Copy link
Member Author

@BenPope BenPope Feb 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, should have used --keep-base because I can't count to 6.

@BenPope
Copy link
Member Author

BenPope commented Feb 28, 2024

Changes in force-pushes

  • Make sharded_ptr movable and add tests.
  • Make the buckets a reference in snc_quota_manager again.

@BenPope
Copy link
Member Author

BenPope commented Feb 28, 2024

CDT had a single failure; #15893

Copy link
Contributor

@michael-redpanda michael-redpanda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔥 lgtm

@BenPope BenPope merged commit bdd7e92 into redpanda-data:dev Feb 29, 2024
17 checks passed
@vbotbuildovich
Copy link
Collaborator

/backport v23.3.x

@vbotbuildovich
Copy link
Collaborator

Failed to create a backport PR to v23.3.x branch. I tried:

git remote add upstream https://github.com/redpanda-data/redpanda.git
git fetch --all
git checkout -b backport-pr-16441-v23.3.x-611 remotes/upstream/v23.3.x
git cherry-pick -x f5a8aa13acbd5b0838e5acdcf34188f42b11cbcb bd424798c49f43916b08eb3d62d5617b8abd5607 20ca50ab871fbe7e1b009fd4687221596bfd9ba7 679804c8f07c7a983a158d36d37a8680628cda3d 55d8952d9dbe457e3da577eb608146d4c560ec59 6ddfb321cabb74d5b54357a0fec58b6c5d6e47fe

Workflow run logs.

BenPope added a commit to BenPope/redpanda that referenced this pull request Apr 16, 2024
v2 was enabled by default in:
redpanda-data#16441

So running the balancer doesn't do anything other
than waste cpu cycles.

Signed-off-by: Ben Pope <ben@redpanda.com>
BenPope added a commit to BenPope/redpanda that referenced this pull request Apr 16, 2024
v2 was enabled by default in:
redpanda-data#16441

Running the balancer doesn't do anything other
than waste cpu cycles.

Signed-off-by: Ben Pope <ben@redpanda.com>
vbotbuildovich pushed a commit to vbotbuildovich/redpanda that referenced this pull request Apr 22, 2024
v2 was enabled by default in:
redpanda-data#16441

Running the balancer doesn't do anything other
than waste cpu cycles.

Signed-off-by: Ben Pope <ben@redpanda.com>
(cherry picked from commit e0ba45e)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

9 participants