Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RabbitListener's returning Mono<Void> doesn't ack the message #1006

Closed
johanhaleby opened this issue May 22, 2019 · 6 comments · Fixed by #1011
Closed

RabbitListener's returning Mono<Void> doesn't ack the message #1006

johanhaleby opened this issue May 22, 2019 · 6 comments · Fixed by #1011
Assignees
Milestone

Comments

@johanhaleby
Copy link

As discussed on stackoverflow a RabbitListener returning Mono<Void> will not ack the message. For example:

@RabbitListener
public Mono<Void> myListener(MyMessage myMessage) {
    Mono<Void> mono = myService.doSomething(myMessage);
    return mono;
}

The reason is that spring-rabbit doesn't seem to handle empty streams.

@artembilan artembilan self-assigned this May 29, 2019
@artembilan artembilan modified the milestones: 2.2.M3, 2.1.7 May 29, 2019
artembilan added a commit to artembilan/spring-amqp that referenced this issue May 30, 2019
Fixes spring-projects#1006

When listener method returns `Mono<Void>`, the `success` callback is
not called from the Reactor because there is no value to handle.

* Move `basicAck()` to the `completeConsumer` which is called when `Mono`
is completed successfully with value or without

**Cherry-pick to 2.1.x**
garyrussell pushed a commit that referenced this issue May 31, 2019
Fixes #1006

When listener method returns `Mono<Void>`, the `success` callback is
not called from the Reactor because there is no value to handle.

* Move `basicAck()` to the `completeConsumer` which is called when `Mono`
is completed successfully with value or without

**Cherry-pick to 2.1.x**
garyrussell pushed a commit that referenced this issue May 31, 2019
Fixes #1006

When listener method returns `Mono<Void>`, the `success` callback is
not called from the Reactor because there is no value to handle.

* Move `basicAck()` to the `completeConsumer` which is called when `Mono`
is completed successfully with value or without

**Cherry-pick to 2.1.x**
@jndietz
Copy link

jndietz commented Aug 4, 2022

This still appears to be a problem.

    @RabbitListener(queues = "audit")
    Mono<Void> listen(CustomerActivityEventWrapper wrapper) {
        log.debug("Received message {}", wrapper.toString())
        strategies
                .find { it.handles(wrapper.payload) }
                .handleEvent(wrapper.payload)
                      // 👆 returns a Mono<Void>
    }

The API call happening within my strategy occurs, but I still see this in the console:

2022-08-04 12:15:20.056 DEBUG 21028 --- [ntContainer#0-2] com.somecompany.listener.EventListener          : Received message com.somecompany.model.rabbit.event.CustomerActivityEventWrapper@150f7163
2022-08-04 12:15:20.184  WARN 21028 --- [ntContainer#0-2] .a.r.l.a.MessagingMessageListenerAdapter : Container AcknowledgeMode must be MANUAL for a Mono<?> return type; otherwise the container will ack the message immediately
2022-08-04 12:15:20.669 DEBUG 21028 --- [ntContainer#0-2] o.s.w.r.f.client.ExchangeFunctions       : [386278] HTTP POST https:/some-api.somecompany.com/api/v1/customer-events
2022-08-04 12:15:22.303 DEBUG 21028 --- [ctor-http-nio-2] o.s.http.codec.json.Jackson2JsonEncoder  : [386278] Encoding [com.somecompany.model.jsite.event.JsiteAuditEvent@4aac9d27]
2022-08-04 12:15:22.408 DEBUG 21028 --- [ctor-http-nio-2] o.s.w.r.f.client.ExchangeFunctions       : [386278] [1d990f1a-1, L:/10.5.26.18:53692 - R:somecompany.somecompany.com/10.72.93.31:443] Response 201 CREATED
2022-08-04 12:15:22.418 ERROR 21028 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
2022-08-04 12:15:22.774  INFO 21028 --- [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@64f3ca6: tags=[[amq.ctag-Zwr_bRDdqroucKk0JDDOcQ]], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3), conn: Proxy@22c8ee48 Shared Rabbit Connection: SimpleConnection@7f93f4c [delegate=amqp://guest@127.0.0.1:5672/, localPort= 53656], acknowledgeMode=AUTO local queue size=0

@artembilan
Copy link
Member

@jndietz ,

Sorry, what the problem are you observing?
You log doesn't show any errors.

@jndietz
Copy link

jndietz commented Aug 4, 2022

@artembilan There is an ERROR line:

2022-08-04 12:15:22.418 ERROR 21028 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)

My thought is that it isn't acking the message when it should be.

@artembilan
Copy link
Member

unknown delivery tag 1,

I guess that's because you didn't follow the recommendation from the previous log:

Container AcknowledgeMode must be MANUAL for a Mono<?> return type; otherwise the container will ack the message immediately

@jndietz
Copy link

jndietz commented Aug 4, 2022

I guess that's because you didn't follow the recommendation from the previous log:

Yep... you're right. I guess I mixed up how MANUAL is supposed to work with reactor. I assumed I was supposed to also perform the acknowledgement, but I guess that isn't the case. This worked well for me:

    @RabbitListener(queues = "audit", ackMode = "MANUAL") // 👈 no actual acknowledgement needed in the code below
    Mono<Void> listen(CustomerActivityEventWrapper wrapper) {
        log.debug("Received message {}", wrapper.toString())
        strategies
                .find { it.handles(wrapper.payload) }
                .handleEvent(wrapper.payload)
    }

Thanks @artembilan

@garyrussell
Copy link
Contributor

I suppose the error message emitted by the amqp-client might be a bit confusing:

reply-text=PRECONDITION_FAILED - unknown delivery tag 1

In this case, it means delivery tag 1 has already been acknowledged (and therefor the second ack used an "unknown" tag.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment