This repository has been archived by the owner on Apr 24, 2024. It is now read-only.
/
LruCache.scala
170 lines (152 loc) · 6.86 KB
/
LruCache.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
/*
* Copyright (C) 2011-2012 spray.cc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cc.spray.caching
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap
import annotation.tailrec
import akka.dispatch.{Promise, ExecutionContext, Future}
import akka.util.Duration
object LruCache {
/**
* Creates a new instance of either []cc.spray.caching.ExpiringLruCache]] or [[cc.spray.caching.SimpleLruCache]],
* depending on whether a non-zero and finite timeToLive and/or timeToIdle is set or not.
*/
def apply[V](maxCapacity: Int = 500, initialCapacity: Int = 16,
timeToLive: Duration = Duration.Zero, timeToIdle: Duration = Duration.Zero): Cache[V] = {
import Duration._
def isNonZeroFinite(d: Duration) = d != Zero && d.isFinite
def millis(d: Duration) = if (isNonZeroFinite(d)) d.toMillis else 0L
if (isNonZeroFinite(timeToLive) || isNonZeroFinite(timeToIdle))
new ExpiringLruCache[V](maxCapacity, initialCapacity, millis(timeToLive), millis(timeToIdle))
else
new SimpleLruCache[V](maxCapacity, initialCapacity)
}
}
/**
* A thread-safe implementation of [[cc.spray.caching.cache]].
* The cache has a defined maximum number of entries is can store. After the maximum capacity has been reached new
* entries cause old ones to be evicted in a last-recently-used manner, i.e. the entries that haven't been accessed for
* the longest time are evicted first.
*/
final class SimpleLruCache[V](val maxCapacity: Int, val initialCapacity: Int) extends Cache[V] {
require(maxCapacity >= 0, "maxCapacity must not be negative")
require(initialCapacity <= maxCapacity, "initialCapacity must be <= maxCapacity")
private[caching] val store = new ConcurrentLinkedHashMap.Builder[Any, Future[V]]
.initialCapacity(initialCapacity)
.maximumWeightedCapacity(maxCapacity)
.build()
def get(key: Any) = Option(store.get(key))
def fromFuture(key: Any)(future: => Future[V])(implicit executor: ExecutionContext): Future[V] = {
val promise = Promise[V]()
store.putIfAbsent(key, promise) match {
case null => future.onComplete { value =>
promise.complete(value)
// in case of exceptions we remove the cache entry (i.e. try again later)
if (value.isLeft) store.remove(key, promise)
}
case existingFuture => existingFuture
}
}
def remove(key: Any) = Option(store.remove(key))
def clear() { store.clear() }
}
/**
* A thread-safe implementation of [[cc.spray.caching.cache]].
* The cache has a defined maximum number of entries is can store. After the maximum capacity has been reached new
* entries cause old ones to be evicted in a last-recently-used manner, i.e. the entries that haven't been accessed for
* the longest time are evicted first.
* In addition this implementation optionally supports time-to-live as well as time-to-idle expiration.
* The former provides an upper limit to the time period an entry is allowed to remain in the cache while the latter
* limits the maximum time an entry is kept without having been accessed. If both values are non-zero the time-to-live
* has to be strictly greater than the time-to-idle.
* Note that expired entries are only evicted upon next access (or by being thrown out by the capacity constraint), so
* they might prevent gargabe collection of their values for longer than expected.
*
* @param timeToLive the time-to-live in millis, zero for disabling ttl-expiration
* @param timeToIdle the time-to-idle in millis, zero for disabling tti-expiration
*/
final class ExpiringLruCache[V](maxCapacity: Long, initialCapacity: Int,
timeToLive: Long, timeToIdle: Long) extends Cache[V] {
require(timeToLive >= 0, "timeToLive must not be negative")
require(timeToIdle >= 0, "timeToIdle must not be negative")
require(timeToLive == 0 || timeToIdle == 0 || timeToLive > timeToIdle,
"timeToLive must be greater than timeToIdle, if both are non-zero")
private[caching] val store = new ConcurrentLinkedHashMap.Builder[Any, Entry[V]]
.initialCapacity(initialCapacity)
.maximumWeightedCapacity(maxCapacity)
.build()
@tailrec
def get(key: Any): Option[Future[V]] = store.get(key) match {
case null => None
case entry if (isAlive(entry)) =>
entry.refresh()
Some(entry.promise)
case entry =>
// remove entry, but only if it hasn't been removed and reinserted in the meantime
if (store.remove(key, entry)) None // successfully removed
else get(key) // nope, try again
}
def fromFuture(key: Any)(future: => Future[V])(implicit executor: ExecutionContext): Future[V] = {
def insert() = {
val newEntry = new Entry(Promise[V]())
val valueFuture = store.put(key, newEntry) match {
case null => future
case entry => if (isAlive(entry)) {
// we date back the new entry we just inserted
// in the meantime someone might have already seen the too fresh timestamp we just put in,
// but since the original entry is also still alive this doesn't matter
newEntry.created = entry.created
entry.promise
} else future
}
valueFuture.onComplete { value =>
newEntry.promise.tryComplete(value)
// in case of exceptions we remove the cache entry (i.e. try again later)
if (value.isLeft) store.remove(key, newEntry)
}
}
store.get(key) match {
case null => insert()
case entry if (isAlive(entry)) =>
entry.refresh()
entry.promise
case entry => insert()
}
}
def remove(key: Any) = store.remove(key) match {
case null => None
case entry if (isAlive(entry)) => Some(entry.promise)
case entry => None
}
def clear() { store.clear() }
private def isAlive(entry: Entry[V]) = {
val now = System.currentTimeMillis
(timeToLive == 0 || (now - entry.created) < timeToLive) &&
(timeToIdle == 0 || (now - entry.lastAccessed) < timeToIdle)
}
}
private[caching] class Entry[T](val promise: Promise[T]) {
@volatile var created = System.currentTimeMillis
@volatile var lastAccessed = System.currentTimeMillis
def refresh() {
// we dont care whether we overwrite a potentially newer value
lastAccessed = System.currentTimeMillis
}
override def toString = promise.value match {
case Some(Right(value)) => value.toString
case Some(Left(exception)) => exception.toString
case None => "pending"
}
}