Skip to content

Commit

Permalink
RKeys.deleteByPattern throws an error in cluster mode. Fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Feb 1, 2016
1 parent cffd15b commit b728aff
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 98 deletions.
147 changes: 65 additions & 82 deletions src/main/java/org/redisson/RedissonKeys.java
Expand Up @@ -152,103 +152,81 @@ public Future<String> randomKeyAsync() {
return commandExecutor.readRandomAsync(RedisCommands.RANDOM_KEY); return commandExecutor.readRandomAsync(RedisCommands.RANDOM_KEY);
} }


/**
* Find keys by key search pattern
*
* Supported glob-style patterns:
* h?llo subscribes to hello, hallo and hxllo
* h*llo subscribes to hllo and heeeello
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @param pattern
* @return
*/
@Override @Override
public Collection<String> findKeysByPattern(String pattern) { public Collection<String> findKeysByPattern(String pattern) {
return commandExecutor.get(findKeysByPatternAsync(pattern)); return commandExecutor.get(findKeysByPatternAsync(pattern));
} }


/**
* Find keys by key search pattern in async mode
*
* Supported glob-style patterns:
* h?llo subscribes to hello, hallo and hxllo
* h*llo subscribes to hllo and heeeello
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @param pattern
* @return
*/
@Override @Override
public Future<Collection<String>> findKeysByPatternAsync(String pattern) { public Future<Collection<String>> findKeysByPatternAsync(String pattern) {
return commandExecutor.readAllAsync(RedisCommands.KEYS, pattern); return commandExecutor.readAllAsync(RedisCommands.KEYS, pattern);
} }


/**
* Delete multiple objects by a key pattern
*
* Supported glob-style patterns:
* h?llo subscribes to hello, hallo and hxllo
* h*llo subscribes to hllo and heeeello
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @param pattern
* @return
*/
@Override @Override
public long deleteByPattern(String pattern) { public long deleteByPattern(String pattern) {
return commandExecutor.get(deleteByPatternAsync(pattern)); return commandExecutor.get(deleteByPatternAsync(pattern));
} }


/**
* Delete multiple objects by a key pattern in async mode
*
* Supported glob-style patterns:
* h?llo subscribes to hello, hallo and hxllo
* h*llo subscribes to hllo and heeeello
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @param pattern
* @return
*/
@Override @Override
public Future<Long> deleteByPatternAsync(String pattern) { public Future<Long> deleteByPatternAsync(String pattern) {
return commandExecutor.evalWriteAllAsync(RedisCommands.EVAL_LONG, new SlotCallback<Long, Long>() { if (!commandExecutor.getConnectionManager().isClusterMode()) {
AtomicLong results = new AtomicLong(); return commandExecutor.evalWriteAsync((String)null, null, RedisCommands.EVAL_LONG, "local keys = redis.call('keys', ARGV[1]) "
@Override + "local n = 0 "
public void onSlotResult(Long result) { + "redis.log(redis.LOG_WARNING, 'keys number ' .. #keys); "
results.addAndGet(result); + "for i=1, #keys,5000 do "
} + "n = n + redis.call('del', unpack(keys, i, math.min(i+4999, table.getn(keys)))) "
+ "end "
+ "return n;",Collections.emptyList(), pattern);
}


final Promise<Long> result = commandExecutor.getConnectionManager().newPromise();
final AtomicReference<Throwable> failed = new AtomicReference<Throwable>();
final AtomicLong count = new AtomicLong();
final AtomicLong executed = new AtomicLong(commandExecutor.getConnectionManager().getEntries().size());
final FutureListener<Long> listener = new FutureListener<Long>() {
@Override @Override
public Long onFinish() { public void operationComplete(Future<Long> future) throws Exception {
return results.get(); if (future.isSuccess()) {
count.addAndGet(future.getNow());
} else {
failed.set(future.cause());
}

checkExecution(result, failed, count, executed);
} }
}, "local keys = redis.call('keys', ARGV[1]) " };
+ "local n = 0 "
+ "for i=1, table.getn(keys),5000 do " for (ClusterSlotRange slot : commandExecutor.getConnectionManager().getEntries().keySet()) {
+ "n = n + redis.call('del', unpack(keys, i, math.min(i+4999, table.getn(keys)))) " Future<Collection<String>> findFuture = commandExecutor.readAsync(slot.getStartSlot(), null, RedisCommands.KEYS, pattern);
+ "end " findFuture.addListener(new FutureListener<Collection<String>>() {
+ "return n;",Collections.emptyList(), pattern); @Override
public void operationComplete(Future<Collection<String>> future) throws Exception {
if (!future.isSuccess()) {
failed.set(future.cause());
checkExecution(result, failed, count, executed);
return;
}

Collection<String> keys = future.getNow();
if (keys.isEmpty()) {
checkExecution(result, failed, count, executed);
return;
}

Future<Long> deleteFuture = deleteAsync(keys.toArray(new String[keys.size()]));
deleteFuture.addListener(listener);
}
});
}

return result;
} }


