Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to use Redis pub/sub with Quarkus and ReactiveRedisClient? #21108

Closed
petarminchev opened this issue Oct 31, 2021 · 16 comments · Fixed by #26268
Closed

How to use Redis pub/sub with Quarkus and ReactiveRedisClient? #21108

petarminchev opened this issue Oct 31, 2021 · 16 comments · Fixed by #26268
Labels
area/redis kind/question Further information is requested
Milestone

Comments

@petarminchev
Copy link

I am trying to use Redis pub/sub with Quarkus, but I cannot find any documentation about it. I managed to use the Quarkus Redis Extension with Redis Streams xadd and xread, but cannot make Redis pub/sub work. I am using ReactiveRedisClient and when I try to subscribe to a redis channel in a Quarkus test I receive following error:

Error received: io.vertx.core.impl.NoStackTraceThrowable: PubSub command in connection-less mode not allowed

I found the error is thrown in vertx redis extension BaseRedisClient class:

@Override
  public Future<@Nullable Response> send(Request command) {
    final Promise<Response> promise = vertx.promise();

    if (command.command().isPubSub()) {
      // mixing pubSub cannot be used on a one-shot operation
      promise.fail("PubSub command in connection-less mode not allowed");
      return promise.future();
    }
  }

Generally I find Quarkus would benefit of documenting how to use Redis Pub/Sub cause I found other people are also confused how to use it exactly.

Thanks in advance for any help!

@quarkus-bot
Copy link

quarkus-bot bot commented Oct 31, 2021

/cc @cescoffier, @gsmet, @machi1990

@geoand geoand added the kind/question Further information is requested label Nov 1, 2021
@machi1990
Copy link
Member

Error received: io.vertx.core.impl.NoStackTraceThrowable: PubSub command in connection-less mode not allowed

The error means you are trying to use the same client to publish and subscribe which is not allowed by the upstream redis client. /cc @pmlopes

The issue has been reported in the past (I cannot find the reference to it) and our recommendation has been to create two separate clients using the multiple redis client feature. One create will perform the subscription and the other to publish.

quarkus.redis.hosts=redis://localhost:6379
quarkus.redis.publish.hosts=redis://localhost:6379
@Inject
ReactiveRedisClient defaultClient; // you can use the default client for subscription


@Inject
@RedisClientName("publish")
ReactiveRedisClient publishingClient; //  you can use this client for publishing

@petarminchev
Copy link
Author

petarminchev commented Nov 2, 2021

@machi1990 Thank you for the answer, but I am not sure if this is the case. I literally don't have any code inside my test - it is just a subscribe called and nothing more - no one is publishing or any other action is going on. Could you try such a test by your side with latest Quarkus - I am using Quarkus dev services in test.

@QuarkusTest
public class RedisTest {

    @Inject
    ReactiveRedisClient reactiveRedisClient;
    
    @Test
    public void redisTest() {
    	reactiveRedisClient.subscribe(List.of("CHANNEL1", "CHANNEL2"))
	    	.subscribe().with(
					response -> System.out.println("Subscribe Response: " + response),
					error -> System.out.println("Error received: " + error));
    }
}

@petarminchev
Copy link
Author

petarminchev commented Nov 2, 2021

I also noticed this inside vertx github - vert-x3/vertx-redis-client#273. I have no idea what is a connection-less mode, but seems subscribe is not allowed in it - only publish/pubsub, but not subscribe. If someone could shed on a light on this it would be great.

@machi1990
Copy link
Member

@machi1990 Thank you for the answer, but I am not sure if this is the case. I literally don't have any code inside my test - it is just a subscribe called and nothing more - no one is publishing or any other action is going on. Could you try such a test by your side with latest Quarkus - I am using Quarkus dev services in test.

@QuarkusTest
public class RedisTest {

    @Inject
    ReactiveRedisClient reactiveRedisClient;
    
    @Test
    public void redisTest() {
    	reactiveRedisClient.subscribe(List.of("CHANNEL1", "CHANNEL2"))
	    	.subscribe().with(
					response -> System.out.println("Subscribe Response: " + response),
					error -> System.out.println("Error received: " + error));
    }
}

Right, I didn't know if you were referencing the subscribe command. I initially thought you were using the pubsub command, my mistake sorry.

