Skip to content

Commit

Permalink
RedissonBatch implemented. #42
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Jul 22, 2015
1 parent 10e83ba commit a5780e4
Show file tree
Hide file tree
Showing 12 changed files with 506 additions and 9 deletions.
209 changes: 209 additions & 0 deletions src/main/java/org/redisson/CommandBatchExecutorService.java
@@ -0,0 +1,209 @@
package org.redisson;

import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException;
import org.redisson.client.RedisMovedException;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.connection.ConnectionManager;

import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;

public class CommandBatchExecutorService extends CommandExecutorService {

public static class Entry {

Queue<CommandData<?, ?>> commands = PlatformDependent.newMpscQueue();

volatile boolean readOnlyMode = true;

public Queue<CommandData<?, ?>> getCommands() {
return commands;
}

public void setReadOnlyMode(boolean readOnlyMode) {
this.readOnlyMode = readOnlyMode;
}

public boolean isReadOnlyMode() {
return readOnlyMode;
}

}

private final ConcurrentMap<Integer, Entry> commands = PlatformDependent.newConcurrentHashMap();

public CommandBatchExecutorService(ConnectionManager connectionManager) {
super(connectionManager);
}

@Override
protected <V, R> void async(boolean readOnlyMode, int slot, MultiDecoder<Object> messageDecoder,
Codec codec, RedisCommand<V> command, Object[] params, Promise<R> mainPromise, int attempt) {
Entry entry = commands.get(slot);
if (entry == null) {
entry = new Entry();
Entry oldEntry = commands.putIfAbsent(slot, entry);
if (oldEntry != null) {
entry = oldEntry;
}
}

if (!readOnlyMode) {
entry.setReadOnlyMode(false);
}
entry.getCommands().add(new CommandData<V, R>(mainPromise, messageDecoder, codec, command, params));
}

public void execute() {
get(executeAsync());
}

public Promise<Void> executeAsync() {
Promise<Void> promise = connectionManager.newPromise();
for (java.util.Map.Entry<Integer, Entry> e : commands.entrySet()) {
execute(e.getValue(), e.getKey(), promise, new AtomicInteger(commands.size()), 0);
}
return promise;
}

public void execute(final Entry entry, final int slot, final Promise<Void> mainPromise, final AtomicInteger slots, final int attempt) {
final Promise<Void> attemptPromise = connectionManager.newPromise();
final AtomicReference<RedisException> ex = new AtomicReference<RedisException>();

TimerTask timerTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (attemptPromise.isDone()) {
return;
}
if (attempt == connectionManager.getConfig().getRetryAttempts()) {
attemptPromise.setFailure(ex.get());
return;
}
attemptPromise.cancel(true);

int count = attempt + 1;
execute(entry, slot, mainPromise, slots, count);
}
};

try {
org.redisson.client.RedisConnection connection;
if (entry.isReadOnlyMode()) {
connection = connectionManager.connectionReadOp(slot);
} else {
connection = connectionManager.connectionWriteOp(slot);
}

connection.send(new CommandsData(mainPromise, new ArrayList<CommandData<?, ?>>(entry.getCommands())));

ex.set(new RedisTimeoutException());
Timeout timeout = connectionManager.getTimer().newTimeout(timerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);

if (entry.isReadOnlyMode()) {
attemptPromise.addListener(connectionManager.createReleaseReadListener(slot, connection, timeout));
} else {
attemptPromise.addListener(connectionManager.createReleaseWriteListener(slot, connection, timeout));
}
} catch (RedisConnectionException e) {
ex.set(e);
connectionManager.getTimer().newTimeout(timerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
}
attemptPromise.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (future.isCancelled()) {
return;
}
// TODO cancel timeout

if (future.cause() instanceof RedisMovedException) {
RedisMovedException ex = (RedisMovedException)future.cause();
execute(entry, ex.getSlot(), mainPromise, slots, attempt);
return;
}

if (future.isSuccess()) {
if (slots.decrementAndGet() == 0) {
mainPromise.setSuccess(future.getNow());
}
} else {
mainPromise.setFailure(future.cause());
}
}
});
}

@Override
public <T, R> R evalRead(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys,
Object... params) {
throw new UnsupportedOperationException();
}

@Override
public <T, R> R evalRead(String key, Codec codec, RedisCommand<T> evalCommandType, String script,
List<Object> keys, Object... params) {
throw new UnsupportedOperationException();
}

@Override
public <T, R> R evalWrite(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys,
Object... params) {
throw new UnsupportedOperationException();
}

@Override
public <T, R> R evalWrite(String key, Codec codec, RedisCommand<T> evalCommandType, String script,
List<Object> keys, Object... params) {
throw new UnsupportedOperationException();
}

@Override
public <R> R read(String key, SyncOperation<R> operation) {
throw new UnsupportedOperationException();
}

@Override
public <R> R write(String key, SyncOperation<R> operation) {
throw new UnsupportedOperationException();
}

@Override
public <T, R> R read(String key, Codec codec, RedisCommand<T> command, Object... params) {
throw new UnsupportedOperationException();
}

@Override
public <T, R> R read(String key, RedisCommand<T> command, Object... params) {
throw new UnsupportedOperationException();
}

@Override
public <T, R> R write(String key, Codec codec, RedisCommand<T> command, Object... params) {
throw new UnsupportedOperationException();
}

@Override
public <T, R> R write(String key, RedisCommand<T> command, Object... params) {
throw new UnsupportedOperationException();
}

}
6 changes: 3 additions & 3 deletions src/main/java/org/redisson/CommandExecutorService.java
Expand Up @@ -53,9 +53,9 @@
*/ */
public class CommandExecutorService implements CommandExecutor { public class CommandExecutorService implements CommandExecutor {


private final Logger log = LoggerFactory.getLogger(getClass()); final Logger log = LoggerFactory.getLogger(getClass());


ConnectionManager connectionManager; final ConnectionManager connectionManager;


public CommandExecutorService(ConnectionManager connectionManager) { public CommandExecutorService(ConnectionManager connectionManager) {
this.connectionManager = connectionManager; this.connectionManager = connectionManager;
Expand Down Expand Up @@ -266,7 +266,7 @@ public <T, R> Future<R> writeAsync(String key, Codec codec, RedisCommand<T> comm
return mainPromise; return mainPromise;
} }


private <V, R> void async(final boolean readOnlyMode, final int slot, final MultiDecoder<Object> messageDecoder, final Codec codec, final RedisCommand<V> command, protected <V, R> void async(final boolean readOnlyMode, final int slot, final MultiDecoder<Object> messageDecoder, final Codec codec, final RedisCommand<V> command,
final Object[] params, final Promise<R> mainPromise, final int attempt) { final Object[] params, final Promise<R> mainPromise, final int attempt) {
final Promise<R> attemptPromise = connectionManager.newPromise(); final Promise<R> attemptPromise = connectionManager.newPromise();
final AtomicReference<RedisException> ex = new AtomicReference<RedisException>(); final AtomicReference<RedisException> ex = new AtomicReference<RedisException>();
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/org/redisson/Redisson.java
Expand Up @@ -27,6 +27,7 @@
import org.redisson.connection.SentinelConnectionManager; import org.redisson.connection.SentinelConnectionManager;
import org.redisson.connection.SingleConnectionManager; import org.redisson.connection.SingleConnectionManager;
import org.redisson.core.RAtomicLong; import org.redisson.core.RAtomicLong;
import org.redisson.core.RBatch;
import org.redisson.core.RBlockingQueue; import org.redisson.core.RBlockingQueue;
import org.redisson.core.RBucket; import org.redisson.core.RBucket;
import org.redisson.core.RCountDownLatch; import org.redisson.core.RCountDownLatch;
Expand Down Expand Up @@ -303,5 +304,10 @@ public void flushdb() {
commandExecutor.writeAllAsync(RedisCommands.FLUSHDB).awaitUninterruptibly(); commandExecutor.writeAllAsync(RedisCommands.FLUSHDB).awaitUninterruptibly();
} }


@Override
public RBatch createBatch() {
return new RedissonBatch(connectionManager);
}

} }


91 changes: 91 additions & 0 deletions src/main/java/org/redisson/RedissonBatch.java
@@ -0,0 +1,91 @@
package org.redisson;

import org.redisson.connection.ConnectionManager;
import org.redisson.core.RAtomicLongAsync;
import org.redisson.core.RBatch;
import org.redisson.core.RBlockingQueueAsync;
import org.redisson.core.RBucketAsync;
import org.redisson.core.RDequeAsync;
import org.redisson.core.RHyperLogLogAsync;
import org.redisson.core.RListAsync;
import org.redisson.core.RMapAsync;
import org.redisson.core.RQueueAsync;
import org.redisson.core.RScriptAsync;
import org.redisson.core.RSetAsync;
import org.redisson.core.RTopicAsync;

public class RedissonBatch implements RBatch {

CommandBatchExecutorService executorService;

public RedissonBatch(ConnectionManager connectionManager) {
super();
this.executorService = new CommandBatchExecutorService(connectionManager);
}

@Override
public <V> RBucketAsync<V> getBucket(String name) {
return new RedissonBucket<V>(executorService, name);
}

@Override
public <V> RHyperLogLogAsync<V> getHyperLogLog(String name) {
return new RedissonHyperLogLog<V>(executorService, name);
}

@Override
public <V> RListAsync<V> getList(String name) {
return new RedissonList<V>(executorService, name);
}

@Override
public <K, V> RMapAsync<K, V> getMap(String name) {
return new RedissonMap<K, V>(executorService, name);
}

@Override
public <V> RSetAsync<V> getSet(String name) {
return new RedissonSet<V>(executorService, name);
}

@Override
public <M> RTopicAsync<M> getTopic(String name) {
return new RedissonTopic<M>(executorService, name);
}

@Override
public <M> RTopicAsync<M> getTopicPattern(String pattern) {
return new RedissonTopicPattern<M>(executorService, pattern);
}

@Override
public <V> RQueueAsync<V> getQueue(String name) {
return new RedissonQueue<V>(executorService, name);
}

@Override
public <V> RBlockingQueueAsync<V> getBlockingQueue(String name) {
return new RedissonBlockingQueue<V>(executorService, name);
}

@Override
public <V> RDequeAsync<V> getDequeAsync(String name) {
return new RedissonDeque<V>(executorService, name);
}

@Override
public RAtomicLongAsync getAtomicLongAsync(String name) {
return new RedissonAtomicLong(executorService, name);
}

@Override
public RScriptAsync getScript() {
return new RedissonScript(executorService);
}

@Override
public void execute() {
executorService.execute();
}

}
10 changes: 10 additions & 0 deletions src/main/java/org/redisson/RedissonClient.java
Expand Up @@ -150,4 +150,14 @@ public interface RedissonClient {
*/ */
RScript getScript(); RScript getScript();


/**
* Return batch object which executes group of
* command in pipeline.
*
* See <a href="http://redis.io/topics/pipelining">http://redis.io/topics/pipelining</a>
*
* @return
*/
RBatch createBatch();

} }
5 changes: 2 additions & 3 deletions src/main/java/org/redisson/client/RedisConnection.java
Expand Up @@ -15,7 +15,6 @@
*/ */
package org.redisson.client; package org.redisson.client;


import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


import org.redisson.client.protocol.Codec; import org.redisson.client.protocol.Codec;
Expand Down Expand Up @@ -81,8 +80,8 @@ public <T, R> void send(CommandData<T, R> data) {
channel.writeAndFlush(data); channel.writeAndFlush(data);
} }


public void send(List<CommandData<? extends Object, ? extends Object>> data) { public void send(CommandsData data) {
channel.writeAndFlush(new CommandsData(data)); channel.writeAndFlush(data);
} }


public <T, R> R sync(Codec encoder, RedisCommand<T> command, Object ... params) { public <T, R> R sync(Codec encoder, RedisCommand<T> command, Object ... params) {
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/redisson/client/handler/CommandDecoder.java
Expand Up @@ -47,6 +47,8 @@
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;


/** /**
* Redis protocol command decoder
*
* Code parts from Sam Pullara * Code parts from Sam Pullara
* *
* @author Nikita Koksharov * @author Nikita Koksharov
Expand Down Expand Up @@ -121,6 +123,8 @@ public Object decode(ByteBuf buf, State state) {
cmd.getPromise().setFailure(e); cmd.getPromise().setFailure(e);
} }
} }

commands.getPromise().setSuccess(null);
} }


ctx.channel().attr(CommandsQueue.REPLAY).remove(); ctx.channel().attr(CommandsQueue.REPLAY).remove();
Expand Down

0 comments on commit a5780e4

Please sign in to comment.