Skip to content

6. Distributed objects

Nikita Koksharov edited this page May 5, 2021 · 104 revisions
Clone this wiki locally

6.1. Object holder

Java implementation of Redis based RBucket object is a holder for any type of object. Size is limited to 512Mb.

Code example:

RBucket<AnyObject> bucket = redisson.getBucket("anyObject");

bucket.set(new AnyObject(1));
AnyObject obj = bucket.get();

bucket.trySet(new AnyObject(3));
bucket.compareAndSet(new AnyObject(4), new AnyObject(5));
bucket.getAndSet(new AnyObject(6));

Code example of Async interface usage:

RBucket<AnyObject> bucket = redisson.getBucket("anyObject");

RFuture<Void> future = bucket.setAsync(new AnyObject(1));
RFuture<AnyObject> objfuture = bucket.getAsync();

RFuture<Boolean> tsFuture = bucket.trySetAsync(new AnyObject(3));
RFuture<Boolean> csFuture = bucket.compareAndSetAsync(new AnyObject(4), new AnyObject(5));
RFuture<AnyObject> gsFuture = bucket.getAndSetAsync(new AnyObject(6));

Code example of Reactive interface usage:

RedissonReactiveClient redisson = redissonClient.reactive();
RBucketReactive<AnyObject> bucket = redisson.getBucket("anyObject");

Mono<Void> mono = bucket.set(new AnyObject(1));
Mono<AnyObject> objMono = bucket.get();

Mono<Boolean> tsMono = bucket.trySet(new AnyObject(3));
Mono<Boolean> csMono = bucket.compareAndSet(new AnyObject(4), new AnyObject(5));
Mono<AnyObject> gsMono = bucket.getAndSet(new AnyObject(6));

Code example of RxJava3 interface usage:

RedissonRxClient redisson = redissonClient.rxJava();
RBucketRx<AnyObject> bucket = redisson.getBucket("anyObject");

Completable rx = bucket.set(new AnyObject(1));
Maybe<AnyObject> objRx = bucket.get();

Single<Boolean> tsRx = bucket.trySet(new AnyObject(3));
Single<Boolean> csMono = bucket.compareAndSet(new AnyObject(4), new AnyObject(5));
Maybe<AnyObject> gsMono = bucket.getAndSet(new AnyObject(6));


Use RBuckets interface to execute operations over multiple RBucket objects:

Code example:

RBuckets buckets = redisson.getBuckets();

// get all bucket values
Map<String, V> loadedBuckets = buckets.get("myBucket1", "myBucket2", "myBucket3");

Map<String, Object> map = new HashMap<>();
map.put("myBucket1", new MyObject());
map.put("myBucket2", new MyObject());

// sets all or nothing if some bucket is already exists
buckets.trySet(map);
// store all at once
buckets.set(map);

Code example of Async interface usage:

RBuckets buckets = redisson.getBuckets();

// get all bucket values
RFuture<Map<String, V>> bucketsFuture = buckets.getAsync("myBucket1", "myBucket2", "myBucket3");

Map<String, Object> map = new HashMap<>();
map.put("myBucket1", new MyObject());
map.put("myBucket2", new MyObject());

// sets all or nothing if some bucket is already exists
RFuture<Boolean> tsFuture = buckets.trySetAsync(map);
// store all at once
RFuture<Void> sFuture = buckets.setAsync(map);

Code example of Reactive interface usage:

RedissonReactiveClient redisson = redissonClient.reactive();
RBucketsReactive buckets = redisson.getBuckets();

// get all bucket values
Mono<Map<String, V>> bucketsMono = buckets.getAsync("myBucket1", "myBucket2", "myBucket3");

Map<String, Object> map = new HashMap<>();
map.put("myBucket1", new MyObject());
map.put("myBucket2", new MyObject());

// sets all or nothing if some bucket is already exists
Mono<Boolean> tsMono = buckets.trySet(map);
// store all at once
Mono<Void> sMono = buckets.set(map);

Code example of RxJava interface usage:

RedissonRxClient redisson = redissonClient.rxJava();
RBucketsRx buckets = redisson.getBuckets();

// get all bucket values
Single<Map<String, V>> bucketsRx = buckets.get("myBucket1", "myBucket2", "myBucket3");

