Skip to content

Commit

Permalink
Fixed - RTopic.onSubscribe should be invoked after failover process
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Apr 21, 2017
1 parent 15ed7c8 commit 7453033
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 76 deletions.
Expand Up @@ -64,9 +64,9 @@ public interface ConnectionManager {


boolean isShuttingDown(); boolean isShuttingDown();


RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, RedisPubSubListener<?> listener); RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, RedisPubSubListener<?>... listeners);


RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, RedisPubSubListener<?> listener, AsyncSemaphore semaphore); RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners);


ConnectionInitializer getConnectListener(); ConnectionInitializer getConnectListener();


Expand Down Expand Up @@ -106,9 +106,9 @@ public interface ConnectionManager {


PubSubConnectionEntry getPubSubEntry(String channelName); PubSubConnectionEntry getPubSubEntry(String channelName);


RFuture<PubSubConnectionEntry> psubscribe(String pattern, Codec codec, RedisPubSubListener<?> listener); RFuture<PubSubConnectionEntry> psubscribe(String pattern, Codec codec, RedisPubSubListener<?>... listeners);


RFuture<PubSubConnectionEntry> psubscribe(String pattern, Codec codec, RedisPubSubListener<?> listener, AsyncSemaphore semaphore); RFuture<PubSubConnectionEntry> psubscribe(String pattern, Codec codec, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners);


Codec unsubscribe(String channelName, AsyncSemaphore lock); Codec unsubscribe(String channelName, AsyncSemaphore lock);


Expand Down
Expand Up @@ -364,60 +364,53 @@ public PubSubConnectionEntry getPubSubEntry(String channelName) {
} }


@Override @Override
public RFuture<PubSubConnectionEntry> psubscribe(final String channelName, final Codec codec, final RedisPubSubListener<?> listener) { public RFuture<PubSubConnectionEntry> psubscribe(final String channelName, final Codec codec, final RedisPubSubListener<?>... listeners) {
final AsyncSemaphore lock = getSemaphore(channelName); final AsyncSemaphore lock = getSemaphore(channelName);
final RPromise<PubSubConnectionEntry> result = newPromise(); final RPromise<PubSubConnectionEntry> result = newPromise();
lock.acquire(new Runnable() { lock.acquire(new Runnable() {
@Override @Override
public void run() { public void run() {
RFuture<PubSubConnectionEntry> future = psubscribe(channelName, codec, listener, lock); RFuture<PubSubConnectionEntry> future = psubscribe(channelName, codec, lock, listeners);
future.addListener(new TransferListener<PubSubConnectionEntry>(result)); future.addListener(new TransferListener<PubSubConnectionEntry>(result));
} }
}); });
return result; return result;
} }


public RFuture<PubSubConnectionEntry> psubscribe(String channelName, Codec codec, RedisPubSubListener<?> listener, AsyncSemaphore semaphore) { public RFuture<PubSubConnectionEntry> psubscribe(String channelName, Codec codec, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) {
RPromise<PubSubConnectionEntry> promise = newPromise(); RPromise<PubSubConnectionEntry> promise = newPromise();
subscribe(codec, channelName, listener, promise, PubSubType.PSUBSCRIBE, semaphore); subscribe(codec, channelName, promise, PubSubType.PSUBSCRIBE, semaphore, listeners);
return promise; return promise;
} }


public RFuture<PubSubConnectionEntry> subscribe(final Codec codec, final String channelName, final RedisPubSubListener<?> listener) { public RFuture<PubSubConnectionEntry> subscribe(final Codec codec, final String channelName, final RedisPubSubListener<?>... listeners) {
final AsyncSemaphore lock = getSemaphore(channelName); final AsyncSemaphore lock = getSemaphore(channelName);
final RPromise<PubSubConnectionEntry> result = newPromise(); final RPromise<PubSubConnectionEntry> result = newPromise();
lock.acquire(new Runnable() { lock.acquire(new Runnable() {
@Override @Override
public void run() { public void run() {
RFuture<PubSubConnectionEntry> future = subscribe(codec, channelName, listener, lock); RFuture<PubSubConnectionEntry> future = subscribe(codec, channelName, lock, listeners);
future.addListener(new TransferListener<PubSubConnectionEntry>(result)); future.addListener(new TransferListener<PubSubConnectionEntry>(result));
} }
}); });
return result; return result;
} }


public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, RedisPubSubListener<?> listener, AsyncSemaphore semaphore) { public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) {
RPromise<PubSubConnectionEntry> promise = newPromise(); RPromise<PubSubConnectionEntry> promise = newPromise();
subscribe(codec, channelName, listener, promise, PubSubType.SUBSCRIBE, semaphore); subscribe(codec, channelName, promise, PubSubType.SUBSCRIBE, semaphore, listeners);
return promise; return promise;
} }


public AsyncSemaphore getSemaphore(String channelName) { public AsyncSemaphore getSemaphore(String channelName) {
return locks[Math.abs(channelName.hashCode() % locks.length)]; return locks[Math.abs(channelName.hashCode() % locks.length)];
} }


