Skip to content

User's Guide

jodzga edited this page Feb 9, 2016 · 27 revisions
Clone this wiki locally


Getting ParSeq

See Getting ParSeq for instructions on how to integrate ParSeq with your project or to get the latest ParSeq binaries.

Key Concepts

Before getting into the details of using ParSeq it is worth describing a few terms that will be used frequently.

A Task is a basic unit of work in the ParSeq system - it is similar to a Java Callable, but its result can be set asynchronously. Tasks can be executed by an Engine (see below). They are not executed by the user directly. Task implements a Promise which is like a fully asynchronous Java Future. Tasks can be transformed and composed to produce desired results. A Plan is collection of tasks executed as a consequence of running a root task.

An Engine is used to run tasks. Normally application has one instance of Engine.

Creating an Engine

The Engine is used to run tasks in the ParSeq framework. To construct an instance, use:

    import com.linkedin.parseq.Engine;
    import com.linkedin.parseq.EngineBuilder;

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;

    // ...

    final int numCores = Runtime.getRuntime().availableProcessors();
    final ExecutorService taskScheduler = Executors.newFixedThreadPool(numCores + 1);
    final ScheduledExecutorService timerScheduler = Executors.newSingleThreadScheduledExecutor();

    final Engine engine = new EngineBuilder()
        .setTaskExecutor(taskScheduler)
        .setTimerScheduler(timerScheduler)
        .build();

With these settings ParSeq will have numCores + 1 threads available for executing tasks and 1 thread available for scheduling timers. These settings are reasonable place to start, but can be customized to particular use cases.

To stop the engine, use:

    engine.shutdown();
    engine.awaitTermination(1, TimeUnit.SECONDS);
    taskScheduler.shutdown();
    timerScheduler.shutdown();

This will initiate shutdown (no new tasks can be executed, but old tasks are allowed to finish) and waits for the engine to quiesce within 1 second. This also shuts down the executors that are used by ParSeq. ParSeq does not manage the lifecycle for these executors.

A Note About Rejection Policies

We do not recommend using the CallerRunsPolicy or the AbortPolicy with the underlying ParSeq task executor. The former may tie up unrelated threads (e.g. IO worker threads) in the execution of ParSeq tasks. The latter, as of v1.3.7, will cause a plan to be aborted if it cannot be rescheduled due to a RejectedExecutionException.

Instead we recommend using standard strategies for managing overload: back-pressure, load shedding, or degraded responses. In addition, the time spent on a plan can be bounded using a timeouts.

Creating and Running Tasks

Initially tasks are created by integrating existing libraries with ParSeq. If task involves non-blocking computation, it can be created using Task.action() or Task.callable(). We also provide Task.value() and Task.failure() for most trivial cases. New tasks are created by transforming and composing existing tasks.

Few words about ParSeq API: most of the methods that create new tasks have version that accept task decription. We recommend to give short, clear description to every task. It provides great value when it comes to debugging and troubleshooting using ParSeq's tracing mechanisms.

Almost every method on ParSeq Task interface creates a new instance of a Task that may refer to the task used to create it e.g. it might depend on its result and cause it to run when executed by an engine.

Tasks are lazy. They are descriptions of computations that will happen when task is executed by engine. Once you've created a task, you can run it by submitting it to the engine:

    engine.run(task);

Transforming Tasks

A main mechanism of transforming tasks is a map() method. Suppose we only need the HTTP content type of google home page. Having instance of a Task<Response> that makes HTTP HEAD request:

    Task<Response> head = HttpClient.head("http://www.google.com").task();

we can transform it into a task that returns the content type with the following code:

    Task<String> contentType =
      head.map("toContentType", response -> response.getContentType());

Note that existing head task has not been modified. Instead, a new task was created that, when executed, will first run head task and, after it is resolved, will apply provided transformation. If head task failed for any reason then the contentType task would also fail and provided transformation would not be invoked. This mechanism is described in details in Handling Errors section.

Using ParSeq's tracing tools we would get the following diagram for the task above:

transforming-tasks-1.png