Map<String, Object> map = new HashMap<>();
map.put("myBucket1", new MyObject());
map.put("myBucket2", new MyObject());

// sets all or nothing if some bucket is already exists
Single<Boolean> tsRx = buckets.trySet(map);
// store all at once
Completable sRx = buckets.set(map);

6.2. Binary stream holder

Java implementation of Redis based RBinaryStream object holds sequence of bytes. It extends RBucket interface and size is limited to 512Mb.

Code example:

RBinaryStream stream = redisson.getBinaryStream("anyStream");

byte[] content = ...
stream.set(content);
stream.getAndSet(content);
stream.trySet(content);
stream.compareAndSet(oldContent, content);

Code example of Async interface usage:

RBinaryStream stream = redisson.getBinaryStream("anyStream");

byte[] content = ...
RFuture<Void> future = stream.set(content);
RFuture<byte[]> future = stream.getAndSet(content);
RFuture<Boolean> future = stream.trySet(content);
RFuture<Boolean> future = stream.compareAndSet(oldContent, content);

Code example of Reactive interface usage:

RedissonReactiveClient redisson = redissonClient.reactive();
RBinaryStreamReactive stream = redisson.getBinaryStream("anyStream");

ByteBuffer content = ...
Mono<Void> mono = stream.set(content);
Mono<byte[]> mono = stream.getAndSet(content);
Mono<Boolean> mono = stream.trySet(content);
Mono<Boolean> mono = stream.compareAndSet(oldContent, content);

Mono<Integer> mono = stream.write(content);
stream.position(0);
Mono<Integer> mono = stream.read(b);

Code example of RxJava3 interface usage:

RedissonRxClient redisson = redissonClient.rxJava();
RBinaryStreamRx stream = redisson.getBinaryStream("anyStream");

ByteBuffer content = ...
Completable rx = stream.set(content);
Maybe<byte[]> rx = stream.getAndSet(content);
Single<Boolean> rx = stream.trySet(content);
Single<Boolean> rx = stream.compareAndSet(oldContent, content);

Single<Integer> rx = stream.write(content);
stream.position(0);
Single<Integer> rx = stream.read(b);

Code example of java.io.InputStream and java.io.OutputStream interfaces usage:

RBinaryStream stream = redisson.getBinaryStream("anyStream");

InputStream is = stream.getInputStream();
byte[] readBuffer = ...
is.read(readBuffer);

OutputStream os = stream.getOuputStream();
byte[] contentToWrite = ...
os.write(contentToWrite);

Code example of java.nio.channels.SeekableByteChannel interface usage:

RBinaryStream stream = redisson.getBinaryStream("anyStream");

SeekableByteChannel sbc = stream.getChannel();
ByteBuffer readBuffer = ...
sbc.read(readBuffer);

sbc.position(0);

ByteBuffer contentToWrite = ...
sbc.write(contentToWrite);

sbc.truncate(234);

Code example of java.nio.channels.AsynchronousByteChannel interface usage:

RBinaryStream stream = redisson.getBinaryStream("anyStream");

AsynchronousByteChannel sbc = stream.getAsynchronousChannel();
ByteBuffer readBuffer = ...
sbc.read(readBuffer);

ByteBuffer contentToWrite = ...
sbc.write(contentToWrite);

6.3. Geospatial holder

Java implementation of Redis based RGeo object is a holder for geospatial items.

Code example:

RGeo<String> geo = redisson.getGeo("test");

geo.add(new GeoEntry(13.361389, 38.115556, "Palermo"), 
        new GeoEntry(15.087269, 37.502669, "Catania"));

Double distance = geo.dist("Palermo", "Catania", GeoUnit.METERS);
Map<String, GeoPosition> positions = geo.pos("test2", "Palermo", "test3", "Catania", "test1");

List<String> cities = geo.search(GeoSearchArgs.from(15, 37).radius(200, GeoUnit.KILOMETERS));
Map<String, GeoPosition> citiesWithPositions = geo.searchWithPosition(GeoSearchArgs.from(15, 37).radius(200, GeoUnit.KILOMETERS));

Code example of Async interface usage:

RGeo<String> geo = redisson.getGeo("test");

RFuture<Long> addFuture = geo.addAsync(new GeoEntry(13.361389, 38.115556, "Palermo"), 
        new GeoEntry(15.087269, 37.502669, "Catania"));

