Skip to content

Commit

Permalink
Merge pull request #9615 from gurkankaymak/8605-AtomicCacheApi
Browse files Browse the repository at this point in the history
Updates cache api to make getOrElseUpdate methods atomic
  • Loading branch information
mergify[bot] committed Oct 15, 2019
2 parents 7aebb5b + 03b9ec5 commit e7fb86f
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 40 deletions.
Expand Up @@ -8,6 +8,11 @@

import javax.annotation.Nonnull;

/**
* @deprecated Deprecated as of 2.8.0. This is an implementation detail and it was not supposed to
* be public.
*/
@Deprecated
public final class CaffeineDefaultExpiry implements Expiry<Object, Object> {
@Override
public long expireAfterCreate(@Nonnull Object key, @Nonnull Object value, long currentTime) {
Expand Down
Expand Up @@ -4,8 +4,9 @@

package play.api.cache.caffeine

import java.time
import java.util.concurrent.TimeUnit
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executor
import java.util.function.BiFunction

import javax.inject.Inject
import javax.inject.Provider
Expand All @@ -26,13 +27,11 @@ import play.cache.{ AsyncCacheApi => JavaAsyncCacheApi }
import play.cache.{ DefaultAsyncCacheApi => JavaDefaultAsyncCacheApi }
import play.cache.{ SyncCacheApi => JavaSyncCacheApi }

import scala.compat.java8.DurationConverters
import scala.compat.java8.FunctionConverters
import scala.compat.java8.FutureConverters
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.Duration.Infinite
import scala.reflect.ClassTag

/**
Expand Down Expand Up @@ -190,71 +189,67 @@ private[play] case class CaffeineCacheExistsException(msg: String, cause: Throwa
class SyncCaffeineCacheApi @Inject()(val cache: NamedCaffeineCache[Any, Any]) extends SyncCacheApi {

private val syncCache: Cache[Any, Any] = cache.synchronous()
private val variableExpiration = syncCache.policy.expireVariably.get

override def set(key: String, value: Any, expiration: Duration): Unit = {
val duration = expiration match {
case _: Infinite => time.Duration.ofSeconds(Long.MaxValue)
case finiteDuration: FiniteDuration if finiteDuration.lteq(0.second) => time.Duration.ofSeconds(1)
case finiteDuration: FiniteDuration => DurationConverters.toJava(finiteDuration)
}
variableExpiration.put(key, value, duration.getSeconds, TimeUnit.SECONDS)
syncCache.put(key, ExpirableCacheValue(value, Some(expiration)))
Done
}

override def remove(key: String): Unit = syncCache.invalidate(key)

override def getOrElseUpdate[A: ClassTag](key: String, expiration: Duration)(orElse: => A): A = {
get[A](key) match {
case Some(value) => value
case None =>
val value = orElse
set(key, value, expiration)
value
}
syncCache.get(key, _ => ExpirableCacheValue(orElse, Some(expiration))).asInstanceOf[ExpirableCacheValue[A]].value
}

override def get[T](key: String)(implicit ct: ClassTag[T]): Option[T] = {
Option(syncCache.getIfPresent(key))
Option(syncCache.getIfPresent(key).asInstanceOf[ExpirableCacheValue[T]])
.filter { v =>
Primitives.wrap(ct.runtimeClass).isInstance(v) ||
ct == ClassTag.Nothing || (ct == ClassTag.Unit && v == ((): Unit))
Primitives.wrap(ct.runtimeClass).isInstance(v.value) ||
ct == ClassTag.Nothing || (ct == ClassTag.Unit && v.value == ((): Unit))
}
.asInstanceOf[Option[T]]
.map(_.value)
}
}

/**
* Cache implementation of [[AsyncCacheApi]]. Since Cache is synchronous by default, this uses [[SyncCaffeineCacheApi]].
* Cache implementation of [[AsyncCacheApi]]
*/
class CaffeineCacheApi @Inject()(val cache: NamedCaffeineCache[Any, Any])(implicit context: ExecutionContext)
extends AsyncCacheApi {

override lazy val sync: SyncCaffeineCacheApi = new SyncCaffeineCacheApi(cache)

def set(key: String, value: Any, expiration: Duration): Future[Done] = Future {
def set(key: String, value: Any, expiration: Duration): Future[Done] = {
sync.set(key, value, expiration)
Done
Future.successful(Done)
}

def get[T: ClassTag](key: String): Future[Option[T]] = Future {
sync.get(key)
def get[T: ClassTag](key: String): Future[Option[T]] = {
val resultJFuture = cache.getIfPresent(key)
if (resultJFuture == null) Future.successful(None)
else
FutureConverters
.toScala(resultJFuture)
.map(valueFromCache => Some(valueFromCache.asInstanceOf[ExpirableCacheValue[T]].value))
}

def remove(key: String): Future[Done] = Future {
def remove(key: String): Future[Done] = {
sync.remove(key)
Done
Future.successful(Done)
}

def getOrElseUpdate[A: ClassTag](key: String, expiration: Duration)(orElse: => Future[A]): Future[A] = {
get[A](key).flatMap {
case Some(value) => Future.successful(value)
case None => orElse.flatMap(value => set(key, value, expiration).map(_ => value))
}
lazy val orElseAsJavaFuture = FutureConverters
.toJava(orElse.map(ExpirableCacheValue(_, Some(expiration)).asInstanceOf[Any]))
.toCompletableFuture
lazy val orElseAsJavaBiFunction = FunctionConverters.asJavaBiFunction((_: Any, _: Executor) => orElseAsJavaFuture)

val resultAsJavaFuture = cache.get(key, orElseAsJavaBiFunction)
FutureConverters.toScala(resultAsJavaFuture).map(_.asInstanceOf[ExpirableCacheValue[A]].value)
}

def removeAll(): Future[Done] = Future {
def removeAll(): Future[Done] = {
cache.synchronous.invalidateAll
Done
Future.successful(Done)
}
}
Expand Up @@ -10,7 +10,6 @@ import java.util.concurrent.ConcurrentMap
import com.github.benmanes.caffeine.cache.AsyncCache
import com.github.benmanes.caffeine.cache.Caffeine
import com.typesafe.config.Config
import play.cache.caffeine.CaffeineDefaultExpiry
import play.cache.caffeine.CaffeineParser
import play.cache.caffeine.NamedCaffeineCache

Expand All @@ -34,7 +33,7 @@ class CaffeineCacheManager(private var config: Config) {

private[caffeine] def getCacheBuilder(cacheName: String): Caffeine[_, _] = {
var cacheBuilder: Caffeine[_, _] = null
val defaultExpiry: CaffeineDefaultExpiry = new CaffeineDefaultExpiry()
val defaultExpiry: DefaultCaffeineExpiry = new DefaultCaffeineExpiry
val caches: Config = config.getConfig("caches")
val defaults: Config = config.getConfig("defaults")
var cacheConfig: Config = null
Expand Down
@@ -0,0 +1,41 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/

package play.api.cache.caffeine

import akka.annotation.InternalApi
import com.github.benmanes.caffeine.cache.Expiry

import scala.concurrent.duration.Duration
import scala.concurrent.duration._

@InternalApi
private[caffeine] class DefaultCaffeineExpiry extends Expiry[String, ExpirableCacheValue[Any]] {

def expireAfterCreate(key: String, value: ExpirableCacheValue[Any], currentTime: Long): Long = {
calculateExpirationTime(value.durationMaybe)
}

def expireAfterUpdate(
key: String,
value: ExpirableCacheValue[Any],
currentTime: Long,
currentDuration: Long
): Long = {
calculateExpirationTime(value.durationMaybe)
}

def expireAfterRead(key: String, value: ExpirableCacheValue[Any], currentTime: Long, currentDuration: Long): Long = {
currentDuration
}

private def calculateExpirationTime(durationMaybe: Option[Duration]): Long = {
durationMaybe match {
case Some(duration) if duration.isFinite && duration.lteq(0.second) => 1.second.toNanos
case Some(duration) if duration.isFinite => duration.toNanos
case _ => Long.MaxValue
}
}

}
@@ -0,0 +1,12 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/

package play.api.cache.caffeine

import akka.annotation.InternalApi

import scala.concurrent.duration.Duration

@InternalApi
private[caffeine] case class ExpirableCacheValue[V](value: V, durationMaybe: Option[Duration] = None)
Expand Up @@ -5,9 +5,12 @@
package play.api.cache.caffeine

import java.util.concurrent.Executors

import javax.inject.Inject
import javax.inject.Provider

import org.specs2.mock.Mockito
import org.mockito.Mockito.verify
import org.mockito.Mockito.never
import play.api.cache.AsyncCacheApi
import play.api.cache.SyncCacheApi
import play.api.inject._
Expand Down Expand Up @@ -99,6 +102,55 @@ class CaffeineCacheApiSpec extends PlaySpecification {
Await.result(cacheApi.removeAll(), 1.second) must be(akka.Done)
Await.result(cacheApi.get("foo"), 1.second) must beNone
}

"put and return the value given with orElse function if there is no value with the given key" in new WithApplication() {
val syncCacheApi = app.injector.instanceOf[SyncCacheApi]
val result: String = syncCacheApi.getOrElseUpdate("aaa")("ddd")
result mustEqual "ddd"
val resultFromCacheMaybe = syncCacheApi.get("aaa")
resultFromCacheMaybe must beSome("ddd")
}

"asynchronously put and return the value given with orElse function if there is no value with the given key" in new WithApplication() {
val asyncCacheApi = app.injector.instanceOf[AsyncCacheApi]
val resultFuture = asyncCacheApi.getOrElseUpdate[String]("aaa")(Future.successful("ddd"))
val result = Await.result(resultFuture, 2.seconds)
result mustEqual "ddd"
val resultFromCacheFuture = asyncCacheApi.get("aaa")
val resultFromCacheMaybe = Await.result(resultFromCacheFuture, 2.seconds)
resultFromCacheMaybe must beSome("ddd")
}

"expire the item after the given amount of time is passed" in new WithApplication() {
val syncCacheApi = app.injector.instanceOf[SyncCacheApi]
val expiration = 1.second
val result: String = syncCacheApi.getOrElseUpdate("aaa", expiration)("ddd")
result mustEqual "ddd"
Thread.sleep(expiration.toMillis) // be sure that expire duration passes
val resultMaybe = syncCacheApi.get("aaa")
resultMaybe must beNone
}

"SyncCacheApi.getOrElseUpdate method should not evaluate the orElse part if the cache contains an item with the given key" in new WithApplication() {
val syncCacheApi = app.injector.instanceOf[SyncCacheApi]
syncCacheApi.set("aaa", "bbb")
trait OrElse { lazy val orElse: String = "ccc" }
val mockOrElse = Mockito.mock[OrElse]
val result = syncCacheApi.getOrElseUpdate[String]("aaa")(mockOrElse.orElse)
result mustEqual "bbb"
verify(mockOrElse, never).orElse
}

"AsyncCacheApi.getOrElseUpdate method should not evaluate the orElse part if the cache contains an item with the given key" in new WithApplication() {
val asyncCacheApi = app.injector.instanceOf[AsyncCacheApi]
asyncCacheApi.set("aaa", "bbb")
trait OrElse { lazy val orElse: Future[String] = Future.successful("ccc") }
val mockOrElse = Mockito.mock[OrElse]
val resultFuture = asyncCacheApi.getOrElseUpdate[String]("aaa")(mockOrElse.orElse)
val result = Await.result(resultFuture, 2.seconds)
result mustEqual "bbb"
verify(mockOrElse, never).orElse
}
}
}

Expand Down
Expand Up @@ -102,6 +102,11 @@ Some new methods were added to improve the Java API too:

xxx

### Cache Api changes
* `Caffeine` implementations of the `getOrElseUpdate` methods in both `SyncCacheApi` and `AsyncCacheApi` are now atomic. (Note that EhCache implementations of `getOrElseUpdate` methods are still non-atomic)
* `Caffeine` implementations of the `getOrElseUpdate` methods lazily evaluate the `orElse` part, that means if the item with the given key exists in cache then the `orElse` part is not executed
* `play.cache.caffeine.CaffeineDefaultExpiry` class is now deprecated

### Internal changes

Many changes have been made to Play's internal APIs. These APIs are used internally and don't follow a normal deprecation process. Changes may be mentioned below to help those who integrate directly with Play internal APIs.
Expand Down
1 change: 1 addition & 0 deletions project/BuildSettings.scala
Expand Up @@ -293,6 +293,7 @@ object BuildSettings {
ProblemFilters.exclude[DirectMissingMethodProblem]("play.cache.caffeine.NamedCaffeineCache.putAll"),
ProblemFilters.exclude[DirectMissingMethodProblem]("play.cache.caffeine.NamedCaffeineCache.getAllPresent"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("play.cache.caffeine.NamedCaffeineCache.this"),
ProblemFilters.exclude[MissingClassProblem]("play.cache.caffeine.CaffeineDefaultExpiry"),
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"play.api.libs.Files#DefaultTemporaryFileCreator#DefaultTemporaryFile.atomicMoveWithFallback"
),
Expand Down

0 comments on commit e7fb86f

Please sign in to comment.