Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Aug 5, 2016
1 parent 1d0d87b commit c3c190e
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 48 deletions.
4 changes: 3 additions & 1 deletion src/main/java/org/redisson/Redisson.java
Expand Up @@ -79,6 +79,7 @@
import org.redisson.liveobject.provider.DefaultCodecProvider;
import org.redisson.liveobject.provider.DefaultResolverProvider;
import org.redisson.liveobject.provider.ResolverProvider;
import org.redisson.pubsub.SemaphorePubSub;

import io.netty.util.concurrent.Future;
import io.netty.util.internal.PlatformDependent;
Expand All @@ -100,6 +101,7 @@ public class Redisson implements RedissonClient {
protected final CodecProvider liveObjectDefaultCodecProvider = new DefaultCodecProvider();
protected final ResolverProvider liveObjectDefaultResolverProvider = new DefaultResolverProvider();
protected final Config config;
protected final SemaphorePubSub semaphorePubSub = new SemaphorePubSub();

protected final UUID id = UUID.randomUUID();

Expand Down Expand Up @@ -504,7 +506,7 @@ public RBitSet getBitSet(String name) {

@Override
public RSemaphore getSemaphore(String name) {
return new RedissonSemaphore(commandExecutor, name, id);
return new RedissonSemaphore(commandExecutor, name, semaphorePubSub);
}

@Override
Expand Down
19 changes: 16 additions & 3 deletions src/main/java/org/redisson/RedissonBlockingDeque.java
Expand Up @@ -26,6 +26,8 @@
import org.redisson.command.CommandAsyncExecutor;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;

/**
* <p>Distributed and concurrent implementation of {@link java.util.concurrent.BlockingDeque}.
Expand All @@ -50,8 +52,19 @@ protected RedissonBlockingDeque(Codec codec, CommandAsyncExecutor commandExecuto
}

@Override
public Future<Boolean> putAsync(V e) {
return offerAsync(e);
public Future<Void> putAsync(V e) {
final Promise<Void> result = commandExecutor.getConnectionManager().newPromise();
offerAsync(e).addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
result.setFailure(future.cause());
return;
}
result.setSuccess(null);
}
});
return result;
}

/*
Expand All @@ -60,7 +73,7 @@ public Future<Boolean> putAsync(V e) {
*/
@Override
public void put(V e) throws InterruptedException {
offer(e);
get(putAsync(e));
}

@Override
Expand Down
19 changes: 16 additions & 3 deletions src/main/java/org/redisson/RedissonBlockingQueue.java
Expand Up @@ -29,6 +29,8 @@
import org.redisson.connection.decoder.ListDrainToDecoder;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;

/**
* <p>Distributed and concurrent implementation of {@link java.util.concurrent.BlockingQueue}.
Expand All @@ -50,8 +52,19 @@ protected RedissonBlockingQueue(Codec codec, CommandAsyncExecutor commandExecuto
}

@Override
public Future<Boolean> putAsync(V e) {
return offerAsync(e);
public Future<Void> putAsync(V e) {
final Promise<Void> result = commandExecutor.getConnectionManager().newPromise();
offerAsync(e).addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
result.setFailure(future.cause());
return;
}
result.setSuccess(null);
}
});
return result;
}

/*
Expand All @@ -60,7 +73,7 @@ public Future<Boolean> putAsync(V e) {
*/
@Override
public void put(V e) throws InterruptedException {
offer(e);
get(putAsync(e));
}

@Override
Expand Down
10 changes: 0 additions & 10 deletions src/main/java/org/redisson/RedissonBloomFilter.java
Expand Up @@ -167,16 +167,6 @@ public boolean contains(T object) {
}
}

private byte[] encode(T object) {
byte[] state = null;
try {
state = codec.getValueEncoder().encode(object);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
return state;
}

private void addConfigCheck(int hashIterations, long size, CommandBatchService executorService) {
executorService.evalReadAsync(getConfigName(), codec, RedisCommands.EVAL_VOID,
"local size = redis.call('hget', KEYS[1], 'size');" +
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/org/redisson/RedissonObject.java
Expand Up @@ -15,6 +15,7 @@
*/
package org.redisson;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import org.redisson.api.RObject;
Expand Down Expand Up @@ -132,5 +133,13 @@ public Future<Boolean> isExistsAsync() {
public Codec getCodec() {
return codec;
}

protected byte[] encode(Object value) {
try {
return codec.getValueEncoder().encode(value);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}

}
2 changes: 1 addition & 1 deletion src/main/java/org/redisson/RedissonQueue.java
Expand Up @@ -43,7 +43,7 @@ protected RedissonQueue(Codec codec, CommandAsyncExecutor commandExecutor, Strin

@Override
public boolean offer(V e) {
return add(e);
return get(offerAsync(e));
}

@Override
Expand Down
8 changes: 0 additions & 8 deletions src/main/java/org/redisson/RedissonScoredSortedSet.java
Expand Up @@ -330,14 +330,6 @@ public boolean retainAll(Collection<?> c) {
return get(retainAllAsync(c));
}

private byte[] encode(V value) {
try {
return codec.getValueEncoder().encode(value);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}

@Override
public Future<Boolean> retainAllAsync(Collection<?> c) {
if (c.isEmpty()) {
Expand Down
19 changes: 6 additions & 13 deletions src/main/java/org/redisson/RedissonSemaphore.java
Expand Up @@ -17,7 +17,6 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -45,20 +44,14 @@
*/
public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {

final UUID id;

private static final SemaphorePubSub PUBSUB = new SemaphorePubSub();
private final SemaphorePubSub semaphorePubSub;

final CommandExecutor commandExecutor;

protected RedissonSemaphore(CommandExecutor commandExecutor, String name, UUID id) {
protected RedissonSemaphore(CommandExecutor commandExecutor, String name, SemaphorePubSub semaphorePubSub) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = id;
}

private String getEntryName() {
return id + ":" + getName();
this.semaphorePubSub = semaphorePubSub;
}

String getChannelName() {
Expand Down Expand Up @@ -375,15 +368,15 @@ public void run() {


private RedissonLockEntry getEntry() {
return PUBSUB.getEntry(getEntryName());
return semaphorePubSub.getEntry(getName());
}

private Future<RedissonLockEntry> subscribe() {
return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager());
return semaphorePubSub.subscribe(getName(), getChannelName(), commandExecutor.getConnectionManager());
}

private void unsubscribe(Future<RedissonLockEntry> future) {
PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager());
semaphorePubSub.unsubscribe(future.getNow(), getName(), getChannelName(), commandExecutor.getConnectionManager());
}

@Override
Expand Down
8 changes: 0 additions & 8 deletions src/main/java/org/redisson/RedissonSetCache.java
Expand Up @@ -209,14 +209,6 @@ public Future<Boolean> addAsync(V value, long ttl, TimeUnit unit) {
Arrays.<Object>asList(getName()), System.currentTimeMillis(), timeoutDate, objectState);
}

private byte[] encode(V value) {
try {
return codec.getValueEncoder().encode(value);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}

@Override
public Future<Boolean> addAsync(V value) {
return addAsync(value, 92233720368547758L - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/redisson/api/RBlockingQueueAsync.java
Expand Up @@ -53,6 +53,6 @@ public interface RBlockingQueueAsync<V> extends RQueueAsync<V> {

Future<V> takeAsync();

Future<Boolean> putAsync(V e);
Future<Void> putAsync(V e);

}

0 comments on commit c3c190e

Please sign in to comment.