Skip to content

Duplicate Message Consumption in Spring-Pulsar Consumer (There is nothing wrong with the official Pulsar client.) #1231

@Programmer-yyds

Description

@Programmer-yyds

Description

During testing, it was found that Spring-Pulsar’s ack handling behaves inconsistently compared to the native Pulsar Client, Spring-Pulsar There is a problem of repeated consumption..

Comparison logic is as follows:

  1. Consumer ack timeout is set to 8 seconds (ackTimeout = 8s / ackTimeoutMillis = 8000).
  2. Business logic simulates a 5-second block, theoretically completing ack within the timeout period.
  3. Expected behavior: Business execution time < timeout → message should be properly acked, no redelivery should occur.
  4. Actual result:
    • With native Pulsar Client → works as expected, message is acked properly, no duplicate consumption.
    • With Spring-Pulsar (@PulsarListener, ackMode = RECORD) → even when business processing time < timeout, duplicate consumption still occurs, indicating that Spring-Pulsar’s ack logic might not be taking effect in time.

Environment Information

  • Pulsar Version: apachepulsar/pulsar:3.3.0 (standalone mode)
  • Spring Boot Version: 3.4.7
  • Spring-Pulsar Version: 3.4.7
  • pulsar-client-all版本: Directly using the Spring-Pulsar memory-dependent package (3.3.7).
  • JDK Version: 21
  • Deployment Environment: Local (pulsar://localhost:6650)

Attachments:

pulsar-consume4.zip

A complete demo has been uploaded (including pom.xml, Producer, Native Consumer, Spring-Pulsar Consumer, and log files) that can be run directly to reproduce the issue.

Project Structure

├── log
│   ├── PulsarConsume.log             # Native Pulsar Client consumer log
│   └── SpringPulsarConsume.log       # Spring-Pulsar consumer log
├── pom.xml
├── src
│   ├── main
│   │   ├── java
│   │   │   └── org
│   │   │       └── pulsar
│   │   │           └── consume
│   │   │               ├── PulsarConsumeApplication.java
│   │   │               ├── original
│   │   │               │   └── PulsarConsume.java          # Native Pulsar Client consumer
│   │   │               └── spring
│   │   │                   └── SpringPulsarConsume.java    # Spring-Pulsar consumer
│   │   └── resources
│   │       └── application.yml
│   └── test
│       └── java
│           └── org
│               └── pulsar
│                   └── consume
│                       └── PulsarConsumeApplicationTest.java # Producer

Reproduction Code

Native Pulsar Client (Works Fine, No Duplicate Consumption)

Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topic("persistent://public/pulsar/mys-test")
    .subscriptionName("my-shared-sub")
    .subscriptionType(SubscriptionType.Shared)
    .ackTimeout(8, TimeUnit.SECONDS)
    .receiverQueueSize(10)
    .subscribe();

while (true) {
    Message<String> msg = consumer.receive();
    try {
        log.info("Message received: {}", msg.getValue());
        Thread.sleep(5000); // Simulate business processing
        consumer.acknowledge(msg); // Ack prevents duplicate consumption
    } catch (Exception e) {
        consumer.negativeAcknowledge(msg);
    }
}

Spring-Pulsar Consumer (Duplicate Consumption Occurs)

@PulsarListener(
    topics = "mys-test",
    ackMode = AckMode.RECORD,
    subscriptionType = SubscriptionType.Shared,
    schemaType = SchemaType.STRING,
    properties = {"ackTimeoutMillis=8000", "receiverQueueSize=10"}
)
public void consumeString2(Message<String> message) throws InterruptedException {
    log.info("{} message: {} Thread:{}", message.getTopicName(), message.getValue(), Thread.currentThread().getName());
    if (messageSet.contains(message.getValue())) {
        log.info("Repeated consumption, TopicName: {} message: {}", message.getTopicName(), message.getValue());
    }
    messageSet.add(message.getValue());
    TimeUnit.SECONDS.sleep(5); // Simulate business processing
}
Image

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions