Skip to content

Commit

Permalink
RBatchOptions.executionMode setting added. #1479
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Jun 11, 2018
1 parent a73ceb9 commit df15195
Show file tree
Hide file tree
Showing 12 changed files with 801 additions and 184 deletions.
6 changes: 3 additions & 3 deletions redisson/src/main/java/org/redisson/RedissonBatch.java
Expand Up @@ -61,7 +61,7 @@ public class RedissonBatch implements RBatch {
private final BatchOptions options;

public RedissonBatch(EvictionScheduler evictionScheduler, ConnectionManager connectionManager, BatchOptions options) {
this.executorService = new CommandBatchService(connectionManager);
this.executorService = new CommandBatchService(connectionManager, options);
this.evictionScheduler = evictionScheduler;
this.options = options;
}
Expand Down Expand Up @@ -264,12 +264,12 @@ public RBatch timeout(long timeout, TimeUnit unit) {

@Override
public BatchResult<?> execute() {
return executorService.execute(options);
return executorService.execute(BatchOptions.defaults());
}

@Override
public RFuture<BatchResult<?>> executeAsync() {
return executorService.executeAsync(options);
return executorService.executeAsync(BatchOptions.defaults());
}

@Override
Expand Down
79 changes: 68 additions & 11 deletions redisson/src/main/java/org/redisson/api/BatchOptions.java
Expand Up @@ -18,21 +18,61 @@
import java.util.concurrent.TimeUnit;

/**
* Configuration for Batch.
* Configuration for Batch objecct.
*
* @author Nikita Koksharov
*
*/
public class BatchOptions {

public enum ExecutionMode {

/**
* Store batched invocations in Redis and execute them atomically as a single command.
* <p>
* Please note, that in cluster mode all objects should be on the same cluster slot.
* https://github.com/antirez/redis/issues/3682
*
*/
REDIS_READ_ATOMIC,

/**
* Store batched invocations in Redis and execute them atomically as a single command.
* <p>
* Please note, that in cluster mode all objects should be on the same cluster slot.
* https://github.com/antirez/redis/issues/3682
*
*/
REDIS_WRITE_ATOMIC,

/**
* Store batched invocations in memory on Redisson side and execute them on Redis.
* <p>
* Default mode
*
*/
IN_MEMORY,

/**
* Store batched invocations on Redisson side and executes them atomically on Redis as a single command.
* <p>
* Please note, that in cluster mode all objects should be on the same cluster slot.
* https://github.com/antirez/redis/issues/3682
*
*/
IN_MEMORY_ATOMIC,

}

private ExecutionMode executionMode = ExecutionMode.IN_MEMORY;

private long responseTimeout;
private int retryAttempts;
private long retryInterval;

private long syncTimeout;
private int syncSlaves;
private boolean skipResult;
private boolean atomic;

private BatchOptions() {
}
Expand Down Expand Up @@ -122,20 +162,14 @@ public int getSyncSlaves() {
}

/**
* Switches batch to atomic mode. Redis atomically executes all commands of this batch as a single command.
* <p>
* Please note, that in cluster mode all objects should be on the same cluster slot.
* https://github.com/antirez/redis/issues/3682
* Use {@link #executionMode(ExecutionMode)} setting instead
*
* @return self instance
*/
@Deprecated
public BatchOptions atomic() {
atomic = true;
executionMode = ExecutionMode.IN_MEMORY_ATOMIC;
return this;
}
public boolean isAtomic() {
return atomic;
}

/**
* Inform Redis not to send reply. This allows to save network traffic for commands with batch with big response.
Expand All @@ -152,4 +186,27 @@ public boolean isSkipResult() {
return skipResult;
}

/**
* Sets execution mode.
*
* @see ExecutionMode
*
* @param executionMode - batch execution mode
* @return self instance
*/
public BatchOptions executionMode(ExecutionMode executionMode) {
this.executionMode = executionMode;
return this;
}
public ExecutionMode getExecutionMode() {
return executionMode;
}

@Override
public String toString() {
return "BatchOptions [queueStore=" + executionMode + "]";
}



}
Expand Up @@ -198,6 +198,8 @@ private void decodeFromCheckpoint(ChannelHandlerContext ctx, ByteBuf in, QueueCo
}
}
}

ThreadLocal<List<CommandData<?, ?>>> commandsData = new ThreadLocal<List<CommandData<?, ?>>>();