private void subscribe(final Codec codec, final String channelName, final RedisPubSubListener<?> listener, private void subscribe(final Codec codec, final String channelName,
final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock) { final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock, final RedisPubSubListener<?>... listeners) {
final PubSubConnectionEntry connEntry = name2PubSubConnection.get(channelName); final PubSubConnectionEntry connEntry = name2PubSubConnection.get(channelName);
if (connEntry != null) { if (connEntry != null) {
connEntry.addListener(channelName, listener); subscribe(channelName, promise, type, lock, connEntry, listeners);
connEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
lock.release();
promise.trySuccess(connEntry);
}
});
return; return;
} }


Expand All @@ -431,7 +424,7 @@ public void run() {


final PubSubConnectionEntry freeEntry = freePubSubConnections.peek(); final PubSubConnectionEntry freeEntry = freePubSubConnections.peek();
if (freeEntry == null) { if (freeEntry == null) {
connect(codec, channelName, listener, promise, type, lock); connect(codec, channelName, promise, type, lock, listeners);
return; return;
} }


Expand All @@ -445,14 +438,7 @@ public void run() {
freeEntry.release(); freeEntry.release();
freePubSubLock.release(); freePubSubLock.release();


oldEntry.addListener(channelName, listener); subscribe(channelName, promise, type, lock, oldEntry, listeners);
oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
lock.release();
promise.trySuccess(oldEntry);
}
});
return; return;
} }


Expand All @@ -461,14 +447,7 @@ public void operationComplete(Future<Void> future) throws Exception {
} }
freePubSubLock.release(); freePubSubLock.release();


freeEntry.addListener(channelName, listener); subscribe(channelName, promise, type, lock, freeEntry, listeners);
freeEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
lock.release();
promise.trySuccess(freeEntry);
}
});


if (PubSubType.PSUBSCRIBE == type) { if (PubSubType.PSUBSCRIBE == type) {
freeEntry.psubscribe(codec, channelName); freeEntry.psubscribe(codec, channelName);
Expand All @@ -480,8 +459,23 @@ public void operationComplete(Future<Void> future) throws Exception {
}); });
} }


private void connect(final Codec codec, final String channelName, final RedisPubSubListener<?> listener, private void subscribe(final String channelName, final RPromise<PubSubConnectionEntry> promise,
final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock) { final PubSubType type, final AsyncSemaphore lock, final PubSubConnectionEntry connEntry,
final RedisPubSubListener<?>... listeners) {
for (RedisPubSubListener<?> listener : listeners) {
connEntry.addListener(channelName, listener);
}
connEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
lock.release();
promise.trySuccess(connEntry);
}
});
}

private void connect(final Codec codec, final String channelName,
final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock, final RedisPubSubListener<?>... listeners) {
final int slot = calcSlot(channelName); final int slot = calcSlot(channelName);
RFuture<RedisPubSubConnection> connFuture = nextPubSubConnection(slot); RFuture<RedisPubSubConnection> connFuture = nextPubSubConnection(slot);
connFuture.addListener(new FutureListener<RedisPubSubConnection>() { connFuture.addListener(new FutureListener<RedisPubSubConnection>() {
Expand All @@ -505,29 +499,15 @@ public void operationComplete(Future<RedisPubSubConnection> future) throws Excep
releaseSubscribeConnection(slot, entry); releaseSubscribeConnection(slot, entry);


freePubSubLock.release(); freePubSubLock.release();


oldEntry.addListener(channelName, listener); subscribe(channelName, promise, type, lock, oldEntry, listeners);
oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
lock.release();
promise.trySuccess(oldEntry);
}
});
return; return;
} }


freePubSubConnections.add(entry); freePubSubConnections.add(entry);
freePubSubLock.release(); freePubSubLock.release();


entry.addListener(channelName, listener); subscribe(channelName, promise, type, lock, entry, listeners);
entry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
lock.release();
promise.trySuccess(entry);
}
});


