Skip to content

Commit

Permalink
Feature - Async, Rx, Reactive interfaces implemented for RBloomFilter…
Browse files Browse the repository at this point in the history
… object. #5836
  • Loading branch information
Nikita Koksharov committed May 10, 2024
1 parent 04b206c commit ce70bf3
Show file tree
Hide file tree
Showing 10 changed files with 587 additions and 103 deletions.
244 changes: 145 additions & 99 deletions redisson/src/main/java/org/redisson/RedissonBloomFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,21 @@
package org.redisson;

import io.netty.buffer.ByteBuf;
import org.redisson.api.RBitSetAsync;
import org.redisson.api.RBloomFilter;
import org.redisson.api.RFuture;
import org.redisson.client.RedisException;
import org.redisson.client.codec.*;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.VoidReplayConvertor;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.Hash;

import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -102,21 +101,29 @@ public boolean add(T object) {
}

@Override
public long add(Collection<T> objects) {
public RFuture<Boolean> addAsync(T object) {
CompletionStage<Boolean> f = addAsync(Arrays.asList(object)).thenApply(r -> r > 0);
return new CompletableFutureWrapper<>(f);
}

@Override
public RFuture<Long> addAsync(Collection<T> objects) {
CompletionStage<Void> future = CompletableFuture.completedFuture(null);
if (size == 0) {
readConfig();
future = readConfigAsync();
}

List<Long> allIndexes = index(objects);
CompletionStage<Long> f = future.thenCompose(r -> {
List<Long> allIndexes = index(objects);

List<Object> params = new ArrayList<>();
params.add(size);
params.add(hashIterations);
int s = allIndexes.size() / objects.size();
params.add(s);
params.addAll(allIndexes);
List<Object> params = new ArrayList<>();
params.add(size);
params.add(hashIterations);
int s = allIndexes.size() / objects.size();
params.add(s);
params.addAll(allIndexes);

return get(commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_LONG,
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
"local size = redis.call('hget', KEYS[1], 'size');" +
"local hashIterations = redis.call('hget', KEYS[1], 'hashIterations');" +
"assert(size == ARGV[1] and hashIterations == ARGV[2], 'Bloom filter config has been changed')" +
Expand All @@ -137,7 +144,15 @@ public long add(Collection<T> objects) {
"end; " +
"return c;",
Arrays.asList(configName, getRawName()),
params.toArray()));
params.toArray());
});

return new CompletableFutureWrapper<>(f);
}

@Override
public long add(Collection<T> objects) {
return get(addAsync(objects));
}

private long[] hash(long hash1, long hash2, int iterations, long size) {
Expand All @@ -155,42 +170,51 @@ private long[] hash(long hash1, long hash2, int iterations, long size) {
}

@Override
public long contains(Collection<T> objects) {
public RFuture<Long> containsAsync(Collection<T> objects) {
CompletionStage<Void> future = CompletableFuture.completedFuture(null);
if (size == 0) {
readConfig();
future = readConfigAsync();
}

List<Long> allIndexes = index(objects);

List<Object> params = new ArrayList<>();
params.add(size);
params.add(hashIterations);
params.add(objects.size());
params.addAll(allIndexes);

return get(commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_LONG,
"local size = redis.call('hget', KEYS[1], 'size');" +
"local hashIterations = redis.call('hget', KEYS[1], 'hashIterations');" +
"assert(size == ARGV[1] and hashIterations == ARGV[2], 'Bloom filter config has been changed')" +

"local k = 0;" +
"local c = 0;" +
"local cc = (#ARGV - 3) / ARGV[3];" +
"for i = 4, #ARGV, 1 do " +
"local r = redis.call('getbit', KEYS[2], ARGV[i]); " +
"if r == 0 then " +
"k = k + 1;" +
"end; " +
"if ((i - 4) + 1) % cc == 0 then " +
"if k > 0 then " +
"c = c + 1;" +
CompletionStage<Long> f = future.thenCompose(r -> {
List<Long> allIndexes = index(objects);

List<Object> params = new ArrayList<>();
params.add(size);
params.add(hashIterations);
params.add(objects.size());
params.addAll(allIndexes);

return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
"local size = redis.call('hget', KEYS[1], 'size');" +
"local hashIterations = redis.call('hget', KEYS[1], 'hashIterations');" +
"assert(size == ARGV[1] and hashIterations == ARGV[2], 'Bloom filter config has been changed')" +

"local k = 0;" +
"local c = 0;" +
"local cc = (#ARGV - 3) / ARGV[3];" +
"for i = 4, #ARGV, 1 do " +
"local r = redis.call('getbit', KEYS[2], ARGV[i]); " +
"if r == 0 then " +
"k = k + 1;" +
"end; " +
"if ((i - 4) + 1) % cc == 0 then " +
"if k > 0 then " +
"c = c + 1;" +
"end; " +
"k = 0; " +
"end; " +
"end; " +
"k = 0; " +
"end; " +
"end; " +
"return ARGV[3] - c;",
Arrays.asList(configName, getRawName()),
params.toArray()));
"return ARGV[3] - c;",
Arrays.asList(configName, getRawName()),
params.toArray());
});
return new CompletableFutureWrapper<>(f);
}

