Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't PUBLISH when subscribed with RESP3 #2594

Closed
Tmpod opened this issue Jan 12, 2024 · 1 comment
Closed

Can't PUBLISH when subscribed with RESP3 #2594

Tmpod opened this issue Jan 12, 2024 · 1 comment
Labels
status: good-first-issue An issue that can only be worked on by brand new contributors type: bug A general bug
Milestone

Comments

@Tmpod
Copy link

Tmpod commented Jan 12, 2024

Bug Report

Current Behavior

Stack trace
io.lettuce.core.RedisException: Command PUBLISH not allowed while subscribed. Allowed commands are: [PSUBSCRIBE, QUIT, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, PING]
	at io.lettuce.core.pubsub.PubSubEndpoint.rejectCommand(PubSubEndpoint.java:167)
	at io.lettuce.core.pubsub.PubSubEndpoint.write(PubSubEndpoint.java:131)
	at io.lettuce.core.RedisChannelHandler.dispatch(RedisChannelHandler.java:218)
	at io.lettuce.core.StatefulRedisConnectionImpl.dispatch(StatefulRedisConnectionImpl.java:163)
	at io.lettuce.core.RedisPublisher$RedisSubscription.dispatchCommand(RedisPublisher.java:395)
	at io.lettuce.core.RedisPublisher$CommandDispatch$1.dispatch(RedisPublisher.java:470)
	at io.lettuce.core.RedisPublisher$RedisSubscription.checkCommandDispatch(RedisPublisher.java:390)
	at io.lettuce.core.RedisPublisher$State$2.request(RedisPublisher.java:541)
	at io.lettuce.core.RedisPublisher$RedisSubscription.request(RedisPublisher.java:248)
	at reactor.core.publisher.MonoNext$NextSubscriber.request(MonoNext.java:108)
	at reactor.core.publisher.MonoNext$NextSubscriber.request(MonoNext.java:108)
	at reactor.core.publisher.StrictSubscriber.onSubscribe(StrictSubscriber.java:77)
	at reactor.core.publisher.MonoNext$NextSubscriber.onSubscribe(MonoNext.java:70)
	at reactor.core.publisher.MonoNext$NextSubscriber.onSubscribe(MonoNext.java:70)
	at io.lettuce.core.RedisPublisher$State$1.subscribe(RedisPublisher.java:518)
	at io.lettuce.core.RedisPublisher$RedisSubscription.subscribe(RedisPublisher.java:231)
	at io.lettuce.core.RedisPublisher.subscribe(RedisPublisher.java:130)
	at reactor.core.publisher.MonoFromPublisher.subscribe(MonoFromPublisher.java:63)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4444)
	at kotlinx.coroutines.reactive.AwaitKt.awaitOne(Await.kt:190)
	at kotlinx.coroutines.reactive.AwaitKt.awaitOne$default(Await.kt:183)
	at kotlinx.coroutines.reactive.AwaitKt.awaitFirstOrNull(Await.kt:45)
	...
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)

Input Code

Input Code
// using kotlinx.coroutines (+reactive APIs)
val uri = RedisURI.Builder/*...*/.build()
val client = RedisClient.create(uri)
client.options = ClientOptions.builder().protocolVersion(ProtocolVersion.RESP3).build()
val channel = connectPubSub<String, String>().reactive()

channel.subcribe("...").awaitFirstOrNull()
channel.publish("...", "...").awaitFirstOrNull()

Expected behavior/code

The message is published even though the client is subscribed to a channel already.

Environment

  • Lettuce version(s): 6.2.6.RELEASE
  • Redis version: 7.2.3

Possible Solution

Patch PubSubEndpoint#isAllowed to check for the client's protocol version.

Something like:

// Would require DefaultEndpoint#clientOptions be protected, instead of private.
private boolean isAllowed(RedisCommand<?, ?, ?> command) {
    return clientOptions.getProtocolVersion() == ProtocolVersion.RESP3 || ALLOWED_COMMANDS_SUBSCRIBED.contains(command.getType().name());
}

Additional context

https://redis.io/commands/subscribe

Once the client enters the subscribed state it is not supposed to issue any other commands, except for additional SUBSCRIBE, SSUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE, SUNSUBSCRIBE, PUNSUBSCRIBE, PING, RESET and QUIT commands. However, if RESP3 is used (see HELLO) it is possible for a client to issue any commands while in subscribed state.

added emphasis

@Tmpod
Copy link
Author

Tmpod commented Jan 12, 2024

Just noticed my suggested change would require changing DefaultEndpoint#clientOptions from private to protected. Unsure if that is alright within this project's design.

@mp911de mp911de added type: bug A general bug status: good-first-issue An issue that can only be worked on by brand new contributors labels Jan 16, 2024
@mp911de mp911de added this to the 6.3.2.RELEASE milestone Mar 13, 2024
mp911de pushed a commit that referenced this issue Mar 13, 2024
mp911de added a commit that referenced this issue Mar 13, 2024
Update tests to reflect new behavior. Use negotiated protocol version and fall back to the configured one of no negotiated version is available.

Original pull request: #2778
mp911de pushed a commit that referenced this issue Mar 13, 2024
mp911de added a commit that referenced this issue Mar 13, 2024
Update tests to reflect new behavior. Use negotiated protocol version and fall back to the configured one of no negotiated version is available.

Original pull request: #2778
@mp911de mp911de closed this as completed Mar 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: good-first-issue An issue that can only be worked on by brand new contributors type: bug A general bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants