Skip to content

Commit

Permalink
RBucketReactive, RHyperLogLogReactive, RListReactive implemented. #210
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Nov 26, 2015
1 parent 48ed3e9 commit 4b56c24
Show file tree
Hide file tree
Showing 45 changed files with 2,075 additions and 342 deletions.
70 changes: 70 additions & 0 deletions src/main/java/org/redisson/CommandAsyncExecutor.java
@@ -0,0 +1,70 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;

import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.List;

import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager;

import io.netty.util.concurrent.Future;

/**
*
* @author Nikita Koksharov
*
*/
public interface CommandAsyncExecutor {

ConnectionManager getConnectionManager();

<V> V get(Future<V> future);

<T, R> Future<R> writeAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params);

<T, R> Future<R> readAsync(InetSocketAddress client, String key, Codec codec, RedisCommand<T> command, Object ... params);

<T, R> Future<R> evalWriteAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params);

<R, T> Future<R> writeAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params);

<T, R> Future<R> evalReadAsync(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);

<T, R> Future<R> evalReadAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);

<T, R> Future<R> evalWriteAsync(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);

<T, R> Future<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);

<T, R> Future<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params);

<T, R> Future<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params);

<T, R> Future<Collection<R>> readAllAsync(RedisCommand<T> command, Object ... params);

<T> Future<Void> writeAllAsync(RedisCommand<T> command, Object ... params);

<T, R> Future<R> writeAsync(String key, RedisCommand<T> command, Object ... params);

<T, R> Future<R> readAsync(String key, RedisCommand<T> command, Object ... params);

<T, R> Future<R> readRandomAsync(RedisCommand<T> command, Object ... params);


}
Expand Up @@ -61,13 +61,13 @@
* @author Nikita Koksharov * @author Nikita Koksharov
* *
*/ */
public class CommandExecutorService implements CommandExecutor { public class CommandAsyncService implements CommandAsyncExecutor {


final Logger log = LoggerFactory.getLogger(getClass()); final Logger log = LoggerFactory.getLogger(getClass());


final ConnectionManager connectionManager; final ConnectionManager connectionManager;


public CommandExecutorService(ConnectionManager connectionManager) { public CommandAsyncService(ConnectionManager connectionManager) {
this.connectionManager = connectionManager; this.connectionManager = connectionManager;
} }


Expand All @@ -76,6 +76,24 @@ public ConnectionManager getConnectionManager() {
return connectionManager; return connectionManager;
} }


@Override
public <V> V get(Future<V> future) {
future.awaitUninterruptibly();
if (future.isSuccess()) {
return future.getNow();
}
throw convertException(future);
}

@Override
public <T, R> Future<R> readAsync(InetSocketAddress client, String key, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise();
int slot = connectionManager.calcSlot(key);
async(true, new NodeSource(slot, client), null, codec, command, params, mainPromise, 0);
return mainPromise;
}

@Override
public <T, R> Future<Collection<R>> readAllAsync(RedisCommand<T> command, Object ... params) { public <T, R> Future<Collection<R>> readAllAsync(RedisCommand<T> command, Object ... params) {
final Promise<Collection<R>> mainPromise = connectionManager.newPromise(); final Promise<Collection<R>> mainPromise = connectionManager.newPromise();
Promise<R> promise = new DefaultPromise<R>() { Promise<R> promise = new DefaultPromise<R>() {
Expand Down Expand Up @@ -110,6 +128,7 @@ public Promise<R> setFailure(Throwable cause) {
return mainPromise; return mainPromise;
} }


@Override
public <T, R> Future<R> readRandomAsync(final RedisCommand<T> command, final Object ... params) { public <T, R> Future<R> readRandomAsync(final RedisCommand<T> command, final Object ... params) {
final Promise<R> mainPromise = connectionManager.newPromise(); final Promise<R> mainPromise = connectionManager.newPromise();
final List<ClusterSlotRange> slots = new ArrayList<ClusterSlotRange>(connectionManager.getEntries().keySet()); final List<ClusterSlotRange> slots = new ArrayList<ClusterSlotRange>(connectionManager.getEntries().keySet());
Expand Down Expand Up @@ -145,10 +164,12 @@ public void operationComplete(Future<R> future) throws Exception {
async(true, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, params, attemptPromise, 0); async(true, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, params, attemptPromise, 0);
} }


@Override
public <T> Future<Void> writeAllAsync(RedisCommand<T> command, Object ... params) { public <T> Future<Void> writeAllAsync(RedisCommand<T> command, Object ... params) {
return writeAllAsync(command, null, params); return writeAllAsync(command, null, params);
} }


@Override
public <R, T> Future<R> writeAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params) { public <R, T> Future<R> writeAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params) {
return allAsync(false, command, callback, params); return allAsync(false, command, callback, params);
} }
Expand Down Expand Up @@ -184,156 +205,53 @@ public Promise<T> setFailure(Throwable cause) {
return mainPromise; return mainPromise;
} }


public <V> V get(Future<V> future) {
future.awaitUninterruptibly();
if (future.isSuccess()) {
return future.getNow();
}
throw convertException(future);
}

protected <V> RedisException convertException(Future<V> future) { protected <V> RedisException convertException(Future<V> future) {
return future.cause() instanceof RedisException ? return future.cause() instanceof RedisException ?
(RedisException) future.cause() : (RedisException) future.cause() :
new RedisException("Unexpected exception while processing command", future.cause()); new RedisException("Unexpected exception while processing command", future.cause());
} }


public <T, R> R read(String key, RedisCommand<T> command, Object ... params) { @Override
return read(key, connectionManager.getCodec(), command, params);
}

public <T, R> R read(String key, Codec codec, RedisCommand<T> command, Object ... params) {
Future<R> res = readAsync(key, codec, command, params);
return get(res);
}

public <T, R> R read(InetSocketAddress client, String key, RedisCommand<T> command, Object ... params) {
Future<R> res = readAsync(client, key, connectionManager.getCodec(), command, params);
return get(res);
}

public <T, R> R read(InetSocketAddress client, String key, Codec codec, RedisCommand<T> command, Object ... params) {
Future<R> res = readAsync(client, key, codec, command, params);
return get(res);
}


public <T, R> Future<R> readAsync(InetSocketAddress client, String key, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise();
int slot = connectionManager.calcSlot(key);
async(true, new NodeSource(slot, client), null, codec, command, params, mainPromise, 0);
return mainPromise;
}

public <T, R> Future<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) { public <T, R> Future<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise(); Promise<R> mainPromise = connectionManager.newPromise();
int slot = connectionManager.calcSlot(key); int slot = connectionManager.calcSlot(key);
async(true, new NodeSource(slot), null, codec, command, params, mainPromise, 0); async(true, new NodeSource(slot), null, codec, command, params, mainPromise, 0);
return mainPromise; return mainPromise;
} }


