Skip to content

Commit

Permalink
retryAttempts, retryInterval, timeout methods added to RBatch object
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Apr 5, 2017
1 parent bdf473a commit d4e8e9e
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 34 deletions.
31 changes: 27 additions & 4 deletions redisson/src/main/java/org/redisson/RedissonBatch.java
Expand Up @@ -17,6 +17,7 @@


import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit;


import org.redisson.api.RAtomicDoubleAsync; import org.redisson.api.RAtomicDoubleAsync;
import org.redisson.api.RAtomicLongAsync; import org.redisson.api.RAtomicLongAsync;
Expand Down Expand Up @@ -59,6 +60,10 @@ public class RedissonBatch implements RBatch {
private final CommandBatchService executorService; private final CommandBatchService executorService;
private final UUID id; private final UUID id;


private long timeout;
private int retryAttempts;
private long retryInterval;

protected RedissonBatch(UUID id, EvictionScheduler evictionScheduler, ConnectionManager connectionManager) { protected RedissonBatch(UUID id, EvictionScheduler evictionScheduler, ConnectionManager connectionManager) {
this.executorService = new CommandBatchService(connectionManager); this.executorService = new CommandBatchService(connectionManager);
this.evictionScheduler = evictionScheduler; this.evictionScheduler = evictionScheduler;
Expand Down Expand Up @@ -225,24 +230,42 @@ public <V> RSetCacheAsync<V> getSetCache(String name, Codec codec) {
return new RedissonSetCache<V>(codec, evictionScheduler, executorService, name, null); return new RedissonSetCache<V>(codec, evictionScheduler, executorService, name, null);
} }


@Override
public RBatch retryAttempts(int retryAttempts) {
this.retryAttempts = retryAttempts;
return this;
}

@Override
public RBatch retryInterval(long retryInterval, TimeUnit unit) {
this.retryInterval = unit.toMillis(retryInterval);
return this;
}

@Override
public RBatch timeout(long timeout, TimeUnit unit) {
this.timeout = unit.toMillis(timeout);
return this;
}

@Override @Override
public List<?> execute() { public List<?> execute() {
return executorService.execute(); return executorService.execute(timeout, retryAttempts, retryInterval);
} }


@Override @Override
public void executeSkipResult() { public void executeSkipResult() {
executorService.executeSkipResult(); executorService.executeSkipResult(timeout, retryAttempts, retryInterval);
} }


@Override @Override
public RFuture<Void> executeSkipResultAsync() { public RFuture<Void> executeSkipResultAsync() {
return executorService.executeSkipResultAsync(); return executorService.executeSkipResultAsync(timeout, retryAttempts, retryInterval);
} }


@Override @Override
public RFuture<List<?>> executeAsync() { public RFuture<List<?>> executeAsync() {
return executorService.executeAsync(); return executorService.executeAsync(timeout, retryAttempts, retryInterval);
} }


@Override @Override
Expand Down
45 changes: 44 additions & 1 deletion redisson/src/main/java/org/redisson/api/RBatch.java
Expand Up @@ -16,6 +16,7 @@
package org.redisson.api; package org.redisson.api;


import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;


import org.redisson.client.RedisException; import org.redisson.client.RedisException;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
Expand All @@ -27,7 +28,7 @@
* from this interface are batched to separate queue and could be executed later * from this interface are batched to separate queue and could be executed later
* with <code>execute()</code> or <code>executeAsync()</code> methods. * with <code>execute()</code> or <code>executeAsync()</code> methods.
* <p> * <p>
* Please be ware, atomicity <b>is not</b> guaranteed. * Please be aware, atomicity <b>is not</b> guaranteed.
* *
* *
* @author Nikita Koksharov * @author Nikita Koksharov
Expand Down Expand Up @@ -426,4 +427,46 @@ public interface RBatch {
* *
*/ */
RFuture<Void> executeSkipResultAsync(); RFuture<Void> executeSkipResultAsync();

/**
* Defines timeout for Redis response.
* Starts to countdown when Redis command has been successfully sent.
* <p>
* <code>0</code> value means use <code>Config.setTimeout</code> value instead.
* <p>
* Default is <code>0</code>
*
* @param timeout value
* @param unit value
* @return self instance
*/
RBatch timeout(long timeout, TimeUnit unit);

/**
* Defines time interval for another one attempt send Redis commands batch
* if it hasn't been sent already.
* <p>
* <code>0</code> value means use <code>Config.setRetryInterval</code> value instead.
* <p>
* Default is <code>0</code>
*
* @param retryInterval value
* @param unit value
* @return self instance
*/
RBatch retryInterval(long retryInterval, TimeUnit unit);

/**
* Defines attempts amount to re-send Redis commands batch
* if it hasn't been sent already.
* <p>
* <code>0</code> value means use <code>Config.setRetryAttempts</code> value instead.
* <p>
* Default is <code>0</code>
*
* @param retryAttempts value
* @return self instance
*/
RBatch retryAttempts(int retryAttempts);

} }
Expand Up @@ -127,14 +127,18 @@ protected <V, R> void async(boolean readOnlyMode, NodeSource nodeSource,
} }


public List<?> execute() { public List<?> execute() {
return get(executeAsync()); return get(executeAsync(0, 0, 0));
}

public List<?> execute(long responseTimeout, int retryAttempts, long retryInterval) {
return get(executeAsync(responseTimeout, retryAttempts, retryInterval));
} }


public RFuture<Void> executeAsyncVoid() { public RFuture<Void> executeAsyncVoid() {
return executeAsyncVoid(false); return executeAsyncVoid(false, 0, 0, 0);
} }


private RFuture<Void> executeAsyncVoid(boolean noResult) { private RFuture<Void> executeAsyncVoid(boolean noResult, long responseTimeout, int retryAttempts, long retryInterval) {
if (executed) { if (executed) {
throw new IllegalStateException("Batch already executed!"); throw new IllegalStateException("Batch already executed!");
} }
Expand All @@ -146,11 +150,11 @@ private RFuture<Void> executeAsyncVoid(boolean noResult) {
if (noResult) { if (noResult) {
for (Entry entry : commands.values()) { for (Entry entry : commands.values()) {
RPromise<Object> s = connectionManager.newPromise(); RPromise<Object> s = connectionManager.newPromise();
BatchCommandData commandData = new BatchCommandData(s, null, RedisCommands.CLIENT_REPLY, new Object[] { "OFF" }, index.incrementAndGet()); BatchCommandData<?, ?> offCommand = new BatchCommandData(s, null, RedisCommands.CLIENT_REPLY, new Object[] { "OFF" }, index.incrementAndGet());
entry.getCommands().addFirst(commandData); entry.getCommands().addFirst(offCommand);
RPromise<Object> s1 = connectionManager.newPromise(); RPromise<Object> s1 = connectionManager.newPromise();
BatchCommandData commandData1 = new BatchCommandData(s1, null, RedisCommands.CLIENT_REPLY, new Object[] { "ON" }, index.incrementAndGet()); BatchCommandData<?, ?> onCommand = new BatchCommandData(s1, null, RedisCommands.CLIENT_REPLY, new Object[] { "ON" }, index.incrementAndGet());
entry.getCommands().add(commandData1); entry.getCommands().add(onCommand);
} }
} }


Expand All @@ -166,20 +170,24 @@ public void operationComplete(Future<Void> future) throws Exception {


AtomicInteger slots = new AtomicInteger(commands.size()); AtomicInteger slots = new AtomicInteger(commands.size());
for (java.util.Map.Entry<MasterSlaveEntry, Entry> e : commands.entrySet()) { for (java.util.Map.Entry<MasterSlaveEntry, Entry> e : commands.entrySet()) {
execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0, true); execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0, true, responseTimeout, retryAttempts, retryInterval);
} }
return voidPromise; return voidPromise;
} }


public void executeSkipResult() { public void executeSkipResult(long timeout, int retryAttempts, long retryInterval) {
get(executeSkipResultAsync()); get(executeSkipResultAsync(timeout, retryAttempts, retryInterval));
} }


public RFuture<Void> executeSkipResultAsync() { public RFuture<Void> executeSkipResultAsync(long timeout, int retryAttempts, long retryInterval) {
return executeAsyncVoid(true); return executeAsyncVoid(true, timeout, retryAttempts, retryInterval);
} }