I also noticed this inside vertx github - vert-x3/vertx-redis-client#273. I have no idea what is a connection-less mode, but seems subscribe is not allowed in it - only publish/pubsub, but not subscribe. If someone could shed on a light on this it would be great.

@pmlopes would you have an idea?

@DarioFR
Copy link

DarioFR commented Nov 21, 2021

Hello. I'm currently facing the same issue which prevents even a simple application with a publisher and subscriber from properly working. Any news about a possible solution/workaround?

Thanks.

@petarminchev
Copy link
Author

petarminchev commented Nov 23, 2021

Today I had same time to dig deeper and found the issue.

Basically the vertx redis client doesn't support pub/sub in connection pooled mode.

https://vertx.io/docs/vertx-redis-client/java/#_connection_pooling
Pooling is not compatible with SUBSCRIBE, UNSUBSCRIBE, PSUBSCRIBE or PUNSUBSCRIBE because these commands will modify the way the connection operates and the connection cannot be reused.

https://vertx.io/docs/vertx-redis-client/java/#_pubsub_mode

And since all Quarkus producers inside RedisClientsProducer class are using the BaseRedisClient which has a connection manager of a connection pool, then ReactiveRedisClient is not supporting pub/sub.

In my opinion Quarkus should support a setting boolean flag to switch between connection pool mode and single connection mode for vertx redis extension. You can see in RedisAPIImpl class of Vertx:

public RedisAPIImpl(RedisConnection connection) {
    this.connection = connection;
    this.redis = null;
  }

  public RedisAPIImpl(Redis redis) {
    this.connection = null;
    this.redis = redis;
  }
  
if (redis != null) {
      // operating in pooled mode
      redis.send(req, promise);
    } else if (connection != null) {
      // operating on connection mode
      connection.send(req, promise);
    }

In my opinion Quarkus needs a config flag which of the two constructors to use. Currently Quarkus uses only the 2nd constructor which calls the connection pooling of vertx and vertx doesn't support pub/sub in connection pooling mode.

If Quarkus doesn't plan to fix this, then at least it should be mentioned in the doc.

The workaround currently is to directly use the Vertx redis extension and bypass the Quarkus injectors. I managed to make it work with Mutiny by manually instantiating the Quarkus Redis Mutiny wrappers, but in my opinion it is much simpler to use directly what Vertx provides for pub/sub. Otherwise we need to subscribe to event bus, etc...

Here is a simple example to use only Vertx:

Redis redisClient = Redis.createClient(vertx, "redis://localhost:1234");
    	redisClient
    		.connect()
    	    .onSuccess(conn -> {    
    	    	System.out.println("Successfully connected = " + conn + " " + Thread.currentThread().getName());	
    	    	
    	    	//subscribe(conn);
    	    	
    	    	conn.handler(message -> {
    	    		// do whatever you need to do with your message
    	    		System.out.println("Message = " + message + " " + Thread.currentThread().getName());
	    	    }); 
    	    	
    	    	conn.send(Request.cmd(Command.SUBSCRIBE).arg("CHANNEL1").arg("CHANNEL2"))
    	    	  .onSuccess(res -> {
    	    		  System.out.println("Subscribed");
    	    		        	    		
      	    		  publish(conn);
    	    	  });    	    	    	    	  
    	    })
    	    .onFailure(ex -> {
    	    	System.out.print("Failed to connect " + ex.getMessage());
    		    ex.printStackTrace();
    	    });
    	    
private void publish(RedisConnection conn) {
    	conn.send(Request.cmd(Command.PUBLISH).arg("CHANNEL1").arg("Hello World!"))
	  	  .onSuccess(res -> {
	  		    System.out.println("Published Hello World, res = " + res);
	  		  });
	  	
    	conn.send(Request.cmd(Command.PUBLISH).arg("CHANNEL2").arg("How are you!"))
		  	  .onSuccess(res -> {
		  		    System.out.println("Published How are you!, res = " + res);
		  		  });
  	
    }

To conclude - I think Quarkus should take some action - at least mention that in the doc and maybe provide that boolean flag config to be able to configure the injected client NOT to use pooling.

And here is the workaround to use the Mutiny wrappers after manually getting the vertx connection from the callback, so the pooling is bypassed:

