Skip to content

Commit

Permalink
Publish/subscribe refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Dec 1, 2015
1 parent b02d901 commit 316ee32
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 113 deletions.
25 changes: 13 additions & 12 deletions src/main/java/org/redisson/RedissonCountDownLatch.java
Expand Up @@ -42,8 +42,8 @@
*/
public class RedissonCountDownLatch extends RedissonObject implements RCountDownLatch {

private static final Integer zeroCountMessage = 0;
private static final Integer newCountMessage = 1;
private static final Long zeroCountMessage = 0L;
private static final Long newCountMessage = 1L;

private static final ConcurrentMap<String, RedissonCountDownLatchEntry> ENTRIES = PlatformDependent.newConcurrentHashMap();

Expand Down Expand Up @@ -72,18 +72,17 @@ private Future<RedissonCountDownLatchEntry> subscribe() {
return oldValue.getPromise();
}

RedisPubSubListener<Integer> listener = createListener(value);

commandExecutor.getConnectionManager().subscribe(listener, getChannelName());
RedisPubSubListener<Long> listener = createListener(value);
commandExecutor.getConnectionManager().subscribe(LongCodec.INSTANCE, getChannelName(), listener);
return newPromise;
}
}

private RedisPubSubListener<Integer> createListener(final RedissonCountDownLatchEntry value) {
RedisPubSubListener<Integer> listener = new BaseRedisPubSubListener<Integer>() {
private RedisPubSubListener<Long> createListener(final RedissonCountDownLatchEntry value) {
RedisPubSubListener<Long> listener = new BaseRedisPubSubListener<Long>() {

@Override
public void onMessage(String channel, Integer message) {
public void onMessage(String channel, Long message) {
if (!getChannelName().equals(channel)) {
return;
}
Expand Down Expand Up @@ -175,7 +174,7 @@ public void countDown() {
return;
}

Future<Boolean> f = commandExecutor.evalWriteAsync(getName(), RedisCommands.EVAL_BOOLEAN_R1,
Future<Boolean> f = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN_R1,
"local v = redis.call('decr', KEYS[1]);" +
"if v <= 0 then redis.call('del', KEYS[1]) end;" +
"if v == 0 then redis.call('publish', ARGV[2], ARGV[1]) end;" +
Expand Down Expand Up @@ -208,7 +207,7 @@ private long getCountInner() {

@Override
public boolean trySetCount(long count) {
Future<Boolean> f = commandExecutor.evalWriteAsync(getName(), RedisCommands.EVAL_BOOLEAN_R1,
Future<Boolean> f = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN_R1,
"if redis.call('exists', KEYS[1]) == 0 then "
+ "redis.call('set', KEYS[1], ARGV[2]); "
+ "redis.call('publish', ARGV[3], ARGV[1]); "
Expand All @@ -222,8 +221,10 @@ public boolean trySetCount(long count) {

@Override
public Future<Boolean> deleteAsync() {
return commandExecutor.evalWriteAsync(getName(), RedisCommands.EVAL_BOOLEAN_R1,
"if redis.call('del', KEYS[1]) == 1 then redis.call('publish', ARGV[2], ARGV[1]); return true else return false end",
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN_R1,
"if redis.call('del', KEYS[1]) == 1 then "
+ "redis.call('publish', ARGV[2], ARGV[1]); "
+ "return true else return false end",
Collections.<Object>singletonList(getName()), newCountMessage, getChannelName());
}

Expand Down
5 changes: 4 additions & 1 deletion src/main/java/org/redisson/RedissonLock.java
Expand Up @@ -25,11 +25,14 @@
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.core.RLock;

import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;

Expand Down Expand Up @@ -116,7 +119,7 @@ public boolean onStatus(PubSubType type, String channel) {

};

commandExecutor.getConnectionManager().subscribe(listener, getChannelName());
commandExecutor.getConnectionManager().subscribe(commandExecutor.getConnectionManager().getCodec(), getChannelName(), listener);
return newPromise;
}
}
Expand Down
14 changes: 3 additions & 11 deletions src/main/java/org/redisson/RedissonTopic.java
Expand Up @@ -77,17 +77,9 @@ public int addListener(MessageListener<M> listener) {
}

private int addListener(RedisPubSubListener<M> pubSubListener) {
Future<PubSubConnectionEntry> future = commandExecutor.getConnectionManager().subscribe(name, codec);
Future<PubSubConnectionEntry> future = commandExecutor.getConnectionManager().subscribe(codec, name, pubSubListener);
future.syncUninterruptibly();
PubSubConnectionEntry entry = future.getNow();
synchronized (entry) {
if (entry.isActive()) {
entry.addListener(name, pubSubListener);
return pubSubListener.hashCode();
}
}
// entry is inactive trying add again
return addListener(pubSubListener);
return pubSubListener.hashCode();
}

@Override
Expand All @@ -106,7 +98,7 @@ public void removeListener(int listenerId) {
}
}

// entry is inactive trying add again
// listener has been re-attached
removeListener(listenerId);
}

Expand Down
7 changes: 5 additions & 2 deletions src/main/java/org/redisson/client/RedisPubSubConnection.java
Expand Up @@ -32,6 +32,9 @@
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent;

public class RedisPubSubConnection extends RedisConnection {
Expand Down Expand Up @@ -102,8 +105,8 @@ public void punsubscribe(String ... channel) {
}
}

private <T, R> void async(MultiDecoder<Object> messageDecoder, RedisCommand<T> command, Object ... params) {
channel.writeAndFlush(new CommandData<T, R>(null, messageDecoder, null, command, params));
private <T, R> ChannelFuture async(MultiDecoder<Object> messageDecoder, RedisCommand<T> command, Object ... params) {
return channel.writeAndFlush(new CommandData<T, R>(null, messageDecoder, null, command, params));
}

public Map<String, Codec> getChannels() {
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/org/redisson/client/protocol/RedisCommands.java
Expand Up @@ -110,7 +110,7 @@ public interface RedisCommands {
RedisCommand<Void> LSET = new RedisCommand<Void>("LSET", new VoidReplayConvertor(), 3);
RedisCommand<Object> LPOP = new RedisCommand<Object>("LPOP");
RedisCommand<Boolean> LREM_SINGLE = new RedisCommand<Boolean>("LREM", new BooleanReplayConvertor(), 3);
RedisCommand<Long> LREM = new RedisCommand<Long>("LREM", 3);
RedisStrictCommand<Long> LREM = new RedisStrictCommand<Long>("LREM", 3);
RedisCommand<Object> LINDEX = new RedisCommand<Object>("LINDEX");
RedisCommand<Object> LINSERT = new RedisCommand<Object>("LINSERT", 3, ValueType.OBJECTS);
RedisStrictCommand<Integer> LLEN_INT = new RedisStrictCommand<Integer>("LLEN", new IntegerReplayConvertor());
Expand All @@ -128,11 +128,11 @@ public interface RedisCommands {
RedisCommand<Object> BLPOP_VALUE = new RedisCommand<Object>("BLPOP", new KeyValueObjectDecoder(), new KeyValueConvertor());

RedisCommand<Boolean> PFADD = new RedisCommand<Boolean>("PFADD", new BooleanReplayConvertor(), 2);
RedisCommand<Long> PFCOUNT = new RedisCommand<Long>("PFCOUNT");
RedisStrictCommand<Long> PFCOUNT = new RedisStrictCommand<Long>("PFCOUNT");
RedisStrictCommand<Void> PFMERGE = new RedisStrictCommand<Void>("PFMERGE", new VoidReplayConvertor());

RedisCommand<Long> RPOP = new RedisCommand<Long>("RPOP");
RedisCommand<Long> LPUSH = new RedisCommand<Long>("LPUSH", 2);
RedisStrictCommand<Long> RPOP = new RedisStrictCommand<Long>("RPOP");
RedisStrictCommand<Long> LPUSH = new RedisStrictCommand<Long>("LPUSH", 2);
RedisCommand<List<Object>> LRANGE = new RedisCommand<List<Object>>("LRANGE", new ObjectListReplayDecoder<Object>());
RedisCommand<Long> RPUSH = new RedisCommand<Long>("RPUSH", 2, ValueType.OBJECTS);
RedisCommand<Boolean> RPUSH_BOOLEAN = new RedisCommand<Boolean>("RPUSH", new TrueReplayConvertor(), 2, ValueType.OBJECTS);
Expand Down Expand Up @@ -194,7 +194,7 @@ public interface RedisCommands {
RedisStrictCommand<Boolean> MOVE = new RedisStrictCommand<Boolean>("MOVE", new BooleanReplayConvertor());
RedisStrictCommand<Void> MIGRATE = new RedisStrictCommand<Void>("MIGRATE", new VoidReplayConvertor());

RedisCommand<Long> PUBLISH = new RedisCommand<Long>("PUBLISH", 2);
RedisStrictCommand<Long> PUBLISH = new RedisStrictCommand<Long>("PUBLISH", 2);

RedisCommand<Object> SUBSCRIBE = new RedisCommand<Object>("SUBSCRIBE", new PubSubStatusDecoder());
RedisCommand<Object> UNSUBSCRIBE = new RedisCommand<Object>("UNSUBSCRIBE", new PubSubStatusDecoder());
Expand Down
11 changes: 7 additions & 4 deletions src/main/java/org/redisson/codec/JsonJacksonCodec.java
Expand Up @@ -18,6 +18,7 @@
import java.io.IOException;

import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
Expand All @@ -44,11 +45,13 @@
*/
public class JsonJacksonCodec implements Codec {

private final ObjectMapper mapObjectMapper = initObjectMapper();
public static final JsonJacksonCodec INSTANCE = new JsonJacksonCodec();

protected ObjectMapper initObjectMapper() {
return new ObjectMapper();
}
private final ObjectMapper mapObjectMapper = initObjectMapper();

protected ObjectMapper initObjectMapper() {
return new ObjectMapper();
}

private final Encoder encoder = new Encoder() {
@Override
Expand Down
6 changes: 2 additions & 4 deletions src/main/java/org/redisson/connection/ConnectionManager.java
Expand Up @@ -45,6 +45,8 @@
*/
public interface ConnectionManager {

Promise<PubSubConnectionEntry> subscribe(Codec codec, String channelName, RedisPubSubListener listener);

ConnectionListener getConnectListener();

IdleConnectionWatcher getConnectionWatcher();
Expand Down Expand Up @@ -89,12 +91,8 @@ <T> FutureListener<T> createReleaseWriteListener(NodeSource source,

PubSubConnectionEntry getPubSubEntry(String channelName);

Future<PubSubConnectionEntry> subscribe(String channelName, Codec codec);

Future<PubSubConnectionEntry> psubscribe(String pattern, Codec codec);

<V> void subscribe(RedisPubSubListener<V> listener, String channelName);

Codec unsubscribe(String channelName);

Codec punsubscribe(String channelName);
Expand Down

0 comments on commit 316ee32

Please sign in to comment.