7. Distributed collections

Rui Gu edited this page Jan 12, 2017 · 24 revisions

7.1. Map

Redisson distributed Map for Java implements java.util.concurrent.ConcurrentMap and java.util.Map interfaces. It keeps elements in insertion order. Map size limited by Redis to 4 294 967 295 elements.

RMap<String, SomeObject> map = redisson.getMap("anyMap");
SomeObject prevObject = map.put("123", new SomeObject());
SomeObject currentObject = map.putIfAbsent("323", new SomeObject());
SomeObject obj = map.remove("123");

// use fast* methods when previous value is not required
map.fastPut("a", new SomeObject());
map.fastPutIfAbsent("d", new SomeObject());
map.fastRemove("b");

Future<SomeObject> putAsyncFuture = map.putAsync("321");
Future<Void> fastPutAsyncFuture = map.fastPutAsync("321");

map.fastPutAsync("321", new SomeObject());
map.fastRemoveAsync("321");

Locking on map keys:

RMap<MyKey, MyValue> map = redisson.getMap("anyMap");
MyKey k = new MyKey();
RLock keyLock = map.getLock(k);
keyLock.lock();
try {
   MyValue v = map.get(k);
   // process value ...
} finally {
   keyLock.unlock();
}

7.1.1. Map eviction

Redisson distributed Map for Java with eviction support implemented by separate RMapCache object which extends RMap interface. It keeps elements in insertion order and implements java.util.concurrent.ConcurrentMap and java.util.Map interfaces. Redisson has a Spring Cache integration which based on Map and MapCache objects.

Current redis implementation doesn't has map entry eviction functionality. Therefore expired entries are cleaned by org.redisson.EvictionScheduler. It removes 100 expired entries at once. Task launch time tuned automatically and depends on expired entries amount deleted in previous time and varies between 1 second to 2 hours. Thus if clean task deletes 100 entries each time it will be executed every second (minimum execution delay). But if current expired entries amount is lower than previous one then execution delay will be increased by 1.5 times.

RMapCache<String, SomeObject> map = redisson.getMapCache("anyMap");
// ttl = 10 minutes, 
map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES);
// ttl = 10 minutes, maxIdleTime = 10 seconds
map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES, 10, TimeUnit.SECONDS);

// ttl = 3 seconds
map.putIfAbsent("key2", new SomeObject(), 3, TimeUnit.SECONDS);
// ttl = 40 seconds, maxIdleTime = 10 seconds
map.putIfAbsent("key2", new SomeObject(), 40, TimeUnit.SECONDS, 10, TimeUnit.SECONDS);

7.1.2. Map local cache

In case when a Map is used mostly for read operations and/or network roundtrips are undesirable. Redisson offers RLocalCachedMap object which caches Map entries on Redisson side. Follow options could be supplied during object creation:

LocalCachedMapOptions options = LocalCachedMapOptions.defaults()
      // LFU, LRU and NONE policies are available
     .evictionPolicy(EvictionPolicy.LFU)
     .cacheSize(1000)
      // if `true` then invalidation message which removes corresponding entry from cache
      // will be sent to all other RLocalCachedMap instances on each entry update/remove operation
     .invalidateEntryOnChange(false)
      // time to live for each map entry in cache
     .timeToLive(10000)
      // or
     .timeToLive(10, TimeUnit.SECONDS)
      // max idle time for each map entry in cache
     .maxIdle(10000)
      // or
     .maxIdle(10, TimeUnit.SECONDS);
RLocalCachedMap<String, Integer> map = redisson.getLocalCachedMap("test", options);

map.put("1", 1);
map.put("2", 2);

map.fastPut("3", 4);

Object should be destroyed if it not used anymore, but it's not necessary to call destroy method if Redisson goes shutdown.

RLocalCachedMap<String, Integer> map = ...
map.destroy();

7.1.3. Map data partitioning

Map data partitioning for Java available only in cluster mode and implemented by separate RClusteredMap object which extends RMap interface. Read more details about it here.

RClusteredMap<String, SomeObject> map = redisson.getClusteredMap("anyMap");

SomeObject prevObject = map.put("123", new SomeObject());
SomeObject currentObject = map.putIfAbsent("323", new SomeObject());
SomeObject obj = map.remove("123");

map.fastPut("321", new SomeObject());
map.fastRemove("321");

This feature available only in Redisson PRO edition.

7.2. Multimap

Redisson distributed Multimap for Java allows to bind multiple values per key.
Keys amount limited by Redis to 4 294 967 295 elements.

7.2.1. Set based Multimap

