Skip to content

9. Distributed services

Nikita Koksharov edited this page May 18, 2019 · 78 revisions

9.1. Remote service

Redisson provides Java Remote Services to execute remote procedure call using Redis. Remote interface could have any type of method parameters and result object. Redis is used to store method request and corresponding execution result.

The RemoteService provides two types of RRemoteService instances:

  • Server side instance - executes remote method (worker instance). Example:
RRemoteService remoteService = redisson.getRemoteService();
SomeServiceImpl someServiceImpl = new SomeServiceImpl();

// register remote service before any remote invocation
// can handle only 1 invocation concurrently
remoteService.register(SomeServiceInterface.class, someServiceImpl);

// register remote service able to handle up to 12 invocations concurrently
remoteService.register(SomeServiceInterface.class, someServiceImpl, 12);
  • Client side instance - invokes remote method. Example:
RRemoteService remoteService = redisson.getRemoteService();
SomeServiceInterface service = remoteService.get(SomeServiceInterface.class);

String result = service.doSomeStuff(1L, "secondParam", new AnyParam());

Client and server side instances shall be using the same remote interface and backed by redisson instances created using the same server connection configuration. Client and server side instances could be run in same JVM. There are no limits to the amount of client and/or server instances. (Note: While redisson does not enforce any limits, limitations from Redis still apply.)

Remote invocations executes in parallel mode if 1+ workers are available.

The total number of parallel executors is calculated as such: T = R * N

T - total available parallel executors R - Redisson server side instance amount N - executors amount defined during service registration

Commands exceeding this number will be queued for the next available executor.

Remote invocations executes in sequential mode if only 1 workers are available. Only one command can be handled concurrently in this case and the rest of commands will be queued.

9.1.1. Remote service. Message flow

RemoteService creates two queues per invocation. One queue for request (being listened by server side instance) and another one is for ack-response and result-response (being listened by client side instance). Ack-response used to determine if method executor has got a request. If it doesn't during ack timeout then RemoteServiceAckTimeoutException will be thrown.

Below is depicted a message flow for each remote invocation.

9.1.2. Remote service. Fire-and-forget and ack-response modes

RemoteService offers some options for each remote invocation via org.redisson.core.RemoteInvocationOptions object. Such options allow to change timeouts and skip ack-response and/or result-response. Examples:

// 1 second ack timeout and 30 seconds execution timeout
RemoteInvocationOptions options = RemoteInvocationOptions.defaults();

// no ack but 30 seconds execution timeout
RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noAck();

// 1 second ack timeout then forget the result
RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noResult();

// 1 minute ack timeout then forget about the result
RemoteInvocationOptions options = RemoteInvocationOptions.defaults().expectAckWithin(1, TimeUnit.MINUTES).noResult();

// no ack and forget about the result (fire and forget)
RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noAck().noResult();

RRemoteService remoteService = redisson.getRemoteService();
YourService service = remoteService.get(YourService.class, options);

9.1.3. Remote service. Asynchronous, Reactive and RxJava2 calls

Remote method could be executed using Async, Reactive and RxJava2 Api.

Asynchronous Remote interface. Interface should be annotated with @RRemoteAsync. Method signatures matches with the methods in remote interface and return org.redisson.api.RFuture object.

Reactive Remote interface. Interface should be annotated with @RRemoteReactive. Method signatures matches with the methods in remote interface and return reactor.core.publisher.Mono object.

RxJava2 Remote interface. Interface should be annotated with @RRemoteRx. Method signatures matches with the methods in remote interface and return one of the following object: io.reactivex.Completable, io.reactivex.Single, io.reactivex.Maybe.

It's not necessary to list all methods, only those which are needed. Below is an example of Remote Service interface:

public interface RemoteInterface {

    Long someMethod1(Long param1, String param2);

    void someMethod2(MyObject param);

    MyObject someMethod3();

}

Asynchronous Remote interface and method call example:

@RRemoteAsync(RemoteInterface.class)
public interface RemoteInterfaceAsync {

    RFuture<Long> someMethod1(Long param1, String param2);

    RFuture<Void> someMethod2(MyObject param);

}

RedissonClient redisson = Redisson.create(config);
RRemoteService remoteService = redisson.getRemoteService();
RemoteInterfaceAsync asyncService = remoteService.get(RemoteInterfaceAsync.class);