public <T, R> R write(Integer slot, Codec codec, RedisCommand<T> command, Object ... params) { @Override
Future<R> res = writeAsync(slot, codec, command, params);
return get(res);
}

public <T, R> Future<R> writeAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params) { public <T, R> Future<R> writeAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise(); Promise<R> mainPromise = connectionManager.newPromise();
async(false, new NodeSource(slot), null, codec, command, params, mainPromise, 0); async(false, new NodeSource(slot), null, codec, command, params, mainPromise, 0);
return mainPromise; return mainPromise;
} }


@Override
public <T, R> Future<R> readAsync(String key, RedisCommand<T> command, Object ... params) { public <T, R> Future<R> readAsync(String key, RedisCommand<T> command, Object ... params) {
return readAsync(key, connectionManager.getCodec(), command, params); return readAsync(key, connectionManager.getCodec(), command, params);
} }


public <R> R write(String key, Codec codec, SyncOperation<R> operation) { @Override
int slot = connectionManager.calcSlot(key);
return async(false, codec, new NodeSource(slot), operation, 0);
}

public <R> R read(String key, Codec codec, SyncOperation<R> operation) {
int slot = connectionManager.calcSlot(key);
return async(true, codec, new NodeSource(slot), operation, 0);
}

private <R> R async(boolean readOnlyMode, Codec codec, NodeSource source, SyncOperation<R> operation, int attempt) {
if (!connectionManager.getShutdownLatch().acquire()) {
throw new IllegalStateException("Redisson is shutdown");
}

try {
Future<RedisConnection> connectionFuture;
if (readOnlyMode) {
connectionFuture = connectionManager.connectionReadOp(source, null);
} else {
connectionFuture = connectionManager.connectionWriteOp(source, null);
}
connectionFuture.syncUninterruptibly();

RedisConnection connection = connectionFuture.getNow();

try {
return operation.execute(codec, connection);
} catch (RedisMovedException e) {
return async(readOnlyMode, codec, new NodeSource(e.getSlot(), e.getAddr(), Redirect.MOVED), operation, attempt);
} catch (RedisAskException e) {
return async(readOnlyMode, codec, new NodeSource(e.getSlot(), e.getAddr(), Redirect.ASK), operation, attempt);
} catch (RedisLoadingException e) {
return async(readOnlyMode, codec, source, operation, attempt);
} catch (RedisTimeoutException e) {
if (attempt == connectionManager.getConfig().getRetryAttempts()) {
throw e;
}
attempt++;
return async(readOnlyMode, codec, source, operation, attempt);
} finally {
connectionManager.getShutdownLatch().release();
if (readOnlyMode) {
connectionManager.releaseRead(source, connection);
} else {
connectionManager.releaseWrite(source, connection);
}
}
} catch (RedisException e) {
if (attempt == connectionManager.getConfig().getRetryAttempts()) {
throw e;
}
try {
Thread.sleep(connectionManager.getConfig().getRetryInterval());
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
attempt++;
return async(readOnlyMode, codec, source, operation, attempt);
}
}

public <T, R> Future<R> evalReadAsync(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) { public <T, R> Future<R> evalReadAsync(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
return evalAsync(true, key, connectionManager.getCodec(), evalCommandType, script, keys, params); return evalAsync(true, key, connectionManager.getCodec(), evalCommandType, script, keys, params);
} }


@Override
public <T, R> Future<R> evalReadAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) { public <T, R> Future<R> evalReadAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
return evalAsync(true, key, codec, evalCommandType, script, keys, params); return evalAsync(true, key, codec, evalCommandType, script, keys, params);
} }