If there is a need to only consume result produced by a task then we can use andThen() method:

    Task<String> printContentType = contentType.andThen("print", System.out::println);

transforming-tasks-2.png

In above example we used Java 8 Method Reference but we could as well use Lambda Expression:

    Task<String> printContentType = contentType.andThen("print", s -> System.out.println(s));

Similarly, if we need to consume potential failure of a task we can use onFailure() method:

    Task<String> logFailure = contentType.onFailure("print stack trace", e -> e.printStackTrace());

transforming-tasks-3.png

Sometimes it is useful to treat potential failure of a task more explicitly. toTry() method that transforms Task<T> into Task<Try<T>> where Try type explicitly represents possibility of task failure:

    Task<Try<String>> contentType = 
        head.map("toContentType", response -> response.getContentType()).toTry();

    Task<Try<String>> logContentType =
        contentType.andThen("log", type -> {
          if (type.isFailed()) {
            type.getError().printStackTrace();
          } else {
            System.out.println("Content type: " + type.get());
          }
        });

transforming-tasks-4.png

Finally, transform() method combines toTry() with map():

    Task<Response> get = HttpClient.get("http://www.google.com").task();

    Task<Optional<String>> contents = get.transform("getContents", tryGet -> {
      if (tryGet.isFailed()) {
        return Success.of(Optional.empty());
      } else {
        return Success.of(Optional.of(tryGet.get().getResponseBody()));
      }
    });

transforming-tasks-5.png

In example above contents task always completes successfully returning contents of google page wrapped with Optional or Optional.empty() if HTTP GET request failed.

Composing Tasks

Many tasks are composed of other tasks that are run sequentially or in parallel.

Parallel Composition

Suppose we want to get String consisting of the content types of a few different pages fetched in parallel. First, let's create a helper method that returns a task responsible for fetching content type for a URL.

  private Task<String> getContentType(String url) {
    return HttpClient.get(url).task()
      .map("getContentType", response -> response.getContentType());
  }

We can use Task.par() method to compose tasks to run in parallel:

    final Task<String> googleContentType = getContentType("http://www.google.com");
    final Task<String> bingContentType = getContentType("http://www.bing.com");

    final Task<String> contentTypes =
        Task.par(googleContentType, bingContentType)
            .map("concatenate", (google, bing) -> "Google: " + google + "\n" +
                                                  "Bing: "   + bing   + "\n");

Task.par() creates a new task that will run the googleContentType and bingContentType tasks in parallel. We transformed result into String using map() method.

Diagram representing above example:

users-guide-par-example.png

Result of running task above:

Google: text/html; charset=ISO-8859-1
Bing: text/html; charset=utf-8

Sequential Composition

We talk about sequential composition when we need to run tasks sequentially.

ParSeq provides andThen() method that can be used to run a task after completion of another task. Second tast is executed regardless of the result of first task i.e. even if the first task fails:

    // task that processes payment
    Task<PaymentStatus> processPayment = processPayment(...);

    // task that ships product
    Task<ShipmentInfo> shipProduct = shipProduct(...);

    Task<ShipmentInfo> shipAfterPayment =
        processPayment.andThen("shipProductAterPayment", shipProduct);

andThen-1.png

In above example shipProduct task will run even if processPayment task fails. Notice that shipProduct does not depend on an actual result of the task that proceeds it.

In many situations second task directly depends on a result of first task. Let's discuss this on the following example. Suppose we would like to get information about the first image from a given web page. We will write a task that will fetch a web page contents, find first reference to an image in it, fetch that image and return short description of it. We will have two asynchronous tasks: fetching page contents and fetching first image. Obviously second task depends on an actual result of the first task.

Let's decompose this problem into smaller pieces. First we'll need tasks to fetch data given a URL. We need a version that treats contents as a String to fetch web page contents and one that treats contents as a binary data - for the image:

  private Task<String> getAsString(String url) {
    return HttpClient.get(url).task()
        .map("bodyAsString", response -> response.getResponseBody());
  }

  private Task<byte[]> getAsBytes(String url) {
    return HttpClient.get(url).task()
        .map("bodyAsBytes", response -> response.getResponseBodyAsBytes());
  }