Set based Multimap doesn't allow duplications for values per key.

RSetMultimap<SimpleKey, SimpleValue> map = redisson.getSetMultimap("myMultimap");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("2"));
map.put(new SimpleKey("3"), new SimpleValue("4"));

Set<SimpleValue> allValues = map.get(new SimpleKey("0"));

List<SimpleValue> newValues = Arrays.asList(new SimpleValue("7"), new SimpleValue("6"), new SimpleValue("5"));
Set<SimpleValue> oldValues = map.replaceValues(new SimpleKey("0"), newValues);

Set<SimpleValue> removedValues = map.removeAll(new SimpleKey("0"));

7.2.2. List based Multimap

List based Multimap for Java stores insertion order and allows duplicates for values mapped to key.

RListMultimap<SimpleKey, SimpleValue> map = redisson.getListMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("2"));
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("3"), new SimpleValue("4"));

List<SimpleValue> allValues = map.get(new SimpleKey("0"));

Collection<SimpleValue> newValues = Arrays.asList(new SimpleValue("7"), new SimpleValue("6"), new SimpleValue("5"));
List<SimpleValue> oldValues = map.replaceValues(new SimpleKey("0"), newValues);

List<SimpleValue> removedValues = map.removeAll(new SimpleKey("0"));

7.2.3. Multimap eviction

Multimap object for Java with eviction support implemented by separated MultimapCache object. There are RSetMultimapCache and RListMultimapCache objects for Set and List Multimaps respectivly.

Expired entries are cleaned by org.redisson.EvictionScheduler. It removes 100 expired entries at once. Task launch time tuned automatically and depends on expired entries amount deleted in previous time and varies between 1 second to 2 hours. Thus if clean task deletes 100 entries each time it will be executed every second (minimum execution delay). But if current expired entries amount is lower than previous one then execution delay will be increased by 1.5 times.

RSetMultimapCache example:

RSetMultimapCache<String, String> multimap = redisson.getSetMultimapCache("myMultimap");
map.put("1", "a");
map.put("1", "b");
map.put("1", "c");

map.put("2", "e");
map.put("2", "f");

map.expireKey("2", 10, TimeUnit.MINUTES);

7.3. Set

Redisson distributed Set for Java implements java.util.Set interface. Keeps elements uniqueness via element state comparison. Set size limited by Redis to 4 294 967 295 elements.

RSet<SomeObject> set = redisson.getSet("anySet");
set.add(new SomeObject());
set.remove(new SomeObject());

Redisson PRO Set object also supports data partitioning (sharding) in cluster mode.

7.3.1. Set eviction

Redisson distributed Set for Java with eviction support implemented by separate RSetCache object which extends RSet interface. It also implements java.util.Set interface.

Current redis implementation doesn't has set value eviction functionality. Therefore expired values are cleaned by org.redisson.EvictionScheduler. It removes 100 expired values at once. Task launch time tuned automatically and depends on expired entries amount deleted in previous time and varies between 1 second to 2 hours. Thus if clean task deletes 100 values each time it will be executed every second (minimum execution delay). But if current expired values amount is lower than previous one then execution delay will be increased by 1.5 times.

RSetCache<SomeObject> set = redisson.getSetCache("anySet");
// ttl = 10 seconds
set.add(new SomeObject(), 10, TimeUnit.SECONDS);

7.3.2. Set data partitioning

Set data partitioning for Java available only in cluster mode and implemented by separate RClusteredSet object which extends RSet interface. Read more details about it here.

RClusteredSet<SomeObject> set = redisson.getClusteredSet("anySet");
set.add(new SomeObject());
set.remove(new SomeObject());

This feature available only in Redisson PRO edition.

7.4. SortedSet

Redisson distributed SortedSet object for Java implements java.util.SortedSet interface. Uses comparator to sort elements and keep uniqueness.

RSortedSet<Integer> set = redisson.getSortedSet("anySet");
set.trySetComparator(new MyComparator()); // set object comparator
set.add(3);
set.add(1);
set.add(2);

set.removeAsync(0);
set.addAsync(5);

7.5. ScoredSortedSet

Redisson distributed ScoredSortedSet object. Sorts elements by score defined during element insertion. Keeps elements uniqueness via element state comparison.

RScoredSortedSet<SomeObject> set = redisson.getScoredSortedSet("simple");

set.add(0.13, new SomeObject(a, b));
set.addAsync(0.251, new SomeObject(c, d));
set.add(0.302, new SomeObject(g, d));

set.pollFirst();
set.pollLast();