/**
* Delete multiple objects by name
*
* @param keys - object names
* @return number of removed keys
*/
@Override @Override
public long delete(String ... keys) { public long delete(String ... keys) {
return commandExecutor.get(deleteAsync(keys)); return commandExecutor.get(deleteAsync(keys));
} }


/**
* Delete multiple objects by name in async mode
*
* @param keys - object names
* @return number of removed keys
*/
@Override @Override
public Future<Long> deleteAsync(String ... keys) { public Future<Long> deleteAsync(String ... keys) {
if (!commandExecutor.getConnectionManager().isClusterMode()) { if (!commandExecutor.getConnectionManager().isClusterMode()) {
Expand Down Expand Up @@ -286,18 +264,7 @@ public void operationComplete(Future<List<?>> future) throws Exception {
failed.set(future.cause()); failed.set(future.cause());
} }


if (executed.decrementAndGet() == 0) { checkExecution(result, failed, count, executed);
if (failed.get() != null) {
if (count.get() > 0) {
RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get());
result.setFailure(ex);
} else {
result.setFailure(failed.get());
}
} else {
result.setSuccess(count.get());
}
}
} }
}; };


Expand Down Expand Up @@ -356,4 +323,20 @@ public Future<Void> flushallAsync() {
return commandExecutor.writeAllAsync(RedisCommands.FLUSHALL); return commandExecutor.writeAllAsync(RedisCommands.FLUSHALL);
} }


private void checkExecution(final Promise<Long> result, final AtomicReference<Throwable> failed,
final AtomicLong count, final AtomicLong executed) {
if (executed.decrementAndGet() == 0) {
if (failed.get() != null) {
if (count.get() > 0) {
RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get());
result.setFailure(ex);
} else {
result.setFailure(failed.get());
}
} else {
result.setSuccess(count.get());
}
}
}

} }
6 changes: 6 additions & 0 deletions src/main/java/org/redisson/command/CommandAsyncExecutor.java
Expand Up @@ -54,8 +54,12 @@ public interface CommandAsyncExecutor {


<T, R> Future<R> evalReadAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params); <T, R> Future<R> evalReadAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);


<T, R> Future<R> evalReadAsync(Integer slot, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);

<T, R> Future<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params); <T, R> Future<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);


<T, R> Future<R> evalWriteAsync(Integer slot, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);

<T, R> Future<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params); <T, R> Future<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params);


<T, R> Future<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params); <T, R> Future<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params);
Expand All @@ -68,6 +72,8 @@ public interface CommandAsyncExecutor {


<T, R> Future<R> readAsync(String key, RedisCommand<T> command, Object ... params); <T, R> Future<R> readAsync(String key, RedisCommand<T> command, Object ... params);


<T, R> Future<R> readAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params);