public RFuture<List<?>> executeAsync() { public RFuture<List<?>> executeAsync() {
return executeAsync(0, 0, 0);
}

public RFuture<List<?>> executeAsync(long responseTimeout, int retryAttempts, long retryInterval) {
if (executed) { if (executed) {
throw new IllegalStateException("Batch already executed!"); throw new IllegalStateException("Batch already executed!");
} }
Expand All @@ -200,7 +208,7 @@ public void operationComplete(Future<Void> future) throws Exception {
return; return;
} }


List<BatchCommandData> entries = new ArrayList<BatchCommandData>(); List<BatchCommandData<?, ?>> entries = new ArrayList<BatchCommandData<?, ?>>();
for (Entry e : commands.values()) { for (Entry e : commands.values()) {
entries.addAll(e.getCommands()); entries.addAll(e.getCommands());
} }
Expand All @@ -223,12 +231,13 @@ public void operationComplete(Future<Void> future) throws Exception {


AtomicInteger slots = new AtomicInteger(commands.size()); AtomicInteger slots = new AtomicInteger(commands.size());
for (java.util.Map.Entry<MasterSlaveEntry, Entry> e : commands.entrySet()) { for (java.util.Map.Entry<MasterSlaveEntry, Entry> e : commands.entrySet()) {
execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0, false); execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0, false, responseTimeout, retryAttempts, retryInterval);
} }
return promise; return promise;
} }


private void execute(final Entry entry, final NodeSource source, final RPromise<Void> mainPromise, final AtomicInteger slots, final int attempt, final boolean noResult) { private void execute(final Entry entry, final NodeSource source, final RPromise<Void> mainPromise, final AtomicInteger slots,
final int attempt, final boolean noResult, final long responseTimeout, final int retryAttempts, final long retryInterval) {
if (mainPromise.isCancelled()) { if (mainPromise.isCancelled()) {
return; return;
} }
Expand Down Expand Up @@ -272,7 +281,11 @@ public void run(Timeout timeout) throws Exception {
return; return;
} }


if (attempt == connectionManager.getConfig().getRetryAttempts()) { int attempts = connectionManager.getConfig().getRetryAttempts();
if (retryAttempts > 0) {
attempts = retryAttempts;
}
if (attempt == attempts) {
if (details.getException() == null) { if (details.getException() == null) {
details.setException(new RedisTimeoutException("Batch command execution timeout")); details.setException(new RedisTimeoutException("Batch command execution timeout"));
} }
Expand All @@ -284,17 +297,22 @@ public void run(Timeout timeout) throws Exception {
} }


int count = attempt + 1; int count = attempt + 1;
execute(entry, source, mainPromise, slots, count, noResult); execute(entry, source, mainPromise, slots, count, noResult, responseTimeout, retryAttempts, retryInterval);
} }
}; };


Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); long interval = connectionManager.getConfig().getRetryInterval();
if (retryInterval > 0) {
interval = retryInterval;
}

Timeout timeout = connectionManager.newTimeout(retryTimerTask, interval, TimeUnit.MILLISECONDS);
details.setTimeout(timeout); details.setTimeout(timeout);


connectionFuture.addListener(new FutureListener<RedisConnection>() { connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override @Override
public void operationComplete(Future<RedisConnection> connFuture) throws Exception { public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
checkConnectionFuture(entry, source, mainPromise, attemptPromise, details, connectionFuture, noResult); checkConnectionFuture(entry, source, mainPromise, attemptPromise, details, connectionFuture, noResult, responseTimeout);
} }
}); });


