Skip to content

Commit

Permalink
Config to set Caffeine's internal executor
Browse files Browse the repository at this point in the history
  • Loading branch information
mkurz committed May 13, 2020
1 parent 89d7c75 commit 6b17ee9
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 37 deletions.
Expand Up @@ -44,13 +44,12 @@
public interface CaffeineCacheComponents extends ConfigurationComponents, AkkaComponents {
default AsyncCacheApi cacheApi(String name) {
CaffeineCacheManager caffeineCacheManager =
new CaffeineCacheManager(config().getConfig("play.cache.caffeine"));
new CaffeineCacheManager(config().getConfig("play.cache.caffeine"), actorSystem());

play.api.cache.AsyncCacheApi scalaAsyncCacheApi =
new CaffeineCacheApi(
NamedCaffeineCacheProvider$.MODULE$.getNamedCache(
name, caffeineCacheManager, configuration()),
executionContext());
name, caffeineCacheManager, configuration()));
return new DefaultAsyncCacheApi(scalaAsyncCacheApi);
}

Expand Down
Expand Up @@ -4,6 +4,7 @@

package play.cache.caffeine;

import akka.actor.ActorSystem;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.typesafe.config.Config;

Expand All @@ -22,6 +23,7 @@
* <li>{@code weak-values}=[condition]: sets {@link Caffeine#weakValues}.
* <li>{@code soft-values}=[condition]: sets {@link Caffeine#softValues}.
* <li>{@code record-stats}=[condition]: sets {@link Caffeine#recordStats}.
* <li>{@code executor}=[string]: sets {@link Caffeine#executor}.
* </ul>
*
* It is illegal to use the following configurations together:
Expand All @@ -37,15 +39,17 @@
public final class CaffeineParser {
private final Caffeine<Object, Object> cacheBuilder;
private final Config config;
private final ActorSystem actorSystem;

private CaffeineParser(Config config) {
private CaffeineParser(Config config, ActorSystem actorSystem) {
this.cacheBuilder = Caffeine.newBuilder();
this.config = Objects.requireNonNull(config);
this.actorSystem = actorSystem;
}

/** Returns a configured {@link Caffeine} cache builder. */
public static Caffeine<Object, Object> from(Config config) {
CaffeineParser parser = new CaffeineParser(config);
public static Caffeine<Object, Object> from(Config config, ActorSystem actorSystem) {
CaffeineParser parser = new CaffeineParser(config, actorSystem);
config.entrySet().stream().map(Map.Entry::getKey).forEach(parser::parse);
return parser.cacheBuilder;
}
Expand Down Expand Up @@ -74,6 +78,11 @@ private void parse(String key) {
case "record-stats":
conditionally(key, cacheBuilder::recordStats);
break;
case "executor":
if (!config.getIsNull(key)) {
cacheBuilder.executor(actorSystem.dispatchers().lookup(config.getString(key)));
}
break;
default:
break;
}
Expand Down
3 changes: 2 additions & 1 deletion cache/play-caffeine-cache/src/main/resources/reference.conf
Expand Up @@ -15,14 +15,15 @@ play {
weak-keys = false
soft-values = false
record-stats = false
executor = ${play.cache.dispatcher}
}
caches {}
}
# The caches to bind
bindCaches = []
# The name of the default cache to use in caffeine
defaultCache = "play"
# The dispatcher used for get, set, remove,... operations on the cache. By default Play's default dispatcher is used.
# The dispatcher internally used by Caffeine. By default Caffeine uses ForkJoinPool.commonPool().
dispatcher = null
}
}
Expand Up @@ -4,9 +4,7 @@

package play.api.cache.caffeine

import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executor
import java.util.function.BiFunction

import javax.inject.Inject
import javax.inject.Provider
Expand All @@ -21,13 +19,13 @@ import play.cache.caffeine.NamedCaffeineCache
import play.api.cache._
import play.api.inject._
import play.api.Configuration
import play.api.libs.streams.Execution.trampoline
import play.cache.NamedCacheImpl
import play.cache.SyncCacheApiAdapter
import play.cache.{ AsyncCacheApi => JavaAsyncCacheApi }
import play.cache.{ DefaultAsyncCacheApi => JavaDefaultAsyncCacheApi }
import play.cache.{ SyncCacheApi => JavaSyncCacheApi }

import scala.compat.java8.FunctionConverters
import scala.compat.java8.FutureConverters
import scala.concurrent.duration.Duration
import scala.concurrent.ExecutionContext
Expand All @@ -43,17 +41,15 @@ trait CaffeineCacheComponents {
implicit def executionContext: ExecutionContext

lazy val caffeineCacheManager: CaffeineCacheManager = new CaffeineCacheManager(
configuration.underlying.getConfig("play.cache.caffeine")
configuration.underlying.getConfig("play.cache.caffeine"),
actorSystem
)

/**
* Use this to create with the given name.
*/
def cacheApi(name: String): AsyncCacheApi = {
val ec = configuration
.get[Option[String]]("play.cache.dispatcher")
.fold(executionContext)(actorSystem.dispatchers.lookup(_))
new CaffeineCacheApi(NamedCaffeineCacheProvider.getNamedCache(name, caffeineCacheManager, configuration))(ec)
new CaffeineCacheApi(NamedCaffeineCacheProvider.getNamedCache(name, caffeineCacheManager, configuration))
}

lazy val defaultCacheApi: AsyncCacheApi = cacheApi(configuration.underlying.getString("play.cache.defaultCache"))
Expand Down Expand Up @@ -108,10 +104,12 @@ class CaffeineCacheModule
})

@Singleton
class CacheManagerProvider @Inject() (configuration: Configuration) extends Provider[CaffeineCacheManager] {
class CacheManagerProvider @Inject() (configuration: Configuration, actorSystem: ActorSystem)
extends Provider[CaffeineCacheManager] {
lazy val get: CaffeineCacheManager = {
val cacheManager: CaffeineCacheManager = new CaffeineCacheManager(
configuration.underlying.getConfig("play.cache.caffeine")
configuration.underlying.getConfig("play.cache.caffeine"),
actorSystem
)
cacheManager
}
Expand Down Expand Up @@ -142,12 +140,8 @@ private[play] class NamedAsyncCacheApiProvider(key: BindingKey[NamedCaffeineCach
@Inject private var defaultEc: ExecutionContext = _
@Inject private var configuration: Configuration = _
@Inject private var actorSystem: ActorSystem = _
private lazy val ec: ExecutionContext = configuration
.get[Option[String]]("play.cache.dispatcher")
.map(actorSystem.dispatchers.lookup(_))
.getOrElse(defaultEc)
lazy val get: AsyncCacheApi =
new CaffeineCacheApi(injector.instanceOf(key))(ec)
new CaffeineCacheApi(injector.instanceOf(key))
}

private[play] class NamedSyncCacheApiProvider(key: BindingKey[AsyncCacheApi]) extends Provider[SyncCacheApi] {
Expand Down Expand Up @@ -211,8 +205,7 @@ class SyncCaffeineCacheApi @Inject() (val cache: NamedCaffeineCache[Any, Any]) e
/**
* Cache implementation of [[AsyncCacheApi]]
*/
class CaffeineCacheApi @Inject() (val cache: NamedCaffeineCache[Any, Any])(implicit context: ExecutionContext)
extends AsyncCacheApi {
class CaffeineCacheApi @Inject() (val cache: NamedCaffeineCache[Any, Any]) extends AsyncCacheApi {
override lazy val sync: SyncCaffeineCacheApi = new SyncCaffeineCacheApi(cache)

def set(key: String, value: Any, expiration: Duration): Future[Done] = {
Expand All @@ -226,7 +219,7 @@ class CaffeineCacheApi @Inject() (val cache: NamedCaffeineCache[Any, Any])(impli
else
FutureConverters
.toScala(resultJFuture)
.map(valueFromCache => Some(valueFromCache.asInstanceOf[ExpirableCacheValue[T]].value))
.map(valueFromCache => Some(valueFromCache.asInstanceOf[ExpirableCacheValue[T]].value))(trampoline)
}

def remove(key: String): Future[Done] = {
Expand All @@ -236,12 +229,11 @@ class CaffeineCacheApi @Inject() (val cache: NamedCaffeineCache[Any, Any])(impli

def getOrElseUpdate[A: ClassTag](key: String, expiration: Duration)(orElse: => Future[A]): Future[A] = {
lazy val orElseAsJavaFuture = FutureConverters
.toJava(orElse.map(ExpirableCacheValue(_, Some(expiration)).asInstanceOf[Any]))
.toJava(orElse.map(ExpirableCacheValue(_, Some(expiration)).asInstanceOf[Any])(trampoline))
.toCompletableFuture
lazy val orElseAsJavaBiFunction = FunctionConverters.asJavaBiFunction((_: Any, _: Executor) => orElseAsJavaFuture)

val resultAsJavaFuture = cache.get(key, orElseAsJavaBiFunction)
FutureConverters.toScala(resultAsJavaFuture).map(_.asInstanceOf[ExpirableCacheValue[A]].value)
val resultAsJavaFuture = cache.get(key, (_: Any, _: Executor) => orElseAsJavaFuture)
FutureConverters.toScala(resultAsJavaFuture).map(_.asInstanceOf[ExpirableCacheValue[A]].value)(trampoline)
}

def removeAll(): Future[Done] = {
Expand Down
Expand Up @@ -7,13 +7,14 @@ package play.api.cache.caffeine
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentMap

import akka.actor.ActorSystem
import com.github.benmanes.caffeine.cache.AsyncCache
import com.github.benmanes.caffeine.cache.Caffeine
import com.typesafe.config.Config
import play.cache.caffeine.CaffeineParser
import play.cache.caffeine.NamedCaffeineCache

class CaffeineCacheManager(private var config: Config) {
class CaffeineCacheManager(private var config: Config, private var actorSystem: ActorSystem) {
private val cacheMap: ConcurrentMap[String, NamedCaffeineCache[_, _]] =
new ConcurrentHashMap(16)

Expand All @@ -40,7 +41,7 @@ class CaffeineCacheManager(private var config: Config) {
if (caches.hasPath(cacheName))
caches.getConfig(cacheName).withFallback(defaults)
else defaults
cacheBuilder = CaffeineParser.from(cacheConfig).expireAfter(defaultExpiry)
cacheBuilder = CaffeineParser.from(cacheConfig, actorSystem).expireAfter(defaultExpiry)
cacheBuilder
}
}
Expand Up @@ -119,9 +119,9 @@ By default, Play will try to create caches with names from `play.cache.bindCache

## Setting the execution context

By default, all Caffeine and EhCache operations are blocking, and async implementations will block threads in the default execution context.
Usually this is okay if you are using Play's default configuration, which only stores elements in memory since reads should be relatively fast.
However, depending on how cache was configured, this blocking I/O might be too costly.
By default, Caffeine and EhCache store elements in memory. Therefore reads from and writes to the cache should be very fast, because there is hardly any blocking I/O.
However, depending on how a cache was configured (e.g. by using [EhCache's `DiskStore`](http://www.ehcache.org/generated/2.10.4/html/ehc-all/#page/Ehcache_Documentation_Set%2Fco-store_storage_tiers.html)), there might be blocking I/O which can become too costly, because even the async implementations will block threads in the default execution context.

For such a case you can configure a different [Akka dispatcher](https://doc.akka.io/docs/akka/2.6/dispatchers.html?language=scala#looking-up-a-dispatcher) and set it via `play.cache.dispatcher` so the cache plugin makes use of it:

```
Expand All @@ -136,6 +136,9 @@ contexts {
}
```

When using Caffeine, this will set Caffeine's [internal executor](https://github.com/ben-manes/caffeine/blob/v2.8.1/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Caffeine.java#L281-L303).
For EhCache, Play will run any EhCache operation in a Future on a thread of the given dispatcher.

## Caching HTTP responses

You can easily create a smart cached action using standard `Action` composition.
Expand Down
Expand Up @@ -119,9 +119,9 @@ By default, Play will try to create caches with names from `play.cache.bindCache

## Setting the execution context

By default, all Caffeine and EhCache operations are blocking, and async implementations will block threads in the default execution context.
Usually this is okay if you are using Play's default configuration, which only stores elements in memory since reads should be relatively fast.
However, depending on how cache was configured, this blocking I/O might be too costly.
By default, Caffeine and EhCache store elements in memory. Therefore reads from and writes to the cache should be very fast, because there is hardly any blocking I/O.
However, depending on how a cache was configured (e.g. by using [EhCache's `DiskStore`](http://www.ehcache.org/generated/2.10.4/html/ehc-all/#page/Ehcache_Documentation_Set%2Fco-store_storage_tiers.html)), there might be blocking I/O which can become too costly, because even the async implementations will block threads in the default execution context.

For such a case you can configure a different [Akka dispatcher](https://doc.akka.io/docs/akka/2.6/dispatchers.html?language=scala#looking-up-a-dispatcher) and set it via `play.cache.dispatcher` so the cache plugin makes use of it:

```
Expand All @@ -136,6 +136,9 @@ contexts {
}
```

When using Caffeine, this will set Caffeine's [internal executor](https://github.com/ben-manes/caffeine/blob/v2.8.1/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Caffeine.java#L281-L303).
For EhCache, Play will run any EhCache operation in a Future on a thread of the given dispatcher.

## Caching HTTP responses

You can easily create smart cached actions using standard Action composition.
Expand Down
5 changes: 5 additions & 0 deletions project/BuildSettings.scala
Expand Up @@ -263,6 +263,11 @@ object BuildSettings {
ProblemFilters.exclude[DirectMissingMethodProblem]("play.api.mvc.Result.copy"),
ProblemFilters.exclude[DirectMissingMethodProblem]("play.api.mvc.Result.this"),
ProblemFilters.exclude[IncompatibleSignatureProblem]("play.api.mvc.Result.unapply"),
// Config which sets Caffeine's internal executor, also switched to trampoline where useful
ProblemFilters.exclude[DirectMissingMethodProblem]("play.api.cache.caffeine.CacheManagerProvider.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("play.api.cache.caffeine.CaffeineCacheApi.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("play.api.cache.caffeine.CaffeineCacheManager.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("play.cache.caffeine.CaffeineParser.from"),
),
unmanagedSourceDirectories in Compile += {
val suffix = CrossVersion.partialVersion(scalaVersion.value) match {
Expand Down

0 comments on commit 6b17ee9

Please sign in to comment.