RFuture<Double> distanceFuture = geo.distAsync("Palermo", "Catania", GeoUnit.METERS);
RFuture<Map<String, GeoPosition>> positionsFuture = geo.posAsync("test2", "Palermo", "test3", "Catania", "test1");

RFuture<List<String>> citiesFuture = geo.searchAsync(GeoSearchArgs.from(15, 37).radius(200, GeoUnit.KILOMETERS));
RFuture<Map<String, GeoPosition>> citiesWithPositions = geo.searchWithPositionAsync(GeoSearchArgs.from(15, 37).radius(200, GeoUnit.KILOMETERS));

Code example of Reactive interface usage:

RedissonReactiveClient redisson = redissonClient.reactive();
RGeoReactive<String> bucket = redisson.getGeo("test");

Mono<Long> addFuture = geo.add(new GeoEntry(13.361389, 38.115556, "Palermo"), 
        new GeoEntry(15.087269, 37.502669, "Catania"));

Mono<Double> distanceFuture = geo.dist("Palermo", "Catania", GeoUnit.METERS);
Mono<Map<String, GeoPosition>> positionsFuture = geo.pos("test2", "Palermo", "test3", "Catania", "test1");

Mono<List<String>> citiesFuture = geo.search(GeoSearchArgs.from(15, 37).radius(200, GeoUnit.KILOMETERS));
Mono<Map<String, GeoPosition>> citiesWithPositions = geo.searchWithPosition(GeoSearchArgs.from(15, 37).radius(200, GeoUnit.KILOMETERS));

Code example of RxJava3 interface usage:

RedissonRxClient redisson = redissonClient.rxJava();
RGeoRx<String> bucket = redisson.getGeo("test");

Single<Long> addFuture = geo.add(new GeoEntry(13.361389, 38.115556, "Palermo"), 
        new GeoEntry(15.087269, 37.502669, "Catania"));

Single<Double> distanceFuture = geo.dist("Palermo", "Catania", GeoUnit.METERS);
Single<Map<String, GeoPosition>> positionsFuture = geo.pos("test2", "Palermo", "test3", "Catania", "test1");

Single<List<String>> citiesFuture = geo.search(GeoSearchArgs.from(15, 37).radius(200, GeoUnit.KILOMETERS));
Single<Map<String, GeoPosition>> citiesWithPositions = geo.searchWithPosition(GeoSearchArgs.from(15, 37).radius(200, GeoUnit.KILOMETERS));

6.4. BitSet

Java implementation of Redis based RBitSet object provides API similar to java.util.BitSet. It represents vector of bits that grows as needed. Size limited by Redis to 4 294 967 295 bits.

Code example:

RBitSet set = redisson.getBitSet("simpleBitset");

set.set(0, true);
set.set(1812, false);

set.clear(0);

set.and("anotherBitset");
set.xor("anotherBitset");

Code example of Async interface usage:

RBitSetAsync set = redisson.getBitSet("simpleBitset");

RFuture<Boolean> setFuture = set.setAsync(0, true);
RFuture<Boolean> setFuture = set.setAsync(1812, false);

RFuture<Void> clearFuture = set.clearAsync(0);

