Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for SPUBLISH #2757 #2800

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
* @author Tugdual Grall
* @author dengliming
* @author Andrey Shlykov
* @author Manish Manghwani
*/
@SuppressWarnings("unchecked")
public abstract class AbstractRedisAsyncCommands<K, V> implements RedisAclAsyncCommands<K, V>, RedisHashAsyncCommands<K, V>,
Expand Down Expand Up @@ -1536,6 +1537,11 @@ public RedisFuture<Long> publish(K channel, V message) {
return dispatch(commandBuilder.publish(channel, message));
}

@Override
public RedisFuture<Long> spublish(K shardChannel, V message) {
return dispatch(commandBuilder.spublish(shardChannel, message));
}

@Override
public RedisFuture<List<K>> pubsubChannels() {
return dispatch(commandBuilder.pubsubChannels());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
* @author Tugdual Grall
* @author dengliming
* @author Andrey Shlykov
* @author Manish Manghwani
* @since 4.0
*/
public abstract class AbstractRedisReactiveCommands<K, V>
Expand Down Expand Up @@ -1615,6 +1616,11 @@ public Mono<Long> publish(K channel, V message) {
return createMono(() -> commandBuilder.publish(channel, message));
}

@Override
public Mono<Long> spublish(K channel, V message) {
return createMono(() -> commandBuilder.spublish(channel, message));
}

@Override
public Flux<K> pubsubChannels() {
return createDissolvingFlux(commandBuilder::pubsubChannels);
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/io/lettuce/core/RedisCommandBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
* @author dengliming
* @author Mikhael Sokolov
* @author Tihomir Mateev
* @author Manish Manghwani
*/
@SuppressWarnings({ "unchecked", "varargs" })
class RedisCommandBuilder<K, V> extends BaseRedisCommandBuilder<K, V> {
Expand Down Expand Up @@ -2094,6 +2095,13 @@ Command<K, V, List<K>> pubsubChannels(K pattern) {
return createCommand(PUBSUB, new KeyListOutput<>(codec), args);
}

Command<K, V, Long> spublish(K shardChannel, V message) {
LettuceAssert.notNull(shardChannel, "Shard Channel " + MUST_NOT_BE_NULL);

CommandArgs<K, V> args = new CommandArgs<>(codec).addKey(shardChannel).addValue(message);
return createCommand(SPUBLISH, new IntegerOutput<>(codec), args);
}

Command<K, V, Long> pubsubNumpat() {
CommandArgs<K, V> args = new CommandArgs<>(codec).add(NUMPAT);
return createCommand(PUBSUB, new IntegerOutput<>(codec), args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
* @param <K> Key type.
* @param <V> Value type.
* @author Mark Paluch
* @author Manish Manghwani
* @since 4.0
* @generated by io.lettuce.apigenerator.CreateAsyncApi
*/
Expand All @@ -43,6 +44,16 @@ public interface BaseRedisAsyncCommands<K, V> {
*/
RedisFuture<Long> publish(K channel, V message);

/**
* Post a message to a Shard channel.
*
* @param shardChannel the shard channel type: key.
* @param message the message type: value.
* @return Long the number of clients that received the message. Note that in a Redis Cluster, only clients that are
* connected to the same node as the publishing client are included in the count.
*/
RedisFuture<Long> spublish(K shardChannel, V message);

/**
* Lists the currently *active channels*.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
* @param <K> Key type.
* @param <V> Value type.
* @author Mark Paluch
* @author Manish Manghwani
* @since 4.0
* @generated by io.lettuce.apigenerator.CreateReactiveApi
*/
Expand All @@ -43,6 +44,16 @@ public interface BaseRedisReactiveCommands<K, V> {
*/
Mono<Long> publish(K channel, V message);

/**
* Post a message to a shard channel.
*
* @param shardChannel the channel type: key.
* @param message the message type: value.
* @return Long the number of clients that received the message. Note that in a Redis Cluster, only clients that are
* connected to the same node as the publishing client are included in the count.
*/
Mono<Long> spublish(K shardChannel, V message);

/**
* Lists the currently *active channels*.
*
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/io/lettuce/core/api/sync/BaseRedisCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* @param <K> Key type.
* @param <V> Value type.
* @author Mark Paluch
* @author Manish Manghwani
* @since 4.0
* @generated by io.lettuce.apigenerator.CreateSyncApi
*/
Expand All @@ -42,6 +43,16 @@ public interface BaseRedisCommands<K, V> {
*/
Long publish(K channel, V message);

/**
* Post a message to a channel.
*
* @param shardChannel the shard channel type: key.
* @param message the message type: value.
* @return Long the number of clients that received the message. Note that in a Redis Cluster, only clients that are
* connected to the same node as the publishing client are included in the count.
*/
Long spublish(K shardChannel, V message);

/**
* Lists the currently *active channels*.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* @param <K> Key type.
* @param <V> Value type.
* @author Mark Paluch
* @author Manish Manghwani
* @since 4.0
* @generated by io.lettuce.apigenerator.CreateSyncNodeSelectionClusterApi
*/
Expand All @@ -38,6 +39,16 @@ public interface BaseNodeSelectionCommands<K, V> {
*/
Executions<Long> publish(K channel, V message);

/**
* Post a message to a channel.
*
* @param shardChannel the channel type: key.
* @param message the message type: value.
* @return Long the number of clients that received the message. Note that in a Redis Cluster, only clients that are
* connected to the same node as the publishing client are included in the count.
*/
Executions<Long> spublish(K shardChannel, V message);

/**
* Lists the currently *active channels*.
*
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/lettuce/core/protocol/CommandType.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* @author Zhang Jessey
* @author dengliming
* @author Mikhael Sokolov
* @author Manish Manghwani
*/
public enum CommandType implements ProtocolKeyword {

Expand Down Expand Up @@ -70,7 +71,7 @@ public enum CommandType implements ProtocolKeyword {

// Pub/Sub

PSUBSCRIBE, PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, PUBSUB,
PSUBSCRIBE, PUBLISH, SPUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, PUBSUB,

// Sets

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
*
* @author Mark Paluch
* @author Tihomir Mateev
* @author Manish Manghwani
* @since 4.2
*/
@SuppressWarnings("varargs")
Expand All @@ -53,6 +54,11 @@ Command<K, V, Long> publish(K channel, V message) {
return createCommand(PUBLISH, new IntegerOutput<>(codec), args);
}

Command<K, V, Long> spublish(K shardChannel, V message) {
CommandArgs<K, V> args = new PubSubCommandArgs<>(codec).addKey(shardChannel).addValue(message);
return createCommand(SPUBLISH, new IntegerOutput<>(codec), args);
}

Command<K, V, List<K>> pubsubChannels(K pattern) {
CommandArgs<K, V> args = new PubSubCommandArgs<>(codec).add(CHANNELS).addKey(pattern);
return createCommand(PUBSUB, new KeyListOutput<>(codec), args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* @param <V> Value type.
* @author Will Glozer
* @author Mark Paluch
* @author Manish Manghwani
*/
public class RedisPubSubAsyncCommandsImpl<K, V> extends RedisAsyncCommandsImpl<K, V> implements RedisPubSubAsyncCommands<K, V> {

Expand Down Expand Up @@ -75,6 +76,11 @@ public RedisFuture<Long> publish(K channel, V message) {
return dispatch(commandBuilder.publish(channel, message));
}

@Override
public RedisFuture<Long> spublish(K shardChannel, V message) {
return dispatch(commandBuilder.spublish(shardChannel, message));
}

@Override
public RedisFuture<List<K>> pubsubChannels(K channel) {
return dispatch(commandBuilder.pubsubChannels(channel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
* @param <K> Key type.
* @param <V> Value type.
* @author Mark Paluch
* @author Manish Manghwani
* @since 5.0
*/
public class RedisPubSubReactiveCommandsImpl<K, V> extends RedisReactiveCommandsImpl<K, V>
Expand Down Expand Up @@ -133,6 +134,11 @@ public Mono<Long> publish(K channel, V message) {
return createMono(() -> commandBuilder.publish(channel, message));
}

@Override
public Mono<Long> spublish(K shardChannel, V message) {
return createMono(() -> commandBuilder.spublish(shardChannel, message));
}

@Override
public Flux<K> pubsubChannels(K channel) {
return createDissolvingFlux(() -> commandBuilder.pubsubChannels(channel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import kotlinx.coroutines.flow.Flow
* @param <K> Key type.
* @param <V> Value type.
* @author Mikhael Sokolov
* @author Manish Manghwani
* @since 6.0
* @generated by io.lettuce.apigenerator.CreateKotlinCoroutinesApi
*/
Expand All @@ -43,6 +44,16 @@ interface BaseRedisCoroutinesCommands<K : Any, V : Any> {
*/
suspend fun publish(channel: K, message: V): Long?

/**
* Post a message to a channel.
*
* @param shardChannel the shard channel type: key.
* @param message the message type: value.
* @return Long the number of clients that received the message. Note that in a Redis Cluster, only clients that are
* connected to the same node as the publishing client are included in the count.
*/
suspend fun spublish(shardChannel: K, message: V): Long?

/**
* Lists the currently *active channels*.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@ import kotlinx.coroutines.reactive.awaitSingle
* @param <K> Key type.
* @param <V> Value type.
* @author Mikhael Sokolov
* @author Manish Manghwani
* @since 6.0
*/
@ExperimentalLettuceCoroutinesApi
internal class BaseRedisCoroutinesCommandsImpl<K : Any, V : Any>(internal val ops: BaseRedisReactiveCommands<K, V>) : BaseRedisCoroutinesCommands<K, V> {

override suspend fun publish(channel: K, message: V): Long? = ops.publish(channel, message).awaitFirstOrNull()

override suspend fun spublish(shardChannel: K, message: V): Long? = ops.spublish(shardChannel, message).awaitFirstOrNull()

override suspend fun pubsubChannels(): List<K> = ops.pubsubChannels().asFlow().toList()

override suspend fun pubsubChannels(channel: K): List<K> = ops.pubsubChannels(channel).asFlow().toList()
Expand Down