@Override
public long contains(Collection<T> objects) {
return get(containsAsync(objects));
}

private List<Long> index(Collection<T> objects) {
Expand All @@ -208,22 +232,27 @@ public boolean contains(T object) {
return contains(Arrays.asList(object)) > 0;
}

protected RBitSetAsync createBitSet(CommandBatchService executorService) {
return new RedissonBitSet(executorService, getName());
@Override
public RFuture<Boolean> containsAsync(T object) {
CompletionStage<Boolean> f = containsAsync(Arrays.asList(object)).thenApply(r -> r > 0);
return new CompletableFutureWrapper<>(f);
}

@Override
public long count() {
CommandBatchService executorService = new CommandBatchService(commandExecutor);
RFuture<Map<String, String>> configFuture = executorService.readAsync(configName, StringCodec.INSTANCE,
new RedisCommand<Map<Object, Object>>("HGETALL", new ObjectMapReplayDecoder()), configName);
RBitSetAsync bs = createBitSet(executorService);
RFuture<Long> cardinalityFuture = bs.cardinalityAsync();
executorService.execute();

readConfig(commandExecutor.getNow(configFuture.toCompletableFuture()));
return get(countAsync());
}

return Math.round(-size / ((double) hashIterations) * Math.log(1 - commandExecutor.getNow(cardinalityFuture.toCompletableFuture()) / ((double) size)));
@Override
public RFuture<Long> countAsync() {
CompletionStage<Void> f = readConfigAsync();
CompletionStage<Long> res = f.thenCompose(r -> {
RedissonBitSet bs = new RedissonBitSet(commandExecutor, getName());
return bs.cardinalityAsync().thenApply(c -> {
return Math.round(-size / ((double) hashIterations) * Math.log(1 - c / ((double) size)));
});
});
return new CompletableFutureWrapper<>(res);
}

@Override
Expand All @@ -237,12 +266,12 @@ public RFuture<Long> sizeInMemoryAsync() {
return super.sizeInMemoryAsync(keys);
}

private void readConfig() {
private CompletionStage<Void> readConfigAsync() {
RFuture<Map<String, String>> future = commandExecutor.readAsync(configName, StringCodec.INSTANCE,
new RedisCommand<Map<Object, Object>>("HGETALL", new ObjectMapReplayDecoder()), configName);
Map<String, String> config = commandExecutor.get(future);

readConfig(config);
return future.thenAccept(config -> {
readConfig(config);
});
}

private void readConfig(Map<String, String> config) {
Expand All @@ -260,6 +289,11 @@ protected long getMaxSize() {

@Override
public boolean tryInit(long expectedInsertions, double falseProbability) {
return get(tryInitAsync(expectedInsertions, falseProbability));
}

@Override
public RFuture<Boolean> tryInitAsync(long expectedInsertions, double falseProbability) {
if (falseProbability > 1) {
throw new IllegalArgumentException("Bloom filter false probability can't be greater than 1");
}
Expand All @@ -276,29 +310,21 @@ public boolean tryInit(long expectedInsertions, double falseProbability) {
}
hashIterations = optimalNumOfHashFunctions(expectedInsertions, size);

CommandBatchService executorService = new CommandBatchService(commandExecutor);
executorService.evalReadAsync(configName, codec, RedisCommands.EVAL_VOID,
"local size = redis.call('hget', KEYS[1], 'size');" +
"local hashIterations = redis.call('hget', KEYS[1], 'hashIterations');" +
"assert(size == false and hashIterations == false, 'Bloom filter config has been changed')",
Arrays.<Object>asList(configName), size, hashIterations);
executorService.writeAsync(configName, StringCodec.INSTANCE,
new RedisCommand<Void>("HMSET", new VoidReplayConvertor()), configName,
"size", size, "hashIterations", hashIterations,
"expectedInsertions", expectedInsertions, "falseProbability", BigDecimal.valueOf(falseProbability).toPlainString());
try {
executorService.execute();
} catch (RedisException e) {
if (e.getMessage() == null || !e.getMessage().contains("Bloom filter config has been changed")) {
throw e;
}
readConfig();
return false;
}
return commandExecutor.evalWriteAsync(configName, StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('exists', KEYS[1]) == 1 then " +
"return 0;" +
"end; " +

return true;
"redis.call('hset', KEYS[1], 'size', ARGV[1]);" +
"redis.call('hset', KEYS[1], 'hashIterations', ARGV[2]);" +
"redis.call('hset', KEYS[1], 'expectedInsertions', ARGV[3]);" +
"redis.call('hset', KEYS[1], 'falseProbability', ARGV[4]);" +
"return 1;",
Arrays.asList(configName),
size, hashIterations, expectedInsertions, falseProbability);
}


@Override
public RFuture<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit, String param, String... keys) {
return super.expireAsync(timeToLive, timeUnit, param, getRawName(), configName);
Expand All @@ -316,26 +342,53 @@ public RFuture<Boolean> clearExpireAsync() {

@Override
public long getExpectedInsertions() {
Long result = get(commandExecutor.readAsync(configName, LongCodec.INSTANCE, RedisCommands.HGET, configName, "expectedInsertions"));
return check(result);
return get(getExpectedInsertionsAsync());
}

@Override
public RFuture<Long> getExpectedInsertionsAsync() {
return readSettingAsync(RedisCommands.EVAL_LONG, LongCodec.INSTANCE, "expectedInsertions");
}

private <T> RFuture<T> readSettingAsync(RedisCommand<T> evalCommandType, Codec codec, String settingName) {
return commandExecutor.evalReadAsync(configName, codec, evalCommandType,
"if redis.call('exists', KEYS[1]) == 0 then " +
"assert(false, 'Bloom filter is not initialized')" +
"end; " +

"return redis.call('hget', KEYS[1], ARGV[1]);",
Arrays.asList(configName),
settingName);
}

@Override
public double getFalseProbability() {
Double result = get(commandExecutor.readAsync(configName, DoubleCodec.INSTANCE, RedisCommands.HGET, configName, "falseProbability"));
return check(result);
return get(getFalseProbabilityAsync());
}

@Override
public RFuture<Double> getFalseProbabilityAsync() {
return readSettingAsync(RedisCommands.EVAL_DOUBLE, DoubleCodec.INSTANCE, "falseProbability");
}

@Override
public long getSize() {
Long result = get(commandExecutor.readAsync(configName, LongCodec.INSTANCE, RedisCommands.HGET, configName, "size"));
return check(result);
return get(getSizeAsync());
}

@Override
public RFuture<Long> getSizeAsync() {
return readSettingAsync(RedisCommands.EVAL_LONG, LongCodec.INSTANCE, "size");
}

@Override
public int getHashIterations() {
Integer result = get(commandExecutor.readAsync(configName, IntegerCodec.INSTANCE, RedisCommands.HGET, configName, "hashIterations"));
return check(result);
return get(getHashIterationsAsync());
}

@Override
public RFuture<Integer> getHashIterationsAsync() {
return readSettingAsync(RedisCommands.EVAL_INTEGER, LongCodec.INSTANCE, "hashIterations");
}

@Override
Expand Down Expand Up @@ -381,11 +434,4 @@ public RFuture<Boolean> renamenxAsync(String newName) {
return new CompletableFutureWrapper<>(f);
}

private <V> V check(V result) {
if (result == null) {
throw new IllegalStateException("Bloom filter is not initialized!");
}
return result;
}

}
18 changes: 18 additions & 0 deletions redisson/src/main/java/org/redisson/RedissonReactive.java
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,24 @@ public RBitSetReactive getBitSet(CommonOptions options) {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonBitSet(ca, params.getName()), RBitSetReactive.class);
}

@Override
public <V> RBloomFilterReactive<V> getBloomFilter(String name) {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonBloomFilter<>(commandExecutor, name), RBloomFilterReactive.class);
}

@Override
public <V> RBloomFilterReactive<V> getBloomFilter(String name, Codec codec) {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonBloomFilter<>(codec, commandExecutor, name), RBloomFilterReactive.class);
}

@Override
public <V> RBloomFilterReactive<V> getBloomFilter(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandReactiveExecutor ca = commandExecutor.copy(params);
return ReactiveProxyBuilder.create(commandExecutor,
new RedissonBloomFilter<V>(params.getCodec(), ca, params.getName()), RBloomFilterReactive.class);
}

@Override
public RFunctionReactive getFunction() {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonFuction(commandExecutor), RFunctionReactive.class);
Expand Down
Loading

0 comments on commit ce70bf3

Please sign in to comment.