RFuture<Void> andFuture = set.andAsync("anotherBitset);
RFuture<Void> xorFuture = set.xorAsync("anotherBitset");

Code example of Reactive interface usage:

RedissonReactiveClient redisson = redissonClient.reactive();
RBitSetReactive stream = redisson.getBitSet("simpleBitset");

Mono<Boolean> setMono = set.set(0, true);
Mono<Boolean> setMono = set.set(1812, false);

Mono<Void> clearMono = set.clear(0);

Mono<Void> andMono = set.and("anotherBitset);
Mono<Void> xorMono = set.xor("anotherBitset");

Code example of RxJava3 interface usage:

RedissonRxClient redisson = redissonClient.rxJava();
RBitSetRx stream = redisson.getBitSet("simpleBitset");

Single<Boolean> setRx = set.set(0, true);
Single<Boolean> setRx = set.set(1812, false);

Completable clearRx = set.clear(0);

Completable andRx = set.and("anotherBitset);
Completable xorRx = set.xor("anotherBitset");

6.4.1. BitSet data partitioning (distributed roaring bitmap)

This feature available only in Redisson PRO edition.

Redis based distributed BitSet data partitioning for Java available only in cluster mode and implemented by separate RClusteredBitSet object. It uses distributed implementation of roaring bitmap structure. Size is limited by whole Redis Cluster memory. More details about partitioning here.

Code example:

RClusteredBitSet set = redisson.getClusteredBitSet("simpleBitset");
set.set(0, true);
set.set(1812, false);
set.clear(0);
set.addAsync("e");
set.xor("anotherBitset");

6.5. AtomicLong

Java implementation of Redis based AtomicLong object provides API similar to java.util.concurrent.atomic.AtomicLong object.

Code example:

RAtomicLong atomicLong = redisson.getAtomicLong("myAtomicLong");
atomicLong.set(3);
atomicLong.incrementAndGet();
atomicLong.get();

Code example of Async interface usage:

RAtomicLongAsync atomicLong = redisson.getAtomicLong("myAtomicLong");

RFuture<Void> setFuture = atomicLong.setAsync(3);
RFuture<Long> igFuture = atomicLong.incrementAndGetAsync();
RFuture<Long> getFuture = atomicLong.getAsync();

Code example of Reactive interface usage:

RedissonReactiveClient redisson = redissonClient.reactive();
RAtomicLongReactive atomicLong = redisson.getAtomicLong("myAtomicLong");

Mono<Void> setMono = atomicLong.set(3);
Mono<Long> igMono = atomicLong.incrementAndGet();
RFuture<Long> getMono = atomicLong.getAsync();

Code example of RxJava3 interface usage:

RedissonRxClient redisson = redissonClient.rxJava();
RAtomicLongRx atomicLong = redisson.getAtomicLong("myAtomicLong");

Completable setMono = atomicLong.set(3);
Single<Long> igMono = atomicLong.incrementAndGet();
Single<Long> getMono = atomicLong.getAsync();

6.6. AtomicDouble

Java implementation of Redis based AtomicDouble object.

Code example:

RAtomicDouble atomicDouble = redisson.getAtomicDouble("myAtomicDouble");
atomicDouble.set(2.81);
atomicDouble.addAndGet(4.11);
atomicDouble.get();

Code example of Async interface usage:

RAtomicDoubleAsync atomicDouble = redisson.getAtomicDouble("myAtomicDouble");

RFuture<Void> setFuture = atomicDouble.setAsync(2.81);
RFuture<Double> agFuture = atomicDouble.addAndGetAsync(4.11);
RFuture<Double> getFuture = atomicDouble.getAsync();

Code example of Reactive interface usage:

RedissonReactiveClient redisson = redissonClient.reactive();
RAtomicDoubleReactive atomicDouble = redisson.getAtomicDouble("myAtomicDouble");

Mono<Void> setMono = atomicDouble.set(2.81);
Mono<Double> agMono = atomicDouble.addAndGet(4.11);
Mono<Double> getMono = atomicDouble.get();

Code example of RxJava3 interface usage:

RedissonRxClient redisson = redissonClient.rxJava();
RAtomicDoubleRx atomicDouble = redisson.getAtomicDouble("myAtomicDouble");

Completable setMono = atomicDouble.set(2.81);
Single<Double> igMono = atomicDouble.addAndGet(4.11);
Single<Double> getMono = atomicDouble.get();

6.7. Topic

Java implementation of Redis based RTopic object implements Publish / Subscribe mechanism. It allows to subscribe on events published with multiple instances of RTopic object with the same name.

Listeners are re-subscribed automatically after reconnection to Redis or Redis failover. All messages sent during absence of connection to Redis are lost. Use Reliable Topic for reliable delivery.

Code example:

RTopic topic = redisson.getTopic("myTopic");
int listenerId = topic.addListener(SomeObject.class, new MessageListener<SomeObject>() {
    @Override
    public void onMessage(String channel, SomeObject message) {
        //...
    }
});

// in other thread or JVM
RTopic topic = redisson.getTopic("myTopic");
long clientsReceivedMessage = topic.publish(new SomeObject());

Code example of Async interface usage:

RTopicAsync topic = redisson.getTopic("myTopic");
RFuture<Integer> listenerFuture = topic.addListenerAsync(SomeObject.class, new MessageListener<SomeObject>() {
    @Override
    public void onMessage(String channel, SomeObject message) {
        //...
    }
});

// in other thread or JVM
RTopicAsync topic = redisson.getTopic("myTopic");
RFuture<Long> publishFuture = topic.publishAsync(new SomeObject());

Code example of Reactive interface usage:

RedissonReactiveClient redisson = redissonClient.reactive();
RTopicReactive topic = redisson.getTopic("myTopic");
Mono<Integer> listenerMono = topic.addListener(SomeObject.class, new MessageListener<SomeObject>() {
    @Override
    public void onMessage(String channel, SomeObject message) {
        //...
    }
});

// in other thread or JVM
RTopicReactive topic = redisson.getTopic("myTopic");
Mono<Long> publishMono = topic.publish(new SomeObject());

Code example of RxJava3 interface usage:

RedissonRxClient redisson = redissonClient.rxJava();
RTopicRx topic = redisson.getTopic("myTopic");
Single<Integer> listenerMono = topic.addListener(SomeObject.class, new MessageListener<SomeObject>() {
    @Override
    public void onMessage(String channel, SomeObject message) {
        //...
    }
});

// in other thread or JVM
RTopicRx topic = redisson.getTopic("myTopic");
Single<Long> publishMono = topic.publish(new SomeObject());

6.7.1. Topic pattern

Java implementation of Redis based RPatternTopic object. It allows to subscribe to multiple topics by specified glob-style pattern.

Listeners are re-subscribed automatically after reconnection to Redis or Redis failover.

Pattern examples:

  • topic? subscribes to topic1, topicA ...
  • topic?_my subscribes to topic_my, topic123_my, topicTEST_my ...
  • topic[ae] subscribes to topica and topice only

Code example:

// subscribe to all topics by `topic*` pattern
RPatternTopic patternTopic = redisson.getPatternTopic("topic*");
int listenerId = patternTopic.addListener(Message.class, new PatternMessageListener<Message>() {
    @Override
    public void onMessage(String pattern, String channel, Message msg) {
        //...
    }
});

Code example of Async interface usage:

RPatternTopicAsync patternTopic = redisson.getPatternTopic("topic*");
RFuture<Integer> listenerFuture = patternTopic.addListenerAsync(Message.class, new PatternMessageListener<Message>() {
    @Override
    public void onMessage(String pattern, String channel, Message msg) {
        //...
    }
});

Code example of Reactive interface usage:

RedissonReactiveClient redisson = redissonClient.reactive();
RTopicReactive patternTopic = redisson.getPatternTopic("topic*");
Mono<Integer> listenerMono = patternTopic.addListener(Message.class, new PatternMessageListener<Message>() {
    @Override
    public void onMessage(String pattern, String channel, Message msg) {
        //...
    }
});

Code example of RxJava3 interface usage:

RedissonRxClient redisson = redissonClient.rxJava();
RTopicRx patternTopic = redisson.getPatternTopic("topic*");
Single<Integer> listenerSingle = patternTopic.addListener(Message.class, new PatternMessageListener<Message>() {
    @Override
    public void onMessage(String pattern, String channel, Message msg) {
        //...
    }
});

6.8. Bloom filter

Redis based distributed RBloomFilter bloom filter for Java. Number of contained bits is limited to 2^32.

Must be initialized with capacity size by tryInit(expectedInsertions, falseProbability) method before usage.

RBloomFilter<SomeObject> bloomFilter = redisson.getBloomFilter("sample");
// initialize bloom filter with 
// expectedInsertions = 55000000
// falseProbability = 0.03
bloomFilter.tryInit(55000000L, 0.03);

bloomFilter.add(new SomeObject("field1Value", "field2Value"));
bloomFilter.add(new SomeObject("field5Value", "field8Value"));

bloomFilter.contains(new SomeObject("field1Value", "field8Value"));
bloomFilter.count();

6.8.1. Bloom filter data partitioning

This feature available only in Redisson PRO edition.

Redis based distributed Bloom filter for Java with data partitioning support available only in cluster mode and implemented by separate RClusteredBloomFilter object. This implementation uses more efficient distributed Redis memory allocation algorithm. It allows to "shrink" memory space consumed by unused bits across all Redis nodes. State of each instance is partitioned across all nodes in Redis cluster. Number of contained bits is limited to 2^64. More details about partitioning here.

RClusteredBloomFilter<SomeObject> bloomFilter = redisson.getClusteredBloomFilter("sample");
// initialize bloom filter with 
// expectedInsertions = 255000000
// falseProbability = 0.03
bloomFilter.tryInit(255000000L, 0.03);
bloomFilter.add(new SomeObject("field1Value", "field2Value"));
bloomFilter.add(new SomeObject("field5Value", "field8Value"));
bloomFilter.contains(new SomeObject("field1Value", "field8Value"));

6.9. HyperLogLog

Redis based distributed RHyperLogLog object for Java. Probabilistic data structure that lets you maintain counts of millions of items with extreme space efficiency.

It has Async, Reactive and RxJava3 interfaces.

RHyperLogLog<Integer> log = redisson.getHyperLogLog("log");
log.add(1);
log.add(2);
log.add(3);

log.count();

6.10. LongAdder

Java implementation of Redis based LongAdder object provides API similar to java.util.concurrent.atomic.LongAdder object.

It maintains internal LongAdder object on client side and provides superior performance for both increment and decrement operations. Up to 12000x faster than similar AtomicLong object. Suitable for distributed metric objects.

Code example:

RLongAdder atomicLong = redisson.getLongAdder("myLongAdder");
atomicLong.add(12);
atomicLong.increment();
atomicLong.decrement();
atomicLong.sum();

Object should be destroyed if it's not used anymore, but it's not necessary to call destroy method if Redisson goes shutdown.

RLongAdder atomicLong = ...
atomicLong.destroy();

6.11. DoubleAdder

Java implementation of Redis based DoubleAdder object provides API similar to java.util.concurrent.atomic.DoubleAdder object.

It maintains internal DoubleAdder object on client side and provides superior performance for both increment and decrement operations. Up to 12000x faster than similar AtomicDouble object. Suitable for distributed metric objects.

Code example:

RLongDouble atomicDouble = redisson.getLongDouble("myLongDouble");
atomicDouble.add(12);
atomicDouble.increment();
atomicDouble.decrement();
atomicDouble.sum();

Object should be destroyed if it's not used anymore, but it's not necessary to call destroy method if Redisson goes shutdown.

RLongDouble atomicDouble = ...
atomicDouble.destroy();

6.12. RateLimiter

Redis based distributed RateLimiter object for Java restricts the total rate of calls either from all threads regardless of Redisson instance or from all threads working with the same Redisson instance. Doesn't guarantee fairness.

Code example:

RRateLimiter limiter = redisson.getRateLimiter("myLimiter");
// Initialization required only once.
// 5 permits per 2 seconds
limiter.trySetRate(RateType.OVERALL, 5, 2, RateIntervalUnit.SECONDS);

// acquire 3 permits or block until they became available       
limiter.acquire(3);

Code example of Async interface usage:

RRateLimiter limiter = redisson.getRateLimiter("myLimiter");
// Initialization required only once.
// 5 permits per 2 seconds
RFuture<Boolean> setRateFuture = limiter.trySetRate(RateType.OVERALL, 5, 2, RateIntervalUnit.SECONDS);

// acquire 3 permits or block until they became available       
RFuture<Void> aquireFuture = limiter.acquire(3);

Code example of Reactive interface usage:

RedissonReactiveClient redisson = redissonClient.reactive();
RRateLimiterReactive limiter = redisson.getRateLimiter("myLimiter");
// Initialization required only once.
// 5 permits per 2 seconds
Mono<Boolean> setRateMono = limiter.trySetRate(RateType.OVERALL, 5, 2, RateIntervalUnit.SECONDS);

// acquire 3 permits or block until they became available       
Mono<Void> aquireMono = limiter.acquire(3);

Code example of RxJava3 interface usage:

RedissonRxClient redisson = redissonClient.rxJava();
RRateLimiterRx limiter = redisson.getRateLimiter("anyObject");

// Initialization required only once.
// 5 permits per 2 seconds
Single<Boolean> setRateRx = limiter.trySetRate(RateType.OVERALL, 5, 2, RateIntervalUnit.SECONDS);

// acquire 3 permits or block until they became available       
Completable aquireRx = limiter.acquire(3);

6.13. Reliable Topic

Java implementation of Redis based RReliableTopic object implements Publish / Subscribe mechanism with reliable delivery of messages. In case of Redis connection interruption all missed messages are delivered after reconnection to Redis. Message considered as delivered when it was received by Redisson and submited for processing by topic listeners.

Each RReliableTopic object instance (subscriber) has own watchdog which is started when the first listener was registered. Subscriber expires after org.redisson.config.Config#reliableTopicWatchdogTimeout timeout if watchdog didn't extend it to the next timeout time interval. This prevents against infinity grow of stored messages in topic due to Redisson client crash or any other reason when subscriber unable to consume messages.

Topic listeners are resubscribed automatically after reconnection to Redis or Redis failover.

Code example:

RReliableTopic topic = redisson.getReliableTopic("anyTopic");
topic.addListener(SomeObject.class, new MessageListener<SomeObject>() {
    @Override
    public void onMessage(CharSequence channel, SomeObject message) {
        //...
    }
});

// in other thread or JVM
RReliableTopic topic = redisson.getReliableTopic("anyTopic");
long subscribersReceivedMessage = topic.publish(new SomeObject());

Code example of Async interface usage:

RReliableTopicAsync topic = redisson.getReliableTopic("anyTopic");
RFuture<String> listenerFuture = topic.addListenerAsync(SomeObject.class, new MessageListener<SomeObject>() {
    @Override
    public void onMessage(CharSequence channel, SomeObject message) {
        //...
    }
});

// in other thread or JVM
RReliableTopicAsync topic = redisson.getReliableTopic("anyTopic");
RFuture<Long> future = topic.publishAsync(new SomeObject());

Code example of Reactive interface usage:

RedissonReactiveClient redisson = redissonClient.reactive();

RReliableTopicReactive topic = redisson.getReliableTopic("anyTopic");
Mono<String> listenerMono = topic.addListener(SomeObject.class, new MessageListener<SomeObject>() {
    @Override
    public void onMessage(CharSequence channel, SomeObject message) {
        //...
    }
});

// in other thread or JVM
RReliableTopicReactive topic = redisson.getReliableTopic("anyTopic");
Mono<Long> publishMono = topic.publish(new SomeObject());

Code example of RxJava3 interface usage:

RedissonRxClient redisson = redissonClient.rxJava();

RReliableTopicRx topic = redisson.getReliableTopic("anyTopic");
Single<String> listenerRx = topic.addListener(SomeObject.class, new MessageListener<SomeObject>() {
    @Override
    public void onMessage(CharSequence channel, SomeObject message) {
        //...
    }
});

// in other thread or JVM
RReliableTopicRx topic = redisson.getReliableTopic("anyTopic");
Single<Long> publisRx = topic.publish(new SomeObject());

6.14. Id generator

Redis based Java Id generator RReliableTopic generates unique numbers but not monotonically increased. At first request batch of id numbers is allocated and cached on Java side till it's exhausted. This approach allows to generate ids faster than RAtomicLong.

Default allocation size is 5000. Default start value is 0.

Code example:

RIdGenerator generator = redisson.getIdGenerator("generator");

// Initialize with start value = 12 and allocation size = 20000
generator.tryInit(12, 20000);

long id = generator.nextId();

Code example of Async interface usage:

RIdGenerator generator = redisson.getIdGenerator("generator");

// Initialize with start value = 12 and allocation size = 20000
RFuture<Boolean> initFuture = generator.tryInitAsync(12, 20000);

RFuture<Long> idFuture = generator.nextIdAsync();

Code example of Reactive interface usage:

RedissonReactiveClient redisson = redissonClient.reactive();
RIdGenerator generator = redisson.getIdGenerator("generator");

// Initialize with start value = 12 and allocation size = 20000
Mono<Boolean> initMono = generator.tryInit(12, 20000);

Mono<Long> idMono = generator.nextId();

Code example of RxJava3 interface usage:

RedissonRxClient redisson = redissonClient.rxJava();
RIdGenerator generator = redisson.getIdGenerator("generator");

// Initialize with start value = 12 and allocation size = 20000
Single<Boolean> initRx = generator.tryInit(12, 20000);

Single<Long> idRx = generator.nextId();