Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v23.3.x] kafka/client: Fix for stall during client shutdown #16392

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/v/kafka/client/produce_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,30 @@ class produce_partition {
vassert(_in_flight, "handle_response requires a batch in flight");
_batcher.handle_response(std::move(res));
_in_flight = false;
if (_await_in_flight) {
_await_in_flight->set_value();
_await_in_flight = std::nullopt;
}
arm_consumer();
}

ss::future<> await_in_flight() {
if (!_in_flight) {
return ss::now();
}
vassert(!_await_in_flight, "Double call to await_in_flight()");
_await_in_flight = ss::promise<>();
return _await_in_flight->get_future();
}

ss::future<> maybe_drain() {
/// Immediately force flush of buffer
if (consumer_can_run()) {
_timer.cancel();
_consumer(co_await do_consume());
}
}

ss::future<> stop() {
_timer.set_callback([]() {});
co_await try_consume();
Expand Down Expand Up @@ -120,6 +141,7 @@ class produce_partition {
int32_t _size_bytes{};
bool _in_flight{};
ss::gate _gate;
std::optional<ss::promise<>> _await_in_flight;
};

} // namespace kafka::client
21 changes: 18 additions & 3 deletions src/v/kafka/client/producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,11 @@ ss::future<> producer::stop() {
/// triggered when the timeout below expires
_ingest_as.request_abort();

/// partition::stop() will invoke send(), it is a last chance best effort
/// attempt for the current queued records to be sent.
/// produce_partition::drain() will invoke send(), it is a last chance best
/// effort attempt for the current queued records to be sent.
co_await ssx::parallel_transform(
_partitions, [](partitions_t::value_type p) { return p.second->stop(); });
_partitions,
[](partitions_t::value_type p) { return p.second->maybe_drain(); });

/// send() is wrapped by a gate, and can be aborted with the internal
/// abort source (_as). This future triggers the abort source after the
Expand Down Expand Up @@ -105,6 +106,20 @@ ss::future<> producer::stop() {
co_await _gate.close();
exit.request_abort();
co_await std::move(abort);
vlog(kclog.debug, "Waiting for inflight state of false");
/// Wait until the produce_partition has no inflight records. That is
/// because if in_flight is true, drain() and stop() will actually not call
/// consume -> send(). This may have been the case when maybe_drain() above
/// was called.
co_await ssx::parallel_transform(
_partitions,
[](partitions_t::value_type p) { return p.second->await_in_flight(); });
vlog(kclog.debug, "Calling produce_partition::stop()");
/// At this point in time there are no inflight requests, for any data that
/// remains in the buffers stop() will be guaranteed to call send() which
/// will return error responses to the initial caller
co_await ssx::parallel_transform(
_partitions, [](partitions_t::value_type p) { return p.second->stop(); });
vlog(kclog.debug, "Producer stopped");
}

Expand Down
Loading