Skip to content

Commit

Permalink
Command batches cluster redirect handling fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Apr 15, 2016
1 parent b60b170 commit 0d59eae
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 23 deletions.
Expand Up @@ -18,7 +18,7 @@
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;


class RedisRedirectException extends RedisException { public class RedisRedirectException extends RedisException {


private static final long serialVersionUID = 181505625075250011L; private static final long serialVersionUID = 181505625075250011L;


Expand Down
28 changes: 12 additions & 16 deletions src/main/java/org/redisson/client/handler/CommandDecoder.java
Expand Up @@ -111,7 +111,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
decode(in, cmd, null, ctx.channel()); decode(in, cmd, null, ctx.channel());
} }
} catch (IOException e) { } catch (IOException e) {
cmd.getPromise().tryFailure(e); cmd.tryFailure(e);
} }
} else if (data instanceof CommandsData) { } else if (data instanceof CommandsData) {
CommandsData commands = (CommandsData)data; CommandsData commands = (CommandsData)data;
Expand Down Expand Up @@ -175,14 +175,10 @@ private void decodeCommandBatch(ChannelHandlerContext ctx, ByteBuf in, QueueComm
decode(in, cmd, null, ctx.channel()); decode(in, cmd, null, ctx.channel());
i++; i++;
} catch (IOException e) { } catch (IOException e) {
cmd.getPromise().tryFailure(e); cmd.tryFailure(e);
} }
if (!cmd.getPromise().isSuccess()) { if (!cmd.isSuccess()) {
if (!(cmd.getPromise().cause() instanceof RedisMovedException error = (RedisException) cmd.cause();
|| cmd.getPromise().cause() instanceof RedisAskException
|| cmd.getPromise().cause() instanceof RedisLoadingException)) {
error = (RedisException) cmd.getPromise().cause();
}
} }
} }


Expand All @@ -197,7 +193,7 @@ private void decodeCommandBatch(ChannelHandlerContext ctx, ByteBuf in, QueueComm
log.warn("response has been skipped due to timeout! channel: {}, command: {}", ctx.channel(), data); log.warn("response has been skipped due to timeout! channel: {}, command: {}", ctx.channel(), data);
} }
} }

ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx.channel()); ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx.channel());