if (PubSubType.PSUBSCRIBE == type) { if (PubSubType.PSUBSCRIBE == type) {
entry.psubscribe(codec, channelName); entry.psubscribe(codec, channelName);
Expand Down
Expand Up @@ -211,21 +211,15 @@ public void operationComplete(Future<Codec> future) throws Exception {


private void subscribe(final String channelName, final Collection<RedisPubSubListener<?>> listeners, private void subscribe(final String channelName, final Collection<RedisPubSubListener<?>> listeners,
final Codec subscribeCodec) { final Codec subscribeCodec) {
RFuture<PubSubConnectionEntry> subscribeFuture = connectionManager.subscribe(subscribeCodec, channelName, null); RFuture<PubSubConnectionEntry> subscribeFuture = connectionManager.subscribe(subscribeCodec, channelName, listeners.toArray(new RedisPubSubListener[listeners.size()]));
subscribeFuture.addListener(new FutureListener<PubSubConnectionEntry>() { subscribeFuture.addListener(new FutureListener<PubSubConnectionEntry>() {


@Override @Override
public void operationComplete(Future<PubSubConnectionEntry> future) public void operationComplete(Future<PubSubConnectionEntry> future)
throws Exception { throws Exception {
if (!future.isSuccess()) { if (future.isSuccess()) {
subscribe(channelName, listeners, subscribeCodec); log.debug("resubscribed listeners of '{}' channel to {}", channelName, future.getNow().getConnection().getRedisClient());
return;
}
PubSubConnectionEntry newEntry = future.getNow();
for (RedisPubSubListener<?> redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);
} }
log.debug("resubscribed listeners of '{}' channel to {}", channelName, newEntry.getConnection().getRedisClient());
} }
}); });
} }
Expand Down
Expand Up @@ -26,7 +26,6 @@


import org.redisson.PubSubMessageListener; import org.redisson.PubSubMessageListener;
import org.redisson.PubSubPatternMessageListener; import org.redisson.PubSubPatternMessageListener;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.BaseRedisPubSubListener; import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener; import org.redisson.client.RedisPubSubListener;
Expand Down
Expand Up @@ -98,7 +98,7 @@ public void run() {
} }


RedisPubSubListener<Object> listener = createListener(channelName, value); RedisPubSubListener<Object> listener = createListener(channelName, value);
connectionManager.subscribe(LongCodec.INSTANCE, channelName, listener, semaphore); connectionManager.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
} }
}; };
semaphore.acquire(listener); semaphore.acquire(listener);
Expand Down
92 changes: 85 additions & 7 deletions redisson/src/test/java/org/redisson/RedissonTopicTest.java
Expand Up @@ -6,22 +6,23 @@
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;


import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner.RedisProcess; import org.redisson.RedisRunner.RedisProcess;
import org.redisson.api.RSet; import org.redisson.api.RSet;
import org.redisson.api.RTopic; import org.redisson.api.RTopic;
Expand Down Expand Up @@ -463,7 +464,7 @@ public void testListenerRemove() throws InterruptedException {
} }


@Test @Test
public void testReattach() throws InterruptedException, IOException, ExecutionException, TimeoutException { public void testReattach() throws Exception {
RedisProcess runner = new RedisRunner() RedisProcess runner = new RedisRunner()
.nosave() .nosave()
.randomDir() .randomDir()
Expand All @@ -475,14 +476,24 @@ public void testReattach() throws InterruptedException, IOException, ExecutionEx
RedissonClient redisson = Redisson.create(config); RedissonClient redisson = Redisson.create(config);


final AtomicBoolean executed = new AtomicBoolean(); final AtomicBoolean executed = new AtomicBoolean();
final AtomicInteger subscriptions = new AtomicInteger();


RTopic<Integer> topic = redisson.getTopic("topic"); RTopic<Integer> topic = redisson.getTopic("topic");
topic.addListener(new StatusListener() {

@Override
public void onUnsubscribe(String channel) {
}

@Override
public void onSubscribe(String channel) {
subscriptions.incrementAndGet();
}
});
topic.addListener(new MessageListener<Integer>() { topic.addListener(new MessageListener<Integer>() {
@Override @Override
public void onMessage(String channel, Integer msg) { public void onMessage(String channel, Integer msg) {
if (msg == 1) { executed.set(true);
executed.set(true);
}
} }
}); });


Expand All @@ -498,10 +509,77 @@ public void onMessage(String channel, Integer msg) {


redisson.getTopic("topic").publish(1); redisson.getTopic("topic").publish(1);


await().atMost(5, TimeUnit.SECONDS).untilTrue(executed); await().atMost(2, TimeUnit.SECONDS).untilTrue(executed);
await().atMost(2, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2);


redisson.shutdown(); redisson.shutdown();
runner.stop(); runner.stop();
} }

@Test
public void testReattachInCluster() throws Exception {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();


ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterProcesses process = clusterRunner.run();

Config config = new Config();
config.useClusterServers()
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);

final AtomicBoolean executed = new AtomicBoolean();
final AtomicInteger subscriptions = new AtomicInteger();

RTopic<Integer> topic = redisson.getTopic("topic");
topic.addListener(new StatusListener() {

@Override
public void onUnsubscribe(String channel) {
}

@Override
public void onSubscribe(String channel) {
subscriptions.incrementAndGet();
}
});
topic.addListener(new MessageListener<Integer>() {
@Override
public void onMessage(String channel, Integer msg) {
executed.set(true);
}
});

process.getNodes().stream().filter(x -> Arrays.asList(slave1.getPort(), slave2.getPort(), slave3.getPort()).contains(x.getRedisServerPort()))
.forEach(x -> {
try {
x.stop();
Thread.sleep(18000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});

Thread.sleep(15000);

redisson.getTopic("topic").publish(1);

await().atMost(75, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2);
Assert.assertTrue(executed.get());

redisson.shutdown();
process.shutdown();
}



} }

0 comments on commit 7453033

Please sign in to comment.