Skip to content

Commit

Permalink
Merge pull request #15589 from oleiman/vbotbuildovich/backport-15438-…
Browse files Browse the repository at this point in the history
…v23.2.x-912

[v23.2.x] kafka/client: Configuration for consume min and max bytes
  • Loading branch information
piyushredpanda committed Dec 14, 2023
2 parents c2eef6b + deb2c63 commit 7f69de2
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 10 deletions.
16 changes: 11 additions & 5 deletions src/v/kafka/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,14 @@ ss::future<> client::update_metadata(wait_or_start::tag) {
}

ss::future<> client::apply(metadata_response res) {
co_await _brokers.apply(std::move(res.data.brokers));
co_await _topic_cache.apply(std::move(res.data.topics));
_controller = res.data.controller_id;
try {
co_await _brokers.apply(std::move(res.data.brokers));
co_await _topic_cache.apply(std::move(res.data.topics));
_controller = res.data.controller_id;
} catch (const std::exception& ex) {
vlog(kclog.debug, "{}Failed to apply metadata request: {}", *this, ex);
throw;
}
}

ss::future<> client::mitigate_error(std::exception_ptr ex) {
Expand Down Expand Up @@ -375,9 +380,10 @@ ss::future<fetch_response> client::fetch_partition(
model::offset offset,
int32_t max_bytes,
std::chrono::milliseconds timeout) {
const auto min_bytes = _config.consumer_request_min_bytes();
auto build_request =
[offset, max_bytes, timeout](model::topic_partition& tp) {
return make_fetch_request(tp, offset, max_bytes, timeout);
[offset, min_bytes, max_bytes, timeout](model::topic_partition& tp) {
return make_fetch_request(tp, offset, min_bytes, max_bytes, timeout);
};

return ss::do_with(
Expand Down
2 changes: 1 addition & 1 deletion src/v/kafka/client/client_fetch_batch_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class client_fetcher final : public model::record_batch_reader::impl {
auto res = co_await _client.fetch_partition(
_tp,
_next_offset,
1_MiB,
_client.config().consumer_request_max_bytes,
std::chrono::duration_cast<std::chrono::milliseconds>(
t - model::timeout_clock::now()));
vlog(
Expand Down
10 changes: 9 additions & 1 deletion src/v/kafka/client/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,20 @@ configuration::configuration()
"Interval (in milliseconds) for consumer request timeout",
{},
100ms)
, consumer_request_min_bytes(
*this,
"consumer_request_min_bytes",
"Min bytes to fetch per request",
{},
1,
{.min = 0})
, consumer_request_max_bytes(
*this,
"consumer_request_max_bytes",
"Max bytes to fetch per request",
{},
1_MiB)
1_MiB,
{.min = 0})
, consumer_session_timeout(
*this,
"consumer_session_timeout_ms",
Expand Down
4 changes: 3 additions & 1 deletion src/v/kafka/client/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*/

#pragma once
#include "config/bounded_property.h"
#include "config/config_store.h"
#include "config/tls_config.h"

Expand All @@ -34,7 +35,8 @@ struct configuration final : public config::config_store {
config::property<int32_t> produce_batch_size_bytes;
config::property<std::chrono::milliseconds> produce_batch_delay;
config::property<std::chrono::milliseconds> consumer_request_timeout;
config::property<int32_t> consumer_request_max_bytes;
config::bounded_property<int32_t> consumer_request_min_bytes;
config::bounded_property<int32_t> consumer_request_max_bytes;
config::property<std::chrono::milliseconds> consumer_session_timeout;
config::property<std::chrono::milliseconds> consumer_rebalance_timeout;
config::property<std::chrono::milliseconds> consumer_heartbeat_interval;
Expand Down
2 changes: 1 addition & 1 deletion src/v/kafka/client/consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ ss::future<fetch_response> consumer::fetch(
.data = {
.replica_id = consumer_replica_id,
.max_wait_ms = timeout,
.min_bytes = 1,
.min_bytes = _config.consumer_request_min_bytes,
.max_bytes = max_bytes.value_or(
_config.consumer_request_max_bytes),
.isolation_level = model::isolation_level::
Expand Down
3 changes: 2 additions & 1 deletion src/v/kafka/client/fetcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ namespace kafka::client {
fetch_request make_fetch_request(
const model::topic_partition& tp,
model::offset offset,
int32_t min_bytes,
int32_t max_bytes,
std::chrono::milliseconds timeout) {
std::vector<fetch_request::partition> partitions;
Expand All @@ -41,7 +42,7 @@ fetch_request make_fetch_request(
.data = {
.replica_id{consumer_replica_id},
.max_wait_ms{timeout},
.min_bytes = 0,
.min_bytes = min_bytes,
.max_bytes = max_bytes,
.isolation_level = model::isolation_level::read_uncommitted,
.topics{std::move(topics)}}};
Expand Down
1 change: 1 addition & 0 deletions src/v/kafka/client/fetcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ namespace kafka::client {
fetch_request make_fetch_request(
const model::topic_partition& tp,
model::offset offset,
int32_t min_bytes,
int32_t max_bytes,
std::chrono::milliseconds timeout);

Expand Down

0 comments on commit 7f69de2

Please sign in to comment.