private void subscribe(RedisConnection connection) {
    	RedisAPI vertxRedisAPI = RedisAPI.api(connection);
    	io.vertx.mutiny.redis.client.RedisAPI redisAPI = new io.vertx.mutiny.redis.client.RedisAPI(vertxRedisAPI);

    	redisAPI.subscribe(List.of("CHANNEL1", "CHANNEL2"))
	    	.subscribe().with(
					response -> { System.out.println("Subscribed"); publish(redisAPI); },
					error -> System.out.println("Error received: " + error));
    }
    
    private void publish(io.vertx.mutiny.redis.client.RedisAPI redisAPI) {
    	redisAPI.publish("CHANNEL1", "message1").subscribe().with(
				response -> System.out.println("Published 1st Message Response: " + response),
				error -> System.out.println("Error received: " + error));
    	
    	redisAPI.publish("CHANNEL2", "message1").subscribe().with(
				response -> System.out.println("Published 2nd Message Response: " + response),
				error -> System.out.println("Error received: " + error));
    }
    
    public void initializeConsumer() {
    	eventBus.<JsonObject>consumer("io.vertx.redis.CHANNEL1", message -> {
    		System.out.println("Consumed event = " + message.body());
    	});
    	
    	eventBus.<JsonObject>consumer("io.vertx.redis.CHANNEL2", message -> {
    		System.out.println("Consumed event = " + message.body());
    	});
    }

@petarminchev
Copy link
Author

petarminchev commented Nov 24, 2021

Forgot also to say: do NOT publish & consume in same connection like in the above examples - it was for a test. Just open separate connections.

@tamht298
Copy link

I had same issue, do you solve it?

@petarminchev
Copy link
Author

I had same issue, do you solve it?

You should manage the connection manually, I suppose - Redis.createClient, like in the example I gave above. The Quarkus produced injectors are using connection pooling, which is not compatible with pub/sub.

@tamht298
Copy link

I had same issue, do you solve it?

You should manage the connection manually, I suppose - Redis.createClient, like in the example I gave above. The Quarkus produced injectors are using connection pooling, which is not compatible with pub/sub.

Oh, thanks so much. Your workaround working properly, but I hope Quarkus team can enhance this feature

@machi1990
Copy link
Member

machi1990 commented Dec 23, 2021

To conclude - I think Quarkus should take some action - at least mention that in the doc and maybe provide that boolean flag config to be able to configure the injected client NOT to use pooling.

Thanks @digitalsamba @tamht298 for the investigation and workaround. Agreed, this should be better documented or made to work transparently without requiring extra work from the consumers of the extension as exposing yet another flag might confuse them.

Pooling is not compatible with SUBSCRIBE, UNSUBSCRIBE, PSUBSCRIBE or PUNSUBSCRIBE because these commands will modify the way the connection operates and the connection cannot be reused

@pmlopes I've just became aware of that, is this something new in one of the recent versions of the vertx-redis client?
I am thinking the way the Quarkus extesion could handle it for the user is of opening a new connection that will be used by those commands each time the corresponding method is called, and closing the connection afterwards. Is that the best way to make to go about it? What's your advise here?

@vundyalaavinash
Copy link

@machi1990 is there an elegant way to handle this? I am also facing the same issue.
Like @digitalsamba has pointed out can we please make it config driven.

@cescoffier
Copy link
Member

This will be handled in #24750

@zyc
Copy link

zyc commented May 29, 2022

Today I had same time to dig deeper and found the issue.

Basically the vertx redis client doesn't support pub/sub in connection pooled mode.

https://vertx.io/docs/vertx-redis-client/java/#_connection_pooling Pooling is not compatible with SUBSCRIBE, UNSUBSCRIBE, PSUBSCRIBE or PUNSUBSCRIBE because these commands will modify the way the connection operates and the connection cannot be reused.

https://vertx.io/docs/vertx-redis-client/java/#_pubsub_mode

And since all Quarkus producers inside RedisClientsProducer class are using the BaseRedisClient which has a connection manager of a connection pool, then ReactiveRedisClient is not supporting pub/sub.

In my opinion Quarkus should support a setting boolean flag to switch between connection pool mode and single connection mode for vertx redis extension. You can see in RedisAPIImpl class of Vertx:

