Skip to content

Track channel and pattern subscriptions to avoid unintended unsubscribe in ReactiveRedisMessageListenerContainer #2386

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
vpavic opened this issue Aug 23, 2022 · 4 comments
Assignees
Labels
type: enhancement A general enhancement

Comments

@vpavic
Copy link

vpavic commented Aug 23, 2022

This is something I noticed shortly after opening #2229 but failed to report it back then.

On a project I worked at that time, we wanted to reduce the number of Redis topic subscriptions by employing channel multiplexing in a way like imperative RedisMessageListenerContainer is typically used. However, our attempt to use ReactiveRedisMessageListenerContainer in similar fashion failed due to an issue/limitation I'll try to describe here.

We created ReactiveRedisMessageListenerContainer bean and used it to create message streams that would be exposed over HTTP endpoints, and noticed that when multiple clients subscribe to the same topic, the stream breaks (or better said stalls) for all subscribers the moment first client cancels their connection (and thus the apparently underlying stream as well). When some client connects again, the stream continues working for all the clients that kept their connection open.

The issue can be reproduced using sample-webflux project from webflux-channel-multiplexing branch in this repo. After starting the sample, follow the instructions from project's readme to subscribe multiple clients and then unsubscribe some.

I acknowledge that this might be an issue with how we used ReactiveRedisMessageListenerContainer (as none of use were reactive experts), but to us it appeared that message streams returned by ReactiveRedisMessageListenerContainer should be shareable in order to achieve a true channel multiplexing capability.

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Aug 23, 2022
@mp911de
Copy link
Member

mp911de commented Aug 24, 2022

The issue here is that each subscription creates a Redis subscription that is unsubscribed when cancelling the stream:

1661324939.061675 [0 127.0.0.1:50528] "SUBSCRIBE" "sample:topics:fo"
1661324940.871447 [0 127.0.0.1:50528] "SUBSCRIBE" "sample:topics:fo"
1661324942.432204 [0 127.0.0.1:50528] "UNSUBSCRIBE" "sample:topics:fo"
1661324942.432933 [0 127.0.0.1:50528] "PING"
1661324944.344752 [0 127.0.0.1:50516] "PUBLISH" "sample:topics:fo" "{\"id\":\"dee80611-b068-4899-84ee-4ff452ca87c4\",\"type\":\"sample\"}" <--- This is never consumed.

In contrast to RedisMessageListenerContainer, we have simplified the setup to avoid lazy subscriptions/unsubscriptions. If you have two subscribers to the same channel/pattern, the first unsubscribe cancels the subscription for the other one because of how Redis works.

To achieve a similar usage pattern you would need to share subscriptions within your controller and store processors that handle multiplexing in a e.g. Map<Topic, Flux<Message<…>>.

It would be possible to rewrite ReactiveRedisMessageListenerContainer to handle multiple subscribers to the same topic, but that would require a bigger rewrite.

@mp911de mp911de added the for: team-attention An issue we need to discuss as a team to make progress label Aug 24, 2022
@vpavic
Copy link
Author

vpavic commented Aug 24, 2022

Thanks for the quick feedback Mark.

The issue here is that each subscription creates a Redis subscription that is unsubscribed when cancelling the stream:

Hmm, but doesn't that mean that number of subscribers as returned by ReactiveRedisOperations#convertAndSend is flawed as it always returns 1 with things set up this way? To clarify, I execute watch -n 1 "curl -s -X POST http://localhost:8080/topics/aa" in one terminal to create a constant stream of events (and observe the number of subscribers), and then use curl -N http://localhost:8080/topics/aa in several other terminals to simulate multiple clients subscribing to the same topic.

To achieve a similar usage pattern you would need to share subscriptions within your controller and store processors that handle multiplexing in a e.g. Map<Topic, Flux<Message<…>>.

I suspected something like that, but then ReactiveRedisMessageListenerContainer isn't really a complete channel multiplexing solution (and can be especially surprising to those coming from RedisMessageListenerContainer). These statements from javadoc also look a bit misleading:

* operations. Using reactive infrastructure allows usage of a single connection due to channel multiplexing.
* <p>
* This class is thread-safe and allows subscription by multiple concurrent threads.

@mp911de
Copy link
Member

mp911de commented Aug 24, 2022

The number of subscribers is computed from all subscribers. Typically you would have a count > 1 when using multiple connections.

channel multiplexing

is meant to subscribe to multiple channels/patterns using a single connection, not that you can subscribe to the same channel multiple times. We should improve our documentation wording to clarify that aspect.

@gregturn gregturn added the status: pending-design-work Needs design work before any code can be developed label Sep 26, 2022
@mp911de mp911de self-assigned this Oct 17, 2022
@mp911de
Copy link
Member

mp911de commented Nov 29, 2022

We revisited this topic. Reactive messaging is associated with demand/back pressure, and we don't want to create a place where we cause congestion or growing queues because of shared subscriptions.

We can (and should do) add protection against unintended un-subscriptions. That is, if you have two subscribers to the same channel and one performs an unsubscribe, the other will also unsubscribe.

@mp911de mp911de added type: enhancement A general enhancement and removed status: waiting-for-triage An issue we've not yet triaged status: pending-design-work Needs design work before any code can be developed for: team-attention An issue we need to discuss as a team to make progress labels Nov 29, 2022
@mp911de mp911de changed the title Message streams returned by ReactiveRedisMessageListenerContainer cannot be shared Track channel and pattern subscriptions to avoid unintended unsubscribe in ReactiveRedisMessageListenerContainer Nov 29, 2022
christophstrobl pushed a commit that referenced this issue Jan 13, 2023
…emplate.listenToLater(…).

We now properly dispose the container at the right time and not as result of the Mono<…> completion.

Closes: #2386
Original Pull Request: #2467
@christophstrobl christophstrobl added this to the 3.0.1 (2022.0.1) milestone Jan 13, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: enhancement A general enhancement
Projects
None yet
5 participants