Permalink
Browse files

Add spray-caching module, CachingDirectives to spray-routing

  • Loading branch information...
1 parent 00d23fc commit 9f9d8390681e12929a0d5c08f350ecb672c014fc @sirthias sirthias committed Jul 24, 2012
View
@@ -26,6 +26,16 @@ object Build extends Build with DocSupport {
// Modules
// -------------------------------------------------------------------------------------------------------------------
+ lazy val sprayCaching = Project("spray-caching", file("spray-caching"))
+ .dependsOn(sprayUtil)
+ .settings(moduleSettings: _*)
+ .settings(libraryDependencies ++=
+ provided(akkaActor) ++
+ compile(clHashMap) ++
+ test(specs2)
+ )
+
+
lazy val sprayCan = Project("spray-can", file("spray-can"))
.dependsOn(sprayIo, sprayHttp, sprayUtil)
.settings(moduleSettings: _*)
@@ -72,7 +82,7 @@ object Build extends Build with DocSupport {
lazy val sprayRouting = Project("spray-routing", file("spray-routing"))
- .dependsOn(sprayHttp, sprayHttpx, sprayUtil)
+ .dependsOn(sprayCaching % "optional", sprayHttp, sprayHttpx, sprayUtil)
.settings(moduleSettings: _*)
.settings(libraryDependencies ++=
compile(shapeless) ++
@@ -81,7 +91,7 @@ object Build extends Build with DocSupport {
lazy val sprayRoutingTests = Project("spray-routing-tests", file("spray-routing-tests"))
- .dependsOn(sprayHttp, sprayHttpx, sprayTestKit, sprayUtil)
+ .dependsOn(sprayCaching, sprayHttp, sprayHttpx, sprayRouting, sprayTestKit, sprayUtil)
.settings(moduleSettings: _*)
.settings(libraryDependencies ++=
compile(shapeless) ++
@@ -17,7 +17,7 @@ object Dependencies {
val akkaActor = "com.typesafe.akka" % "akka-actor" % "2.0.2"
val akkaSlf4j = "com.typesafe.akka" % "akka-slf4j" % "2.0.2"
val akkaTestKit = "com.typesafe.akka" % "akka-testkit" % "2.0.2"
- val clHashMap = "com.googlecode.concurrentlinkedhashmap" % "concurrentlinkedhashmap-lru" % "1.2"
+ val clHashMap = "com.googlecode.concurrentlinkedhashmap" % "concurrentlinkedhashmap-lru" % "1.3.1"
val jetty7Async = "org.eclipse.jetty" % "jetty-continuation" % "7.5.1.v20110908"
val jettyWebApp = "org.eclipse.jetty" % "jetty-webapp" % "8.1.0.v20120127"
val liftJson = "net.liftweb" % "lift-json_2.9.1" % "2.4"
@@ -0,0 +1,76 @@
+/*
+ * Copyright (C) 2011-2012 spray.cc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cc.spray.caching
+
+import akka.util.NonFatal
+import akka.dispatch._
+
+
+/**
+ * General interface implemented by all spray cache implementations.
+ */
+trait Cache[V] {
+
+ /**
+ * Selects the (potentially non-existing) cache entry with the given key.
+ */
+ def apply(key: Any) = new Key(key)
+
+ class Key(key: Any) {
+
+ /**
+ * Wraps the given expression with caching support.
+ */
+ def apply(expr: => V)(implicit executor: ExecutionContext): Future[V] = apply { promise =>
+ try {
+ promise.success(expr)
+ } catch {
+ case NonFatal(e) => promise.failure(e)
+ }
+ }
+
+ /**
+ * Wraps the given function with caching support.
+ */
+ def apply(func: Promise[V] => Unit)(implicit executor: ExecutionContext): Future[V] = fromFuture(key) {
+ val p = Promise[V]()
+ func(p)
+ p
+ }
+ }
+
+ /**
+ * Retrieves the future instance that is currently in the cache for the given key.
+ * Returns None if the key has no corresponding cache entry.
+ */
+ def get(key: Any): Option[Future[V]]
+
+ /**
+ * Supplies a cache entry for the given key from the given expression.
+ */
+ def fromFuture(key: Any)(future: => Future[V])(implicit executor: ExecutionContext): Future[V]
+
+ /**
+ * Removes the cache item for the given key. Returns the removed item if it was found (and removed).
+ */
+ def remove(key: Any): Option[Future[V]]
+
+ /**
+ * Clears the cache by removing all entries.
+ */
+ def clear()
+}
@@ -0,0 +1,170 @@
+/*
+ * Copyright (C) 2011-2012 spray.cc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cc.spray.caching
+
+import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap
+import annotation.tailrec
+import akka.dispatch.{Promise, ExecutionContext, Future}
+import akka.util.Duration
+
+
+object LruCache {
+ /**
+ * Creates a new instance of either []cc.spray.caching.ExpiringLruCache]] or [[cc.spray.caching.SimpleLruCache]],
+ * depending on whether a non-zero and finite timeToLive and/or timeToIdle is set or not.
+ */
+ def apply[V](maxCapacity: Int = 500, initialCapacity: Int = 16,
+ timeToLive: Duration = Duration.Zero, timeToIdle: Duration = Duration.Zero): Cache[V] = {
+ import Duration._
+ def isNonZeroFinite(d: Duration) = d != Zero && d.isFinite
+ def millis(d: Duration) = if (isNonZeroFinite(d)) d.toMillis else 0L
+ if (isNonZeroFinite(timeToLive) || isNonZeroFinite(timeToIdle))
+ new ExpiringLruCache[V](maxCapacity, initialCapacity, millis(timeToLive), millis(timeToIdle))
+ else
+ new SimpleLruCache[V](maxCapacity, initialCapacity)
+ }
+}
+
+/**
+ * A thread-safe implementation of [[cc.spray.caching.cache]].
+ * The cache has a defined maximum number of entries is can store. After the maximum capacity has been reached new
+ * entries cause old ones to be evicted in a last-recently-used manner, i.e. the entries that haven't been accessed for
+ * the longest time are evicted first.
+ */
+final class SimpleLruCache[V](val maxCapacity: Int, val initialCapacity: Int) extends Cache[V] {
+ require(maxCapacity >= 0, "maxCapacity must not be negative")
+ require(initialCapacity <= maxCapacity, "initialCapacity must be <= maxCapacity")
+
+ private[caching] val store = new ConcurrentLinkedHashMap.Builder[Any, Future[V]]
+ .initialCapacity(initialCapacity)
+ .maximumWeightedCapacity(maxCapacity)
+ .build()
+
+ def get(key: Any) = Option(store.get(key))
+
+ def fromFuture(key: Any)(future: => Future[V])(implicit executor: ExecutionContext): Future[V] = {
+ val promise = Promise[V]()
+ store.putIfAbsent(key, promise) match {
+ case null => future.onComplete { value =>
+ promise.complete(value)
+ // in case of exceptions we remove the cache entry (i.e. try again later)
+ if (value.isLeft) store.remove(key, promise)
+ }
+ case existingFuture => existingFuture
+ }
+ }
+
+ def remove(key: Any) = Option(store.remove(key))
+
+ def clear() { store.clear() }
+}
+
+/**
+ * A thread-safe implementation of [[cc.spray.caching.cache]].
+ * The cache has a defined maximum number of entries is can store. After the maximum capacity has been reached new
+ * entries cause old ones to be evicted in a last-recently-used manner, i.e. the entries that haven't been accessed for
+ * the longest time are evicted first.
+ * In addition this implementation optionally supports time-to-live as well as time-to-idle expiration.
+ * The former provides an upper limit to the time period an entry is allowed to remain in the cache while the latter
+ * limits the maximum time an entry is kept without having been accessed. If both values are non-zero the time-to-live
+ * has to be strictly greater than the time-to-idle.
+ * Note that expired entries are only evicted upon next access (or by being thrown out by the capacity constraint), so
+ * they might prevent gargabe collection of their values for longer than expected.
+ *
+ * @param timeToLive the time-to-live in millis, zero for disabling ttl-expiration
+ * @param timeToIdle the time-to-idle in millis, zero for disabling tti-expiration
+ */
+final class ExpiringLruCache[V](maxCapacity: Long, initialCapacity: Int,
+ timeToLive: Long, timeToIdle: Long) extends Cache[V] {
+ require(timeToLive >= 0, "timeToLive must not be negative")
+ require(timeToIdle >= 0, "timeToIdle must not be negative")
+ require(timeToLive == 0 || timeToIdle == 0 || timeToLive > timeToIdle,
+ "timeToLive must be greater than timeToIdle, if both are non-zero")
+
+ private[caching] val store = new ConcurrentLinkedHashMap.Builder[Any, Entry[V]]
+ .initialCapacity(initialCapacity)
+ .maximumWeightedCapacity(maxCapacity)
+ .build()
+
+ @tailrec
+ def get(key: Any): Option[Future[V]] = store.get(key) match {
+ case null => None
+ case entry if (isAlive(entry)) =>
+ entry.refresh()
+ Some(entry.promise)
+ case entry =>
+ // remove entry, but only if it hasn't been removed and reinserted in the meantime
+ if (store.remove(key, entry)) None // successfully removed
+ else get(key) // nope, try again
+ }
+
+ def fromFuture(key: Any)(future: => Future[V])(implicit executor: ExecutionContext): Future[V] = {
+ def insert() = {
+ val newEntry = new Entry(Promise[V]())
+ val valueFuture = store.put(key, newEntry) match {
+ case null => future
+ case entry => if (isAlive(entry)) {
+ // we date back the new entry we just inserted
+ // in the meantime someone might have already seen the too fresh timestamp we just put in,
+ // but since the original entry is also still alive this doesn't matter
+ newEntry.created = entry.created
+ entry.promise
+ } else future
+ }
+ valueFuture.onComplete { value =>
+ newEntry.promise.tryComplete(value)
+ // in case of exceptions we remove the cache entry (i.e. try again later)
+ if (value.isLeft) store.remove(key, newEntry)
+ }
+ }
+ store.get(key) match {
+ case null => insert()
+ case entry if (isAlive(entry)) =>
+ entry.refresh()
+ entry.promise
+ case entry => insert()
+ }
+ }
+
+ def remove(key: Any) = store.remove(key) match {
+ case null => None
+ case entry if (isAlive(entry)) => Some(entry.promise)
+ case entry => None
+ }
+
+ def clear() { store.clear() }
+
+ private def isAlive(entry: Entry[V]) = {
+ val now = System.currentTimeMillis
+ (timeToLive == 0 || (now - entry.created) < timeToLive) &&
+ (timeToIdle == 0 || (now - entry.lastAccessed) < timeToIdle)
+ }
+}
+
+private[caching] class Entry[T](val promise: Promise[T]) {
+ @volatile var created = System.currentTimeMillis
+ @volatile var lastAccessed = System.currentTimeMillis
+ def refresh() {
+ // we dont care whether we overwrite a potentially newer value
+ lastAccessed = System.currentTimeMillis
+ }
+ override def toString = promise.value match {
+ case Some(Right(value)) => value.toString
+ case Some(Left(exception)) => exception.toString
+ case None => "pending"
+ }
+}
Oops, something went wrong.

0 comments on commit 9f9d839

Please sign in to comment.