Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AMQP Shutdown causes RMQ messages to lose order. (Single Active Consumer and Exclusive consumers) #8668

Closed
stikku opened this issue Jul 7, 2023 · 8 comments · Fixed by #8673

Comments

@stikku
Copy link

stikku commented Jul 7, 2023

In what version(s) of Spring Integration are you seeing this issue?

Tested on: spring boot 3.1.1 and Spring Integration 6.1.1
Also tested on: spring boot 2.7.4 and Spring Integration 5.5.15

Same behaviour on both versions.

Describe the bug

Rabbit MQ provides Single Active Consumers and Exclusive consumers to maintain message order given multiple consumers on a single queue. These maintain order by only allowing a single consumer to process messages at any point in time.

Spring Integration AMQP causes messages to be processed in parallel (and out of order) during shutdown of one instance.

Given:

  • Consumer A (active consumer)
  • Consumer B (waiting consumer)
  • 20 Messages in RMQ (M1 to M20)
  • prefetch = 5 (bug happens with any prefetch amount)
  • Consumer acknowledgement enabled.

Consumer A "prefetches" M1 to M5 locally.
image
While processing M1, Consumer A is instructed to shutdown gracefully (kill -INT <process_id> or similar). The Consumer A's consumer (not channel) is unregistered from RabbitMQ but continues to process M1 to M5, at the same time Consumer B is set to active and "prefetches" and processes M6 to M10. This results in (M1 to M5) and (M6 to M10) to be processed in parallel.
image

Note if Consumer A fails to process M1 to M5, these will be re-queued in RMQ, and consumed by Consumer B, AFTER M6 to M10 have been processed.

I've attempted to replicate this bug using RabbitMQ client directly and using Spring AMQP (not integration), but was unable to.

To Reproduce

See also provided github sample.

  • Register two consumers (on two difference processes) on the same queue.
  • Block consumption of messages on Consumer A (using single thread breakpoint or Thread.sleep)
    • Note: Some IDEs disable breakpoints if process is instructed to stop from within the IDE 😭
  • Publish a number of messages to queue (more than prefetch)
  • signal Consumer A to shutdown gracefully
  • Watch as Consumer B starts consuming new messages while Consumer A is still processing its messages.

Expected behavior

When using exclusive consumers or Single Active Consumer, at no point should multiple consumer be processing messages at the same time.
On shutdown of active consumer application, waiting consumer cannot become active until prior consumer has processed all of its unacked messages.

Sample

Sample is provided here. Due to the need for multiple instances, its README.md will guide you on how to use.

This was also tested using Spring-AMQP library (here) and using RabbitMQ's client (here) directly. Bug could not be replicated using these libraries.

@stikku stikku added status: waiting-for-triage The issue need to be evaluated and its future decided type: bug labels Jul 7, 2023
@artembilan
Copy link
Member

Can you tell us, please, what is really a claim that the problem belongs to Spring Integration, but not Spring AMQP or even directly to AMQP Java client?

Thanks

@artembilan artembilan added status: waiting-for-reporter Needs a feedback from the reporter and removed status: waiting-for-triage The issue need to be evaluated and its future decided labels Jul 7, 2023
@stikku
Copy link
Author

stikku commented Jul 8, 2023

Thank you for the question @artembilan . I've also done POCs using RabbitMQ client directly and using Spring-AMQP. In all POCs I haven't modified the shutdown behaviour (tried to keep it as simple as possible). This ordering bug only occurred when using Spring-integration-amqp.

RabbitMQ client POC can be found here
Spring AMQP POC can be found here

Both of these maintain message order.

@artembilan
Copy link
Member

I don't see args.put("x-queue-type", "quorum"); in plain Spring AMQP or RabbitMQ samples.
Plus it might be great if Spring Integration sample would be simplified similar way as others.
Right now there is some extra stuff which might be unrelated, but still makes some noise in review.

My thought was that you do some thread shifting when you process a message downstream, but no: looks like everything is done in a consumer thread.
Spring Integration sample should be very close to the Spring AMQP one since channel adapter is fully based on a listener container from Spring AMQP.

@garyrussell
Copy link
Contributor

garyrussell commented Jul 10, 2023

Your Spring Integration sample is using a much older (unsupported) version of Spring AMQP (2.3.16).

https://spring.io/projects/spring-amqp#support

Your Spring AMQP sample is using 3.0.5.

But I am seeing the same result with 2.4.13 (and the client used by 3.0.5).

            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-amqp</artifactId>
                <version>2.4.13</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit</artifactId>
                <version>2.4.13</version>
            </dependency>
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.16.0</version>
            </dependency>

If I downgrade your Spring AMQP sample to Boot 2.7.13, I see the same behavior there.

We need to figure out what changed in 3.0.x (I don't recall any changes that would affect this).

@garyrussell
Copy link
Contributor

Correction; the Spring AMQP sample is working correctly for me with Boot 2.7.13 and 3.1.1; however the integration sample fails with both.

Weird.

@garyrussell
Copy link
Contributor

I cannot explain why the Spring AMQP sample works as expected; on the console, I see the second consumer is marked as active (and the first goes away) while it continues to drain the queue, but the second does not actually consume anything until the first app terminates.

I would have expected it to fail the same way as the integration sample (since it's using the same code underneath) but, clearly, there is a difference.

I think we need to add an option to the container (in Spring AMQP) to requeue any unprocessed messages after a cancel is received.

@garyrussell
Copy link
Contributor

spring-projects/spring-amqp#2482

garyrussell added a commit to garyrussell/spring-integration that referenced this issue Jul 12, 2023
@artembilan artembilan added this to the 6.2.0-M1 milestone Jul 12, 2023
artembilan pushed a commit that referenced this issue Jul 12, 2023
Resolves #8668

**cherry-pick to all supported branches**
artembilan pushed a commit that referenced this issue Jul 12, 2023
Resolves #8668

**cherry-pick to all supported branches**

(cherry picked from commit fc3c8d2)
artembilan pushed a commit that referenced this issue Jul 12, 2023
Resolves #8668

**cherry-pick to all supported branches**

(cherry picked from commit fc3c8d2)

# Conflicts:
#	src/reference/asciidoc/amqp.adoc
artembilan pushed a commit that referenced this issue Jul 12, 2023
Resolves #8668

**cherry-pick to all supported branches**

(cherry picked from commit fc3c8d2)

# Conflicts:
#	src/reference/asciidoc/amqp.adoc
garyrussell added a commit that referenced this issue Jul 13, 2023
Boot is no longer releasing 2.6.x with latest dependencies.
@lukebakken
Copy link

lukebakken commented Jul 19, 2023

Thank you @garyrussell!!!

If you're interested, this issue was originally reported here - https://stackoverflow.com/q/76563078/1466825

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants