Skip to content

Commit

Permalink
RCountDownLatch and RLock refactoring.
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Dec 8, 2015
1 parent d84b986 commit 2436dcc
Show file tree
Hide file tree
Showing 9 changed files with 250 additions and 204 deletions.
13 changes: 13 additions & 0 deletions src/main/java/org/redisson/PubSubEntry.java
@@ -0,0 +1,13 @@
package org.redisson;

import io.netty.util.concurrent.Promise;

public interface PubSubEntry<E> {

void aquire();

int release();

Promise<E> getPromise();

}
122 changes: 31 additions & 91 deletions src/main/java/org/redisson/RedissonCountDownLatch.java
Expand Up @@ -15,22 +15,17 @@
*/
package org.redisson;

import java.util.Collections;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;

import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.core.RCountDownLatch;
import org.redisson.pubsub.CountDownLatchPubSub;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;

/**
* Distributed alternative to the {@link java.util.concurrent.CountDownLatch}
Expand All @@ -43,10 +38,10 @@
*/
public class RedissonCountDownLatch extends RedissonObject implements RCountDownLatch {

private static final Long zeroCountMessage = 0L;
private static final Long newCountMessage = 1L;
public static final Long zeroCountMessage = 0L;
public static final Long newCountMessage = 1L;

private static final ConcurrentMap<String, RedissonCountDownLatchEntry> ENTRIES = PlatformDependent.newConcurrentHashMap();
private static final CountDownLatchPubSub PUBSUB = new CountDownLatchPubSub();

private final UUID id;

Expand All @@ -55,90 +50,23 @@ protected RedissonCountDownLatch(CommandAsyncExecutor commandExecutor, String na
this.id = id;
}

private Future<RedissonCountDownLatchEntry> subscribe() {
synchronized (ENTRIES) {
RedissonCountDownLatchEntry entry = ENTRIES.get(getEntryName());
if (entry != null) {
entry.aquire();
return entry.getPromise();
}

Promise<RedissonCountDownLatchEntry> newPromise = newPromise();
final RedissonCountDownLatchEntry value = new RedissonCountDownLatchEntry(newPromise);
value.aquire();

RedissonCountDownLatchEntry oldValue = ENTRIES.putIfAbsent(getEntryName(), value);
if (oldValue != null) {
oldValue.aquire();
return oldValue.getPromise();
}

RedisPubSubListener<Long> listener = createListener(value);
commandExecutor.getConnectionManager().subscribe(LongCodec.INSTANCE, getChannelName(), listener);
return newPromise;
}
}

private RedisPubSubListener<Long> createListener(final RedissonCountDownLatchEntry value) {
RedisPubSubListener<Long> listener = new BaseRedisPubSubListener<Long>() {

@Override
public void onMessage(String channel, Long message) {
if (!getChannelName().equals(channel)) {
return;
}
if (message.equals(zeroCountMessage)) {
value.getLatch().open();
}
if (message.equals(newCountMessage)) {
value.getLatch().close();
}
}

@Override
public boolean onStatus(PubSubType type, String channel) {
if (channel.equals(getChannelName())
&& type == PubSubType.SUBSCRIBE) {
value.getPromise().trySuccess(value);
return true;
}
return false;
}

};
return listener;
}

private void unsubscribe(RedissonCountDownLatchEntry entry) {
synchronized (ENTRIES) {
if (entry.release() == 0) {
// just an assertion
boolean removed = ENTRIES.remove(getEntryName()) == entry;
if (removed) {
commandExecutor.getConnectionManager().unsubscribe(getChannelName());
}
}
}
}

public void await() throws InterruptedException {
Future<RedissonCountDownLatchEntry> promise = subscribe();
try {
promise.await();

while (getCountInner() > 0) {
// waiting for open state
RedissonCountDownLatchEntry entry = ENTRIES.get(getEntryName());
RedissonCountDownLatchEntry entry = getEntry();
if (entry != null) {
entry.getLatch().await();
}
}
} finally {
unsubscribe(promise.getNow());
unsubscribe(promise);
}
}


@Override
public boolean await(long time, TimeUnit unit) throws InterruptedException {
Future<RedissonCountDownLatchEntry> promise = subscribe();
Expand All @@ -154,7 +82,7 @@ public boolean await(long time, TimeUnit unit) throws InterruptedException {
}
long current = System.currentTimeMillis();
// waiting for open state
RedissonCountDownLatchEntry entry = ENTRIES.get(getEntryName());
RedissonCountDownLatchEntry entry = getEntry();
if (entry != null) {
entry.getLatch().await(time, TimeUnit.MILLISECONDS);
}
Expand All @@ -165,18 +93,30 @@ public boolean await(long time, TimeUnit unit) throws InterruptedException {

return true;
} finally {
unsubscribe(promise.getNow());
unsubscribe(promise);
}
}

private RedissonCountDownLatchEntry getEntry() {
return PUBSUB.getEntry(getEntryName());
}

private Future<RedissonCountDownLatchEntry> subscribe() {
return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager());
}

private void unsubscribe(Future<RedissonCountDownLatchEntry> future) {
PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager());
}

@Override
public void countDown() {
Future<Boolean> f = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN_R1,
Future<Boolean> f = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local v = redis.call('decr', KEYS[1]);" +
"if v <= 0 then redis.call('del', KEYS[1]) end;" +
"if v == 0 then redis.call('publish', ARGV[2], ARGV[1]) end;" +
"if v == 0 then redis.call('publish', KEYS[2], ARGV[1]) end;" +
"return true",
Collections.<Object>singletonList(getName()), zeroCountMessage, getChannelName());
Arrays.<Object>asList(getName(), getChannelName()), zeroCountMessage);
get(f);
}

Expand All @@ -185,7 +125,7 @@ private String getEntryName() {
}

private String getChannelName() {
return "redisson_countdownlatch_{" + getName() + "}";
return "redisson_countdownlatch__channel__{" + getName() + "}";
}

@Override
Expand All @@ -204,28 +144,28 @@ private long getCountInner() {

@Override
public boolean trySetCount(long count) {
Future<Boolean> f = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN_R1,
Future<Boolean> f = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('exists', KEYS[1]) == 0 then "
+ "redis.call('set', KEYS[1], ARGV[2]); "
+ "redis.call('publish', ARGV[3], ARGV[1]); "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "return true "
+ "else "
+ "return false "
+ "end",
Collections.<Object>singletonList(getName()), newCountMessage, count, getChannelName());
Arrays.<Object>asList(getName(), getChannelName()), newCountMessage, count);
return get(f);
}

@Override
public Future<Boolean> deleteAsync() {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN_R1,
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('del', KEYS[1]) == 1 then "
+ "redis.call('publish', ARGV[2], ARGV[1]); "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "return true "
+ "else "
+ "return false "
+ "end",
Collections.<Object>singletonList(getName()), newCountMessage, getChannelName());
Arrays.<Object>asList(getName(), getChannelName()), newCountMessage);
}

}
Expand Up @@ -19,7 +19,7 @@

import io.netty.util.concurrent.Promise;

public class RedissonCountDownLatchEntry {
public class RedissonCountDownLatchEntry implements PubSubEntry<RedissonCountDownLatchEntry> {

private int counter;

Expand Down

0 comments on commit 2436dcc

Please sign in to comment.