Skip to content

Commit

Permalink
1. Fixed - RTopic.addListener sometimes worked asynchronous
Browse files Browse the repository at this point in the history
2. Fixed - ClastCastException occurred if multi-type PubSub channels were used with single connection
3. Fixed - PubSub status message decoding
4. Fixed - PubSub subscription may stuck in some cases #530
  • Loading branch information
Nikita committed Jun 19, 2016
1 parent eeec753 commit bb9a016
Show file tree
Hide file tree
Showing 19 changed files with 641 additions and 255 deletions.
10 changes: 5 additions & 5 deletions src/main/java/org/redisson/PubSubMessageListener.java
Expand Up @@ -26,7 +26,7 @@
* @param <K>
* @param <V>
*/
public class PubSubMessageListener<V> implements RedisPubSubListener<V> {
public class PubSubMessageListener<V> implements RedisPubSubListener<Object> {

private final MessageListener<V> listener;
private final String name;
Expand Down Expand Up @@ -67,18 +67,18 @@ public boolean equals(Object obj) {
}

@Override
public void onMessage(String channel, V message) {
public void onMessage(String channel, Object message) {
// could be subscribed to multiple channels
if (name.equals(channel)) {
listener.onMessage(channel, message);
listener.onMessage(channel, (V)message);
}
}

@Override
public void onPatternMessage(String pattern, String channel, V message) {
public void onPatternMessage(String pattern, String channel, Object message) {
// could be subscribed to multiple channels
if (name.equals(pattern)) {
listener.onMessage(channel, message);
listener.onMessage(channel, (V)message);
}
}

Expand Down
26 changes: 11 additions & 15 deletions src/main/java/org/redisson/RedissonPatternTopic.java
Expand Up @@ -53,7 +53,7 @@ protected RedissonPatternTopic(Codec codec, CommandExecutor commandExecutor, Str

@Override
public int addListener(PatternStatusListener listener) {
return addListener(new PubSubPatternStatusListener(listener, name));
return addListener(new PubSubPatternStatusListener<Object>(listener, name));
};

@Override
Expand All @@ -62,18 +62,10 @@ public int addListener(PatternMessageListener<M> listener) {
return addListener(pubSubListener);
}

private int addListener(RedisPubSubListener<M> pubSubListener) {
Future<PubSubConnectionEntry> future = commandExecutor.getConnectionManager().psubscribe(name, codec);
private int addListener(RedisPubSubListener<?> pubSubListener) {
Future<PubSubConnectionEntry> future = commandExecutor.getConnectionManager().psubscribe(name, codec, pubSubListener);
future.syncUninterruptibly();
PubSubConnectionEntry entry = future.getNow();
synchronized (entry) {
if (entry.isActive()) {
entry.addListener(name, pubSubListener);
return System.identityHashCode(pubSubListener);
}
}
// entry is inactive trying add again
return addListener(pubSubListener);
return System.identityHashCode(pubSubListener);
}

@Override
Expand All @@ -82,17 +74,21 @@ public void removeListener(int listenerId) {
if (entry == null) {
return;
}
synchronized (entry) {

entry.lock();
try {
if (entry.isActive()) {
entry.removeListener(name, listenerId);
if (entry.getListeners(name).isEmpty()) {
if (!entry.hasListeners(name)) {
commandExecutor.getConnectionManager().punsubscribe(name);
}
return;
}
} finally {
entry.unlock();
}

// entry is inactive trying add again
// listener has been re-attached
removeListener(listenerId);
}

Expand Down
10 changes: 7 additions & 3 deletions src/main/java/org/redisson/RedissonTopic.java
Expand Up @@ -68,7 +68,7 @@ public Future<Long> publishAsync(M message) {

@Override
public int addListener(StatusListener listener) {
return addListener(new PubSubStatusListener(listener, name));
return addListener(new PubSubStatusListener<Object>(listener, name));
};

@Override
Expand All @@ -77,7 +77,7 @@ public int addListener(MessageListener<M> listener) {
return addListener(pubSubListener);
}

private int addListener(RedisPubSubListener<M> pubSubListener) {
private int addListener(RedisPubSubListener<?> pubSubListener) {
Future<PubSubConnectionEntry> future = commandExecutor.getConnectionManager().subscribe(codec, name, pubSubListener);
future.syncUninterruptibly();
return System.identityHashCode(pubSubListener);
Expand All @@ -89,14 +89,18 @@ public void removeListener(int listenerId) {
if (entry == null) {
return;
}
synchronized (entry) {

entry.lock();
try {
if (entry.isActive()) {
entry.removeListener(name, listenerId);
if (!entry.hasListeners(name)) {
commandExecutor.getConnectionManager().unsubscribe(name);
}
return;
}
} finally {
entry.unlock();
}

// listener has been re-attached
Expand Down
Expand Up @@ -17,19 +17,19 @@

import org.redisson.client.protocol.pubsub.PubSubType;

public class BaseRedisPubSubListener<V> implements RedisPubSubListener<V> {
public class BaseRedisPubSubListener implements RedisPubSubListener<Object> {

@Override
public boolean onStatus(PubSubType type, String channel) {
return false;
}

@Override
public void onMessage(String channel, V message) {
public void onMessage(String channel, Object message) {
}

@Override
public void onPatternMessage(String pattern, String channel, V message) {
public void onPatternMessage(String pattern, String channel, Object message) {
}

}
47 changes: 47 additions & 0 deletions src/main/java/org/redisson/client/SubscribeListener.java
@@ -0,0 +1,47 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client;

import org.redisson.client.protocol.pubsub.PubSubType;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;

public class SubscribeListener extends BaseRedisPubSubListener {

Promise<Void> promise = ImmediateEventExecutor.INSTANCE.newPromise();
String name;
PubSubType type;

public SubscribeListener(String name, PubSubType type) {
super();
this.name = name;
this.type = type;
}

public boolean onStatus(PubSubType type, String channel) {
if (name.equals(channel) && this.type.equals(type)) {
promise.trySuccess(null);
}
return true;
}

public Future<Void> getSuccessFuture() {
return promise;
}

}
21 changes: 13 additions & 8 deletions src/main/java/org/redisson/client/handler/CommandDecoder.java
Expand Up @@ -71,10 +71,11 @@ public class CommandDecoder extends ReplayingDecoder<State> {

// It is not needed to use concurrent map because responses are coming consecutive
private final Map<String, MultiDecoder<Object>> pubSubMessageDecoders = new HashMap<String, MultiDecoder<Object>>();
private final Map<String, CommandData<Object, Object>> pubSubChannels = PlatformDependent.newConcurrentHashMap();
private final Map<PubSubKey, CommandData<Object, Object>> pubSubChannels = PlatformDependent.newConcurrentHashMap();

public void addPubSubCommand(String channel, CommandData<Object, Object> data) {
pubSubChannels.put(channel, data);
String operation = data.getCommand().getName().toLowerCase();
pubSubChannels.put(new PubSubKey(channel, operation), data);
}

@Override
Expand Down Expand Up @@ -313,13 +314,15 @@ private void handleMultiResult(CommandData<Object, Object> data, List<Object> pa
Channel channel, Object result) {
if (result instanceof PubSubStatusMessage) {
String channelName = ((PubSubStatusMessage) result).getChannel();
CommandData<Object, Object> d = pubSubChannels.get(channelName);
String operation = ((PubSubStatusMessage) result).getType().name().toLowerCase();
PubSubKey key = new PubSubKey(channelName, operation);
CommandData<Object, Object> d = pubSubChannels.get(key);
if (Arrays.asList("PSUBSCRIBE", "SUBSCRIBE").contains(d.getCommand().getName())) {
pubSubChannels.remove(channelName);
pubSubChannels.remove(key);
pubSubMessageDecoders.put(channelName, d.getMessageDecoder());
}
if (Arrays.asList("PUNSUBSCRIBE", "UNSUBSCRIBE").contains(d.getCommand().getName())) {
pubSubChannels.remove(channelName);
pubSubChannels.remove(key);
pubSubMessageDecoders.remove(channelName);
}
}
Expand Down Expand Up @@ -353,9 +356,11 @@ private void handleResult(CommandData<Object, Object> data, List<Object> parts,

private MultiDecoder<Object> messageDecoder(CommandData<Object, Object> data, List<Object> parts, Channel channel) {
if (data == null) {
if (Arrays.asList("subscribe", "psubscribe", "punsubscribe", "unsubscribe").contains(parts.get(0))) {
String channelName = (String) parts.get(1);
CommandData<Object, Object> commandData = pubSubChannels.get(channelName);
String command = parts.get(0).toString();
if (Arrays.asList("subscribe", "psubscribe", "punsubscribe", "unsubscribe").contains(command)) {
String channelName = parts.get(1).toString();
PubSubKey key = new PubSubKey(channelName, command);
CommandData<Object, Object> commandData = pubSubChannels.get(key);
if (commandData == null) {
return null;
}
Expand Down
68 changes: 68 additions & 0 deletions src/main/java/org/redisson/client/handler/PubSubKey.java
@@ -0,0 +1,68 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.handler;

public class PubSubKey {

private final String channel;
private final String operation;

public PubSubKey(String channel, String operation) {
super();
this.channel = channel;
this.operation = operation;
}

public String getChannel() {
return channel;
}

public String getOperation() {
return operation;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((channel == null) ? 0 : channel.hashCode());
result = prime * result + ((operation == null) ? 0 : operation.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
PubSubKey other = (PubSubKey) obj;
if (channel == null) {
if (other.channel != null)
return false;
} else if (!channel.equals(other.channel))
return false;
if (operation == null) {
if (other.operation != null)
return false;
} else if (!operation.equals(other.operation))
return false;
return true;
}

}
Expand Up @@ -15,12 +15,12 @@
*/
package org.redisson.client.protocol.pubsub;

public class PubSubMessage<V> implements Message {
public class PubSubMessage implements Message {

private final String channel;
private final V value;
private final Object value;

public PubSubMessage(String channel, V value) {
public PubSubMessage(String channel, Object value) {
super();
this.channel = channel;
this.value = value;
Expand All @@ -30,7 +30,7 @@ public String getChannel() {
return channel;
}

public V getValue() {
public Object getValue() {
return value;
}

Expand Down
Expand Up @@ -39,8 +39,8 @@ public Object decode(ByteBuf buf, State state) throws IOException {
}

@Override
public PubSubMessage<Object> decode(List<Object> parts, State state) {
return new PubSubMessage<Object>(parts.get(1).toString(), parts.get(2));
public PubSubMessage decode(List<Object> parts, State state) {
return new PubSubMessage(parts.get(1).toString(), parts.get(2));
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/redisson/connection/ConnectionManager.java
Expand Up @@ -91,9 +91,9 @@ public interface ConnectionManager {

PubSubConnectionEntry getPubSubEntry(String channelName);

Future<PubSubConnectionEntry> psubscribe(String pattern, Codec codec);
Future<PubSubConnectionEntry> psubscribe(String pattern, Codec codec, RedisPubSubListener<?> listener);

Future<Codec> unsubscribe(String channelName);
Codec unsubscribe(String channelName);

Codec punsubscribe(String channelName);

Expand Down

0 comments on commit bb9a016

Please sign in to comment.