diff --git a/redisson/src/main/java/org/redisson/RedissonMultiLock.java b/redisson/src/main/java/org/redisson/RedissonMultiLock.java index 65de5907a89..116f0a98406 100644 --- a/redisson/src/main/java/org/redisson/RedissonMultiLock.java +++ b/redisson/src/main/java/org/redisson/RedissonMultiLock.java @@ -28,7 +28,6 @@ import org.redisson.api.RFuture; import org.redisson.api.RLock; -import org.redisson.client.RedisConnectionClosedException; import org.redisson.client.RedisResponseTimeoutException; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; @@ -236,9 +235,6 @@ public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws Inte long awaitTime = Math.min(lockWaitTime, remainTime); lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS); } - } catch (RedisConnectionClosedException e) { - unlockInner(Arrays.asList(lock)); - lockAcquired = false; } catch (RedisResponseTimeoutException e) { unlockInner(Arrays.asList(lock)); lockAcquired = false; @@ -321,8 +317,7 @@ public void operationComplete(Future future) throws Exception { lockAcquired = future.getNow(); } - if (future.cause() instanceof RedisConnectionClosedException - || future.cause() instanceof RedisResponseTimeoutException) { + if (future.cause() instanceof RedisResponseTimeoutException) { unlockInnerAsync(Arrays.asList(lock), threadId); } diff --git a/redisson/src/main/java/org/redisson/client/RedisConnectionClosedException.java b/redisson/src/main/java/org/redisson/client/RedisConnectionClosedException.java deleted file mode 100644 index 0c5fae97a7c..00000000000 --- a/redisson/src/main/java/org/redisson/client/RedisConnectionClosedException.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Copyright 2018 Nikita Koksharov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson.client; - -/** - * - * @author Nikita Koksharov - * - */ -public class RedisConnectionClosedException extends RedisConnectionException { - - private static final long serialVersionUID = -5162298227713965182L; - - public RedisConnectionClosedException(String msg) { - super(msg); - } - -} diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandsQueue.java b/redisson/src/main/java/org/redisson/client/handler/CommandsQueue.java index eb0dc07439f..8d7f35e8e95 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandsQueue.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandsQueue.java @@ -21,7 +21,6 @@ import java.util.regex.Pattern; import org.redisson.client.ChannelName; -import org.redisson.client.RedisConnectionClosedException; import org.redisson.client.WriteRedisConnectionException; import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.QueueCommand; @@ -82,11 +81,6 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { command.getChannelPromise().tryFailure( new WriteRedisConnectionException("Channel has been closed! Can't write command: " + LogHelper.toString(command.getCommand()) + " to channel: " + ctx.channel())); - - if (command.getChannelPromise().isSuccess() && !command.getCommand().isBlockingCommand()) { - command.getCommand().tryFailure(new RedisConnectionClosedException("Command " - + LogHelper.toString(command.getCommand()) + " succesfully sent, but channel " + ctx.channel() + " has been closed!")); - } } super.channelInactive(ctx); diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 63a382938aa..1934e7f8c80 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -219,7 +219,7 @@ protected RPromise createPromise() { @Override public RFuture readAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = createPromise(); - async(true, new NodeSource(entry, client), codec, command, params, mainPromise, 0, false, null); + async(true, new NodeSource(entry, client), codec, command, params, mainPromise, 0, false); return mainPromise; } @@ -227,21 +227,21 @@ public RFuture readAsync(RedisClient client, MasterSlaveEntry entry, C public RFuture readAsync(RedisClient client, String name, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = createPromise(); int slot = connectionManager.calcSlot(name); - async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0, false, null); + async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0, false); return mainPromise; } public RFuture readAsync(RedisClient client, byte[] key, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = createPromise(); int slot = connectionManager.calcSlot(key); - async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0, false, null); + async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0, false); return mainPromise; } @Override public RFuture readAsync(RedisClient client, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = createPromise(); - async(true, new NodeSource(client), codec, command, params, mainPromise, 0, false, null); + async(true, new NodeSource(client), codec, command, params, mainPromise, 0, false); return mainPromise; } @@ -291,7 +291,7 @@ public void operationComplete(Future future) throws Exception { for (MasterSlaveEntry entry : nodes) { RPromise promise = new RedissonPromise(); promise.addListener(listener); - async(true, new NodeSource(entry), codec, command, params, promise, 0, true, null); + async(true, new NodeSource(entry), codec, command, params, promise, 0, true); } return mainPromise; } @@ -336,7 +336,7 @@ public void operationComplete(Future future) throws Exception { }); MasterSlaveEntry entry = nodes.remove(0); - async(true, new NodeSource(entry), codec, command, params, attemptPromise, 0, false, null); + async(true, new NodeSource(entry), codec, command, params, attemptPromise, 0, false); } @Override @@ -392,7 +392,7 @@ public void operationComplete(Future future) throws Exception { for (MasterSlaveEntry entry : nodes) { RPromise promise = new RedissonPromise(); promise.addListener(listener); - async(readOnlyMode, new NodeSource(entry), codec, command, params, promise, 0, true, null); + async(readOnlyMode, new NodeSource(entry), codec, command, params, promise, 0, true); } return mainPromise; } @@ -419,7 +419,7 @@ private NodeSource getNodeSource(byte[] key) { public RFuture readAsync(String key, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = createPromise(); NodeSource source = getNodeSource(key); - async(true, source, codec, command, params, mainPromise, 0, false, null); + async(true, source, codec, command, params, mainPromise, 0, false); return mainPromise; } @@ -427,20 +427,20 @@ public RFuture readAsync(String key, Codec codec, RedisCommand comm public RFuture readAsync(byte[] key, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = createPromise(); NodeSource source = getNodeSource(key); - async(true, source, codec, command, params, mainPromise, 0, false, null); + async(true, source, codec, command, params, mainPromise, 0, false); return mainPromise; } public RFuture readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = createPromise(); - async(true, new NodeSource(entry), codec, command, params, mainPromise, 0, false, null); + async(true, new NodeSource(entry), codec, command, params, mainPromise, 0, false); return mainPromise; } @Override public RFuture writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = createPromise(); - async(false, new NodeSource(entry), codec, command, params, mainPromise, 0, false, null); + async(false, new NodeSource(entry), codec, command, params, mainPromise, 0, false); return mainPromise; } @@ -510,7 +510,7 @@ public void operationComplete(Future future) throws Exception { for (MasterSlaveEntry entry : entries) { RPromise promise = new RedissonPromise(); promise.addListener(listener); - async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, args.toArray(), promise, 0, true, null); + async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, args.toArray(), promise, 0, true); } return mainPromise; } @@ -590,7 +590,7 @@ private RFuture evalAsync(final NodeSource nodeSource, boolean readOnl args.add(keys.size()); args.addAll(keys); args.addAll(Arrays.asList(params)); - async(false, nodeSource, codec, command, args.toArray(), promise, 0, false, null); + async(false, nodeSource, codec, command, args.toArray(), promise, 0, false); promise.addListener(new FutureListener() { @Override @@ -613,8 +613,7 @@ public void operationComplete(Future future) throws Exception { args.add(keys.size()); args.addAll(keys); args.addAll(Arrays.asList(pps)); - async(false, nodeSource, codec, command, args.toArray(), mainPromise, 0, false, - null); + async(false, nodeSource, codec, command, args.toArray(), mainPromise, 0, false); } }); } else { @@ -636,7 +635,7 @@ public void operationComplete(Future future) throws Exception { args.add(keys.size()); args.addAll(keys); args.addAll(Arrays.asList(params)); - async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), mainPromise, 0, false, null); + async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), mainPromise, 0, false); return mainPromise; } @@ -649,20 +648,20 @@ public RFuture writeAsync(String key, RedisCommand command, Object. public RFuture writeAsync(String key, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = createPromise(); NodeSource source = getNodeSource(key); - async(false, source, codec, command, params, mainPromise, 0, false, null); + async(false, source, codec, command, params, mainPromise, 0, false); return mainPromise; } public RFuture writeAsync(byte[] key, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = createPromise(); NodeSource source = getNodeSource(key); - async(false, source, codec, command, params, mainPromise, 0, false, null); + async(false, source, codec, command, params, mainPromise, 0, false); return mainPromise; } public void async(final boolean readOnlyMode, final NodeSource source, final Codec codec, final RedisCommand command, final Object[] params, final RPromise mainPromise, final int attempt, - final boolean ignoreRedirect, final RFuture connFuture) { + final boolean ignoreRedirect) { if (mainPromise.isCancelled()) { free(params); return; @@ -764,7 +763,7 @@ public void run(Timeout t) throws Exception { count, details.getCommand(), LogHelper.toString(details.getParams())); } details.removeMainPromiseListener(); - async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count, ignoreRedirect, connFuture); + async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count, ignoreRedirect); AsyncDetails.release(details); } @@ -798,7 +797,7 @@ public void operationComplete(Future connFuture) throws Excepti details.getWriteFuture().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - checkWriteFuture(details, connection); + checkWriteFuture(details, ignoreRedirect, connection); } }); @@ -831,7 +830,7 @@ protected void free(final Object[] params) { } } - private void checkWriteFuture(final AsyncDetails details, final RedisConnection connection) { + private void checkWriteFuture(final AsyncDetails details, final boolean ignoreRedirect, final RedisConnection connection) { ChannelFuture future = details.getWriteFuture(); if (future.isCancelled() || details.getAttemptPromise().isDone()) { return; @@ -884,9 +883,26 @@ private void checkWriteFuture(final AsyncDetails details, final Red TimerTask timeoutTask = new TimerTask() { @Override public void run(Timeout timeout) throws Exception { + if (details.getAttempt() < connectionManager.getConfig().getRetryAttempts()) { + if (!details.getAttemptPromise().cancel(false)) { + return; + } + + int count = details.getAttempt() + 1; + if (log.isDebugEnabled()) { + log.debug("attempt {} for command {} and params {}", + count, details.getCommand(), LogHelper.toString(details.getParams())); + } + details.removeMainPromiseListener(); + async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count, ignoreRedirect); + AsyncDetails.release(details); + return; + } + details.getAttemptPromise().tryFailure( - new RedisResponseTimeoutException("Redis server response timeout (" + timeoutAmount + " ms) occured for command: " + details.getCommand() - + " with params: " + LogHelper.toString(details.getParams()) + " channel: " + connection.getChannel())); + new RedisResponseTimeoutException("Redis server response timeout (" + timeoutAmount + " ms) occured" + + " after " + connectionManager.getConfig().getRetryAttempts() + " retry attempts. Command: " + details.getCommand() + + ", params: " + LogHelper.toString(details.getParams()) + ", channel: " + connection.getChannel())); } }; @@ -905,17 +921,9 @@ public void operationComplete(Future future) throws Exception { final Timeout scheduledFuture; if (popTimeout != 0) { // handling cases when connection has been lost - final Channel orignalChannel = connection.getChannel(); scheduledFuture = connectionManager.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { - // re-connection hasn't been made - // and connection is still active -// if (orignalChannel == connection.getChannel() -// && connection.isActive()) { -// return; -// } - if (details.getAttemptPromise().trySuccess(null)) { connection.forceFastReconnectAsync(); } @@ -1004,7 +1012,7 @@ protected void checkAttemptFuture(final NodeSource source, final AsyncDet } async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.MOVED), details.getCodec(), - details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect, details.getConnectionFuture()); + details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect); AsyncDetails.release(details); return; } @@ -1012,14 +1020,14 @@ protected void checkAttemptFuture(final NodeSource source, final AsyncDet if (future.cause() instanceof RedisAskException && !ignoreRedirect) { RedisAskException ex = (RedisAskException) future.cause(); async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.ASK), details.getCodec(), - details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect, details.getConnectionFuture()); + details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect); AsyncDetails.release(details); return; } if (future.cause() instanceof RedisLoadingException) { async(details.isReadOnlyMode(), source, details.getCodec(), - details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect, details.getConnectionFuture()); + details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect); AsyncDetails.release(details); return; } @@ -1029,7 +1037,7 @@ protected void checkAttemptFuture(final NodeSource source, final AsyncDet @Override public void run(Timeout timeout) throws Exception { async(details.isReadOnlyMode(), source, details.getCodec(), - details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect, details.getConnectionFuture()); + details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect); } }, 1, TimeUnit.SECONDS); diff --git a/redisson/src/main/java/org/redisson/command/CommandBatchService.java b/redisson/src/main/java/org/redisson/command/CommandBatchService.java index 02bfa4d0594..c5f02724eb2 100644 --- a/redisson/src/main/java/org/redisson/command/CommandBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandBatchService.java @@ -153,7 +153,7 @@ public void add(RFuture future, List services) { @Override public void async(boolean readOnlyMode, NodeSource nodeSource, - Codec codec, RedisCommand command, Object[] params, RPromise mainPromise, int attempt, boolean ignoreRedirect, RFuture connFuture) { + Codec codec, RedisCommand command, Object[] params, RPromise mainPromise, int attempt, boolean ignoreRedirect) { if (executed.get()) { throw new IllegalStateException("Batch already has been executed!"); } @@ -187,7 +187,7 @@ public void async(boolean readOnlyMode, NodeSource nodeSource, throw new IllegalStateException("Data modification commands can't be used with queueStore=REDIS_READ_ATOMIC"); } - super.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt, true, connFuture); + super.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt, true); } AsyncSemaphore semaphore = new AsyncSemaphore(0); @@ -439,10 +439,9 @@ public void run() { final CountableListener>> listener = new CountableListener>>(mainPromise, result); listener.setCounter(connections.size()); for (final Map.Entry entry : commands.entrySet()) { - ConnectionEntry connection = connections.get(entry.getKey()); final RPromise> execPromise = new RedissonPromise>(); async(false, new NodeSource(entry.getKey()), connectionManager.getCodec(), RedisCommands.EXEC, - new Object[] {}, execPromise, 0, false, connection.getConnectionFuture()); + new Object[] {}, execPromise, 0, false); execPromise.addListener(new FutureListener>() { @Override public void operationComplete(Future> future) throws Exception { @@ -647,6 +646,8 @@ private void execute(final Entry entry, final NodeSource source, final RPromise< final RPromise attemptPromise = new RedissonPromise(); final AsyncDetails details = new AsyncDetails(); + details.init(null, attemptPromise, + entry.isReadOnlyMode(), source, null, null, null, mainPromise, attempt); final RFuture connectionFuture; if (entry.isReadOnlyMode()) { @@ -751,7 +752,7 @@ public void run(Timeout t) throws Exception { @Override public void operationComplete(Future connFuture) throws Exception { checkConnectionFuture(entry, source, mainPromise, attemptPromise, details, connectionFuture, options.isSkipResult(), - options.getResponseTimeout(), attempts, options.getExecutionMode()); + options.getResponseTimeout(), attempts, options.getExecutionMode(), slots); } }); @@ -808,8 +809,8 @@ protected void free(final Entry entry) { } } - private void checkWriteFuture(Entry entry, final RPromise attemptPromise, AsyncDetails details, - final RedisConnection connection, ChannelFuture future, long responseTimeout, int attempts) { + private void checkWriteFuture(final Entry entry, final RPromise attemptPromise, final AsyncDetails details, + final RedisConnection connection, ChannelFuture future, long responseTimeout, int attempts, final AtomicInteger slots, final RPromise mainPromise) { if (future.isCancelled() || attemptPromise.isDone()) { return; } @@ -817,7 +818,9 @@ private void checkWriteFuture(Entry entry, final RPromise attemptPromise, if (!future.isSuccess()) { details.setException(new WriteRedisConnectionException("Can't write command batch to channel: " + future.channel(), future.cause())); if (details.getAttempt() == attempts) { - attemptPromise.tryFailure(details.getException()); + if (!attemptPromise.tryFailure(details.getException())) { + log.error(details.getException().getMessage()); + } } return; } @@ -827,6 +830,21 @@ private void checkWriteFuture(Entry entry, final RPromise attemptPromise, TimerTask timerTask = new TimerTask() { @Override public void run(Timeout timeout) throws Exception { + if (details.getAttempt() < connectionManager.getConfig().getRetryAttempts()) { + if (!details.getAttemptPromise().cancel(false)) { + return; + } + + int count = details.getAttempt() + 1; + if (log.isDebugEnabled()) { + log.debug("attempt {} for command {} and params {}", + count, details.getCommand(), LogHelper.toString(details.getParams())); + } + details.removeMainPromiseListener(); + execute(entry, details.getSource(), mainPromise, slots, count, options); + return; + } + attemptPromise.tryFailure( new RedisResponseTimeoutException("Redis server response timeout during command batch execution. Channel: " + connection.getChannel())); } @@ -842,7 +860,8 @@ public void run(Timeout timeout) throws Exception { private void checkConnectionFuture(final Entry entry, final NodeSource source, final RPromise mainPromise, final RPromise attemptPromise, final AsyncDetails details, - RFuture connFuture, final boolean noResult, final long responseTimeout, final int attempts, ExecutionMode executionMode) { + RFuture connFuture, final boolean noResult, final long responseTimeout, final int attempts, + ExecutionMode executionMode, final AtomicInteger slots) { if (connFuture.isCancelled()) { return; } @@ -881,7 +900,7 @@ private void checkConnectionFuture(final Entry entry, final NodeSource source, details.getWriteFuture().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - checkWriteFuture(entry, attemptPromise, details, connection, future, responseTimeout, attempts); + checkWriteFuture(entry, attemptPromise, details, connection, future, responseTimeout, attempts, slots, mainPromise); } }); diff --git a/redisson/src/main/java/org/redisson/reactive/CommandReactiveBatchService.java b/redisson/src/main/java/org/redisson/reactive/CommandReactiveBatchService.java index c40b324cc58..51ea0b61bef 100644 --- a/redisson/src/main/java/org/redisson/reactive/CommandReactiveBatchService.java +++ b/redisson/src/main/java/org/redisson/reactive/CommandReactiveBatchService.java @@ -24,7 +24,6 @@ import org.redisson.api.BatchResult; import org.redisson.api.RFuture; import org.redisson.api.RedissonReactiveClient; -import org.redisson.client.RedisConnection; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.command.CommandAsyncExecutor; @@ -64,8 +63,8 @@ public Publisher superReactive(Supplier> supplier) { @Override public void async(boolean readOnlyMode, NodeSource nodeSource, - Codec codec, RedisCommand command, Object[] params, RPromise mainPromise, int attempt, boolean ignoreRedirect, RFuture connFuture) { - batchService.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt, ignoreRedirect, connFuture); + Codec codec, RedisCommand command, Object[] params, RPromise mainPromise, int attempt, boolean ignoreRedirect) { + batchService.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt, ignoreRedirect); } public RFuture> executeAsync(BatchOptions options) { diff --git a/redisson/src/main/java/org/redisson/rx/CommandRxBatchService.java b/redisson/src/main/java/org/redisson/rx/CommandRxBatchService.java index ebe3f0845be..e15583d297b 100644 --- a/redisson/src/main/java/org/redisson/rx/CommandRxBatchService.java +++ b/redisson/src/main/java/org/redisson/rx/CommandRxBatchService.java @@ -25,7 +25,6 @@ import org.redisson.api.BatchResult; import org.redisson.api.RFuture; import org.redisson.api.RedissonRxClient; -import org.redisson.client.RedisConnection; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.command.CommandAsyncExecutor; @@ -65,8 +64,8 @@ public Flowable superReactive(Callable> supplier) { @Override public void async(boolean readOnlyMode, NodeSource nodeSource, - Codec codec, RedisCommand command, Object[] params, RPromise mainPromise, int attempt, boolean ignoreRedirect, RFuture connFuture) { - batchService.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt, ignoreRedirect, connFuture); + Codec codec, RedisCommand command, Object[] params, RPromise mainPromise, int attempt, boolean ignoreRedirect) { + batchService.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt, ignoreRedirect); } public RFuture> executeAsync(BatchOptions options) { diff --git a/redisson/src/test/java/org/redisson/RedissonBatchTest.java b/redisson/src/test/java/org/redisson/RedissonBatchTest.java index f6fe65a0ad3..06f3a3bc7cf 100644 --- a/redisson/src/test/java/org/redisson/RedissonBatchTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBatchTest.java @@ -150,7 +150,7 @@ public void testConnectionLeakAfterError() throws InterruptedException { BatchOptions batchOptions = BatchOptions.defaults().executionMode(ExecutionMode.REDIS_WRITE_ATOMIC); RBatch batch = redisson.createBatch(batchOptions); - for (int i = 0; i < 300000; i++) { + for (int i = 0; i < 400000; i++) { batch.getBucket("test").setAsync(123); } @@ -171,6 +171,8 @@ public void testConnectionLeakAfterError() throws InterruptedException { assertThat(redisson.getBucket("test1").get()).isEqualTo(1); assertThat(redisson.getBucket("test2").get()).isEqualTo(2); + + redisson.shutdown(); } @Test @@ -228,6 +230,7 @@ public void testSyncSlaves() throws FailedToStartRedisException, IOException, In assertThat(result.getSyncedSlaves()).isEqualTo(1); process.shutdown(); + redisson.shutdown(); } @Test @@ -238,7 +241,8 @@ public void testWriteTimeout() { RBatch batch = redisson.createBatch(batchOptions); RMapCacheAsync map = batch.getMapCache("test"); - for (int i = 0; i < 200000; i++) { + int total = 200000; + for (int i = 0; i < total; i++) { RFuture f = map.putAsync("" + i, "" + i, 5, TimeUnit.MINUTES); if (batchOptions.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC) { f.syncUninterruptibly(); @@ -246,7 +250,7 @@ public void testWriteTimeout() { } batch.execute(); - assertThat(redisson.getMapCache("test").size()).isEqualTo(200000); + assertThat(redisson.getMapCache("test").size()).isEqualTo(total); redisson.shutdown(); } @@ -339,6 +343,7 @@ public void testAtomicSyncSlaves() throws FailedToStartRedisException, IOExcepti } process.shutdown(); + redisson.shutdown(); }