asyncService.someMethod1(1L, "myparam");

Reactive Remote interface and method call example:

@RRemoteReactive(RemoteInterface.class)
public interface RemoteInterfaceReactive {

    Mono<Long> someMethod1(Long param1, String param2);

    Mono<Void> someMethod2(MyObject param);

}

RedissonReactiveClient redisson = Redisson.createReactive(config);
RRemoteService remoteService = redisson.getRemoteService();
RemoteInterfaceReactive asyncService = remoteService.get(RemoteInterfaceReactive.class);

asyncService.someMethod1(1L, "myparam");

RxJava2 Remote interface and method call example:

@RRemoteRx(RemoteInterface.class)
public interface RemoteInterfaceRx {

    Single<Long> someMethod1(Long param1, String param2);

    Completable someMethod2(MyObject param);

}

RedissonRxClient redisson = Redisson.createRx(config);
RRemoteService remoteService = redisson.getRemoteService();
RemoteInterfaceReactive asyncService = remoteService.get(RemoteInterfaceRx.class);

asyncService.someMethod1(1L, "myparam");

9.1.4. Remote service. Asynchronous, Reactive and RxJava2 call cancellation

Remote service provides ability to cancel invocation in any stages of its execution. There are three stages:

  1. Remote invocation request in queue
  2. Remote invocation request received by remote service but not lunched and Ack-response hasn't send yet
  3. Remote invocation execution in progress

To handle third stage you need to check for Thread.currentThread().isInterrupted() status in your Remote service code. Here is an example:

public interface MyRemoteInterface {

    Long myBusyMethod(Long param1, String param2);

}

@RRemoteAsync(MyRemoteInterface.class)
public interface MyRemoteInterfaceAsync {

    RFuture<Long> myBusyMethod(Long param1, String param2);

}

@RRemoteReactive(MyRemoteInterface.class)
public interface MyRemoteInterfaceReactive {

    Mono<Long> myBusyMethod(Long param1, String param2);

}

@RRemoteRx(MyRemoteInterface.class)
public interface MyRemoteInterfaceRx {

    Single<Long> myBusyMethod(Long param1, String param2);

}


// remote service implementation
public class MyRemoteServiceImpl implements MyRemoteInterface {

   public Long myBusyMethod(Long param1, String param2) {
       for (long i = 0; i < Long.MAX_VALUE; i++) {
           iterations.incrementAndGet();
           if (Thread.currentThread().isInterrupted()) {
                System.out.println("interrupted! " + i);
                return;
           }
       }
   }

}

RRemoteService remoteService = redisson.getRemoteService();
ExecutorService executor = Executors.newFixedThreadPool(5);
// register remote service using separate
// ExecutorService used to execute remote invocation
MyRemoteInterface serviceImpl = new MyRemoteServiceImpl();
remoteService.register(MyRemoteInterface.class, serviceImpl, 5, executor);

// call Asynchronous method
MyRemoteInterfaceAsync asyncService = remoteService.get(MyRemoteInterfaceAsync.class);
RFuture<Long> future = asyncService.myBusyMethod(1L, "someparam");
// cancel invocation
future.cancel(true);

// call Reactive method
MyRemoteInterfaceReactive reactiveService = remoteService.get(MyRemoteInterfaceReactive.class);
Mono<Long> mono = reactiveService.myBusyMethod(1L, "someparam");
Disposable disp = mono.doOnSubscribe(s -> s.request(1)).subscribe();
// cancel invocation
disp.dispose();

// call RxJava2 method
MyRemoteInterfaceRx asyncService = remoteService.get(MyRemoteInterfaceRx.class);
Single<Long> single = asyncService.myBusyMethod(1L, "someparam");
Disposable disp = single.subscribe();
// cancel invocation
disp.dispose();

9.2. Live Object service

9.2.1. Introduction

A Live Object can be understood as an enhanced version of standard Java object, of which an instance reference can be shared not only between threads in a single JVM, but can also be shared between different JVMs across different machines.Wikipedia discribes it as:

Live distributed object (also abbreviated as live object) refers to a running instance of a distributed multi-party (or peer-to-peer) protocol, viewed from the object-oriented perspective, as an entity that has a distinct identity, may encapsulate internal state and threads of execution, and that exhibits a well-defined externally visible behavior.

