Skip to content

Commit

Permalink
Redisson Reference support for batch, reactive and reactive batch
Browse files Browse the repository at this point in the history
  • Loading branch information
jackygurui committed Sep 6, 2016
1 parent 2476bca commit 09be99e
Show file tree
Hide file tree
Showing 14 changed files with 309 additions and 45 deletions.
12 changes: 10 additions & 2 deletions redisson/src/main/java/org/redisson/Redisson.java
Expand Up @@ -159,7 +159,11 @@ public static RedissonReactiveClient createReactive() {
* @return Redisson instance
*/
public static RedissonReactiveClient createReactive(Config config) {
return new RedissonReactive(config);
RedissonReactive react = new RedissonReactive(config);
if (config.isRedissonReferenceEnabled()) {
react.enableRedissonReferenceSupport();
}
return react;
}

@Override
Expand Down Expand Up @@ -489,7 +493,11 @@ public RKeys getKeys() {

@Override
public RBatch createBatch() {
return new RedissonBatch(evictionScheduler, connectionManager);
RedissonBatch batch = new RedissonBatch(evictionScheduler, connectionManager);
if (config.isRedissonReferenceEnabled()) {
batch.enableRedissonReferenceSupport(this);
}
return batch;
}

@Override
Expand Down
5 changes: 4 additions & 1 deletion redisson/src/main/java/org/redisson/RedissonBatch.java
Expand Up @@ -56,7 +56,7 @@ public class RedissonBatch implements RBatch {
private final EvictionScheduler evictionScheduler;
private final CommandBatchService executorService;

public RedissonBatch(EvictionScheduler evictionScheduler, ConnectionManager connectionManager) {
protected RedissonBatch(EvictionScheduler evictionScheduler, ConnectionManager connectionManager) {
this.executorService = new CommandBatchService(connectionManager);
this.evictionScheduler = evictionScheduler;
}
Expand Down Expand Up @@ -281,5 +281,8 @@ public <K, V> RMultimapCacheAsync<K, V> getListMultimapCache(String name, Codec
return new RedissonListMultimapCache<K, V>(evictionScheduler, codec, executorService, name);
}

protected void enableRedissonReferenceSupport(Redisson redisson) {
this.executorService.enableRedissonReferenceSupport(redisson);
}

}
19 changes: 17 additions & 2 deletions redisson/src/main/java/org/redisson/RedissonReactive.java
Expand Up @@ -45,6 +45,7 @@
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.codec.CodecProvider;
import org.redisson.command.CommandReactiveService;
import org.redisson.config.Config;
import org.redisson.config.ConfigSupport;
Expand Down Expand Up @@ -82,14 +83,16 @@ public class RedissonReactive implements RedissonReactiveClient {
protected final CommandReactiveService commandExecutor;
protected final ConnectionManager connectionManager;
protected final Config config;

protected final CodecProvider codecProvider;

protected RedissonReactive(Config config) {
this.config = config;
Config configCopy = new Config(config);

connectionManager = ConfigSupport.createConnectionManager(configCopy);
commandExecutor = new CommandReactiveService(connectionManager);
evictionScheduler = new EvictionScheduler(commandExecutor);
codecProvider = config.getCodecProvider();
}


Expand Down Expand Up @@ -259,7 +262,11 @@ public RScriptReactive getScript() {

@Override
public RBatchReactive createBatch() {
return new RedissonBatchReactive(evictionScheduler, connectionManager);
RedissonBatchReactive batch = new RedissonBatchReactive(evictionScheduler, connectionManager);
if (config.isRedissonReferenceEnabled()) {
batch.enableRedissonReferenceSupport(this);
}
return batch;
}

@Override
Expand All @@ -272,6 +279,11 @@ public Config getConfig() {
return config;
}

@Override
public CodecProvider getCodecProvider() {
return codecProvider;
}

@Override
public NodesGroup<Node> getNodesGroup() {
return new RedisNodes<Node>(connectionManager);
Expand Down Expand Up @@ -300,5 +312,8 @@ public boolean isShuttingDown() {
return connectionManager.isShuttingDown();
}

protected void enableRedissonReferenceSupport() {
this.commandExecutor.enableRedissonReferenceSupport(this);
}
}

37 changes: 29 additions & 8 deletions redisson/src/main/java/org/redisson/RedissonReference.java
Expand Up @@ -17,6 +17,7 @@

import org.redisson.client.codec.Codec;
import org.redisson.api.RObject;
import org.redisson.api.RObjectReactive;
import org.redisson.api.annotation.REntity;

/**
Expand All @@ -37,12 +38,14 @@ public RedissonReference(Class type, String keyName) {
}

public RedissonReference(Class type, String keyName, Codec codec) {
if (!type.isAnnotationPresent(REntity.class) && !RObject.class.isAssignableFrom(type)) {
throw new IllegalArgumentException("Class reference has to be a type of either RObject or RLiveObject");
if (!type.isAnnotationPresent(REntity.class) && !RObject.class.isAssignableFrom(type) && !RObjectReactive.class.isAssignableFrom(type)) {
throw new IllegalArgumentException("Class reference has to be a type of either RObject or RLiveObject or RObjectReactive");
}
this.type = type.getName();
this.type = RObjectReactive.class.isAssignableFrom(type)
? type.getName().substring(0, type.getName().length() - "Reactive".length()).replaceFirst(".reactive", "")
: type.getName();
this.keyName = keyName;
this.codec = codec != null ? codec.getClass().getCanonicalName() : null;
this.codec = codec != null ? codec.getClass().getName() : null;
}

public boolean isDefaultCodec() {
Expand All @@ -60,21 +63,39 @@ public Class<?> getType() throws Exception {
return Class.forName(type);
}

/**
* @return the type
* @throws java.lang.Exception - which could be:
* LinkageError - if the linkage fails
* ExceptionInInitializerError - if the initialization provoked by this method fails
* ClassNotFoundException - if the class cannot be located
*/
public Class<?> getReactiveType() throws Exception {
return Class.forName(type.replaceFirst("org.redisson", "org.redisson.reactive") + "Reactive");//live object is not supported in reactive client
}

/**
* @return type name in string
*/
public String getTypeName() {
return type;
}

/**
* @return type name in string
*/
public String getReactiveTypeName() {
return type + "Reactive";
}

/**
* @param type the type to set
*/
public void setType(Class<?> type) {
if (!type.isAnnotationPresent(REntity.class) && !RObject.class.isAssignableFrom(type)) {
throw new IllegalArgumentException("Class reference has to be a type of either RObject or RLiveObject");
if (!type.isAnnotationPresent(REntity.class) && (!RObject.class.isAssignableFrom(type) || !RObjectReactive.class.isAssignableFrom(type))) {
throw new IllegalArgumentException("Class reference has to be a type of either RObject or RLiveObject or RObjectReactive");
}
this.type = type.getCanonicalName();
this.type = type.getName();
}

/**
Expand Down Expand Up @@ -115,7 +136,7 @@ public String getCodecName() {
* @param codec the codec to set
*/
public void setCodecType(Class<? extends Codec> codec) {
this.codec = codec.getCanonicalName();
this.codec = codec.getName();
}

}
4 changes: 3 additions & 1 deletion redisson/src/main/java/org/redisson/api/RObjectReactive.java
Expand Up @@ -16,6 +16,7 @@
package org.redisson.api;

import org.reactivestreams.Publisher;
import org.redisson.client.codec.Codec;

/**
* Base interface for all Redisson objects
Expand All @@ -26,7 +27,8 @@
public interface RObjectReactive {

String getName();


Codec getCodec();
/**
* Transfer a object from a source Redis instance to a destination Redis instance
* in mode
Expand Down
Expand Up @@ -18,6 +18,7 @@
import java.util.List;

import org.redisson.client.codec.Codec;
import org.redisson.codec.CodecProvider;
import org.redisson.config.Config;

/**
Expand Down Expand Up @@ -356,7 +357,14 @@ public interface RedissonReactiveClient {
* @return Config object
*/
Config getConfig();


/**
* Returns the CodecProvider instance
*
* @return CodecProvider
*/
CodecProvider getCodecProvider();

/**
* Get Redis nodes group for server operations
*
Expand Down
Expand Up @@ -28,8 +28,8 @@
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;

import io.netty.util.concurrent.Future;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.api.RedissonReactiveClient;

/**
*
Expand All @@ -40,8 +40,12 @@ public interface CommandAsyncExecutor {

ConnectionManager getConnectionManager();

CommandAsyncExecutor enableRedissonReferenceSupport(Redisson redisson);

CommandAsyncExecutor enableRedissonReferenceSupport(RedissonClient redisson);

CommandAsyncExecutor enableRedissonReferenceSupport(RedissonReactiveClient redissonReactive);

boolean isRedissonReferenceSupportEnabled();

<V> RedisException convertException(RFuture<V> RFuture);

boolean await(RFuture<?> RFuture, long timeout, TimeUnit timeoutUnit) throws InterruptedException;
Expand Down
Expand Up @@ -61,8 +61,11 @@
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import org.redisson.Redisson;
import org.redisson.RedissonReactive;
import org.redisson.RedissonReference;
import org.redisson.api.RObject;
import org.redisson.api.RedissonClient;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.liveobject.misc.RedissonObjectFactory;

/**
Expand All @@ -75,7 +78,8 @@ public class CommandAsyncService implements CommandAsyncExecutor {
private static final Logger log = LoggerFactory.getLogger(CommandAsyncService.class);

final ConnectionManager connectionManager;
private Redisson redisson;
protected RedissonClient redisson;
protected RedissonReactiveClient redissonReactive;

public CommandAsyncService(ConnectionManager connectionManager) {
this.connectionManager = connectionManager;
Expand All @@ -87,13 +91,28 @@ public ConnectionManager getConnectionManager() {
}

@Override
public CommandAsyncExecutor enableRedissonReferenceSupport(Redisson redisson) {
public CommandAsyncExecutor enableRedissonReferenceSupport(RedissonClient redisson) {
if (redisson != null) {
this.redisson = redisson;
this.redissonReactive = null;
}
return this;
}

@Override
public CommandAsyncExecutor enableRedissonReferenceSupport(RedissonReactiveClient redissonReactive) {
if (redissonReactive != null) {
this.redissonReactive = redissonReactive;
this.redisson = null;
}
return this;
}

@Override
public boolean isRedissonReferenceSupportEnabled() {
return redisson != null || redissonReactive != null;
}

@Override
public <V> V get(RFuture<V> future) {
final CountDownLatch l = new CountDownLatch(1);
Expand Down Expand Up @@ -450,9 +469,11 @@ protected <V, R> void async(final boolean readOnlyMode, final NodeSource source,
}

final AsyncDetails<V, R> details = AsyncDetails.acquire();
if (redisson != null) {
if (isRedissonReferenceSupportEnabled()) {
for (int i = 0; i < params.length; i++) {
RedissonReference reference = RedissonObjectFactory.toReference(redisson, params[i]);
RedissonReference reference = redisson != null
? RedissonObjectFactory.toReference(redisson, params[i])
: RedissonObjectFactory.toReference(redissonReactive, params[i]);
params[i] = reference == null ? params[i] : reference;
}
}
Expand Down Expand Up @@ -730,9 +751,25 @@ private <R, V> void checkAttemptFuture(final NodeSource source, final AsyncDetai
}
((RedisClientResult)res).setRedisClient(addr);
}
if (redisson != null && res instanceof RedissonReference) {

if (isRedissonReferenceSupportEnabled() && res instanceof List) {
List r = (List) res;
for (int i = 0; i < r.size(); i++) {
if (r.get(i) instanceof RedissonReference) {
try {
r.set(i ,(redisson != null
? RedissonObjectFactory.<R>fromReference(redisson, (RedissonReference) r.get(i))
: RedissonObjectFactory.<R>fromReference(redissonReactive, (RedissonReference) r.get(i))));
} catch (Exception exception) {//skip and carry on to next one.
}
}
}
details.getMainPromise().trySuccess(res);
} else if (isRedissonReferenceSupportEnabled() && res instanceof RedissonReference) {
try {
details.getMainPromise().trySuccess(RedissonObjectFactory.<R>fromReference(redisson, (RedissonReference) res));
details.getMainPromise().trySuccess(redisson != null
? RedissonObjectFactory.<R>fromReference(redisson, (RedissonReference) res)
: RedissonObjectFactory.<R>fromReference(redissonReactive, (RedissonReference) res));
} catch (Exception exception) {
details.getMainPromise().trySuccess(res);//fallback
}
Expand Down
Expand Up @@ -50,8 +50,9 @@
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
import org.redisson.RedissonReference;
import org.redisson.liveobject.misc.RedissonObjectFactory;

public class CommandBatchService extends CommandReactiveService {

Expand Down Expand Up @@ -109,7 +110,14 @@ protected <V, R> void async(boolean readOnlyMode, NodeSource nodeSource,
if (!readOnlyMode) {
entry.setReadOnlyMode(false);
}

if (isRedissonReferenceSupportEnabled()) {
for (int i = 0; i < params.length; i++) {
RedissonReference reference = redisson != null
? RedissonObjectFactory.toReference(redisson, params[i])
: RedissonObjectFactory.toReference(redissonReactive, params[i]);
params[i] = reference == null ? params[i] : reference;
}
}
BatchCommandData<V, R> commandData = new BatchCommandData<V, R>(mainPromise, codec, command, params, index.incrementAndGet());
entry.getCommands().add(commandData);
}
Expand Down Expand Up @@ -171,7 +179,14 @@ public void operationComplete(Future<Void> future) throws Exception {
Collections.sort(entries);
List<Object> result = new ArrayList<Object>(entries.size());
for (BatchCommandData<?, ?> commandEntry : entries) {
result.add(commandEntry.getPromise().getNow());
Object entryResult = commandEntry.getPromise().getNow();
if (isRedissonReferenceSupportEnabled() && entryResult instanceof RedissonReference) {
result.add(redisson != null
? RedissonObjectFactory.<Object>fromReference(redisson, (RedissonReference) entryResult)
: RedissonObjectFactory.<Object>fromReference(redissonReactive, (RedissonReference) entryResult));
} else {
result.add(commandEntry.getPromise().getNow());
}
}
promise.setSuccess(result);
commands = null;
Expand Down

0 comments on commit 09be99e

Please sign in to comment.