Having methods above we can write a method that returns info for an image (for simplicity just its length), given its URL:

  private Task<String> info(String url) {
    return getAsBytes(url).map("info", body -> url + ": length = " + body.length);
  }

We will also need a way to find occurrence of first image given a web page contents. Naive implementation (serving just as an example) might look like this:

  private String findFirstImage(String body) {
    Pattern pat = Pattern.compile("[\\('\"]([^\\(\\)'\"]+.(png|gif|jpg))[\\)'\"]");
    Matcher matcher = pat.matcher(body);
    matcher.find();
    return matcher.group(1);
  }

We have all the pieces, let's combine them. First approach, using map() method:

getAsString(url).map("firstImageInfo", body -> info(url + findFirstImage(body)));

The problem is that type of this expression is Task<Task<String>>. This situation (nested tasks) will occur when one task depends on an outcome of another task. In this case task that fetches image obviously depends on the outcome of task that fetches the contents of a web page. There is a method that can flatten such an expression, unsurprisingly called Task.flatten():

Task.flatten(getAsString(url).map("firstImageInfo", body -> info(url + findFirstImage(body))));

Result of above expression has type Task<String>. This is such a common pattern that there is a shorthand which combines flatten() and map() called flatMap():

getAsString(url).flatMap("firstImageInfo", body -> info(url + findFirstImage(body)));

Now we can write a method that returns info of a first image of a given web page:

  private Task<String> firstImageInfo(String url) {
    return getAsString(url).flatMap("firstImageInfo", body -> info(url + findFirstImage(body)));
  }

Let's use it on google.com:

    final Task<String> firstImageInfo = firstImageInfo("http://www.google.com");

Result:

http://www.google.com/images/google_favicon_128.png: length = 3243

Trace obtained from this task:

users-guide-seq-example.png

Finally let's combine sequential and parallel composition. We will, in parallel, get info of first image on google.com and bing.com web pages:

    final Task<String> googleInfo = firstImageInfo("http://www.google.com");
    final Task<String> bingInfo = firstImageInfo("http://www.bing.com");

    Task<String> infos = Task.par(googleInfo, bingInfo)
        .map("concatenate", (google, bing) -> "Google: " + google + "\n" +
                                              "Bing: "   + bing   + "\n");

Running above task would yield the following trace:

users-guide-composition-example.png

And here is the result:

Google: http://www.google.com/images/google_favicon_128.png: length = 3243
Bing: http://www.bing.com/s/a/hpc12.png: length = 5574

Handling Errors

Main principle in ParSeq is that task failure is always propagated to tasks that depend on it. Generally there is no need for catching or re-throwing exceptions:

    Task<String> failing = Task.callable("hello", () -> {
      return "Hello World".substring(100);
    });

    Task<Integer> length = failing.map("length", s -> s.length());

In example above length task would fail with java.lang.StringIndexOutOfBoundsException that was propagated from failing task.

error-handling-1.png

Often degraded behavior is a better choice than simply propagating an exception. If there exist a reasonable fallback value we can use recover() method to recover from a failure:

    Task<String> failing = Task.callable("hello", () -> {
      return "Hello World".substring(100);
    });

    Task<Integer> length = failing.map("length", s -> s.length())
        .recover("withDefault0", e -> 0);

This time length task recovers from java.lang.StringIndexOutOfBoundsException with fallback default value 0. Notice that recovery function accepts exception that caused the failure as a parameter.

error-handling-2.png

Sometimes we don't have fallback value ready to be used but we can compute it using another task e.g. compute it asynchronously. In those cases we can use recoverWith() method. The difference between recover() and recoverWith() is that the latter returns an instance of a Task that will be executed to obtain fallback value. The following example shows how to use recoverWith() to fetch a user from main DB when fetching it from cache failed:

    Task<Person> user = fetchFromCache(id)
         .recoverWith(e -> fetchFromDB(id));

error-handling-3.png

Using Timeouts