Redisson Live Object (RLO) realised this idea by mapping all the fields inside a Java class to a redis hash through a runtime-constructed proxy class. All the get/set methods of each field are translated to hget/hset commands operated on the redis hash, making it accessable to/from any clients connected to the same redis server. As we all know, the field values of an object represent its state; having them stored in a remote repository, redis, makes it a distributed object. This object is a Redisson Live Object.

By using RLO, sharing an object between applications and/or servers is the same as sharing one in a standalone application. This removes the need for serialization and deserialization, and at the same time reduces the complexity of the programming model: Changes made to one field is (almost^) immediately accessable to other processes, applications and servers. (^Redis' eventual consistant replication rule still applies when connected to slave nodes)

Since the redis server is a single-threaded application, all field access to the live object is automatically executed in atomic fashion: a value will not be changed when you are reading it.

With RLO, you can treat the redis server as a shared Heap space for all connected JVMs.

9.2.2. Usage

Redisson provides different annotations for Live Object. @RId and @REntity annotation are required to create and use Live Object.

@REntity
public class MyObject {

    @RId
    private String id;
    @RIndex
    private String value;
    private MyObject parent;

    public MyObject(String id) {
        this.id = id;
    }

    public MyObject() {
    }

    // getters and setters

}

To start use it you should attach, merge or persist instance of this Live Object.

RLiveObjectService service = redisson.getLiveObjectService();
MyLiveObject myObject = new MyLiveObject();
myObject1.setId("1");
// current state of myObject is now persisted and attached to Redis
myObject = service.persist(myObject);

MyLiveObject myObject = new MyLiveObject("1");
// current state of myObject is now cleared and attached to Redis
myObject = service.attach(myObject);

MyLiveObject myObject = new MyLiveObject();
myObject.setId("1");
// current state of myObject is now merged to already existed object and attached to Redis
myObject = service.merge(myObject);
myObject.setValue("somevalue");

// get Live Object by Id
MyLiveObject myObject = service.get(MyLiveObject.class, "1");

// find Live Objects by value field
Collection<MyLiveObject> myObjects = service.find(MyLiveObject.class, Conditions.in("value", "somevalue", "somevalue2"));

Collection<MyLiveObject> myObjects = service.find(MyLiveObject.class, Conditions.and(Conditions.in("value", "somevalue", "somevalue2"), Conditions.eq("secondfield", "test")));

"parent" field has a link to another instances of Live Object of the same type, but the type could be different. Redisson stores this link into Redis as a reference object and not the whole object state, so you continue to work with reference object as Live Object.

//Redisson Live Object behavior:
MyObject myObject = service.get(MyObject.class, "1");
MyObject myParentObject = service.get(MyObject.class, "2");
myObject.setValue(myParentObject);

Field types in the RLO can be almost anything, from Java util classes to collection/map types and of course your own custom objects, as long as it can be encoded and decoded by a supplied codec. More details about the codec can be found in the Advanced Usage section.

As much as I like to say it's free with no limits, there are still some restrictions on the choices of field types you can have. The field annotated with RId can not be an array type, i.e. int[], long[], double[], byte[], etc. More details and explainations can be found in Restrictions section

In order to keep RLOs behaving as closely to standard Java objects as possible, Redisson automatically converts the following standard Java field types to its counter types supported by Redisson RObject.

Standard Java Class Converted Redisson Class
SortedSet.class RedissonSortedSet.class
Set.class RedissonSet.class
ConcurrentMap.class RedissonMap.class
Map.class RedissonMap.class
BlockingDeque.class RedissonBlockingDeque.class
Deque.class RedissonDeque.class
BlockingQueue.class RedissonBlockingQueue.class
Queue.class RedissonQueue.class
List.class RedissonList.class

The conversion prefers the one nearer to the top of the table if a field type matches more than one entries. i.e. LinkedList implements Deque, List, Queue, it will be converted to a RedissonDeque because of this.

Instances of these Redisson classes retains their states/values/entries in Redis too, changes to them are directly reflected into Redis without keeping values in local VM.

9.2.3. Advanced Usage

As described before, RLO classes are proxy classes which can be fabricated when needed and then get cached in a RedissonClient instance against its original class. This process can be a bit slow and it is recommended to pre-register all the Redisson Live Object classes via RedissonLiveObjectService for any kind of delay-sensitive applications. The service can also be used to unregister a class if it is no longer needed. And of course it can be used to check if the class has already been registered.

RLiveObjectService service = redisson.getLiveObjectService();
service.registerClass(MyClass.class);
service.unregisterClass(MyClass.class);
Boolean registered = service.isClassRegistered(MyClass.class);

9.2.4. Annotations

@REntity

Applied to a class. The behaviour of each type of RLO can be customised through properties of the @REntity annotation. You can specify each of those properties to gain fine control over its behaviour.

  • namingScheme - You can specify a naming scheme which tells Redisson how to assign key names for each instance of this class. It is used to create a reference to an existing Redisson Live Object and materialising a new one in redis. It defaults to use Redisson provided DefaultNamingScheme.
  • codec - You can tell Redisson which Codec class you want to use for the RLO. Redisson will use an instance pool to locate the instance based on the class type. It defaults to JsonJacksonCodec provided by Redisson.
  • fieldTransformation - You can also specify a field transformation mode for the RLO. As mentioned before, in order to keep everything as close to standard Java as possible, Redisson will automatically transform fields with commonly-used Java util classes to Redisson compatible classes. This uses ANNOTATION_BASED as the default value. You can set it to IMPLEMENTATION_BASED which will skip the transformation.

@RId

Applied to a field. Defines primary key field of this class. The value of this field is used to create a reference to existing RLO. The field with this annotation is the only field that has its value also kept in the local VM. You can only have one RId annotation per class.

You can supply a generator strategy to the @RId annotation if you want the value of this field to be programatically generated. The default generator is null.

@RIndex

Applied to a field. Specifies that the field is used in search index. Allows to execute search query based on that field through RLiveObjectService.find method. Search method accepts different type of conditions: IN, OR, AND, EQ in any combination. Usage example:

public class MyObject {
@RIndex
String field1;
@RIndex
String field2;
@RIndex
String field3;

}

Collection<MyObject> objects = RLiveObjectService.find(MyObject.class, Conditions.or(Conditions.and(Conditions.eq("field1", "value"), Conditions.eq("field2", "value")), Conditions.in("field3", "value1", "value2"));

@RObjectField

Applied to a field. Allows to specify namingScheme and/or codec different from what is specified in @REntity.

@RCascade

Applied to a field. Specifies that the defined cascade types are applied to the object/objects contained in Live Object field. Different cascade types are available:

RCascadeType.ALL - Includes all cascade types
RCascadeType.PERSIST - Cascade persist operation during RLiveObjectService.persist() method invocation
RCascadeType.DETACH - Cascade detach operation during RLiveObjectService.detach() method invocation
RCascadeType.MERGE - Cascade merge operation during RLiveObjectService.merge() method invocation
RCascadeType.DELETE - Cascade delete operation during RLiveObjectService.delete() method invocation.

9.2.5. Restrictions

As mentioned above, the type of the RId field cannot be an Array type. This is due to the DefaultNamingScheme which cannot serialize and deserialize the Array type as of yet. This restriction can be lifted once the DefaultNamingScheme is improved. Since the RId field is encoded as part of the key name used by the underlying RMap, it makes no sense to create a RLO with just have one field. It is better to use a RBucket for this type of usage.

9.3. Distributed executor service

9.3.1. Distributed executor service. Overview

Redisson provides implementation of java.util.concurrent.ExecutorService interface to run java.util.concurrent.Callable, java.lang.Runnable and Lambda tasks on different Redisson nodes. Redisson instance could be injected right into task object to process Redis data and execute distributed computations in fast and efficient way.

9.3.2. Distributed executor service. Tasks

Redisson node doesn't require presence of task classes in classpath. Task classes are loaded automatically by Redisson node ClassLoader. Thus each new task class doesn't require restart of Redisson node.

Example with Callable task:

public class CallableTask implements Callable<Long> {

    @RInject
    private RedissonClient redissonClient;

    @Override
    public Long call() throws Exception {
        RMap<String, Integer> map = redissonClient.getMap("myMap");
        Long result = 0;
        for (Integer value : map.values()) {
            result += value;
        }
        return result;
    }

}

Example with Runnable task:

public class RunnableTask implements Runnable {

    @RInject
    private RedissonClient redissonClient;

    private long param;

    public RunnableTask() {
    }

    public RunnableTask(long param) {
        this.param = param;
    }

    @Override
    public void run() {
        RAtomicLong atomic = redissonClient.getAtomicLong("myAtomic");
        atomic.addAndGet(param);
    }

}

Follow options could be supplied during ExecutorService aquisition:

ExecutorOptions options = ExecutorOptions.defaults()

// Defines task retry interval at the end of which task is executed again.
// ExecutorService worker re-schedule task execution retry every 5 seconds.
//
// Set 0 to disable.
//
// Default is 5 minutes
options.taskRetryInterval(10, TimeUnit.MINUTES);
RExecutorService executorService = redisson.getExecutorService("myExecutor", options);
executorService.submit(new RunnableTask(123));

RExecutorService executorService = redisson.getExecutorService("myExecutor", options);
Future<Long> future = executorService.submit(new CallableTask());
Long result = future.get();

Example with Lambda task:

RExecutorService executorService = redisson.getExecutorService("myExecutor", options);
Future<Long> future = executorService.submit((Callable & Serializable)() -> {
      System.out.println("task has been executed!");
});
Long result = future.get();

Each Redisson node has ready to use RedissonClient which injected to task each time via @RInject annotation.

9.3.3. Distributed executor service. Task execution cancellation

It's easy to cancel any submitted task via RFuture.cancel() or RExecutorService.cancelTask() method. To handle case then task execution is already in progress you need to check Thread status for interruption with Thread.currentThread().isInterrupted() invocation:

public class CallableTask implements Callable<Long> {

    @RInject
    private RedissonClient redissonClient;

    @Override
    public Long call() throws Exception {
        RMap<String, Integer> map = redissonClient.getMap("myMap");
        Long result = 0;
        // map contains many entries
        for (Integer value : map.values()) {
           if (Thread.currentThread().isInterrupted()) {
                // task has been canceled
                return null;
           }
           result += value;
        }
        return result;
    }

}

RExecutorService executorService = redisson.getExecutorService("myExecutor");
Future<Long> future = executorService.submit(new CallableTask());
// or
RFuture<Long> future = executorService.submitAsync(new CallableTask());
// ...
future.cancel(true);

9.4. Distributed scheduled executor service

9.4.1. Distributed scheduled executor service. Overview

Redisson provides implementation of java.util.concurrent.ScheduledExecutorService interface to schedule java.util.concurrent.Callable, java.lang.Runnable and Lambda tasks on different Redisson nodes. Tasks and result objects stored in Redis queues. Scheduled task is a job which needs to be execute in the future at a particular time one or more times.

9.4.2. Distributed scheduled executor service. Scheduling a task

Redisson node doesn't require presence of task classes in classpath. Task classes are loaded automatically by Redisson node ClassLoader. Thus each new task class doesn't require restart of Redisson node.

Example with Callable task:

public class CallableTask implements Callable<Long> {

    @RInject
    private RedissonClient redissonClient;

    @Override
    public Long call() throws Exception {
        RMap<String, Integer> map = redissonClient.getMap("myMap");
        Long result = 0;
        for (Integer value : map.values()) {
            result += value;
        }
        return result;
    }

}

Follow options could be supplied during ExecutorService aquisition:

ExecutorOptions options = ExecutorOptions.defaults()

// Defines task retry interval at the end of which task is executed again.
// ExecutorService worker re-schedule task execution retry every 5 seconds.
//
// Set 0 to disable.
//
// Default is 5 minutes
options.taskRetryInterval(10, TimeUnit.MINUTES);
RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor");
ScheduledFuture<Long> future = executorService.schedule(new CallableTask(), 10, TimeUnit.MINUTES);
Long result = future.get();

Example with Lambda task:

RExecutorService executorService = redisson.getExecutorService("myExecutor", options);
ScheduledFuture<Long> future = executorService.schedule((Callable & Serializable)() -> {
      System.out.println("task has been executed!");
}, 10, TimeUnit.MINUTES);
Long result = future.get();

Example with Runnable task:

public class RunnableTask implements Runnable {

    @RInject
    private RedissonClient redissonClient;

    private long param;

    public RunnableTask() {
    }

    public RunnableTask(long param) {
        this.param= param;
    }

    @Override
    public void run() {
        RAtomicLong atomic = redissonClient.getAtomicLong("myAtomic");
        atomic.addAndGet(param);
    }

}

RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor");
ScheduledFuture<?> future1 = executorService.schedule(new RunnableTask(123), 10, TimeUnit.HOURS);
// ...
ScheduledFuture<?> future2 = executorService.scheduleAtFixedRate(new RunnableTask(123), 10, 25, TimeUnit.HOURS);
// ...
ScheduledFuture<?> future3 = executorService.scheduleWithFixedDelay(new RunnableTask(123), 5, 10, TimeUnit.HOURS);

9.4.3. Distributed scheduled executor service. Scheduling a task with cron expression

Tasks scheduler service allows to define more complex schedule with cron expressions. Which fully compatible with Quartz cron format.

Example:

RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor");
executorService.schedule(new RunnableTask(), CronSchedule.of("10 0/5 * * * ?"));
// ...
executorService.schedule(new RunnableTask(), CronSchedule.dailyAtHourAndMinute(10, 5));
// ...
executorService.schedule(new RunnableTask(), CronSchedule.weeklyOnDayAndHourAndMinute(12, 4, Calendar.MONDAY, Calendar.FRIDAY));

9.4.4. Distributed scheduled executor service. Task scheduling cancellation

Scheduled executor service provides two ways to cancel scheduled task via RScheduledFuture.cancel() or RScheduledExecutorService.cancelTask method. To handle case then task execution is already in progress you need to check Thread status for interruption with Thread.currentThread().isInterrupted() invocation:

public class RunnableTask implements Callable<Long> {

    @RInject
    private RedissonClient redissonClient;

    @Override
    public Long call() throws Exception {
        RMap<String, Integer> map = redissonClient.getMap("myMap");
        Long result = 0;
        // map contains many entries
        for (Integer value : map.values()) {
           if (Thread.currentThread().isInterrupted()) {
                // task has been canceled
                return null;
           }
           result += value;
        }
        return result;
    }

}

RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor");
RScheduledFuture<Long> future = executorService.scheduleAsync(new RunnableTask(), CronSchedule.dailyAtHourAndMinute(10, 5));
// ...
future.cancel(true);
// or
String taskId = future.getTaskId();
// ...
executorService.cancelScheduledTask(taskId);

9.5. Distributed MapReduce service

9.5.1 Distributed MapReduce service. Overview

Redisson provides MapReduce programming model to process large amount of data stored in Redis. Based on ideas from different implementations and Google's MapReduce publication. Supported by different objects: RMap, RMapCache, RLocalCachedMap, RClusteredMap, RClusteredMapCache, RClusteredLocalCachedMap, RClusteredSet, RClusteredSetCache, RSet, RSetCache, RList, RSortedSet, RScoredSortedSet, RQueue, RBlockingQueue, RDeque, RBlockingDeque, RPriorityQueue and RPriorityDeque.

MapReduce model based on few objects: RMapper/RCollectionMapper, RReducer and RCollator.

All tasks for map and reduce phases are executed across Redisson Nodes.

For RLocalCachedMap, RClusteredMap, RClusteredMapCache, RClusteredLocalCachedMap, RClusteredSet, RClusteredSetCache objects Map phase is splitted to multiple sub-tasks and executes in parallel. For other objects Map phase is executed in single sub-task. Reduce phase is always splitted to multiple sub-tasks and executes in parallel. Sub-tasks amount equals to total amount of registered MapReduce workers.

1. RMapper applied only for Map objects and transforms each Map's entry to intermediate key/value pair.

public interface RMapper<KIn, VIn, KOut, VOut> extends Serializable {

    void map(KIn key, VIn value, RCollector<KOut, VOut> collector);
    
}

2. RCollectionMapper applied only for Collection objects and transforms each Collection's entry to intermediate key/value pair.

public interface RCollectionMapper<VIn, KOut, VOut> extends Serializable {

    void map(VIn value, RCollector<KOut, VOut> collector);
    
}

3. RReducer reduces a list of intermediate key/value pairs.

public interface RReducer<K, V> extends Serializable {

    V reduce(K reducedKey, Iterator<V> values);
    
}

4. RCollator converts reduced result to a single object.

public interface RCollator<K, V, R> extends Serializable {

    R collate(Map<K, V> resultMap);
    
}

Each type of task has ability to inject RedissonClient using @RInject annotation like this:

    public class WordMapper implements RMapper<String, String, String, Integer> {

        @RInject
        private RedissonClient redissonClient;

        @Override
        public void map(String key, String value, RCollector<String, Integer> collector) {

            // ...

            redissonClient.getAtomicLong("mapInvocations").incrementAndGet();
        }
        
    }

By default MapReduce time execution is not limited, but timeout could be defined if required:

             = list.<String, Integer>mapReduce()
                   .mapper(new WordMapper())
                   .reducer(new WordReducer())
                   .timeout(60, TimeUnit.MINUTES);

9.5.2 Distributed MapReduce service. Map example

MapReduce is available for map type objects: RMap, RMapCache and RLocalCachedMap.

Below is word count example using MapReduce:

  1. Mapper object applied to each Map entry and split value by space to separate words.
    public class WordMapper implements RMapper<String, String, String, Integer> {

        @Override
        public void map(String key, String value, RCollector<String, Integer> collector) {
            String[] words = value.split("[^a-zA-Z]");
            for (String word : words) {
                collector.emit(word, 1);
            }
        }
        
    }
  1. Reducer object calculates the total sum for each word.
    public class WordReducer implements RReducer<String, Integer> {

        @Override
        public Integer reduce(String reducedKey, Iterator<Integer> iter) {
            int sum = 0;
            while (iter.hasNext()) {
               Integer i = (Integer) iter.next();
               sum += i;
            }
            return sum;
        }
        
    }
  1. Collator object counts total amount of words.
    public class WordCollator implements RCollator<String, Integer, Integer> {

        @Override
        public Integer collate(Map<String, Integer> resultMap) {
            int result = 0;
            for (Integer count : resultMap.values()) {
                result += count;
            }
            return result;
        }
        
    }
  1. Here is how to run all together:
    RMap<String, String> map = redisson.getMap("wordsMap");
    map.put("line1", "Alice was beginning to get very tired"); 
    map.put("line2", "of sitting by her sister on the bank and");
    map.put("line3", "of having nothing to do once or twice she");
    map.put("line4", "had peeped into the book her sister was reading");
    map.put("line5", "but it had no pictures or conversations in it");
    map.put("line6", "and what is the use of a book");
    map.put("line7", "thought Alice without pictures or conversation");

    RMapReduce<String, String, String, Integer> mapReduce
             = map.<String, Integer>mapReduce()
                  .mapper(new WordMapper())
                  .reducer(new WordReducer());

    // count occurrences of words
    Map<String, Integer> mapToNumber = mapReduce.execute();

    // count total words amount
    Integer totalWordsAmount = mapReduce.execute(new WordCollator());

9.5.3 Distributed MapReduce service. Collection example

MapReduce is available for collection type objects: RSet, RSetCache, RList, RSortedSet, RScoredSortedSet, RQueue, RBlockingQueue, RDeque, RBlockingDeque, RPriorityQueue and RPriorityDeque.

Below is word count example using MapReduce:

    public class WordMapper implements RCollectionMapper<String, String, Integer> {

        @Override
        public void map(String value, RCollector<String, Integer> collector) {
            String[] words = value.split("[^a-zA-Z]");
            for (String word : words) {
                collector.emit(word, 1);
            }
        }
        
    }
    public class WordReducer implements RReducer<String, Integer> {

        @Override
        public Integer reduce(String reducedKey, Iterator<Integer> iter) {
            int sum = 0;
            while (iter.hasNext()) {
               Integer i = (Integer) iter.next();
               sum += i;
            }
            return sum;
        }
        
    }
    public class WordCollator implements RCollator<String, Integer, Integer> {

        @Override
        public Integer collate(Map<String, Integer> resultMap) {
            int result = 0;
            for (Integer count : resultMap.values()) {
                result += count;
            }
            return result;
        }
        
    }
    RList<String> list = redisson.getList("myList");
    list.add("Alice was beginning to get very tired"); 
    list.add("of sitting by her sister on the bank and");
    list.add("of having nothing to do once or twice she");
    list.add("had peeped into the book her sister was reading");
    list.add("but it had no pictures or conversations in it");
    list.add("and what is the use of a book");
    list.add("thought Alice without pictures or conversation");

    RCollectionMapReduce<String, String, Integer> mapReduce
             = list.<String, Integer>mapReduce()
                   .mapper(new WordMapper())
                   .reducer(new WordReducer());

    // count occurrences of words
    Map<String, Integer> mapToNumber = mapReduce.execute();

    // count total words amount
    Integer totalWordsAmount = mapReduce.execute(new WordCollator());
You can’t perform that action at this time.