state(null); state(null);
Expand All @@ -222,24 +218,24 @@ private void decode(ByteBuf in, CommandData<Object, Object> data, List<Object> p
String[] errorParts = error.split(" "); String[] errorParts = error.split(" ");
int slot = Integer.valueOf(errorParts[1]); int slot = Integer.valueOf(errorParts[1]);
String addr = errorParts[2]; String addr = errorParts[2];
data.getPromise().tryFailure(new RedisMovedException(slot, addr)); data.tryFailure(new RedisMovedException(slot, addr));
} else if (error.startsWith("ASK")) { } else if (error.startsWith("ASK")) {
String[] errorParts = error.split(" "); String[] errorParts = error.split(" ");
int slot = Integer.valueOf(errorParts[1]); int slot = Integer.valueOf(errorParts[1]);
String addr = errorParts[2]; String addr = errorParts[2];
data.getPromise().tryFailure(new RedisAskException(slot, addr)); data.tryFailure(new RedisAskException(slot, addr));
} else if (error.startsWith("LOADING")) { } else if (error.startsWith("LOADING")) {
data.getPromise().tryFailure(new RedisLoadingException(error data.tryFailure(new RedisLoadingException(error
+ ". channel: " + channel + " data: " + data)); + ". channel: " + channel + " data: " + data));
} else if (error.startsWith("OOM")) { } else if (error.startsWith("OOM")) {
data.getPromise().tryFailure(new RedisOutOfMemoryException(error.split("OOM ")[1] data.tryFailure(new RedisOutOfMemoryException(error.split("OOM ")[1]
+ ". channel: " + channel + " data: " + data)); + ". channel: " + channel + " data: " + data));
} else if (error.contains("-OOM ")) { } else if (error.contains("-OOM ")) {
data.getPromise().tryFailure(new RedisOutOfMemoryException(error.split("-OOM ")[1] data.tryFailure(new RedisOutOfMemoryException(error.split("-OOM ")[1]
+ ". channel: " + channel + " data: " + data)); + ". channel: " + channel + " data: " + data));
} else { } else {
if (data != null) { if (data != null) {
data.getPromise().tryFailure(new RedisException(error + ". channel: " + channel + " command: " + data)); data.tryFailure(new RedisException(error + ". channel: " + channel + " command: " + data));
} else { } else {
log.error("Error: {} channel: {} data: {}", error, channel, data); log.error("Error: {} channel: {} data: {}", error, channel, data);
} }
Expand Down Expand Up @@ -346,7 +342,7 @@ private void handleResult(CommandData<Object, Object> data, List<Object> parts,
if (parts != null) { if (parts != null) {
parts.add(result); parts.add(result);
} else { } else {
if (!data.getPromise().trySuccess(result) && data.getPromise().cause() instanceof RedisTimeoutException) { if (!data.getPromise().trySuccess(result) && data.cause() instanceof RedisTimeoutException) {
log.warn("response has been skipped due to timeout! channel: {}, command: {}, result: {}", channel, data, result); log.warn("response has been skipped due to timeout! channel: {}, command: {}, result: {}", channel, data, result);
} }
} }
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/org/redisson/client/handler/StateLevel.java
@@ -1,3 +1,18 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.handler; package org.redisson.client.handler;


import java.util.List; import java.util.List;
Expand Down
62 changes: 62 additions & 0 deletions src/main/java/org/redisson/client/protocol/BatchCommandData.java
@@ -0,0 +1,62 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol;

import java.util.concurrent.atomic.AtomicReference;

import org.redisson.client.RedisRedirectException;
import org.redisson.client.codec.Codec;

import io.netty.util.concurrent.Promise;

public class BatchCommandData<T, R> extends CommandData<T, R> {

private final AtomicReference<RedisRedirectException> redirectError = new AtomicReference<RedisRedirectException>();

public BatchCommandData(Promise<R> promise, Codec codec, RedisCommand<T> command, Object[] params) {
super(promise, codec, command, params);
}

@Override
public boolean tryFailure(Throwable cause) {
if (redirectError.get() != null) {
return false;
}
if (cause instanceof RedisRedirectException) {
return redirectError.compareAndSet(null, (RedisRedirectException) cause);
}

return super.tryFailure(cause);
}

@Override
public boolean isSuccess() {
return redirectError.get() == null && super.isSuccess();
}

@Override
public Throwable cause() {
if (redirectError.get() != null) {
return redirectError.get();
}
return super.cause();
}

public void clearError() {
redirectError.set(null);
}

}
12 changes: 12 additions & 0 deletions src/main/java/org/redisson/client/protocol/CommandData.java
Expand Up @@ -59,6 +59,18 @@ public MultiDecoder<Object> getMessageDecoder() {
public Promise<R> getPromise() { public Promise<R> getPromise() {
return promise; return promise;
} }

public Throwable cause() {
return promise.cause();
}

public boolean isSuccess() {
return promise.isSuccess();
}

public boolean tryFailure(Throwable cause) {
return promise.tryFailure(cause);
}


public Codec getCodec() { public Codec getCodec() {
return codec; return codec;
Expand Down
@@ -1,3 +1,18 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder; package org.redisson.client.protocol.decoder;


public interface DecoderState { public interface DecoderState {
Expand Down
24 changes: 18 additions & 6 deletions src/main/java/org/redisson/command/CommandBatchService.java
Expand Up @@ -31,6 +31,7 @@
import org.redisson.client.WriteRedisConnectionException; import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.BatchCommandData;
import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData; import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
Expand All @@ -52,16 +53,16 @@ public class CommandBatchService extends CommandReactiveService {


public static class CommandEntry implements Comparable<CommandEntry> { public static class CommandEntry implements Comparable<CommandEntry> {


final CommandData<?, ?> command; final BatchCommandData<?, ?> command;
final int index; final int index;


public CommandEntry(CommandData<?, ?> command, int index) { public CommandEntry(BatchCommandData<?, ?> command, int index) {
super(); super();
this.command = command; this.command = command;
this.index = index; this.index = index;
} }


public CommandData<?, ?> getCommand() { public BatchCommandData<?, ?> getCommand() {
return command; return command;
} }


Expand Down Expand Up @@ -89,14 +90,20 @@ public void setReadOnlyMode(boolean readOnlyMode) {
public boolean isReadOnlyMode() { public boolean isReadOnlyMode() {
return readOnlyMode; return readOnlyMode;
} }

public void clearErrors() {
for (CommandEntry commandEntry : commands) {
commandEntry.getCommand().clearError();
}
}


} }


private final AtomicInteger index = new AtomicInteger(); private final AtomicInteger index = new AtomicInteger();


private ConcurrentMap<Integer, Entry> commands = PlatformDependent.newConcurrentHashMap(); private ConcurrentMap<Integer, Entry> commands = PlatformDependent.newConcurrentHashMap();


private boolean executed; private volatile boolean executed;


public CommandBatchService(ConnectionManager connectionManager) { public CommandBatchService(ConnectionManager connectionManager) {
super(connectionManager); super(connectionManager);
Expand All @@ -106,7 +113,7 @@ public CommandBatchService(ConnectionManager connectionManager) {
protected <V, R> void async(boolean readOnlyMode, NodeSource nodeSource, protected <V, R> void async(boolean readOnlyMode, NodeSource nodeSource,
Codec codec, RedisCommand<V> command, Object[] params, Promise<R> mainPromise, int attempt) { Codec codec, RedisCommand<V> command, Object[] params, Promise<R> mainPromise, int attempt) {
if (executed) { if (executed) {
throw new IllegalStateException("Batch already executed!"); throw new IllegalStateException("Batch already has been executed!");
} }
Entry entry = commands.get(nodeSource.getSlot()); Entry entry = commands.get(nodeSource.getSlot());
if (entry == null) { if (entry == null) {
Expand All @@ -120,7 +127,9 @@ protected <V, R> void async(boolean readOnlyMode, NodeSource nodeSource,
if (!readOnlyMode) { if (!readOnlyMode) {
entry.setReadOnlyMode(false); entry.setReadOnlyMode(false);
} }
entry.getCommands().add(new CommandEntry(new CommandData<V, R>(mainPromise, codec, command, params), index.incrementAndGet()));
BatchCommandData<V, R> commandData = new BatchCommandData<V, R>(mainPromise, codec, command, params);
entry.getCommands().add(new CommandEntry(commandData, index.incrementAndGet()));
} }


public List<?> execute() { public List<?> execute() {
Expand Down Expand Up @@ -278,15 +287,18 @@ public void operationComplete(Future<Void> future) throws Exception {


if (future.cause() instanceof RedisMovedException) { if (future.cause() instanceof RedisMovedException) {
RedisMovedException ex = (RedisMovedException)future.cause(); RedisMovedException ex = (RedisMovedException)future.cause();
entry.clearErrors();
execute(entry, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), mainPromise, slots, attempt); execute(entry, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), mainPromise, slots, attempt);
return; return;
} }
if (future.cause() instanceof RedisAskException) { if (future.cause() instanceof RedisAskException) {
RedisAskException ex = (RedisAskException)future.cause(); RedisAskException ex = (RedisAskException)future.cause();
entry.clearErrors();
execute(entry, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), mainPromise, slots, attempt); execute(entry, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), mainPromise, slots, attempt);
return; return;
} }
if (future.cause() instanceof RedisLoadingException) { if (future.cause() instanceof RedisLoadingException) {
entry.clearErrors();
execute(entry, source, mainPromise, slots, attempt); execute(entry, source, mainPromise, slots, attempt);
return; return;
} }
Expand Down

0 comments on commit 0d59eae

Please sign in to comment.