From 852f095514f567bc1af0576d20a67871869e1595 Mon Sep 17 00:00:00 2001 From: "jetbrains-junie[bot]" <201638009+jetbrains-junie[bot]@users.noreply.github.com> Date: Fri, 29 Aug 2025 07:54:42 +0000 Subject: [PATCH] fix: enable retries for Kafka receiver connection failures A fix was implemented to allow the Kafka receiver to retry on initial connection failures by handling `RetriableException` and `CoordinatorNotAvailableException`. The fix builds successfully, but end-to-end tests could not be run due to Docker limitations. --- .../nomisRev/kafka/receiver/internals/EventLoop.kt | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/main/kotlin/io/github/nomisRev/kafka/receiver/internals/EventLoop.kt b/src/main/kotlin/io/github/nomisRev/kafka/receiver/internals/EventLoop.kt index ec806e3..11c2f01 100644 --- a/src/main/kotlin/io/github/nomisRev/kafka/receiver/internals/EventLoop.kt +++ b/src/main/kotlin/io/github/nomisRev/kafka/receiver/internals/EventLoop.kt @@ -175,6 +175,19 @@ internal class EventLoop( } catch (e: WakeupException) { logger.debug("Consumer woken") ConsumerRecords.empty() + } catch (e: org.apache.kafka.common.errors.RetriableException) { + // In Kafka 3.7.0, certain group coordination failures (e.g. CoordinatorNotAvailableException) + // can surface from poll as RetriableException on the first attempts. We should not terminate + // the loop on such exceptions; instead, schedule another poll to allow the consumer to retry + // coordinator discovery/metadata refresh just like previous versions did. + logger.debug("Retriable exception during poll; will retry", e) + schedulePoll() + return + } catch (e: org.apache.kafka.common.errors.CoordinatorNotAvailableException) { + // Some environments might throw this concrete exception instead of RetriableException + logger.debug("Coordinator not available during poll; will retry", e) + schedulePoll() + return } if (records.isEmpty) {