<T, R> Future<R> readRandomAsync(RedisCommand<T> command, Object ... params); <T, R> Future<R> readRandomAsync(RedisCommand<T> command, Object ... params);




Expand Down
16 changes: 16 additions & 0 deletions src/main/java/org/redisson/command/CommandAsyncService.java
Expand Up @@ -240,6 +240,12 @@ public <T, R> Future<R> readAsync(String key, Codec codec, RedisCommand<T> comma
return mainPromise; return mainPromise;
} }


public <T, R> Future<R> readAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise();
async(true, new NodeSource(slot), codec, command, params, mainPromise, 0);
return mainPromise;
}

@Override @Override
public <T, R> Future<R> writeAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params) { public <T, R> Future<R> writeAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise(); Promise<R> mainPromise = connectionManager.newPromise();
Expand All @@ -258,6 +264,11 @@ public <T, R> Future<R> evalReadAsync(String key, Codec codec, RedisCommand<T> e
return evalAsync(source, true, codec, evalCommandType, script, keys, params); return evalAsync(source, true, codec, evalCommandType, script, keys, params);
} }


@Override
public <T, R> Future<R> evalReadAsync(Integer slot, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
return evalAsync(new NodeSource(slot), true, codec, evalCommandType, script, keys, params);
}

@Override @Override
public <T, R> Future<R> evalReadAsync(InetSocketAddress client, String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) { public <T, R> Future<R> evalReadAsync(InetSocketAddress client, String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
int slot = connectionManager.calcSlot(key); int slot = connectionManager.calcSlot(key);
Expand All @@ -270,6 +281,11 @@ public <T, R> Future<R> evalWriteAsync(String key, Codec codec, RedisCommand<T>
return evalAsync(source, false, codec, evalCommandType, script, keys, params); return evalAsync(source, false, codec, evalCommandType, script, keys, params);
} }


public <T, R> Future<R> evalWriteAsync(Integer slot, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
return evalAsync(new NodeSource(slot), false, codec, evalCommandType, script, keys, params);
}


@Override @Override
public <T, R> Future<R> evalWriteAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params) { public <T, R> Future<R> evalWriteAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params) {
return evalAllAsync(false, command, callback, script, keys, params); return evalAllAsync(false, command, callback, script, keys, params);
Expand Down
10 changes: 6 additions & 4 deletions src/main/java/org/redisson/core/RKeys.java
Expand Up @@ -68,23 +68,25 @@ public interface RKeys extends RKeysAsync {
Collection<String> findKeysByPattern(String pattern); Collection<String> findKeysByPattern(String pattern);


/** /**
* Delete multiple objects by a key pattern * Delete multiple objects by a key pattern.
* * <p/>
* Method executes in <b>NON atomic way</b> in cluster mode due to lua script limitations.
* <p/>
* Supported glob-style patterns: * Supported glob-style patterns:
* h?llo subscribes to hello, hallo and hxllo * h?llo subscribes to hello, hallo and hxllo
* h*llo subscribes to hllo and heeeello * h*llo subscribes to hllo and heeeello
* h[ae]llo subscribes to hello and hallo, but not hillo * h[ae]llo subscribes to hello and hallo, but not hillo
* *
* @param pattern * @param pattern
* @return * @return number of removed keys
*/ */
long deleteByPattern(String pattern); long deleteByPattern(String pattern);


/** /**
* Delete multiple objects by name * Delete multiple objects by name
* *
* @param keys - object names * @param keys - object names
* @return * @return number of removed keys
*/ */
long delete(String ... keys); long delete(String ... keys);


Expand Down
12 changes: 7 additions & 5 deletions src/main/java/org/redisson/core/RKeysAsync.java
Expand Up @@ -51,23 +51,25 @@ public interface RKeysAsync {
Future<Collection<String>> findKeysByPatternAsync(String pattern); Future<Collection<String>> findKeysByPatternAsync(String pattern);


/** /**
* Delete multiple objects by a key pattern in async mode * Delete multiple objects by a key pattern.
* * <p/>
* Method executes in <b>NON atomic way</b> in cluster mode due to lua script limitations.
* <p/>
* Supported glob-style patterns: * Supported glob-style patterns:
* h?llo subscribes to hello, hallo and hxllo * h?llo subscribes to hello, hallo and hxllo
* h*llo subscribes to hllo and heeeello * h*llo subscribes to hllo and heeeello
* h[ae]llo subscribes to hello and hallo, but not hillo * h[ae]llo subscribes to hello and hallo, but not hillo
* *
* @param pattern * @param pattern
* @return * @return number of removed keys
*/ */
Future<Long> deleteByPatternAsync(String pattern); Future<Long> deleteByPatternAsync(String pattern);


/** /**
* Delete multiple objects by name in async mode * Delete multiple objects by name
* *
* @param keys - object names * @param keys - object names
* @return * @return number of removed keys
*/ */
Future<Long> deleteAsync(String ... keys); Future<Long> deleteAsync(String ... keys);


Expand Down
38 changes: 31 additions & 7 deletions src/test/java/org/redisson/RedissonKeysTest.java
Expand Up @@ -66,12 +66,25 @@ public void testRandomKey() {


@Test @Test
public void testDeleteByPattern() { public void testDeleteByPattern() {
RBucket<String> bucket = redisson.getBucket("test1"); RBucket<String> bucket = redisson.getBucket("test0");
bucket.set("someValue"); bucket.set("someValue3");
assertThat(bucket.isExists()).isTrue();

RBucket<String> bucket2 = redisson.getBucket("test9");
bucket2.set("someValue4");
assertThat(bucket.isExists()).isTrue();

RMap<String, String> map = redisson.getMap("test2"); RMap<String, String> map = redisson.getMap("test2");
map.fastPut("1", "2"); map.fastPut("1", "2");
assertThat(map.isExists()).isTrue();


Assert.assertEquals(2, redisson.getKeys().deleteByPattern("test?")); RMap<String, String> map2 = redisson.getMap("test3");
map2.fastPut("1", "5");
assertThat(map2.isExists()).isTrue();


Assert.assertEquals(4, redisson.getKeys().deleteByPattern("test?"));
Assert.assertEquals(0, redisson.getKeys().deleteByPattern("test?"));
} }


@Test @Test
Expand All @@ -90,13 +103,24 @@ public void testFindKeys() {


@Test @Test
public void testMassDelete() { public void testMassDelete() {
RBucket<String> bucket = redisson.getBucket("test"); RBucket<String> bucket0 = redisson.getBucket("test0");
bucket.set("someValue"); bucket0.set("someValue");
RBucket<String> bucket1 = redisson.getBucket("test1");
bucket1.set("someValue");
RBucket<String> bucket2 = redisson.getBucket("test2");
bucket2.set("someValue");
RBucket<String> bucket3 = redisson.getBucket("test3");
bucket3.set("someValue");
RBucket<String> bucket10 = redisson.getBucket("test10");
bucket10.set("someValue");

RBucket<String> bucket12 = redisson.getBucket("test12");
bucket12.set("someValue");
RMap<String, String> map = redisson.getMap("map2"); RMap<String, String> map = redisson.getMap("map2");
map.fastPut("1", "2"); map.fastPut("1", "2");


Assert.assertEquals(2, redisson.getKeys().delete("test", "map2")); Assert.assertEquals(7, redisson.getKeys().delete("test0", "test1", "test2", "test3", "test10", "test12", "map2"));
Assert.assertEquals(0, redisson.getKeys().delete("test", "map2")); Assert.assertEquals(0, redisson.getKeys().delete("test0", "test1", "test2", "test3", "test10", "test12", "map2"));
} }


@Test @Test
Expand Down

0 comments on commit b728aff

Please sign in to comment.