diff --git a/redisson/src/main/java/org/redisson/RedissonBloomFilter.java b/redisson/src/main/java/org/redisson/RedissonBloomFilter.java index 27b78ca2230..becf61f5d24 100644 --- a/redisson/src/main/java/org/redisson/RedissonBloomFilter.java +++ b/redisson/src/main/java/org/redisson/RedissonBloomFilter.java @@ -76,13 +76,21 @@ private long optimalNumOfBits(long n, double p) { } return (long) (-n * Math.log(p) / (Math.log(2) * Math.log(2))); } + + private long[] hash(Object object) { + ByteBuf state = encode(object); + try { + long hash1 = LongHashFunction.xx().hashBytes(state.internalNioBuffer(state.readerIndex(), state.readableBytes())); + long hash2 = LongHashFunction.farmUo().hashBytes(state.internalNioBuffer(state.readerIndex(), state.readableBytes())); + return new long[] {hash1, hash2}; + } finally { + state.release(); + } + } @Override public boolean add(T object) { - ByteBuf state = encode(object); - long hash1 = LongHashFunction.xx().hashBytes(state.internalNioBuffer(state.readerIndex(), state.readableBytes())); - long hash2 = LongHashFunction.farmUo().hashBytes(state.internalNioBuffer(state.readerIndex(), state.readableBytes())); - state.release(); + long[] hashes = hash(object); while (true) { if (size == 0) { @@ -92,7 +100,7 @@ public boolean add(T object) { int hashIterations = this.hashIterations; long size = this.size; - long[] indexes = hash(hash1, hash2, hashIterations, size); + long[] indexes = hash(hashes[0], hashes[1], hashIterations, size); CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager()); addConfigCheck(hashIterations, size, executorService); @@ -132,10 +140,7 @@ private long[] hash(long hash1, long hash2, int iterations, long size) { @Override public boolean contains(T object) { - ByteBuf state = encode(object); - long hash1 = LongHashFunction.xx().hashBytes(state.internalNioBuffer(state.readerIndex(), state.readableBytes())); - long hash2 = LongHashFunction.farmUo().hashBytes(state.internalNioBuffer(state.readerIndex(), state.readableBytes())); - state.release(); + long[] hashes = hash(object); while (true) { if (size == 0) { @@ -145,7 +150,7 @@ public boolean contains(T object) { int hashIterations = this.hashIterations; long size = this.size; - long[] indexes = hash(hash1, hash2, hashIterations, size); + long[] indexes = hash(hashes[0], hashes[1], hashIterations, size); CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager()); addConfigCheck(hashIterations, size, executorService); diff --git a/redisson/src/main/java/org/redisson/RedissonTopic.java b/redisson/src/main/java/org/redisson/RedissonTopic.java index b5c0242c035..c61eebae368 100644 --- a/redisson/src/main/java/org/redisson/RedissonTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonTopic.java @@ -15,6 +15,7 @@ */ package org.redisson; +import java.io.IOException; import java.util.Collections; import java.util.List; @@ -27,8 +28,15 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; import org.redisson.connection.PubSubConnectionEntry; +import org.redisson.misc.RPromise; +import org.redisson.misc.RedissonObjectFactory; +import org.redisson.misc.RedissonPromise; import org.redisson.pubsub.AsyncSemaphore; +import io.netty.buffer.ByteBuf; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; + /** * Distributed topic implementation. Messages are delivered to all message listeners across Redis cluster. * @@ -42,11 +50,11 @@ public class RedissonTopic implements RTopic { private final String name; private final Codec codec; - protected RedissonTopic(CommandAsyncExecutor commandExecutor, String name) { + public RedissonTopic(CommandAsyncExecutor commandExecutor, String name) { this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name); } - protected RedissonTopic(Codec codec, CommandAsyncExecutor commandExecutor, String name) { + public RedissonTopic(Codec codec, CommandAsyncExecutor commandExecutor, String name) { this.commandExecutor = commandExecutor; this.name = name; this.codec = codec; @@ -63,9 +71,24 @@ public long publish(M message) { @Override public RFuture publishAsync(M message) { - return commandExecutor.writeAsync(name, codec, RedisCommands.PUBLISH, name, message); + return commandExecutor.writeAsync(name, codec, RedisCommands.PUBLISH, name, encode(message)); } + protected ByteBuf encode(Object value) { + if (commandExecutor.isRedissonReferenceSupportEnabled()) { + RedissonReference reference = RedissonObjectFactory.toReference(commandExecutor.getConnectionManager().getCfg(), value); + if (reference != null) { + value = reference; + } + } + + try { + return codec.getValueEncoder().encode(value); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + } + @Override public int addListener(StatusListener listener) { return addListener(new PubSubStatusListener(listener, name)); @@ -82,6 +105,23 @@ private int addListener(RedisPubSubListener pubSubListener) { commandExecutor.syncSubscription(future); return System.identityHashCode(pubSubListener); } + + public RFuture addListenerAsync(final RedisPubSubListener pubSubListener) { + RFuture future = commandExecutor.getConnectionManager().subscribe(codec, name, pubSubListener); + RPromise result = new RedissonPromise(); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + result.tryFailure(future.cause()); + return; + } + + result.trySuccess(System.identityHashCode(pubSubListener)); + } + }); + return result; + } @Override public void removeAllListeners() { diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index 083fe33e5fd..6e50d598b73 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -302,7 +302,7 @@ public interface RedisCommands { RedisStrictCommand MOVE = new RedisStrictCommand("MOVE", new BooleanReplayConvertor()); RedisStrictCommand MIGRATE = new RedisStrictCommand("MIGRATE", new VoidReplayConvertor()); - RedisStrictCommand PUBLISH = new RedisStrictCommand("PUBLISH", 2); + RedisStrictCommand PUBLISH = new RedisStrictCommand("PUBLISH"); RedisCommand SUBSCRIBE = new RedisCommand("SUBSCRIBE", new PubSubStatusDecoder()); RedisCommand UNSUBSCRIBE = new RedisCommand("UNSUBSCRIBE", new PubSubStatusDecoder()); diff --git a/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java index 7051d55a649..c43aeaa1780 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java @@ -17,7 +17,6 @@ import org.redisson.api.RFuture; import org.redisson.client.RedisPubSubConnection; -import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.connection.ClientConnectionsEntry; diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonTopicReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonTopicReactive.java index c2528998e42..90048e3c4f2 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonTopicReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonTopicReactive.java @@ -21,20 +21,16 @@ import org.reactivestreams.Publisher; import org.redisson.PubSubMessageListener; import org.redisson.PubSubStatusListener; +import org.redisson.RedissonTopic; import org.redisson.api.RFuture; +import org.redisson.api.RTopic; import org.redisson.api.RTopicReactive; import org.redisson.api.listener.MessageListener; import org.redisson.api.listener.StatusListener; import org.redisson.client.RedisPubSubListener; import org.redisson.client.codec.Codec; -import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandReactiveExecutor; -import org.redisson.connection.PubSubConnectionEntry; -import org.redisson.misc.RPromise; -import org.redisson.pubsub.AsyncSemaphore; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; import reactor.fn.Supplier; /** @@ -46,18 +42,18 @@ */ public class RedissonTopicReactive implements RTopicReactive { + private final RTopic topic; private final CommandReactiveExecutor commandExecutor; private final String name; - private final Codec codec; public RedissonTopicReactive(CommandReactiveExecutor commandExecutor, String name) { this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name); } public RedissonTopicReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { + this.topic = new RedissonTopic(codec, commandExecutor, name); this.commandExecutor = commandExecutor; this.name = name; - this.codec = codec; } @Override @@ -66,8 +62,13 @@ public List getChannelNames() { } @Override - public Publisher publish(M message) { - return commandExecutor.writeReactive(name, codec, RedisCommands.PUBLISH, name, message); + public Publisher publish(final M message) { + return commandExecutor.reactive(new Supplier>() { + @Override + public RFuture get() { + return topic.publishAsync(message); + } + }); } @Override @@ -82,23 +83,10 @@ public Publisher addListener(MessageListener listener) { } private Publisher addListener(final RedisPubSubListener pubSubListener) { - return new NettyFuturePublisher(new Supplier>() { + return commandExecutor.reactive(new Supplier>() { @Override public RFuture get() { - final RPromise promise = commandExecutor.getConnectionManager().newPromise(); - RFuture future = commandExecutor.getConnectionManager().subscribe(codec, name, pubSubListener); - future.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - promise.tryFailure(future.cause()); - return; - } - - promise.trySuccess(System.identityHashCode(pubSubListener)); - } - }); - return promise; + return ((RedissonTopic) topic).addListenerAsync(pubSubListener); } }); } @@ -106,21 +94,7 @@ public void operationComplete(Future future) throws Excep @Override public void removeListener(int listenerId) { - AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); - semaphore.acquireUninterruptibly(); - - PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); - if (entry == null) { - semaphore.release(); - return; - } - - entry.removeListener(name, listenerId); - if (!entry.hasListeners(name)) { - commandExecutor.getConnectionManager().unsubscribe(name, semaphore); - } else { - semaphore.release(); - } + topic.removeListener(listenerId); }