public <T, R> R evalRead(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) { @Override
return evalRead(key, connectionManager.getCodec(), evalCommandType, script, keys, params);
}

public <T, R> R evalRead(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
Future<R> res = evalReadAsync(key, codec, evalCommandType, script, keys, params);
return get(res);
}

public <T, R> Future<R> evalWriteAsync(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) { public <T, R> Future<R> evalWriteAsync(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
return evalAsync(false, key, connectionManager.getCodec(), evalCommandType, script, keys, params); return evalAsync(false, key, connectionManager.getCodec(), evalCommandType, script, keys, params);
} }


@Override
public <T, R> Future<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) { public <T, R> Future<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
return evalAsync(false, key, codec, evalCommandType, script, keys, params); return evalAsync(false, key, codec, evalCommandType, script, keys, params);
} }


@Override
public <T, R> Future<R> evalWriteAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params) { public <T, R> Future<R> evalWriteAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params) {
return evalAllAsync(false, command, callback, script, keys, params); return evalAllAsync(false, command, callback, script, keys, params);
} }
Expand Down Expand Up @@ -382,29 +300,12 @@ private <T, R> Future<R> evalAsync(boolean readOnlyMode, String key, Codec codec
return mainPromise; return mainPromise;
} }


public <T, R> R evalWrite(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) { @Override
return evalWrite(key, connectionManager.getCodec(), evalCommandType, script, keys, params);
}

public <T, R> R evalWrite(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
Future<R> res = evalWriteAsync(key, codec, evalCommandType, script, keys, params);
return get(res);
}

public <T, R> R write(String key, RedisCommand<T> command, Object ... params) {
Future<R> res = writeAsync(key, command, params);
return get(res);
}

public <T, R> Future<R> writeAsync(String key, RedisCommand<T> command, Object ... params) { public <T, R> Future<R> writeAsync(String key, RedisCommand<T> command, Object ... params) {
return writeAsync(key, connectionManager.getCodec(), command, params); return writeAsync(key, connectionManager.getCodec(), command, params);
} }


public <T, R> R write(String key, Codec codec, RedisCommand<T> command, Object ... params) { @Override
Future<R> res = writeAsync(key, codec, command, params);
return get(res);
}

public <T, R> Future<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) { public <T, R> Future<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise(); Promise<R> mainPromise = connectionManager.newPromise();
int slot = connectionManager.calcSlot(key); int slot = connectionManager.calcSlot(key);
Expand Down

0 comments on commit 4b56c24

Please sign in to comment.