Skip to content

Asynchronous Connections

Mark Paluch edited this page Jun 13, 2020 · 6 revisions

❗️Documentation page for Lettuce 3.x. Find the recent documentation for Lettuce 4.x here: Asynchronous-API


This guide will give you an impression how and when to use the asynchronous API provided by Lettuce 3.x.

Motivation

Asynchronous methodologies allow you to utilize better system resources, instead of wasting threads waiting for network or disk I/O. Threads can be fully utilized to perform other work instead. Lettuce facilitates asynchronicity from building the client on top of netty that is a multithreaded, event-driven I/O framework. All communication is handled asynchronously. Once the foundation can process commands concurrently, it is convenient to take advantage from the asynchronicity. It is way harder to turn a blocking and synchronous working software into a concurrently processing system.

Understanding asynchronicity

Asynchronicity permits other processing to continue before the transmission has finished and the response of the transmission is processed. This means, in the context of Lettuce and especially Redis, that multiple commands can be issued serially without the need of waiting to finish the preceding command. This mode of operation is also known as Pipelining. The following example should give you an impression of the mode of operation:

  • Given client A and client B
  • Client A triggers command SET A=B
  • Client B triggers at the same time of Client A command SET C=D
  • Redis receives command from Client A
  • Redis receives command from Client B
  • Redis processes SET A=B and responds OK to Client A
  • Client A receives the response and stores the response in the response handle
  • Redis processes SET C=D and responds OK to Client B
  • Client B receives the response and stores the response in the response handle

Both clients from the example above can be either two threads or connections within an application or two physically separated clients.

Clients can operate concurrently to each other by either being separate processes, threads, event-loops, actors, fibers, etc. Redis processes incoming commands serially and operates mostly single-threaded. This means, commands are processed in the order they are received with some characteristic that we'll cover later.

Let's take the simplified example and enhance it by some program flow details:

  • Given client A
  • Client A triggers command SET A=B
  • Client A uses the asynchronous API and can perform other processing
  • Redis receives command from Client A
  • Redis processes SET A=B and responds OK to Client A
  • Client A receives the response and stores the response in the response handle
  • Client A can access now the response to its command without waiting (non-blocking)

The Client A takes advantage from not waiting on the result of the command so it can process computational work or issue another Redis command. The client can work with the command result as soon as the response is available.

Impact of asynchronicity to the synchronous API

While this guide helps you to understand the asynchronous API it is worthwhile to learn the impact on the synchronous API. The general approach of the synchronous API is no different than the asynchronous API. In both cases, the same facilities are used to invoke and transport commands to the Redis server. The only difference is a blocking behavior of the caller that is using the synchronous API. Blocking happens on command level and affects only the command completion part, meaning multiple clients using the synchronous API can invoke commands on the same connection and at the same time without blocking each other. A call on the synchronous API is unblocked at the moment a command response was processed.

  • Given client A and client B
  • Client A triggers command SET A=B on the synchronous API and waits for the result
  • Client B triggers at the same time of Client A command SET C=D on the synchronous API and waits for the result
  • Redis receives command from Client A
  • Redis receives command from Client B
  • Redis processes SET A=B and responds OK to Client A
  • Client A receives the response and unblocks the program flow of Client A
  • Redis processes SET C=D and responds OK to Client B
  • Client B receives the response and unblocks the program flow of Client B

However, there are some cases you should not share a connection among threads to avoid side-effects. The cases are:

  • Disabling flush-after-command to improve performance
  • The use of blocking operations like BLPOP. Blocking operations are queued on Redis until they can be executed. While one connection is blocked, other connections can issue commands to Redis. Once a command unblocks the blocking command (that said an LPUSH or RPUSH hits the list), the blocked connection is unblocked and can proceed after that.
  • Transactions
  • Using multiple databases

Result handles