int index = set.rank(new SomeObject(g, d)); // get element index
Double score = set.getScore(new SomeObject(g, d)); // get element score

7.6. LexSortedSet

Redisson distributed Set object for Java allows String objects only with lexicographical ordering and implements java.util.Set<String> interface. Keeps elements uniqueness via element state comparison.

RLexSortedSet set = redisson.getLexSortedSet("simple");
set.add("d");
set.addAsync("e");
set.add("f");

set.lexRangeTail("d", false);
set.lexCountHead("e");
set.lexRange("d", true, "z", false);

7.7. List

Redisson distributed List object for Java implements java.util.List interface. Keeps elements in insertion order. List size limited by Redis to 4 294 967 295 elements.

RList<SomeObject> list = redisson.getList("anyList");
list.add(new SomeObject());
list.get(0);
list.remove(new SomeObject());

7.8. Queue

Redisson distributed unbounded Queue for Java implements java.util.Queue interface.
Queue size limited by Redis to 4 294 967 295 elements.

RQueue<SomeObject> queue = redisson.getQueue("anyQueue");
queue.add(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();

7.9. Deque

Redisson distributed unbounded Deque for Java implements java.util.Deque interface.
Deque size limited by Redis to 4 294 967 295 elements.

RDeque<SomeObject> queue = redisson.getDeque("anyDeque");
queue.addFirst(new SomeObject());
queue.addLast(new SomeObject());
SomeObject obj = queue.removeFirst();
SomeObject someObj = queue.removeLast();

7.10. Blocking Queue

Redisson distributed unbounded BlockingQueue for Java implements java.util.concurrent.BlockingQueue interface. BlockingQueue size limited by Redis to 4 294 967 295 elements.

RBlockingQueue<SomeObject> queue = redisson.getBlockingQueue("anyQueue");
queue.offer(new SomeObject());

SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

poll, pollFromAny, pollLastAndOfferFirstTo and take methods will be resubscribed automatically during reconnection to Redis server or Redis server failover.

7.11. Bounded Blocking Queue

Redisson distributed BoundedBlockingQueue for Java implements java.util.concurrent.BlockingQueue interface. BoundedBlockingQueue size limited by Redis to 4 294 967 295 elements. Queue capacity should be defined once before usage:

RBoundedBlockingQueue<SomeObject> queue = redisson.getBoundedBlockingQueue("anyQueue");
// returns `true` if capacity set successfully and `false` if it already set.
queue.trySetCapacity(2);

queue.offer(new SomeObject(1));
queue.offer(new SomeObject(2));
// will be blocked until free space available in queue
queue.put(new SomeObject());

SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

poll, pollFromAny, pollLastAndOfferFirstTo and take methods will be resubscribed automatically during reconnection to Redis server or Redis server failover.

7.12. Blocking Deque

Unbunded Redisson distributed BlockingDeque for Java implements java.util.concurrent.BlockingDeque interface. BlockingDeque size limited by Redis to 4 294 967 295 elements.

RBlockingDeque<Integer> deque = redisson.getBlockingDeque("anyDeque");
deque.putFirst(1);
deque.putLast(2);
Integer firstValue = queue.takeFirst();
Integer lastValue = queue.takeLast();
Integer firstValue = queue.pollFirst(10, TimeUnit.MINUTES);
Integer lastValue = queue.pollLast(3, TimeUnit.MINUTES);

poll, pollFromAny, pollLastAndOfferFirstTo and take methods will be resubscribed automatically during reconnection to Redis server or Redis server failover.

7.13. Blocking Fair Queue

When queue consumers in different parts of network: some of them closer to redis and some further. "further" consumers will get lower amount of messages from queue due to network delays. In turn "closer" consumers will get higher amount and this could lead to client overloading.

Blocking queue with fair polling and guarantees access order for poll and take methods and allows to get uniformly distributed consumption.

RBlockingFairQueue queue = redisson.getBlockingFairQueue("myQueue");
queue.offer(new SomeObject());

SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

7.14. Delayed Queue

Delayed queue allows to transfer each element to destination queue with specified delay. Could be useful for exponential backoff strategy used for message delivery to consumer. Destination queue could be any queue implements RQueue interface.

RQueue<String> distinationQueue = ...
RDelayedQueue<String> delayedQueue = getDelayedQueue(distinationQueue);
// move object to distinationQueue in 10 seconds
delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
// move object to distinationQueue in 1 minutes
delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);

Object should be destroyed if it not used anymore, but it's not necessary to call destroy method if Redisson goes shutdown.

RDelayedQueue<String> delayedQueue = ...
delayedQueue.destroy();