Skip to content

Commit

Permalink
OneShot listener handling fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Jul 24, 2015
1 parent c13edde commit 7c9e7c4
Show file tree
Hide file tree
Showing 12 changed files with 36 additions and 25 deletions.
Expand Up @@ -80,7 +80,8 @@ public void onPatternMessage(String pattern, String channel, V message) {
} }


@Override @Override
public void onStatus(Type type, String channel) { public boolean onStatus(Type type, String channel) {
return false;
} }


} }
4 changes: 3 additions & 1 deletion src/main/java/org/redisson/RedissonCountDownLatch.java
Expand Up @@ -88,10 +88,12 @@ public void onMessage(String channel, Integer message) {
} }


@Override @Override
public void onStatus(Type type, String channel) { public boolean onStatus(Type type, String channel) {
if (channel.equals(getChannelName()) && !value.getPromise().isSuccess()) { if (channel.equals(getChannelName()) && !value.getPromise().isSuccess()) {
value.getPromise().setSuccess(true); value.getPromise().setSuccess(true);
return true;
} }
return false;
} }


}; };
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/org/redisson/RedissonLock.java
Expand Up @@ -129,10 +129,12 @@ public void onMessage(String channel, Integer message) {
} }


@Override @Override
public void onStatus(Type type, String channel) { public boolean onStatus(Type type, String channel) {
if (channel.equals(getChannelName()) && !value.getPromise().isSuccess()) { if (channel.equals(getChannelName()) && !value.getPromise().isSuccess()) {
value.getPromise().setSuccess(true); value.getPromise().setSuccess(true);
return true;
} }
return false;
} }


}; };
Expand Down
Expand Up @@ -20,7 +20,8 @@
public class BaseRedisPubSubListener<V> implements RedisPubSubListener<V> { public class BaseRedisPubSubListener<V> implements RedisPubSubListener<V> {


@Override @Override
public void onStatus(Type type, String channel) { public boolean onStatus(Type type, String channel) {
return false;
} }


@Override @Override
Expand Down
Expand Up @@ -17,21 +17,24 @@


import org.redisson.client.protocol.pubsub.PubSubStatusMessage.Type; import org.redisson.client.protocol.pubsub.PubSubStatusMessage.Type;


public class OnceRedisPubSubListener<V> implements RedisPubSubListener<V> { public class OneShotPubSubListener<V> implements RedisPubSubListener<V> {


private RedisPubSubConnection connection; private RedisPubSubConnection connection;
private RedisPubSubListener<V> listener; private RedisPubSubListener<V> listener;


public OnceRedisPubSubListener(RedisPubSubConnection connection, RedisPubSubListener<V> listener) { public OneShotPubSubListener(RedisPubSubConnection connection, RedisPubSubListener<V> listener) {
super(); super();
this.connection = connection; this.connection = connection;
this.listener = listener; this.listener = listener;
} }


@Override @Override
public void onStatus(Type type, String channel) { public boolean onStatus(Type type, String channel) {
listener.onStatus(type, channel); if (listener.onStatus(type, channel)) {
connection.removeListener(this); connection.removeListener(this);
return true;
}
return false;
} }


@Override @Override
Expand Down
Expand Up @@ -44,7 +44,7 @@ public void addListener(RedisPubSubListener listener) {
} }


public void addOneShotListener(RedisPubSubListener listener) { public void addOneShotListener(RedisPubSubListener listener) {
listeners.add(new OnceRedisPubSubListener<Object>(this, listener)); listeners.add(new OneShotPubSubListener<Object>(this, listener));
} }


public void removeListener(RedisPubSubListener<?> listener) { public void removeListener(RedisPubSubListener<?> listener) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/redisson/client/RedisPubSubListener.java
Expand Up @@ -19,7 +19,7 @@


public interface RedisPubSubListener<V> { public interface RedisPubSubListener<V> {


void onStatus(PubSubStatusMessage.Type type, String channel); boolean onStatus(PubSubStatusMessage.Type type, String channel);


void onMessage(String channel, V message); void onMessage(String channel, V message);


Expand Down
2 changes: 0 additions & 2 deletions src/main/java/org/redisson/connection/ConnectionManager.java
Expand Up @@ -23,14 +23,12 @@
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubListener; import org.redisson.client.RedisPubSubListener;
import org.redisson.client.protocol.Codec; import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import org.redisson.misc.InfinitySemaphoreLatch; import org.redisson.misc.InfinitySemaphoreLatch;


import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.util.HashedWheelTimer; import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout; import io.netty.util.Timeout;
import io.netty.util.TimerTask; import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.Promise;


Expand Down
Expand Up @@ -337,14 +337,16 @@ public void unsubscribe(final String channelName) {
entry.unsubscribe(channelName, new BaseRedisPubSubListener() { entry.unsubscribe(channelName, new BaseRedisPubSubListener() {


@Override @Override
public void onStatus(Type type, String channel) { public boolean onStatus(Type type, String channel) {
if (type == Type.UNSUBSCRIBE && channel.equals(channelName)) { if (type == Type.UNSUBSCRIBE && channel.equals(channelName)) {
synchronized (entry) { synchronized (entry) {
if (entry.tryClose()) { if (entry.tryClose()) {
returnSubscribeConnection(-1, entry); returnSubscribeConnection(-1, entry);
} }
} }
return true;
} }
return false;
} }


}); });
Expand All @@ -360,14 +362,16 @@ public void punsubscribe(final String channelName) {
entry.punsubscribe(channelName, new BaseRedisPubSubListener() { entry.punsubscribe(channelName, new BaseRedisPubSubListener() {


@Override @Override
public void onStatus(Type type, String channel) { public boolean onStatus(Type type, String channel) {
if (type == Type.PUNSUBSCRIBE && channel.equals(channelName)) { if (type == Type.PUNSUBSCRIBE && channel.equals(channelName)) {
synchronized (entry) { synchronized (entry) {
if (entry.tryClose()) { if (entry.tryClose()) {
returnSubscribeConnection(-1, entry); returnSubscribeConnection(-1, entry);
} }
} }
return true;
} }
return false;
} }


}); });
Expand Down
Expand Up @@ -137,7 +137,7 @@ public void subscribe(Codec codec, RedisPubSubListener listener, String channel)
public void unsubscribe(final String channel, RedisPubSubListener listener) { public void unsubscribe(final String channel, RedisPubSubListener listener) {
conn.addOneShotListener(new BaseRedisPubSubListener<Object>() { conn.addOneShotListener(new BaseRedisPubSubListener<Object>() {
@Override @Override
public void onStatus(Type type, String ch) { public boolean onStatus(Type type, String ch) {
if (type == Type.UNSUBSCRIBE && channel.equals(ch)) { if (type == Type.UNSUBSCRIBE && channel.equals(ch)) {
Queue<RedisPubSubListener> listeners = channelListeners.get(channel); Queue<RedisPubSubListener> listeners = channelListeners.get(channel);
if (listeners != null) { if (listeners != null) {
Expand All @@ -146,7 +146,9 @@ public void onStatus(Type type, String ch) {
} }
} }
subscribedChannelsAmount.release(); subscribedChannelsAmount.release();
return true;
} }
return false;
} }
}); });
conn.addOneShotListener(listener); conn.addOneShotListener(listener);
Expand All @@ -156,7 +158,7 @@ public void onStatus(Type type, String ch) {
public void punsubscribe(final String channel, RedisPubSubListener listener) { public void punsubscribe(final String channel, RedisPubSubListener listener) {
conn.addOneShotListener(new BaseRedisPubSubListener<Object>() { conn.addOneShotListener(new BaseRedisPubSubListener<Object>() {
@Override @Override
public void onStatus(Type type, String ch) { public boolean onStatus(Type type, String ch) {
if (type == Type.PUNSUBSCRIBE && channel.equals(ch)) { if (type == Type.PUNSUBSCRIBE && channel.equals(ch)) {
Queue<RedisPubSubListener> listeners = channelListeners.get(channel); Queue<RedisPubSubListener> listeners = channelListeners.get(channel);
if (listeners != null) { if (listeners != null) {
Expand All @@ -165,7 +167,9 @@ public void onStatus(Type type, String ch) {
} }
} }
subscribedChannelsAmount.release(); subscribedChannelsAmount.release();
return true;
} }
return false;
} }
}); });
conn.addOneShotListener(listener); conn.addOneShotListener(listener);
Expand Down
Expand Up @@ -33,14 +33,10 @@
import org.redisson.client.RedisPubSubListener; import org.redisson.client.RedisPubSubListener;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.StringCodec; import org.redisson.client.protocol.StringCodec;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage.Type; import org.redisson.client.protocol.pubsub.PubSubStatusMessage.Type;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;

public class SentinelConnectionManager extends MasterSlaveConnectionManager { public class SentinelConnectionManager extends MasterSlaveConnectionManager {


private final Logger log = LoggerFactory.getLogger(getClass()); private final Logger log = LoggerFactory.getLogger(getClass());
Expand Down Expand Up @@ -133,10 +129,11 @@ public void onPatternMessage(String pattern, String channel, String message) {
} }


@Override @Override
public void onStatus(Type type, String channel) { public boolean onStatus(Type type, String channel) {
if (type == Type.SUBSCRIBE) { if (type == Type.SUBSCRIBE) {
log.info("subscribed to channel: {} from Sentinel {}:{}", channel, addr.getHost(), addr.getPort()); log.info("subscribed to channel: {} from Sentinel {}:{}", channel, addr.getHost(), addr.getPort());
} }
return true;
} }
}); });


Expand Down
5 changes: 2 additions & 3 deletions src/test/java/org/redisson/RedisClientTest.java
Expand Up @@ -21,10 +21,8 @@
import org.redisson.client.protocol.LongCodec; import org.redisson.client.protocol.LongCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.StringCodec; import org.redisson.client.protocol.StringCodec;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage.Type; import org.redisson.client.protocol.pubsub.PubSubStatusMessage.Type;


import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.Promise;


public class RedisClientTest { public class RedisClientTest {
Expand All @@ -37,10 +35,11 @@ public void testSubscribe() throws InterruptedException {
pubSubConnection.addListener(new RedisPubSubListener<Object>() { pubSubConnection.addListener(new RedisPubSubListener<Object>() {


@Override @Override
public void onStatus(Type type, String channel) { public boolean onStatus(Type type, String channel) {
Assert.assertEquals(Type.SUBSCRIBE, type); Assert.assertEquals(Type.SUBSCRIBE, type);
Assert.assertTrue(Arrays.asList("test1", "test2").contains(channel)); Assert.assertTrue(Arrays.asList("test1", "test2").contains(channel));
latch.countDown(); latch.countDown();
return true;
} }


@Override @Override
Expand Down

0 comments on commit 7c9e7c4

Please sign in to comment.