Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Aug 17, 2016
1 parent 0e7dba0 commit 5f0801d
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 21 deletions.
Expand Up @@ -38,7 +38,6 @@
import org.redisson.connection.balancer.LoadBalancerManager;
import org.redisson.connection.balancer.LoadBalancerManagerImpl;
import org.redisson.connection.pool.MasterConnectionPool;
import org.redisson.pubsub.AsyncSemaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -164,18 +163,18 @@ public void operationComplete(ChannelFuture future) throws Exception {
private void reattachPubSub(RedisPubSubConnection redisPubSubConnection) {
for (String channelName : redisPubSubConnection.getChannels().keySet()) {
PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName);
Collection<RedisPubSubListener> listeners = pubSubEntry.getListeners(channelName);
Collection<RedisPubSubListener<?>> listeners = pubSubEntry.getListeners(channelName);
reattachPubSubListeners(channelName, listeners);
}

for (String channelName : redisPubSubConnection.getPatternChannels().keySet()) {
PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName);
Collection<RedisPubSubListener> listeners = pubSubEntry.getListeners(channelName);
Collection<RedisPubSubListener<?>> listeners = pubSubEntry.getListeners(channelName);
reattachPatternPubSubListeners(channelName, listeners);
}
}

private void reattachPubSubListeners(final String channelName, final Collection<RedisPubSubListener> listeners) {
private void reattachPubSubListeners(final String channelName, final Collection<RedisPubSubListener<?>> listeners) {
Codec subscribeCodec = connectionManager.unsubscribe(channelName);
if (listeners.isEmpty()) {
return;
Expand All @@ -192,7 +191,7 @@ public void operationComplete(Future<PubSubConnectionEntry> future)
return;
}
PubSubConnectionEntry newEntry = future.getNow();
for (RedisPubSubListener redisPubSubListener : listeners) {
for (RedisPubSubListener<?> redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);
}
log.debug("resubscribed listeners for '{}' channel", channelName);
Expand All @@ -201,7 +200,7 @@ public void operationComplete(Future<PubSubConnectionEntry> future)
}

private void reattachPatternPubSubListeners(final String channelName,
final Collection<RedisPubSubListener> listeners) {
final Collection<RedisPubSubListener<?>> listeners) {
Codec subscribeCodec = connectionManager.punsubscribe(channelName);
if (!listeners.isEmpty()) {
Future<PubSubConnectionEntry> future = connectionManager.psubscribe(channelName, subscribeCodec, null);
Expand All @@ -215,7 +214,7 @@ public void operationComplete(Future<PubSubConnectionEntry> future)
}

PubSubConnectionEntry newEntry = future.getNow();
for (RedisPubSubListener redisPubSubListener : listeners) {
for (RedisPubSubListener<?> redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);
}
log.debug("resubscribed listeners for '{}' channel-pattern", channelName);
Expand Down
Expand Up @@ -40,7 +40,7 @@ public enum Status {ACTIVE, INACTIVE}
private final RedisPubSubConnection conn;

private final ConcurrentMap<String, SubscribeListener> subscribeChannelListeners = new ConcurrentHashMap<String, SubscribeListener>();
private final ConcurrentMap<String, Queue<RedisPubSubListener>> channelListeners = new ConcurrentHashMap<String, Queue<RedisPubSubListener>>();
private final ConcurrentMap<String, Queue<RedisPubSubListener<?>>> channelListeners = new ConcurrentHashMap<String, Queue<RedisPubSubListener<?>>>();

public PubSubConnectionEntry(RedisPubSubConnection conn, int subscriptionsPerConnection) {
super();
Expand All @@ -52,8 +52,8 @@ public boolean hasListeners(String channelName) {
return channelListeners.containsKey(channelName);
}

public Collection<RedisPubSubListener> getListeners(String channelName) {
Collection<RedisPubSubListener> result = channelListeners.get(channelName);
public Collection<RedisPubSubListener<?>> getListeners(String channelName) {
Collection<RedisPubSubListener<?>> result = channelListeners.get(channelName);
if (result == null) {
return Collections.emptyList();
}
Expand All @@ -65,10 +65,10 @@ public void addListener(String channelName, RedisPubSubListener<?> listener) {
return;
}

Queue<RedisPubSubListener> queue = channelListeners.get(channelName);
Queue<RedisPubSubListener<?>> queue = channelListeners.get(channelName);
if (queue == null) {
queue = new ConcurrentLinkedQueue<RedisPubSubListener>();
Queue<RedisPubSubListener> oldQueue = channelListeners.putIfAbsent(channelName, queue);
queue = new ConcurrentLinkedQueue<RedisPubSubListener<?>>();
Queue<RedisPubSubListener<?>> oldQueue = channelListeners.putIfAbsent(channelName, queue);
if (oldQueue != null) {
queue = oldQueue;
}
Expand All @@ -92,8 +92,8 @@ public void addListener(String channelName, RedisPubSubListener<?> listener) {

// TODO optimize
public boolean removeListener(String channelName, int listenerId) {
Queue<RedisPubSubListener> listeners = channelListeners.get(channelName);
for (RedisPubSubListener listener : listeners) {
Queue<RedisPubSubListener<?>> listeners = channelListeners.get(channelName);
for (RedisPubSubListener<?> listener : listeners) {
if (System.identityHashCode(listener) == listenerId) {
removeListener(channelName, listener);
return true;
Expand All @@ -102,8 +102,8 @@ public boolean removeListener(String channelName, int listenerId) {
return false;
}

private void removeListener(String channelName, RedisPubSubListener listener) {
Queue<RedisPubSubListener> queue = channelListeners.get(channelName);
private void removeListener(String channelName, RedisPubSubListener<?> listener) {
Queue<RedisPubSubListener<?>> queue = channelListeners.get(channelName);
synchronized (queue) {
if (queue.remove(listener) && queue.isEmpty()) {
channelListeners.remove(channelName);
Expand Down Expand Up @@ -156,7 +156,7 @@ public Future<Void> getSubscribeFuture(String channel, PubSubType type) {
return listener.getSuccessFuture();
}

public void unsubscribe(final String channel, final RedisPubSubListener listener) {
public void unsubscribe(final String channel, final RedisPubSubListener<?> listener) {
conn.addListener(new BaseRedisPubSubListener() {
@Override
public boolean onStatus(PubSubType type, String ch) {
Expand All @@ -179,18 +179,18 @@ private void removeListeners(String channel) {
conn.removeDisconnectListener(channel);
SubscribeListener s = subscribeChannelListeners.remove(channel);
conn.removeListener(s);
Queue<RedisPubSubListener> queue = channelListeners.get(channel);
Queue<RedisPubSubListener<?>> queue = channelListeners.get(channel);
if (queue != null) {
synchronized (queue) {
channelListeners.remove(channel);
}
for (RedisPubSubListener listener : queue) {
for (RedisPubSubListener<?> listener : queue) {
conn.removeListener(listener);
}
}
}

public void punsubscribe(final String channel, final RedisPubSubListener listener) {
public void punsubscribe(final String channel, final RedisPubSubListener<?> listener) {
conn.addListener(new BaseRedisPubSubListener() {
@Override
public boolean onStatus(PubSubType type, String ch) {
Expand Down

0 comments on commit 5f0801d

Please sign in to comment.