public RedisAPIImpl(RedisConnection connection) {
    this.connection = connection;
    this.redis = null;
  }

  public RedisAPIImpl(Redis redis) {
    this.connection = null;
    this.redis = redis;
  }
  
if (redis != null) {
      // operating in pooled mode
      redis.send(req, promise);
    } else if (connection != null) {
      // operating on connection mode
      connection.send(req, promise);
    }

In my opinion Quarkus needs a config flag which of the two constructors to use. Currently Quarkus uses only the 2nd constructor which calls the connection pooling of vertx and vertx doesn't support pub/sub in connection pooling mode.

If Quarkus doesn't plan to fix this, then at least it should be mentioned in the doc.

The workaround currently is to directly use the Vertx redis extension and bypass the Quarkus injectors. I managed to make it work with Mutiny by manually instantiating the Quarkus Redis Mutiny wrappers, but in my opinion it is much simpler to use directly what Vertx provides for pub/sub. Otherwise we need to subscribe to event bus, etc...

Here is a simple example to use only Vertx:

Redis redisClient = Redis.createClient(vertx, "redis://localhost:1234");
    	redisClient
    		.connect()
    	    .onSuccess(conn -> {    
    	    	System.out.println("Successfully connected = " + conn + " " + Thread.currentThread().getName());	
    	    	
    	    	//subscribe(conn);
    	    	
    	    	conn.handler(message -> {
    	    		// do whatever you need to do with your message
    	    		System.out.println("Message = " + message + " " + Thread.currentThread().getName());
	    	    }); 
    	    	
    	    	conn.send(Request.cmd(Command.SUBSCRIBE).arg("CHANNEL1").arg("CHANNEL2"))
    	    	  .onSuccess(res -> {
    	    		  System.out.println("Subscribed");
    	    		        	    		
      	    		  publish(conn);
    	    	  });    	    	    	    	  
    	    })
    	    .onFailure(ex -> {
    	    	System.out.print("Failed to connect " + ex.getMessage());
    		    ex.printStackTrace();
    	    });
    	    
private void publish(RedisConnection conn) {
    	conn.send(Request.cmd(Command.PUBLISH).arg("CHANNEL1").arg("Hello World!"))
	  	  .onSuccess(res -> {
	  		    System.out.println("Published Hello World, res = " + res);
	  		  });
	  	
    	conn.send(Request.cmd(Command.PUBLISH).arg("CHANNEL2").arg("How are you!"))
		  	  .onSuccess(res -> {
		  		    System.out.println("Published How are you!, res = " + res);
		  		  });
  	
    }

To conclude - I think Quarkus should take some action - at least mention that in the doc and maybe provide that boolean flag config to be able to configure the injected client NOT to use pooling.

And here is the workaround to use the Mutiny wrappers after manually getting the vertx connection from the callback, so the pooling is bypassed:

private void subscribe(RedisConnection connection) {
    	RedisAPI vertxRedisAPI = RedisAPI.api(connection);
    	io.vertx.mutiny.redis.client.RedisAPI redisAPI = new io.vertx.mutiny.redis.client.RedisAPI(vertxRedisAPI);

    	redisAPI.subscribe(List.of("CHANNEL1", "CHANNEL2"))
	    	.subscribe().with(
					response -> { System.out.println("Subscribed"); publish(redisAPI); },
					error -> System.out.println("Error received: " + error));
    }
    
    private void publish(io.vertx.mutiny.redis.client.RedisAPI redisAPI) {
    	redisAPI.publish("CHANNEL1", "message1").subscribe().with(
				response -> System.out.println("Published 1st Message Response: " + response),
				error -> System.out.println("Error received: " + error));
    	
    	redisAPI.publish("CHANNEL2", "message1").subscribe().with(
				response -> System.out.println("Published 2nd Message Response: " + response),
				error -> System.out.println("Error received: " + error));
    }
    
    public void initializeConsumer() {
    	eventBus.<JsonObject>consumer("io.vertx.redis.CHANNEL1", message -> {
    		System.out.println("Consumed event = " + message.body());
    	});
    	
    	eventBus.<JsonObject>consumer("io.vertx.redis.CHANNEL2", message -> {
    		System.out.println("Consumed event = " + message.body());
    	});
    }

Thanks for that!

@cescoffier
Copy link
Member

Fixed in #26268

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/redis kind/question Further information is requested
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants