Skip to content

Commit

Permalink
RBatch response ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Jul 22, 2015
1 parent a5780e4 commit 67d6fef
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 16 deletions.
86 changes: 76 additions & 10 deletions src/main/java/org/redisson/CommandBatchExecutorService.java
@@ -1,6 +1,7 @@
package org.redisson;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -28,13 +29,36 @@

public class CommandBatchExecutorService extends CommandExecutorService {


public static class CommandEntry implements Comparable<CommandEntry> {

final CommandData<?, ?> command;
final int index;

public CommandEntry(CommandData<?, ?> command, int index) {
super();
this.command = command;
this.index = index;
}

public CommandData<?, ?> getCommand() {
return command;
}

@Override
public int compareTo(CommandEntry o) {
return index - o.index;
}

}

public static class Entry {

Queue<CommandData<?, ?>> commands = PlatformDependent.newMpscQueue();
Queue<CommandEntry> commands = PlatformDependent.newMpscQueue();

volatile boolean readOnlyMode = true;

public Queue<CommandData<?, ?>> getCommands() {
public Queue<CommandEntry> getCommands() {
return commands;
}

Expand All @@ -48,7 +72,11 @@ public boolean isReadOnlyMode() {

}

private final ConcurrentMap<Integer, Entry> commands = PlatformDependent.newConcurrentHashMap();
private final AtomicInteger index = new AtomicInteger();

private ConcurrentMap<Integer, Entry> commands = PlatformDependent.newConcurrentHashMap();

private boolean executed;

public CommandBatchExecutorService(ConnectionManager connectionManager) {
super(connectionManager);
Expand All @@ -57,6 +85,9 @@ public CommandBatchExecutorService(ConnectionManager connectionManager) {
@Override
protected <V, R> void async(boolean readOnlyMode, int slot, MultiDecoder<Object> messageDecoder,
Codec codec, RedisCommand<V> command, Object[] params, Promise<R> mainPromise, int attempt) {
if (executed) {
throw new IllegalStateException("Batch already executed!");
}
Entry entry = commands.get(slot);
if (entry == null) {
entry = new Entry();
Expand All @@ -69,17 +100,48 @@ protected <V, R> void async(boolean readOnlyMode, int slot, MultiDecoder<Object>
if (!readOnlyMode) {
entry.setReadOnlyMode(false);
}
entry.getCommands().add(new CommandData<V, R>(mainPromise, messageDecoder, codec, command, params));
entry.getCommands().add(new CommandEntry(new CommandData<V, R>(mainPromise, messageDecoder, codec, command, params), index.incrementAndGet()));
}

public void execute() {
get(executeAsync());
public List<?> execute() {
return get(executeAsync());
}

public Promise<Void> executeAsync() {
Promise<Void> promise = connectionManager.newPromise();
public Future<List<?>> executeAsync() {
if (executed) {
throw new IllegalStateException("Batch already executed!");
}

if (commands.isEmpty()) {
return connectionManager.getGroup().next().newSucceededFuture(null);
}
executed = true;

Promise<Void> voidPromise = connectionManager.newPromise();
final Promise<List<?>> promise = connectionManager.newPromise();
voidPromise.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return;
}

List<CommandEntry> entries = new ArrayList<CommandEntry>();
for (Entry e : commands.values()) {
entries.addAll(e.getCommands());
}
Collections.sort(entries);
List<Object> result = new ArrayList<Object>();
for (CommandEntry commandEntry : entries) {
result.add(commandEntry.getCommand().getPromise().getNow());
}
promise.setSuccess(result);
commands = null;
}
});
for (java.util.Map.Entry<Integer, Entry> e : commands.entrySet()) {
execute(e.getValue(), e.getKey(), promise, new AtomicInteger(commands.size()), 0);
execute(e.getValue(), e.getKey(), voidPromise, new AtomicInteger(commands.size()), 0);
}
return promise;
}
Expand Down Expand Up @@ -113,7 +175,11 @@ public void run(Timeout timeout) throws Exception {
connection = connectionManager.connectionWriteOp(slot);
}

connection.send(new CommandsData(mainPromise, new ArrayList<CommandData<?, ?>>(entry.getCommands())));
ArrayList<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(entry.getCommands().size());
for (CommandEntry c : entry.getCommands()) {
list.add(c.getCommand());
}
connection.send(new CommandsData(mainPromise, list));

ex.set(new RedisTimeoutException());
Timeout timeout = connectionManager.getTimer().newTimeout(timerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);
Expand Down
13 changes: 11 additions & 2 deletions src/main/java/org/redisson/RedissonBatch.java
@@ -1,5 +1,7 @@
package org.redisson;

import java.util.List;

import org.redisson.connection.ConnectionManager;
import org.redisson.core.RAtomicLongAsync;
import org.redisson.core.RBatch;
Expand All @@ -14,6 +16,8 @@
import org.redisson.core.RSetAsync;
import org.redisson.core.RTopicAsync;

import io.netty.util.concurrent.Future;

public class RedissonBatch implements RBatch {

CommandBatchExecutorService executorService;
Expand Down Expand Up @@ -84,8 +88,13 @@ public RScriptAsync getScript() {
}

@Override
public void execute() {
executorService.execute();
public List<?> execute() {
return executorService.execute();
}

@Override
public Future<List<?>> executeAsync() {
return executorService.executeAsync();
}

}
24 changes: 23 additions & 1 deletion src/main/java/org/redisson/core/RBatch.java
Expand Up @@ -15,6 +15,10 @@
*/
package org.redisson.core;

import java.util.List;

import io.netty.util.concurrent.Future;

/**
* Interface for using pipeline feature.
*
Expand Down Expand Up @@ -129,6 +133,24 @@ public interface RBatch {
*/
RScriptAsync getScript();

void execute();
/**
* Executes all operations accumulated during async methods invocations.
*
* In cluster configurations operations grouped by slot ids
* so may be executed on different servers. Thus command execution order could be changed
*
* @return
*/
List<?> execute();

/**
* Executes all operations accumulated during async methods invocations asynchronously.
*
* In cluster configurations operations grouped by slot ids
* so may be executed on different servers. Thus command execution order could be changed
*
* @return
*/
Future<List<?>> executeAsync();

}
30 changes: 27 additions & 3 deletions src/test/java/org/redisson/RedissonBatchTest.java
@@ -1,6 +1,7 @@
package org.redisson;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.junit.Assert;
Expand All @@ -9,19 +10,42 @@

public class RedissonBatchTest extends BaseTest {

@Test(expected=IllegalStateException.class)
public void testTwice() {
RBatch batch = redisson.createBatch();
batch.getMap("test").putAsync("1", "2");
batch.execute();
batch.execute();
}


@Test
public void testEmpty() {
RBatch batch = redisson.createBatch();
batch.execute();
}

@Test
public void test() {
RBatch batch = redisson.createBatch();
batch.getMap("test").fastPutAsync("1", "2");
batch.getMap("test").fastPutAsync("2", "3");
batch.getMap("test").putAsync("2", "5");
batch.getAtomicLongAsync("counter").incrementAndGetAsync();
batch.getAtomicLongAsync("counter").incrementAndGetAsync();
batch.execute();

List<?> res = batch.execute();
Assert.assertEquals(5, res.size());
Assert.assertTrue((Boolean)res.get(0));
Assert.assertTrue((Boolean)res.get(1));
Assert.assertEquals("3", res.get(2));
Assert.assertEquals(1L, res.get(3));
Assert.assertEquals(2L, res.get(4));

Map<String, String> map = new HashMap<String, String>();
map.put("1", "2");
map.put("2", "3");
Assert.assertEquals(redisson.getMap("test"), map);
map.put("2", "5");
Assert.assertEquals(map, redisson.getMap("test"));

Assert.assertEquals(redisson.getAtomicLong("counter").get(), 2);
}
Expand Down

0 comments on commit 67d6fef

Please sign in to comment.