Skip to content

Commit

Permalink
Switched to CompletionStage
Browse files Browse the repository at this point in the history
Fixes #4691
  • Loading branch information
jroper committed Oct 9, 2015
1 parent a2a49f3 commit 235275c
Show file tree
Hide file tree
Showing 73 changed files with 1,268 additions and 745 deletions.
7 changes: 7 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
language: scala
sudo: false
# This is needed as long as the travis build environment is JDK 1.8.0 < u40 (at time of writing it is u31)
# Otherwise, FSpec fails due to deadlocks caused by CompletableFuture.thenCompose blocking in the trampoline
# executor.
addons:
apt:
packages:
- oracle-java8-installer
jdk:
- oraclejdk8
env:
Expand Down
11 changes: 6 additions & 5 deletions documentation/manual/detailedTopics/configuration/ThreadPools.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ Class loaders and thread locals need special handling in a multithreaded environ
In a Play application the [thread context class loader](https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#getContextClassLoader--) may not always be able to load application classes. You should explicitly use the application class loader to load classes.

Java
: @[using-app-classloader](code/ThreadPoolsJava.java)
: @[using-app-classloader](code/detailedtopics/ThreadPoolsJava.java)

Scala
: @[using-app-classloader](code/ThreadPools.scala)
Expand All @@ -97,12 +97,13 @@ In some cases you may not be able to explicitly use the application classloader.

Java code in Play uses a `ThreadLocal` to find out about contextual information such as the current HTTP request. Scala code doesn't need to use `ThreadLocal`s because it can use implicit parameters to pass context instead. `ThreadLocal`s are used in Java so that Java code can access contextual information without needing to pass context parameters everywhere.

Java `ThreadLocal`s, along with the correct context `ClassLoader`, are propagated automatically by `ExecutionContextExecutor` objects provided through the `HttpExecution` class. (An `ExecutionContextExecutor` is both a Scala `ExecutionContext` and a Java `Executor`.) These special `ExecutionContextExecutor` objects are automatically created and used by Java actions and Java `Promise` methods. The default objects wrap the default user thread pool. If you want to do your own threading then you should use the `HttpExecution` class' helper methods to get an `ExecutionContextExecutor` object yourself.
The problem with using thread locals however is that as soon as control switches to another thread, you lose thread local information. So if you were to map a `CompletionStage` using `thenApply`, and then try to access the HTTP context (eg, the session or request), it won't work. To address this, Play provides an [`HttpExecutionContext`](api/java/play/libs/concurrent/HttpExecutionContext.html). This allows you to capture the current context in an `Executor`, which you can then pass to the `CompletionStage` `*Async` methods such as `thenApplyAsync()`, and when the executor executes your callback, it will ensure the thread local context is setup so that you can access the request/session/flash/response objects.

In the example below, a user thread pool is wrapped to create a new `ExecutionContext` that propagates thread locals correctly.
To use the `HttpExecutionContext`, inject it into your component, and then pass the current context anytime a `CompletionStage` is interacted with. For example:

@[async-explicit-ec-imports](../../working/javaGuide/main/async/code/javaguide/async/controllers/Application.java)
@[async-explicit-ec](../../working/javaGuide/main/async/code/javaguide/async/controllers/Application.java)
@[http-execution-context](code/detailedtopics/httpec/MyController.java)

If you have a custom executor, you can wrap it in an `HttpExecutionContext` simply by passing it to the `HttpExecutionContext`s constructor.

## Best practices

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package detailedtopics.configuration.threadpools;
package detailedtopics;

import org.junit.Test;
import play.Play;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package detailedtopics.httpec;

//#http-execution-context
import play.libs.concurrent.HttpExecutionContext;
import play.libs.ws.WSClient;
import play.mvc.*;

import javax.inject.Inject;
import java.util.concurrent.CompletionStage;

public class MyController extends Controller {
@Inject HttpExecutionContext ec;
@Inject WSClient ws;

public CompletionStage<Result> index() {
String checkUrl = request().getQueryString("url");
return ws.url(checkUrl).get().thenApplyAsync((response) -> {
session().put("lastStatus", Integer.toString(response.getStatus()));
return ok();
}, ec.current());
}
}
//#http-execution-context
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
//#cleanup
import javax.inject.*;
import play.inject.ApplicationLifecycle;
import play.libs.F;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;

@Singleton
public class MessageQueueConnection {
Expand All @@ -20,7 +20,7 @@ public MessageQueueConnection(ApplicationLifecycle lifecycle) {

lifecycle.addStopHook(() -> {
connection.stop();
return F.Promise.pure(null);
return CompletableFuture.completedFuture(null);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,17 @@
import java.io.IOException;

import org.asynchttpclient.AsyncHttpClientConfig;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;

import play.libs.F;
import play.libs.ws.WSClient;
import play.libs.ws.WSResponse;
import play.libs.ws.ning.NingWSClient;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;

//#imports
import javax.inject.Inject;
import play.api.Play;
import play.Mode;
import play.routing.RoutingDsl;
Expand All @@ -32,7 +31,7 @@
public class JavaEmbeddingPlay {

@Test
public void simple() throws IOException {
public void simple() throws Exception {
//#simple
Server server = Server.forRouter(new RoutingDsl()
.GET("/hello/:to").routeTo(to ->
Expand All @@ -45,11 +44,15 @@ public void simple() throws IOException {
try {
withClient(ws -> {
//#http-port
F.Promise<WSResponse> response = ws.url(
CompletionStage<WSResponse> response = ws.url(
"http://localhost:" + server.httpPort() + "/hello/world"
).get();
//#http-port
assertThat(response.get(10000).getBody(), equalTo("Hello world"));
try {
assertThat(response.toCompletableFuture().get(10, TimeUnit.SECONDS).getBody(), equalTo("Hello world"));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
} finally {
//#stop
Expand All @@ -59,7 +62,7 @@ public void simple() throws IOException {
}

@Test
public void config() throws IOException {
public void config() throws Exception {
//#config
Server server = Server.forRouter(new RoutingDsl()
.GET("/hello/:to").routeTo(to ->
Expand All @@ -71,8 +74,14 @@ public void config() throws IOException {
//#config

try {
withClient(ws ->
assertThat(ws.url("http://localhost:19000/hello/world").get().get(10000).getBody(), equalTo("Hello world"))
withClient(ws -> {
try {
assertThat(ws.url("http://localhost:19000/hello/world").get().toCompletableFuture().get(10,
TimeUnit.SECONDS).getBody(), equalTo("Hello world"));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
);
} finally {
server.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
import org.junit.Test;
import play.Application;
import play.Play;
import play.libs.F;
import play.test.*;
import scala.compat.java8.FutureConverters;

import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.*;
Expand All @@ -20,26 +20,26 @@ public class JavaPlugins {
@Test
public void pluginsShouldBeAccessible() {
final AtomicReference<MyComponent> myComponentRef = new AtomicReference<MyComponent>();
Application app = fakeApplication(new HashMap<String, Object>(), Arrays.asList(MyPlugin.class.getName()));
running(app, new Runnable() {
public void run() {
//#access-plugin
MyComponent myComponent = Play.application().plugin(MyPlugin.class).getMyComponent();
//#access-plugin
assertTrue(myComponent.started);
myComponentRef.set(myComponent);
}
Application app = fakeApplication(new HashMap<>(), Arrays.asList(MyPlugin.class.getName()));
running(app, () -> {
//#access-plugin
MyComponent myComponent = Play.application().plugin(MyPlugin.class).getMyComponent();
//#access-plugin
assertTrue(myComponent.started);
myComponentRef.set(myComponent);
});
assertTrue(myComponentRef.get().stopped);
}

@Test
public void actorExampleShouldWork() {
Application app = fakeApplication(new HashMap<String, Object>(), Arrays.asList(Actors.class.getName()));
running(app, new Runnable() {
public void run() {
ActorRef actor = Actors.getMyActor();
assertEquals("hi", F.Promise.wrap(ask(actor, "hi", 20000)).get(20000));
Application app = fakeApplication(new HashMap<>(), Arrays.asList(Actors.class.getName()));
running(app, () -> {
ActorRef actor = Actors.getMyActor();
try {
assertEquals("hi", FutureConverters.toJava(ask(actor, "hi", 20000)).toCompletableFuture().get(20, TimeUnit.SECONDS));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

//#default
import play.http.HttpRequestHandler;
import play.libs.F;
import play.mvc.Action;
import play.mvc.Http;
import play.mvc.Result;
import java.util.concurrent.CompletionStage;

import java.lang.reflect.Method;

Expand All @@ -17,7 +17,7 @@ public class RequestHandler implements HttpRequestHandler {
public Action createAction(Http.Request request, Method actionMethod) {
return new Action.Simple() {
@Override
public F.Promise<Result> call(Http.Context ctx) {
public CompletionStage<Result> call(Http.Context ctx) {
return delegate.call(ctx);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
//#imports
import play.api.routing.Router;
import play.routing.RoutingDsl;
import play.libs.F;
import java.util.concurrent.CompletableFuture;

import static play.mvc.Controller.*;
//#imports

Expand Down Expand Up @@ -77,7 +78,7 @@ public void async() {
//#async
Router router = new RoutingDsl()
.GET("/api/items/:id").routeAsync((Integer id) ->
F.Promise.pure(ok("Getting item " + id))
CompletableFuture.completedFuture(ok("Getting item " + id))
)
.build();
//#async
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
*/
package javaguide.testhelpers {

import java.util.concurrent.{CompletionStage, CompletableFuture}

import akka.stream.Materializer
import play.api.mvc.{Action, Request}
import play.core.j.{JavaHandlerComponents, JavaHelpers, JavaActionAnnotations, JavaAction}
import play.http.DefaultHttpRequestHandler
import play.mvc.{Controller, Http, Result}
import play.api.test.Helpers
import play.libs.F
import java.lang.reflect.Method

abstract class MockJavaAction extends Controller with Action[Http.RequestBody] {
Expand Down Expand Up @@ -40,8 +41,8 @@ abstract class MockJavaAction extends Controller with Action[Http.RequestBody] {

def invocation = {
method.invoke(this) match {
case r: Result => F.Promise.pure(r)
case f: F.Promise[_] => f.asInstanceOf[F.Promise[Result]]
case r: Result => CompletableFuture.completedFuture(r)
case f: CompletionStage[_] => f.asInstanceOf[CompletionStage[Result]]
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions documentation/manual/working/javaGuide/main/akka/JavaAkka.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ To create and/or use an actor, you need an `ActorSystem`. This can be obtained

The most basic thing that you can do with an actor is send it a message. When you send a message to an actor, there is no response, it's fire and forget. This is also known as the _tell_ pattern.

In a web application however, the _tell_ pattern is often not useful, since HTTP is a protocol that has requests and responses. In this case, it is much more likely that you will want to use the _ask_ pattern. The ask pattern returns a Scala `Future`, which you can then wrap in a Play `Promise`, and then map to your own result type.
In a web application however, the _tell_ pattern is often not useful, since HTTP is a protocol that has requests and responses. In this case, it is much more likely that you will want to use the _ask_ pattern. The ask pattern returns a Scala `Future`, which you can convert to a Java `CompletionStage` using `scala.compat.java8.FutureConverts.toJava`, and then map to your own result type.

Below is an example of using our `HelloActor` with the ask pattern:

Expand All @@ -36,7 +36,7 @@ Below is an example of using our `HelloActor` with the ask pattern:
A few things to notice:

* The ask pattern needs to be imported, it's often most convenient to static import the `ask` method.
* The returned future is wrapped in a `Promise`. The resulting promise is a `Promise<Object>`, so when you access its value, you need to cast it to the type you are expecting back from the actor.
* The returned future is converted to a `CompletionStage`. The resulting promise is a `CompletionStage<Object>`, so when you access its value, you need to cast it to the type you are expecting back from the actor.
* The ask pattern requires a timeout, we have supplied 1000 milliseconds. If the actor takes longer than that to respond, the returned promise will be completed with a timeout error.
* Since we're creating the actor in the constructor, we need to scope our controller as `Singleton`, so that a new actor isn't created every time this controller is used.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@

import play.Application;
import play.inject.guice.GuiceApplicationBuilder;
import play.libs.F.Promise;
import play.mvc.Result;
import play.test.*;
import scala.compat.java8.FutureConverters;
import scala.concurrent.duration.Duration;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;

import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
Expand All @@ -40,8 +38,12 @@ public void testask() throws Exception {
running(app, () -> {
javaguide.akka.ask.Application controller = app.injector().instanceOf(javaguide.akka.ask.Application.class);

String message = contentAsString(controller.sayHello("world").get(1000));
assertThat(message, equalTo("Hello, world"));
try {
String message = contentAsString(controller.sayHello("world").toCompletableFuture().get(1, TimeUnit.SECONDS));
assertThat(message, equalTo("Hello, world"));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

Expand All @@ -54,8 +56,12 @@ public void injected() throws Exception {
running(app, () -> {
javaguide.akka.inject.Application controller = app.injector().instanceOf(javaguide.akka.inject.Application.class);

String message = contentAsString(controller.getConfig().get(1000));
assertThat(message, equalTo("foo"));
try {
String message = contentAsString(controller.getConfig().toCompletableFuture().get(1, TimeUnit.SECONDS));
assertThat(message, equalTo("foo"));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

Expand All @@ -68,11 +74,14 @@ public void factoryinjected() throws Exception {
running(app, () -> {
ActorRef parent = app.injector().instanceOf(play.inject.Bindings.bind(ActorRef.class).qualifiedWith("parent-actor"));

String message = (String) Promise.wrap(ask(parent, new ParentActorProtocol.GetChild("my.config"), 1000)).flatMap(child ->
Promise.wrap(ask((ActorRef) child, new ConfiguredChildActorProtocol.GetConfig(), 1000))
).get(5000);

assertThat(message, equalTo("foo"));
try {
String message = (String) FutureConverters.toJava(ask(parent, new ParentActorProtocol.GetChild("my.config"), 1000)).thenCompose(child ->
FutureConverters.toJava(ask((ActorRef) child, new ConfiguredChildActorProtocol.GetConfig(), 1000))
).toCompletableFuture().get(5, TimeUnit.SECONDS);
assertThat(message, equalTo("foo"));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

Expand All @@ -87,7 +96,7 @@ public void async() throws Exception {
Application app = fakeApplication();
running(app, () -> {
Result result = MockJavaActionHelper.call(new MockJavaAction() {
public Promise<Result> index() {
public CompletionStage<Result> index() {
return new javaguide.akka.async.Application().index();
}
}, fakeRequest(), app.getWrappedApplication().materializer());
Expand Down
Loading

0 comments on commit 235275c

Please sign in to comment.