-
Notifications
You must be signed in to change notification settings - Fork 604
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[v23.1.x] Limit memory used while fetching many partitions #11302
Conversation
`numeric_bounds<T>` implied the argument to be an integral numeric by requiring the `%` operation on it. This change renames `numeric_bounds` into `numeric_integral_bounds` to emphasize that, and introduces the `numeric_bounds` that does not support alignments and odd/even checks, and thus works with floating point types too. The `bounded_property` now can accept an arbitrary bounds struct that conforms to `detail::bounds<>` concept. For compatibility purpose, it defaults to `numeric_integral_bounds` so no code change is necessary. (cherry picked from commit 1db2edc)
(cherry picked from commit ee5e069)
While limiting the number of partitions in fetch response by `kafka_max_bytes_per_fetch`, also consider the fetch plan's `bytes_left` which is based on on fetch request's max_bytes and on `fetch_max_bytes` property. (cherry picked from commit eb8a915)
Functions down the fetch code path will need access to the local kafka::server instance members like memory semaphores. (cherry picked from commit d891efe)
fix uninitialized max_service_memory_per_core, also disable metrics (cherry picked from commit 58a9de0)
Kafka server now stores (per shard) memory semaphore that will limit memory usage by fetch request handler. Semaphore count is configured based on the "kafka_memory_share_for_fetch" property and the kafka rpc service memory size. Metric `vectorized_kafka_rpc_fetch_avail_mem_bytes` added to control the semaphore level. There is a sharded `server` accessor in `request_context` to reach the local shard instance of the new semaphore, as well as the local instance of `net::memory` semaphore. (cherry picked from commit 7b38601)
(cherry picked from commit c1d77cd)
Consult with memory semaphores on whether there is enough memory available to perform the fetch while concurrently fetching from ntps. Both general kafka memory semaphore, and the new kafka fetch memory semaphores are used. With the former one, the amount consumed from it by request memory estimator is considered. Since batch size is not known ahead, it is estimated at 1 MiB. The first partition in the list is fetched regardless of the semaphores values, to satisfy the requirement that at least a signle partition from the fetch request must advance. The amount of units held is adjusted to the actual size used as soon as it is known. The acquired units of the memory semaphores are held with `read_result` until it is destroyed at the end of the fetch request processing. When `read_result` is destroyed in the connection shard, the semaphore units are returned in the shard where they have been acquired. If request's max_size bytes is more than either semaphore holds, max_size is reduced to the memory actually available, also considering the minimum batch size. (cherry picked from commit 2474ef6)
In kafka_server_rpfixture, an extra `kafka::server` is created using a barely initialized `server_configuration` instance. A garbage in `max_service_memory_per_core` has caused issues now because of the new arithmetics done with in in the kafka::server ctor. (cherry picked from commit 623e613)
Test the algorithm that decides whether can a fetch request proceed in an ntp based on the resources available. Move the existing testing-only symbols into the `testing` ns. (cherry picked from commit 950abc7)
RAM increased to 512M because redpanda was failing on 256M for unrelated reasons. Test with different values for "kafka_memory_share_for_fetch". (cherry picked from commit 06a38b9)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is missing which makes sense: d3d9276
/** | ||
* Create a fetch plan with the simple fetch planner. | ||
* | ||
* Exposed for testing/benchmarking only. | ||
*/ | ||
kafka::fetch_plan make_simple_fetch_plan(op_context& octx); | ||
|
||
read_result::memory_units_t reserve_memory_units( | ||
ssx::semaphore& memory_sem, | ||
ssx::semaphore& memory_fetch_sem, | ||
const size_t max_bytes, | ||
const bool obligatory_batch_read); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is accidentally nabbed from 47892b7
@@ -146,14 +147,11 @@ read_result::memory_units_t::~memory_units_t() noexcept { | |||
if (shard == ss::this_shard_id()) { | |||
return; | |||
} | |||
auto f = ss::smp::submit_to( | |||
ssx::background = ss::smp::submit_to( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change isn't in the original PR, but it';s a new seastar feature, and this change is fine.
Replaced by #11858 |
Review comments addressed inhttps://github.com//pull/11858
Backport of PR #10905
Fixes #11262