Skip to content

Commit

Permalink
Fixed exception during channel re-subscription. #87
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Nov 8, 2014
1 parent 57dc4fc commit f80a679
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 15 deletions.
23 changes: 12 additions & 11 deletions src/main/java/org/redisson/RedissonCountDownLatch.java
Expand Up @@ -49,7 +49,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
private static final Integer newCountMessage = 1;

private static final ConcurrentMap<String, RedissonCountDownLatchEntry> ENTRIES = new ConcurrentHashMap<String, RedissonCountDownLatchEntry>();

private final UUID id;

RedissonCountDownLatch(ConnectionManager connectionManager, String name, UUID id) {
Expand All @@ -74,12 +74,13 @@ private Future<Boolean> subscribe() {
}
return oldPromise;
}

RedisPubSubAdapter<Integer> listener = new RedisPubSubAdapter<Integer>() {

@Override
public void subscribed(String channel, long count) {
if (getChannelName().equals(channel)) {
if (getChannelName().equals(channel)
&& !value.getPromise().isSuccess()) {
value.getPromise().setSuccess(true);
}
}
Expand Down Expand Up @@ -112,7 +113,7 @@ private void release() {
RedissonCountDownLatchEntry newEntry = new RedissonCountDownLatchEntry(entry);
newEntry.release();
if (ENTRIES.replace(getEntryName(), entry, newEntry)) {
if (newEntry.isFree()
if (newEntry.isFree()
&& ENTRIES.remove(getEntryName(), newEntry)) {
Future future = connectionManager.unsubscribe(getChannelName());
future.awaitUninterruptibly();
Expand All @@ -121,7 +122,7 @@ private void release() {
}
}
}

private Promise<Boolean> aquire() {
while (true) {
RedissonCountDownLatchEntry entry = ENTRIES.get(getEntryName());
Expand All @@ -141,7 +142,7 @@ public void await() throws InterruptedException {
Future<Boolean> promise = subscribe();
try {
promise.await();

while (getCountInner() > 0) {
// waiting for open state
RedissonCountDownLatchEntry entry = ENTRIES.get(getEntryName());
Expand All @@ -162,7 +163,7 @@ public boolean await(long time, TimeUnit unit) throws InterruptedException {
if (!promise.await(time, unit)) {
return false;
}

time = unit.toMillis(time);
while (getCountInner() > 0) {
if (time <= 0) {
Expand All @@ -178,7 +179,7 @@ public boolean await(long time, TimeUnit unit) throws InterruptedException {
long elapsed = System.currentTimeMillis() - current;
time = time - elapsed;
}

return true;
} finally {
release();
Expand Down Expand Up @@ -213,7 +214,7 @@ public Void execute(RedisConnection<Object, Object> conn) {
private String getEntryName() {
return id + getName();
}

private String getChannelName() {
return groupName + getName();
}
Expand All @@ -230,7 +231,7 @@ protected Future<Number> execute(RedisAsyncConnection<Object, Number> async) {
return async.get(getName());
}
});

if (val == null) {
return 0;
}
Expand All @@ -256,7 +257,7 @@ public Boolean execute(RedisConnection<Object, Object> conn) {
}
});
}

@Override
public void delete() {
connectionManager.write(new SyncOperation<Object, Void>() {
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/redisson/RedissonLock.java
Expand Up @@ -178,7 +178,8 @@ private Future<Boolean> subscribe() {

@Override
public void subscribed(String channel, long count) {
if (getChannelName().equals(channel)) {
if (getChannelName().equals(channel)
&& !value.getPromise().isSuccess()) {
value.getPromise().setSuccess(true);
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/test/java/org/redisson/RedissonLockTest.java
Expand Up @@ -33,15 +33,15 @@ public void after() {
public void testExpire() throws InterruptedException {
RLock lock = redisson.getLock("lock");
lock.lock(2, TimeUnit.SECONDS);

final long startTime = System.currentTimeMillis();
final CountDownLatch latch = new CountDownLatch(1);
new Thread() {
public void run() {
RLock lock1 = redisson.getLock("lock");
lock1.lock();
long spendTime = System.currentTimeMillis() - startTime;
Assert.assertTrue(spendTime < 2005);
Assert.assertTrue(spendTime < 2010);
lock1.unlock();
latch.countDown();
};
Expand All @@ -51,7 +51,7 @@ public void run() {

lock.unlock();
}

@Test
public void testGetHoldCount() {
RLock lock = redisson.getLock("lock");
Expand Down

0 comments on commit f80a679

Please sign in to comment.