Skip to content

Commit

Permalink
Merge pull request #16392 from vbotbuildovich/backport-pr-16315-v23.3…
Browse files Browse the repository at this point in the history
….x-145

[v23.3.x] kafka/client: Fix for stall during client shutdown
  • Loading branch information
Rob Blafford authored Feb 1, 2024
2 parents 24c3e6b + 3823027 commit a37302c
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 3 deletions.
22 changes: 22 additions & 0 deletions src/v/kafka/client/produce_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,30 @@ class produce_partition {
vassert(_in_flight, "handle_response requires a batch in flight");
_batcher.handle_response(std::move(res));
_in_flight = false;
if (_await_in_flight) {
_await_in_flight->set_value();
_await_in_flight = std::nullopt;
}
arm_consumer();
}

ss::future<> await_in_flight() {
if (!_in_flight) {
return ss::now();
}
vassert(!_await_in_flight, "Double call to await_in_flight()");
_await_in_flight = ss::promise<>();
return _await_in_flight->get_future();
}

ss::future<> maybe_drain() {
/// Immediately force flush of buffer
if (consumer_can_run()) {
_timer.cancel();
_consumer(co_await do_consume());
}
}

ss::future<> stop() {
_timer.set_callback([]() {});
co_await try_consume();
Expand Down Expand Up @@ -120,6 +141,7 @@ class produce_partition {
int32_t _size_bytes{};
bool _in_flight{};
ss::gate _gate;
std::optional<ss::promise<>> _await_in_flight;
};

} // namespace kafka::client
21 changes: 18 additions & 3 deletions src/v/kafka/client/producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,11 @@ ss::future<> producer::stop() {
/// triggered when the timeout below expires
_ingest_as.request_abort();

/// partition::stop() will invoke send(), it is a last chance best effort
/// attempt for the current queued records to be sent.
/// produce_partition::drain() will invoke send(), it is a last chance best
/// effort attempt for the current queued records to be sent.
co_await ssx::parallel_transform(
_partitions, [](partitions_t::value_type p) { return p.second->stop(); });
_partitions,
[](partitions_t::value_type p) { return p.second->maybe_drain(); });

/// send() is wrapped by a gate, and can be aborted with the internal
/// abort source (_as). This future triggers the abort source after the
Expand Down Expand Up @@ -105,6 +106,20 @@ ss::future<> producer::stop() {
co_await _gate.close();
exit.request_abort();
co_await std::move(abort);
vlog(kclog.debug, "Waiting for inflight state of false");
/// Wait until the produce_partition has no inflight records. That is
/// because if in_flight is true, drain() and stop() will actually not call
/// consume -> send(). This may have been the case when maybe_drain() above
/// was called.
co_await ssx::parallel_transform(
_partitions,
[](partitions_t::value_type p) { return p.second->await_in_flight(); });
vlog(kclog.debug, "Calling produce_partition::stop()");
/// At this point in time there are no inflight requests, for any data that
/// remains in the buffers stop() will be guaranteed to call send() which
/// will return error responses to the initial caller
co_await ssx::parallel_transform(
_partitions, [](partitions_t::value_type p) { return p.second->stop(); });
vlog(kclog.debug, "Producer stopped");
}

Expand Down

0 comments on commit a37302c

Please sign in to comment.