Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Reactive Messaging are not automatically acknowledged when input is payload and output is Message<payload> #40589

Closed
phrovat opened this issue May 13, 2024 · 3 comments

Comments

@phrovat
Copy link

phrovat commented May 13, 2024

Describe the bug

Using throttledcommit strategy in Quarkus Kafka Reactive Messaging and using payload as method parameter doesn't automatically acknowledge the message if the return value is a Message.

2024-05-13 10:52:54,341 WARN  [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-0) SRMSG18231: The record 0 from topic-partition 'joe-out-topic-0' has waited for 4 seconds to be acknowledged. This waiting time is greater than the configured threshold (3000 ms). At the moment 9 messages from this partition are awaiting acknowledgement. The last committed offset for this partition was -1. This error is due to a potential issue in the application which does not acknowledged the records in a timely fashion. The connector cannot commit as a record processing has not completed.
2024-05-13 10:52:54,342 WARN  [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-0) SRMSG18228: A failure has been reported for Kafka topics '[joe-out-topic]': io.smallrye.reactive.messaging.kafka.commit.KafkaThrottledLatestProcessedCommit$TooManyMessagesWithoutAckException: The record 0 from topic/partition 'joe-out-topic-0' has waited for 4 seconds to be acknowledged. At the moment 9 messages from this partition are awaiting acknowledgement. The last committed offset for this partition was -1.

Example non working code:

    @Incoming("payload-in")
    @Outgoing("payload-out")
    public Message<String> processPayload(String in) {
        logger.info("Processing message: " + in);
        return Message.of(in);
    }

Working code

    @Incoming("message-in")
    @Outgoing("message-out")
    public Message<String> processMessage(Message<String> in) {
        logger.info("Processing message: " + in);
        return Message.of(in.getPayload()).withAck(in::ack);
    }

Expected behavior

This behaviour should be documented (https://quarkus.io/guides/kafka#processing-messages) or a message should be automatically acknowledged.

Actual behavior

Message is not acknowledged automatically when processed and offsets are not committed to Kafka broker.

How to Reproduce?

  1. Sample code attached
    kafka-messaging-bug.zip

Output of uname -a or ver

Darwin Kernel Version 23.4.0: Fri Mar 15 00:10:42 PDT 2024; root:xnu-10063.101.17~1/RELEASE_ARM64_T6000 arm64

Output of java -version

openjdk version "17.0.6" 2023-01-17 OpenJDK Runtime Environment GraalVM CE 22.3.1 (build 17.0.6+10-jvmci-22.3-b13) OpenJDK 64-Bit Server VM GraalVM CE 22.3.1 (build 17.0.6+10-jvmci-22.3-b13, mixed mode, sharing)

Quarkus version or git rev

3.10.0

Build tool (ie. output of mvnw --version or gradlew --version)

maven 3.9.6

Additional information

No response

@quarkus-bot
Copy link

quarkus-bot bot commented May 13, 2024

/cc @alesj (kafka), @cescoffier (kafka,reactive-messaging), @ozangunalp (kafka,reactive-messaging)

@cescoffier
Copy link
Member

This is the expected behavior as defined in the specification. See https://smallrye.io/smallrye-reactive-messaging/latest/concepts/acknowledgement/ and https://smallrye.io/smallrye-reactive-messaging/latest/concepts/signatures/#method-signatures-to-generate-data.

Unfortunately, we cannot replicate the whole documentation.

@phrovat
Copy link
Author

phrovat commented May 13, 2024

Thank you. We overlooked that in the documentation.

@phrovat phrovat closed this as completed May 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants