Skip to content

Commit

Permalink
Made the use of the ordered execution context in Java optional
Browse files Browse the repository at this point in the history
  • Loading branch information
jroper committed Mar 12, 2014
1 parent 0dd7a41 commit fd82e1a
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 144 deletions.
3 changes: 2 additions & 1 deletion framework/project/Build.scala
Expand Up @@ -138,7 +138,8 @@ object PlayBuild extends Build {
ProblemFilters.exclude[MissingMethodProblem]("play.core.server.netty.RequestBodyHandler.newRequestBodyHandler"),
ProblemFilters.exclude[MissingMethodProblem]("play.core.server.netty.RequestBodyHandler.newRequestBodyUpstreamHandler"),
ProblemFilters.exclude[MissingMethodProblem]("play.core.server.netty.PlayDefaultUpstreamHandler.play$core$server$netty$PlayDefaultUpstreamHandler$$bodyEnumerator$1"),
ProblemFilters.exclude[MissingMethodProblem]("play.core.server.netty.PlayDefaultUpstreamHandler.newRequestBodyHandler")
ProblemFilters.exclude[MissingMethodProblem]("play.core.server.netty.PlayDefaultUpstreamHandler.newRequestBodyHandler"),
ProblemFilters.exclude[MissingClassProblem]("play.libs.F$Promise$PromiseActor")
),
sourceGenerators in Compile <+= (dependencyClasspath in TemplatesCompilerProject in Runtime, packageBin in TemplatesCompilerProject in Compile, scalaSource in Compile, sourceManaged in Compile, streams) map ScalaTemplates
)
Expand Down
151 changes: 13 additions & 138 deletions framework/src/play/src/main/java/play/libs/F.java
Expand Up @@ -6,7 +6,6 @@
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import play.core.Invoker;

/**
* Defines a set of functional programming style helpers.
Expand Down Expand Up @@ -86,11 +85,10 @@ public Promise( scala.concurrent.Future<A> promise) {
}

/*
* reset underlying shared actors
* useful for mainly in tests
* No longer used
*/
@Deprecated
public static void resetActors() {
actors = null;
}

/**
Expand Down Expand Up @@ -239,31 +237,8 @@ public <B> Promise<Either<A,B>> or(Promise<B> another) {
* @param action The action to perform.
*/
public void onRedeem(final Callback<A> action) {
final play.mvc.Http.Context context = play.mvc.Http.Context.current.get();
final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
new play.api.libs.concurrent.PlayPromise<A>(promise).onRedeem(new scala.runtime.AbstractFunction1<A,scala.runtime.BoxedUnit>() {
public scala.runtime.BoxedUnit apply(A a) {
try {
run(new Function<A,Object>() {
public Object apply(A a) {
try {
action.invoke(a);
return 0;
} catch(RuntimeException e) {
throw e;
} catch(Throwable t) {
throw new RuntimeException(t);
}
}
}, a, context, classLoader);
} catch (RuntimeException e) {
throw e;
} catch (Throwable t) {
throw new RuntimeException(t);
}
return null;
}
},Invoker.executionContext());
promise.onSuccess(play.core.j.JavaPromise.callbackToPartialFunction(action),
HttpExecution.defaultContext());
}

/**
Expand All @@ -277,20 +252,8 @@ public Object apply(A a) {
* @return A wrapped promise that maps the type from <code>A</code> to <code>B</code>.
*/
public <B> Promise<B> map(final Function<A, B> function) {
final play.mvc.Http.Context context = play.mvc.Http.Context.current.get();
final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
return new Promise<B>(
promise.flatMap(new scala.runtime.AbstractFunction1<A,scala.concurrent.Future<B>>() {
public scala.concurrent.Future<B> apply(A a) {
try {
return run(function, a, context, classLoader);
} catch (RuntimeException e) {
throw e;
} catch(Throwable t) {
throw new RuntimeException(t);
}
}
},Invoker.executionContext())
promise.map(play.core.j.JavaPromise.functionToScalaFunction(function), HttpExecution.defaultContext())
);
}

Expand All @@ -305,22 +268,11 @@ public scala.concurrent.Future<B> apply(A a) {
* @return A wrapped promise that will only throw an exception if the supplied <code>function</code> throws an
* exception.
*/
public Promise<A> recover(final Function<Throwable,A> function) {
final play.mvc.Http.Context context = play.mvc.Http.Context.current.get();
final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
public Promise<A> recover(final Function<Throwable, A> function) {
return new Promise<A>(
play.core.j.JavaPromise.recover(promise, new scala.runtime.AbstractFunction1<Throwable, scala.concurrent.Future<A>>() {
public scala.concurrent.Future<A> apply(Throwable t) {
try {
return run(function,t, context, classLoader);
} catch (RuntimeException e) {
throw e;
} catch(Throwable e) {
throw new RuntimeException(e);
}
}
},Invoker.executionContext())
);
promise.recover(play.core.j.JavaPromise.functionToScalaPartialFunction(function),
HttpExecution.defaultContext())
);
}

/**
Expand All @@ -334,25 +286,18 @@ public scala.concurrent.Future<A> apply(Throwable t) {
* @return A wrapped promise for a result of type <code>B</code>
*/
public <B> Promise<B> flatMap(final Function<A,Promise<B>> function) {
final play.mvc.Http.Context context = play.mvc.Http.Context.current.get();
final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
return new Promise<B>(
promise.flatMap(new scala.runtime.AbstractFunction1<A,scala.concurrent.Future<Promise<B>>>() {
public scala.concurrent.Future<Promise<B>> apply(A a) {
promise.flatMap(new scala.runtime.AbstractFunction1<A,scala.concurrent.Future<B>>() {
public scala.concurrent.Future<B> apply(A a) {
try {
return run(function, a, context, classLoader);
return function.apply(a).getWrappedPromise();
} catch (RuntimeException e) {
throw e;
} catch(Throwable t) {
throw new RuntimeException(t);
}
}
},Invoker.executionContext()).flatMap(new scala.runtime.AbstractFunction1<Promise<B>,scala.concurrent.Future<B>>() {
public scala.concurrent.Future<B> apply(Promise<B> p) {
return p.promise;
}
},Invoker.executionContext())
);
}, HttpExecution.defaultContext()));
}

/**
Expand All @@ -364,76 +309,6 @@ public scala.concurrent.Future<A> getWrappedPromise() {
return promise;
}

// -- Utils

static Integer nb = 64;

static List<akka.actor.ActorRef> actors;
static {
actors = new ArrayList<akka.actor.ActorRef>(nb);
for(int i=0; i<nb; i++) {
actors.add(play.core.Invoker$.MODULE$.system().actorOf(new akka.actor.Props(PromiseActor.class), "promise-actor-" + i));
}
}
static List<akka.actor.ActorRef> actors() {
return actors;
}

static <A,B> scala.concurrent.Future<B> run(Function<A,B> f, A a, play.mvc.Http.Context context, ClassLoader classLoader) {
Long id;
if(context == null) {
id = 0l;
} else {
id = context.id();
}
return play.core.j.JavaPromise.akkaAsk(
actors().get((int)(id % actors().size())),
Tuple4(f, a, context, classLoader),
akka.util.Timeout.apply(60000 * 60 * 1) // Let's wait 1h here. Unfortunately we can't avoid a timeout.
).map(new scala.runtime.AbstractFunction1<Object,B> () {
public B apply(Object o) {
Either<Throwable,B> r = (Either<Throwable,B>)o;
if(r.left.isDefined()) {
Throwable t = r.left.get();
if(t instanceof RuntimeException) {
throw (RuntimeException)t;
} else {
throw new RuntimeException(t);
}
}

return r.right.get();
}
},Invoker.executionContext());
}

// Executes the Promise functions (capturing exception), with the given ThreadLocal context.
// This Actor is used as Agent to ensure function execution ordering for a given context.
public static class PromiseActor extends akka.actor.UntypedActor {

public void onReceive(Object msg) {
if (msg instanceof Tuple4) {
Tuple4 tuple = (Tuple4) msg;
Function f = (Function) tuple._1;
Object a = tuple._2;
play.mvc.Http.Context context = (play.mvc.Http.Context) tuple._3;
ClassLoader classLoader = (ClassLoader) tuple._4;
ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(classLoader);
play.mvc.Http.Context.current.set(context);
getSender().tell(Either.Right(f.apply(a)), null);
} catch(Throwable t) {
getSender().tell(Either.Left(t), null);
} finally {
play.mvc.Http.Context.current.remove();
Thread.currentThread().setContextClassLoader(oldClassLoader);
}
}
}

}

}

/**
Expand Down
54 changes: 54 additions & 0 deletions framework/src/play/src/main/java/play/libs/HttpExecution.java
@@ -0,0 +1,54 @@
package play.libs;

import play.Play;
import play.core.Invoker;
import play.core.j.HttpExecutionContext;
import play.core.j.OrderedExecutionContext;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContextExecutor;

/**
* ExecutionContexts that preserve the current thread's context ClassLoader and
* Http.Context.
*/
public class HttpExecution {

/**
* An ExecutionContext that executes work on the given ExecutionContext. The
* current thread's context ClassLoader and Http.Context are captured when
* this method is called and preserved for all executed tasks.
*/
public static ExecutionContextExecutor fromThread(ExecutionContext delegate) {
return HttpExecutionContext.fromThread(delegate);
}

/**
* An ExecutionContext that executes work on the application's internal
* ActorSystem dispatcher. The current thread's context ClassLoader and
* Http.Context are captured when this method is called and preserved
* for all executed tasks.
*/
public static ExecutionContextExecutor defaultContext() {
if (Invoker.useOrderedExecutionContext()) {
return orderedContext();
} else {
return HttpExecutionContext.fromThread(Invoker.executionContext());
}
}

private static ExecutionContext orderedExecutionContext = new OrderedExecutionContext(Invoker.system(), 64);

/**
* An ExecutionContext that executes work for a given Http.Context in the
* same actor each time, ensuring ordered execution of that work. The
* current thread's context ClassLoader and Http.Context are captured when
* this method is called and preserved for all executed tasks.
*
* This ExecutionContext gives the legacy behaviour of Play's F.Promise
* class.
*/
public static ExecutionContextExecutor orderedContext() {
return HttpExecutionContext.fromThread(orderedExecutionContext);
}

}
@@ -0,0 +1,48 @@
/*
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package play.core.j

import play.mvc.Http
import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor }

object HttpExecutionContext {

/**
* Create an HttpExecutionContext with values from the current thread.
*/
def fromThread(delegate: ExecutionContext): ExecutionContextExecutor =
new HttpExecutionContext(Thread.currentThread().getContextClassLoader, Http.Context.current.get(), delegate)

/**
* Create an ExecutionContext that will, when prepared, be created with values from that thread.
*/
def unprepared(delegate: ExecutionContext) = new ExecutionContext {
def execute(runnable: Runnable) = delegate.execute(runnable) // FIXME: Make calling this an error once SI-7383 is fixed
def reportFailure(t: Throwable) = delegate.reportFailure(t)
override def prepare(): ExecutionContext = fromThread(delegate)
}
}

/**
* Manages execution to ensure that the given context ClassLoader and Http.Context are set correctly
* in the current thread. Actual execution is performed by a delegate ExecutionContext.
*/
class HttpExecutionContext(contextClassLoader: ClassLoader, httpContext: Http.Context, delegate: ExecutionContext) extends ExecutionContextExecutor {
def execute(runnable: Runnable) = delegate.execute(new Runnable {
def run() {
val thread = Thread.currentThread()
val oldContextClassLoader = thread.getContextClassLoader
val oldHttpContext = Http.Context.current.get()
thread.setContextClassLoader(contextClassLoader)
Http.Context.current.set(httpContext)
try {
runnable.run()
} finally {
thread.setContextClassLoader(oldContextClassLoader)
Http.Context.current.set(oldHttpContext)
}
}
})
def reportFailure(t: Throwable) = delegate.reportFailure(t)
}
17 changes: 12 additions & 5 deletions framework/src/play/src/main/scala/play/core/j/JavaPromise.scala
Expand Up @@ -33,15 +33,22 @@ object JavaPromise {

def timeout: Future[Nothing] = Promise.timeout

def recover[A](promise: Future[A], f: Throwable => Future[A], ec: ExecutionContext): Future[A] = {
promise.extend1 {
case Thrown(e) => f(e)
case Redeemed(a) => Promise.pure(a)
}(ec).flatMap(p => p)(ec)
def recover[A](promise: Future[A], f: Throwable => A, ec: ExecutionContext): Future[A] = {
promise.recover {
case t => f(t)
}(ec)
}

def pure[A](a: A) = Promise.pure(a)

def throwing[A](throwable: Throwable) = Promise.pure[A](throw throwable)

def functionToScalaFunction[A, B](f: F.Function[A, B]): A => B = a => f.apply(a)
def functionToScalaPartialFunction[A, B](f: F.Function[A, B]): PartialFunction[A, B] = {
case a => f.apply(a)
}
def callbackToPartialFunction[A](c: F.Callback[A]): PartialFunction[A, Unit] = {
case a => c.invoke(a)
}

}
@@ -0,0 +1,40 @@
/*
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package play.core.j

import akka.actor.{ Actor, ActorSystem, Props }
import scala.concurrent.ExecutionContext
import play.mvc.Http
import play.api.Logger

/**
* Executes work in a fixed-sized pool of actors. If an Http.Context is associated
* with the current thread then that id will be used to dispatch work to the same
* actor every time, resulting in ordered execution of work for that context.
*
* The ExecutionContext preserves the execution behaviour of F.Promise from Play.
*/
class OrderedExecutionContext(actorSystem: ActorSystem, size: Int) extends ExecutionContext {
private val actors = Array.fill(size)(actorSystem.actorOf(Props[OrderedExecutionContext.RunActor]))

def execute(runnable: Runnable) = {
val httpContext = Http.Context.current.get()
val id: Long = if (httpContext == null) 0L else httpContext.id()
val actor = actors((id % size).toInt)
actor ! runnable
}

def reportFailure(t: Throwable) = Logger.error("Failure in OrderedExecutionContext", t)
}

object OrderedExecutionContext {
/**
* Used by the OrderedExecutionContext to run work in an actor.
*/
class RunActor extends Actor {
def receive = {
case runnable: Runnable => runnable.run()
}
}
}

0 comments on commit fd82e1a

Please sign in to comment.