Expand All @@ -309,26 +327,28 @@ public void operationComplete(Future<Void> future) throws Exception {
if (future.cause() instanceof RedisMovedException) { if (future.cause() instanceof RedisMovedException) {
RedisMovedException ex = (RedisMovedException)future.cause(); RedisMovedException ex = (RedisMovedException)future.cause();
entry.clearErrors(); entry.clearErrors();
execute(entry, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), mainPromise, slots, attempt, noResult); NodeSource nodeSource = new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED);
execute(entry, nodeSource, mainPromise, slots, attempt, noResult, responseTimeout, retryAttempts, retryInterval);
return; return;
} }
if (future.cause() instanceof RedisAskException) { if (future.cause() instanceof RedisAskException) {
RedisAskException ex = (RedisAskException)future.cause(); RedisAskException ex = (RedisAskException)future.cause();
entry.clearErrors(); entry.clearErrors();
execute(entry, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), mainPromise, slots, attempt, noResult); NodeSource nodeSource = new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK);
execute(entry, nodeSource, mainPromise, slots, attempt, noResult, responseTimeout, retryAttempts, retryInterval);
return; return;
} }
if (future.cause() instanceof RedisLoadingException) { if (future.cause() instanceof RedisLoadingException) {
entry.clearErrors(); entry.clearErrors();
execute(entry, source, mainPromise, slots, attempt, noResult); execute(entry, source, mainPromise, slots, attempt, noResult, responseTimeout, retryAttempts, retryInterval);
return; return;
} }
if (future.cause() instanceof RedisTryAgainException) { if (future.cause() instanceof RedisTryAgainException) {
entry.clearErrors(); entry.clearErrors();
connectionManager.newTimeout(new TimerTask() { connectionManager.newTimeout(new TimerTask() {
@Override @Override
public void run(Timeout timeout) throws Exception { public void run(Timeout timeout) throws Exception {
execute(entry, source, mainPromise, slots, attempt, noResult); execute(entry, source, mainPromise, slots, attempt, noResult, responseTimeout, retryAttempts, retryInterval);
} }
}, 1, TimeUnit.SECONDS); }, 1, TimeUnit.SECONDS);
return; return;
Expand All @@ -347,7 +367,7 @@ public void run(Timeout timeout) throws Exception {
} }


private void checkWriteFuture(final RPromise<Void> attemptPromise, AsyncDetails details, private void checkWriteFuture(final RPromise<Void> attemptPromise, AsyncDetails details,
final RedisConnection connection, ChannelFuture future, boolean noResult) { final RedisConnection connection, ChannelFuture future, boolean noResult, long responseTimeout) {
if (attemptPromise.isDone() || future.isCancelled()) { if (attemptPromise.isDone() || future.isCancelled()) {
return; return;
} }
Expand All @@ -356,21 +376,26 @@ private void checkWriteFuture(final RPromise<Void> attemptPromise, AsyncDetails
details.setException(new WriteRedisConnectionException("Can't write command batch to channel: " + future.channel(), future.cause())); details.setException(new WriteRedisConnectionException("Can't write command batch to channel: " + future.channel(), future.cause()));
} else { } else {
details.getTimeout().cancel(); details.getTimeout().cancel();
TimerTask timeoutTask = new TimerTask() { TimerTask timerTask = new TimerTask() {
@Override @Override
public void run(Timeout timeout) throws Exception { public void run(Timeout timeout) throws Exception {
attemptPromise.tryFailure( attemptPromise.tryFailure(
new RedisTimeoutException("Redis server response timeout during command batch execution. Channel: " + connection.getChannel())); new RedisTimeoutException("Redis server response timeout during command batch execution. Channel: " + connection.getChannel()));
} }
}; };
Timeout timeout = connectionManager.newTimeout(timeoutTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);
details.setTimeout(timeout); long timeout = connectionManager.getConfig().getTimeout();
if (responseTimeout > 0) {
timeout = responseTimeout;
}
Timeout timeoutTask = connectionManager.newTimeout(timerTask, timeout, TimeUnit.MILLISECONDS);
details.setTimeout(timeoutTask);
} }
} }


private void checkConnectionFuture(final Entry entry, final NodeSource source, private void checkConnectionFuture(final Entry entry, final NodeSource source,
final RPromise<Void> mainPromise, final RPromise<Void> attemptPromise, final AsyncDetails details, final RPromise<Void> mainPromise, final RPromise<Void> attemptPromise, final AsyncDetails details,
RFuture<RedisConnection> connFuture, final boolean noResult) { RFuture<RedisConnection> connFuture, final boolean noResult, final long responseTimeout) {
if (attemptPromise.isDone() || mainPromise.isCancelled() || connFuture.isCancelled()) { if (attemptPromise.isDone() || mainPromise.isCancelled() || connFuture.isCancelled()) {
return; return;
} }
Expand Down Expand Up @@ -402,7 +427,7 @@ private void checkConnectionFuture(final Entry entry, final NodeSource source,
details.getWriteFuture().addListener(new ChannelFutureListener() { details.getWriteFuture().addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
checkWriteFuture(attemptPromise, details, connection, future, noResult); checkWriteFuture(attemptPromise, details, connection, future, noResult, responseTimeout);
} }
}); });


Expand Down

0 comments on commit d4e8e9e

Please sign in to comment.