Skip to content

Commit

Permalink
Ability to cancel BRPOP and BLPOP async command execution. #446
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Mar 22, 2016
1 parent 9503089 commit 9c33b27
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 9 deletions.
4 changes: 2 additions & 2 deletions src/main/java/org/redisson/client/RedisConnection.java
Expand Up @@ -174,8 +174,8 @@ public boolean isClosed() {
return closed;
}

public void forceReconnect() {
channel.close();
public ChannelFuture forceReconnectAsync() {
return channel.close();
}

/**
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/org/redisson/command/CommandAsyncService.java
Expand Up @@ -462,6 +462,18 @@ private <V, R> void checkWriteFuture(final AsyncDetails<V, R> details, final Red
int timeoutTime = connectionManager.getConfig().getTimeout();
if (skipTimeout.contains(details.getCommand().getName())) {
Integer popTimeout = Integer.valueOf(details.getParams()[details.getParams().length - 1].toString());
details.getMainPromise().addListener(new FutureListener<R>() {
@Override
public void operationComplete(Future<R> future) throws Exception {
if (!future.isCancelled()) {
return;
}
// cancel handling for commands from skipTimeout collection
if (details.getAttemptPromise().cancel(true)) {
connection.forceReconnectAsync();
}
}
});
if (popTimeout == 0) {
return;
}
Expand Down
50 changes: 43 additions & 7 deletions src/test/java/org/redisson/RedissonBlockingQueueTest.java
@@ -1,9 +1,6 @@
package org.redisson;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.assertj.core.api.Assertions.*;

import java.util.ArrayList;
import java.util.HashSet;
Expand All @@ -22,8 +19,47 @@
import org.junit.Test;
import org.redisson.core.RBlockingQueue;

import io.netty.util.concurrent.Future;

public class RedissonBlockingQueueTest extends BaseTest {

@Test
public void testTakeAsyncCancel() {
Config config = createConfig();
config.useSingleServer().setConnectionMinimumIdleSize(1).setConnectionPoolSize(1);

RedissonClient redisson = Redisson.create(config);
RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue:pollany");
for (int i = 0; i < 10; i++) {
Future<Integer> f = queue1.takeAsync();
f.cancel(true);
}
assertThat(queue1.add(1)).isTrue();
assertThat(queue1.add(2)).isTrue();
assertThat(queue1.size()).isEqualTo(2);

redisson.shutdown();
}

@Test
public void testPollAsyncCancel() {
Config config = createConfig();
config.useSingleServer().setConnectionMinimumIdleSize(1).setConnectionPoolSize(1);

RedissonClient redisson = Redisson.create(config);
RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue:pollany");
for (int i = 0; i < 10; i++) {
Future<Integer> f = queue1.pollAsync(1, TimeUnit.SECONDS);
f.cancel(true);
}
assertThat(queue1.add(1)).isTrue();
assertThat(queue1.add(2)).isTrue();
assertThat(queue1.size()).isEqualTo(2);

redisson.shutdown();
}


@Test
public void testPollFromAny() throws InterruptedException {
final RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue:pollany");
Expand Down Expand Up @@ -225,14 +261,14 @@ public void run() {
try {
// blocking
int item = queue.take();
assertTrue(item > 0 && item <= total);
assertThat(item > 0 && item <= total).isTrue();
} catch (InterruptedException exception) {
fail();
Assert.fail();
}
count++;
}

assertThat(counter.get(), equalTo(total));
assertThat(counter.get()).isEqualTo(total);
queue.delete();
}

Expand Down

0 comments on commit 9c33b27

Please sign in to comment.