It is a good idea to set a timeout on asynchronous tasks. ParSeq provides withTimeout() method to do that:

    final Task<Response> google = HttpClient.get("http://google.com").task()
        .withTimeout(10, TimeUnit.MILLISECONDS);

In above example google task will fail with TimeoutException if fetching contents of google.com takes more 10ms.

timeouts-1.png

Cancelling

ParSeq supports tasks cancellation. Cancelling a task means that result of that task is not relevant anymore. Task can be cancelled at any time. Implementation of a task can detect situation when it has been cancelled and react to it accordingly. Cancelled task completes with CancellationException and thus behaves like a failed task i.e. cancellation is automatically propagated to all tasks that depend on it. Even though cancelled tasks are effectively failed, methods that handle failures such as recover(), recoverWith(), onFailure() etc. are not called when task is cancelled. The reason is that task cancellation means that result of a task is not relevant, thus normally there is no point trying to recover from this situation. To cancel a task we can call cancel() method.

Automatic Cancellation

Usually main goal of a task is to calculate a value. You can think of a task as an asynchronous function. Once the value has been calculated there is no point runing task again. Thus, ParSeq will run any task only once. Engine will recognize task that has been completed (or even started) and will not run it again.

It is possible for the value of a task to be known before it finished running or even befor it is started. One of such cases occurs when we specify timeout for a task. After specified amount of time task is failed by timeoutTimer even though original task might still be running. In such cases ParSeq will automatically cancell original task with EarlyFinishException and, because of failure propagation, all tasksk that depend on it.

Recall timeout example:

timeouts-1.png

Resulting task has failed after 10ms, as represented by red circle, while the original GET task has been automatically cancelled with EarlyFinishException, which is represented by yellow color. See tracing section for details describing generated diagrams.

Tracing

See Tracing for details about ParSeq's tracing and visualization features.

Unit Testing

ParSeq comes with a test classified artifact that contains BaseEngineTest which can be used as a base class for ParSeq-related tests. It will automatically create and shut down engine for every test case and provides many usefult methods for running and tracing tasks.

To use it add the following dependency:

        <dependency>
            <groupId>com.linkedin.parseq</groupId>
            <artifactId>parseq</artifactId>
            <version>2.0.5</version>
            <classifier>test</classifier>
            <scope>test</scope>
        </dependency>

Integrating with ParSeq

This section descfribes how to integrate existing asynchronous libraries with ParSeq. We provided two examples that might be useful as further guides:

Integrating Asynchronous API

We will use Task.async() method to create a task instance that will be completed asynchronously. It accepts either Callable or a Function1 that returns an instance of a Promise. When returned task is executed it will call provided callabel or a function and will propagate returned promise completion to itself. It means that task will complete when the promise completes. The following example shows how async http client can be integrated with ParSeq, for full source see WrappedRequestBuilder:

  public Task<Response> task(final String desc) {
    return Task.async(desc, () -> {
      final SettablePromise<Response> result = Promises.settable();

      _delegate.execute(new AsyncCompletionHandler<Response>() {
        @Override
        public Response onCompleted(final Response response) throws Exception {
          result.done(response);
          return response;
        }
        @Override
        public void onThrowable(Throwable t) {
          result.fail(t);
        }
      });
      return result;
    });
  }

In example above we first create instance of SettablePromise. Next, we invoke http client's method and pass a completion handler which will propagate result to the promise.

Integrating Blocking API

Unfortunately not every library provides asynchronous API, e.g. JDBC libraries. We should never run blocking code directly inside ParSeq task because this can affect other asynchronous tasks.

In order to integrate blocking API with ParSeq we can use Task.blocking() method. It accepts two parameters:

  • Callable that executes blocking code
  • Executor instance on which callable will be called

At runtime ParSeq will submit a blocking callable to provided executor and will complete the task with its result once it has completed.

Managing executors used for blocking calls it outside of ParSeq responsibilities and is use case specific but we recommend to always limit number of threads used by executor, limit size of work queue and specify Rejection Policy other than CallerRunsPolicy.

Something went wrong with that request. Please try again.