private void decodeCommandBatch(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data,
CommandsData commandBatch) throws Exception {
Expand All @@ -214,9 +216,22 @@ private void decodeCommandBatch(ChannelHandlerContext ctx, ByteBuf in, QueueComm
|| RedisCommands.EXEC.getName().equals(cmd.getName())
|| RedisCommands.WAIT.getName().equals(cmd.getName())) {
commandData = (CommandData<Object, Object>) commandBatch.getCommands().get(i);
if (RedisCommands.EXEC.getName().equals(cmd.getName())) {
if (commandBatch.getAttachedCommands() != null) {
commandsData.set(commandBatch.getAttachedCommands());
} else {
commandsData.set(commandBatch.getCommands());
}
}
}

decode(in, commandData, null, ctx.channel());
try {
decode(in, commandData, null, ctx.channel());
} finally {
if (commandData != null && RedisCommands.EXEC.getName().equals(commandData.getCommand().getName())) {
commandsData.remove();
}
}

if (commandData != null && RedisCommands.EXEC.getName().equals(commandData.getCommand().getName())
&& commandData.getPromise().isSuccess()) {
Expand All @@ -230,7 +245,7 @@ private void decodeCommandBatch(ChannelHandlerContext ctx, ByteBuf in, QueueComm
}
Object res = iter.next();

handleResult((CommandData<Object, Object>) command, null, res, false, ctx.channel());
completeResponse((CommandData<Object, Object>) command, res, ctx.channel());
}

if (RedisCommands.MULTI.getName().equals(command.getCommand().getName())) {
Expand Down Expand Up @@ -365,13 +380,33 @@ protected void decode(ByteBuf in, CommandData<Object, Object> data, List<Object>
}
}

@SuppressWarnings("unchecked")
private void decodeList(ByteBuf in, CommandData<Object, Object> data, List<Object> parts,
Channel channel, long size, List<Object> respParts)
throws IOException {
for (int i = respParts.size(); i < size; i++) {
decode(in, data, respParts, channel);
if (state().isMakeCheckpoint()) {
checkpoint();
if (parts == null && commandsData.get() != null) {
List<CommandData<?, ?>> commands = commandsData.get();
for (int i = respParts.size(); i < size; i++) {
int suffix = 0;
if (RedisCommands.MULTI.getName().equals(commands.get(0).getCommand().getName())) {
suffix = 1;
}
CommandData<Object, Object> commandData = (CommandData<Object, Object>) commands.get(i+suffix);
decode(in, commandData, respParts, channel);
if (commandData.getPromise().isDone() && !commandData.getPromise().isSuccess()) {
data.tryFailure(commandData.cause());
}

if (state().isMakeCheckpoint()) {
checkpoint();
}
}
} else {
for (int i = respParts.size(); i < size; i++) {
decode(in, data, respParts, channel);
if (state().isMakeCheckpoint()) {
checkpoint();
}
}
}

Expand Down Expand Up @@ -402,9 +437,13 @@ private void handleResult(CommandData<Object, Object> data, List<Object> parts,
if (parts != null) {
parts.add(result);
} else {
if (data != null && !data.getPromise().trySuccess(result) && data.cause() instanceof RedisTimeoutException) {
log.warn("response has been skipped due to timeout! channel: {}, command: {}, result: {}", channel, LogHelper.toString(data), LogHelper.toString(result));
}
completeResponse(data, result, channel);
}
}

protected void completeResponse(CommandData<Object, Object> data, Object result, Channel channel) {
if (data != null && !data.getPromise().trySuccess(result) && data.cause() instanceof RedisTimeoutException) {
log.warn("response has been skipped due to timeout! channel: {}, command: {}, result: {}", channel, LogHelper.toString(data), LogHelper.toString(result));
}
}

Expand All @@ -425,9 +464,11 @@ protected Decoder<Object> selectDecoder(CommandData<Object, Object> data, List<O
Decoder<Object> decoder = data.getCommand().getReplayDecoder();
if (parts != null) {
MultiDecoder<Object> multiDecoder = data.getCommand().getReplayMultiDecoder();
Decoder<Object> mDecoder = multiDecoder.getDecoder(parts.size(), state());
if (mDecoder != null) {
decoder = mDecoder;
if (multiDecoder != null) {
Decoder<Object> mDecoder = multiDecoder.getDecoder(parts.size(), state());
if (mDecoder != null) {
decoder = mDecoder;
}
}
}
if (decoder == null) {
Expand Down
Expand Up @@ -28,20 +28,31 @@
public class CommandsData implements QueueCommand {

private final List<CommandData<?, ?>> commands;
private final List<CommandData<?, ?>> attachedCommands;
private final RPromise<Void> promise;
private final boolean skipResult;
private final boolean atomic;

public CommandsData(RPromise<Void> promise, List<CommandData<?, ?>> commands) {
this(promise, commands, false, false);
this(promise, commands, null);
}

public CommandsData(RPromise<Void> promise, List<CommandData<?, ?>> commands, List<CommandData<?, ?>> attachedCommands) {
this(promise, commands, attachedCommands, false, false);
}


public CommandsData(RPromise<Void> promise, List<CommandData<?, ?>> commands, boolean skipResult, boolean atomic) {
this(promise, commands, null, skipResult, atomic);
}

public CommandsData(RPromise<Void> promise, List<CommandData<?, ?>> commands, List<CommandData<?, ?>> attachedCommands, boolean skipResult, boolean atomic) {
super();
this.promise = promise;
this.commands = commands;
this.skipResult = skipResult;
this.atomic = atomic;
this.attachedCommands = attachedCommands;
}

public RPromise<Void> getPromise() {
Expand All @@ -56,6 +67,10 @@ public boolean isSkipResult() {
return skipResult;
}

public List<CommandData<?, ?>> getAttachedCommands() {
return attachedCommands;
}

public List<CommandData<?, ?>> getCommands() {
return commands;
}
Expand Down
59 changes: 59 additions & 0 deletions redisson/src/main/java/org/redisson/command/BatchPromise.java
@@ -0,0 +1,59 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.command;

import java.util.concurrent.atomic.AtomicBoolean;

import org.redisson.api.RFuture;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;

/**
*
* @author Nikita Koksharov
*
*/
public class BatchPromise<T> extends RedissonPromise<T> {

private final AtomicBoolean executed;
private final RFuture<Void> sentPromise = new RedissonPromise<Void>();

public BatchPromise(AtomicBoolean executed) {
super();
this.executed = executed;
}

public RFuture<Void> getSentPromise() {
return sentPromise;
}

@Override
public RPromise<T> sync() throws InterruptedException {
if (executed.get()) {
return super.sync();
}
return (RPromise<T>) sentPromise.sync();
}

@Override
public RPromise<T> syncUninterruptibly() {
if (executed.get()) {
return super.syncUninterruptibly();
}
return (RPromise<T>) sentPromise.syncUninterruptibly();
}

}

0 comments on commit df15195

Please sign in to comment.