Every command invocation on the asynchronous API creates a RedisFuture<T> that can be canceled, awaited and subscribed (listener). A Future<T> or RedisFuture<T> is a pointer to the result that is initially unknown since the computation of its value is yet incomplete. A RedisFuture<T> provides operations for synchronization and chaining.

SettableFuture<String> future = SettableFuture.create();

System.out.println("Current state: " + future.isDone());

future.set("my value");

System.out.println("Current state: " + future.isDone());
System.out.println("Got value: " + future.get());

The example prints the following lines:

Current state: false
Current state: true
Got value: my value

Attaching a listener to a future allows chaining. Promises can be used synonymous to futures, but not every future is a promise. A promise guarantees a callback/notification and thus it has come to its name.

A simple listener that gets called once the future completes:

final SettableFuture<String> future = SettableFuture.create();

future.addListener(new Runnable() {
    @Override
    public void run() {
        try {
            System.out.println("Got value: " + future.get());
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}, MoreExecutors.sameThreadExecutor());

System.out.println("Current state: " + future.isDone());
future.set("my value");
System.out.println("Current state: " + future.isDone());

The value processing moves from the caller into a listener that is then called by whoever completes the future. The example prints the following lines:

Current state: false
Got value: my value
Current state: true

The code from above requires exception handling since calls to the get() method can lead to exceptions. Exceptions raised during the computation of the Future<T> are transported within an ExecutionException. Another exception that may be thrown is the InterruptedException. This is because calls to get() are blocking calls and the blocked thread can be interrupted at any time. Just think about a system shutdown.

You can find the full reference for the Future<T> type in the Java 7 API documentation.

Creating futures using Lettuce

Lettuce futures can be used for initial and chaining operations. When using Lettuce futures, you will notice the non-blocking behavior. This is because all I/O and command processing are handled asynchronously using the netty EventLoop. The Lettuce RedisFuture<T> extends a ListenableFuture<T> so all methods of the base type are available.

Lettuce exposes its futures on the Standalone, Sentinel, Publish/Subscribe and Cluster APIs.

Connecting to Redis is insanely simple:

RedisClient client = new RedisClient("localhost", 6379);
RedisStringAsyncConnection<String, String> connection = client.connectAsync();

In the next step, obtaining a value from a key requires the GET operation:

RedisFuture<String> future = connection.get("key");

Consuming futures

The first thing you want to do when working with futures is to consume them. Consuming a futures means obtaining the value. Here is an example that blocks the calling thread and prints the value:

RedisFuture<String> future = connection.get("key");
String value = future.get();
System.out.println(value);

Invocations to the get() method (pull-style) block the calling thread at least until the value is computed but in the worst case indefinitely. Using timeouts is always a good idea to not exhaust your threads.

try {
    RedisFuture<String> future = connection.get("key");
    String value = future.get(1, TimeUnit.MINUTES);
    System.out.println(value);
} catch (Exception e) {
    e.printStackTrace();
}

The example will wait at most 1 minute for the future to complete. If the timeout exceeds, a TimeoutException is thrown to signal the timeout.

Futures can also be consumed in a push style, meaning when the RedisFuture<T> is completed, a follow-up action is triggered:

final RedisFuture<String> future = connection.get("key");

future.addListener(new Runnable() {
    @Override
    public void run() {
        try {
            String value = future.get();

            System.out.println(value);
        } catch (Exception e) {
        }
    }
}, MoreExecutors.sameThreadExecutor());

Lettuce futures are completed on the netty EventLoop. Consuming and chaining futures on the default thread is always a good idea except for one case: Blocking/long-running operations. As a rule of thumb, never block the event loop. If you need to chain futures using blocking calls, pass in a custom executor that does not execute the Runnable on the same thread.

Executor sharedExecutor = ...
RedisFuture<String> future = connection.get("key");

future.addListener(new Runnable<>() {
    @Override
    public void run() {
        // ...
    }
}, sharedExecutor);

Synchronizing futures

A key point when using futures is the synchronization. Futures are usually used to:

  1. Trigger multiple invocations without the urge to wait for the predecessors (Batching)
  2. Invoking a command without awaiting the result at all (Fire&Forget)
  3. Invoking a command and perform other computing in the meantime (Decoupling)
  4. Adding concurrency to certain computational efforts (Concurrency)

There are several ways how to wait or get notified in case a future completes. Certain synchronization techniques apply to some motivations why you want to use futures.

Blocking synchronization

Blocking synchronization comes handy if you perform batching/add concurrency to certain parts of your system. An example to batching can be setting/retrieving multiple values and awaiting the results before a certain point within processing.

List<RedisFuture<String>> futures = new ArrayList<RedisFuture<String>>();

for (int i = 0; i < 10; i++) {
    futures.add(connection.get("key-" + i, "value-" + i));
}

LettuceFutures.awaitAll(1, TimeUnit.MINUTES, futures.toArray(new RedisFuture[futures.size()]));

The code from above does not wait until a certain command completes before it issues another one. The synchronization is done after all commands are issued. The example code can easily be turned into a Fire&Forget pattern by omitting the call to LettuceFutures.awaitAll().

A single future execution can be also awaited, meaning an opt-in to wait for a certain time but without raising an exception:

RedisFuture<String> future = connection.get("key");

if(!future.await(1, TimeUnit.MINUTES)) {
    System.out.println("Could not complete within the timeout");
}

Calling await() is friendlier to call since it throws only an InterruptedException in case the blocked thread is interrupted. You are already familiar with the get() method for synchronization, so we will not bother you with this one.

At last, there is another way to synchronize futures in a blocking way. The major caveat is that you will become responsible to handle thread interruptions. If you do not handle that aspect, you will not be able to shut down your system properly if it is in a running state.

RedisFuture<String> future = connection.get("key");
while (!future.isDone()) {
    // do something ...
}

While the isDone() method does not aim primarily for synchronization use, it might come handy to perform other computational efforts while the command is executed.

Chaining synchronization

Futures can be synchronized/chained in a non-blocking style to improve thread utilization. Chaining works very well in systems relying on event-driven characteristics. Future chaining builds up a chain of one or more futures that are executed serially, and every chain member handles a part in the computation. Java 6's and 7's java.util.concurrent.Future is nearly unusable for chaining. Therefore the Lettuce 3.x API is based on Guava's ListenableFuture that allows very basic chaining:

set.addListener(new Runnable() {
    @Override
    public void run() {
            ...;
    }
}, MoreExecutors.sameThreadExecutor());

Error handling

Error handling is an indispensable component of every real world application and should to be considered from the beginning on. Futures provide some mechanisms to deal with errors.

In general, you want to react in the following ways:

  • Return a default value instead
  • Use a backup future
  • Retry the future

Any Redis errors will cause to return null on calling get(). If you are interested in the error, so you can access the message using getError() on your future. RedisException won't be thrown (in contrast to synchronous execution).

Retrying futures and recovery using futures is not part of the Java Future<T>. See Lettuce 4.0 Reactive API for comfortable ways handling with exceptions.

Examples

Basic operations

RedisStringsConnection<String, String> async = client.connectAsync();
RedisFuture<String> set = async.set("key", "value");
RedisFuture<String> get = async.get("key");

set.get() == "OK"
get.get() == "value"

Waiting for a future with a timeout

RedisStringsConnection<String, String> async = client.connectAsync();
RedisFuture<String> set = async.set("key", "value");
RedisFuture<String> get = async.get("key");

set.await(1, SECONDS) == true
set.get() == "OK"
get.get(1, TimeUnit.MINUTES) == "value"

Using listeners for a future

RedisStringsConnection<String, String> async = client.connectAsync();
RedisFuture<String> set = async.set("key", "value");

Runnable listener = new Runnable() {
    @Override
    public void run() {
            ...;
    }
};

set.addListener(listener, MoreExecutors.sameThreadExecutor());
Clone this wiki locally