-
Notifications
You must be signed in to change notification settings - Fork 160
/
lru.clj
206 lines (169 loc) · 5.66 KB
/
lru.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
(ns crux.lru
(:require [crux.db :as db]
[crux.index :as idx]
[crux.io :as cio]
[crux.kv :as kv])
(:import [clojure.lang Counted ILookup]
java.io.Closeable
java.util.concurrent.locks.StampedLock
java.util.function.Function
java.util.LinkedHashMap))
(set! *unchecked-math* :warn-on-boxed)
(defprotocol LRUCache
(compute-if-absent [this k stored-key-fn f])
; key-fn sometimes used to copy the key to prevent memory leaks
(evict [this k]))
(defn new-cache [^long size]
(let [cache (proxy [LinkedHashMap] [size 0.75 true]
(removeEldestEntry [_]
(> (count this) size)))
lock (StampedLock.)]
(reify
Object
(toString [this]
(.toString cache))
LRUCache
(compute-if-absent [this k stored-key-fn f]
(let [v (.valAt this k ::not-found)] ; use ::not-found as values can be falsy
(if (= ::not-found v)
(let [k (stored-key-fn k)
v (f k)]
(cio/with-write-lock lock
;; lock the cache only after potentially heavy value and key calculations are done
(.computeIfAbsent cache k (reify Function
(apply [_ k]
v)))))
v)))
(evict [_ k]
(cio/with-write-lock lock
(.remove cache k)))
ILookup
(valAt [this k]
(cio/with-write-lock lock
(.get cache k)))
(valAt [this k default]
(cio/with-write-lock lock
(.getOrDefault cache k default)))
Counted
(count [_]
(.size cache)))))
(defn- ensure-iterator-open [closed-state]
(when @closed-state
(throw (IllegalStateException. "Iterator closed."))))
(defrecord CachedIterator [i ^StampedLock lock closed-state]
kv/KvIterator
(seek [_ k]
(cio/with-read-lock lock
(ensure-iterator-open closed-state)
(kv/seek i k)))
(next [_]
(cio/with-read-lock lock
(ensure-iterator-open closed-state)
(kv/next i)))
(prev [_]
(cio/with-read-lock lock
(ensure-iterator-open closed-state)
(kv/prev i)))
(value [_]
(cio/with-read-lock lock
(ensure-iterator-open closed-state)
(kv/value i)))
Closeable
(close [_]
(cio/with-write-lock lock
(ensure-iterator-open closed-state)
(reset! closed-state true))))
(defrecord CachedSnapshot [^Closeable snapshot close-snapshot? ^StampedLock lock iterators-state]
kv/KvSnapshot
(new-iterator [_]
(if-let [^CachedIterator i (->> @iterators-state
(filter (fn [^CachedIterator i]
@(.closed-state i)))
(first))]
(if (compare-and-set! (.closed-state i) true false)
i
(recur))
(let [i (kv/new-iterator snapshot)
i (->CachedIterator i lock (atom false))]
(swap! iterators-state conj i)
i)))
(get-value [_ k]
(kv/get-value snapshot k))
Closeable
(close [_]
(doseq [^CachedIterator i @iterators-state]
(cio/with-write-lock lock
(reset! (.closed-state i) true)
(.close ^Closeable (.i i))))
(when close-snapshot?
(.close snapshot))))
(defn new-cached-snapshot ^crux.lru.CachedSnapshot [snapshot close-snapshot?]
(->CachedSnapshot snapshot close-snapshot? (StampedLock.) (atom #{})))
(defprotocol CacheProvider
(get-named-cache [this cache-name]))
;; TODO: this should be changed to something more sensible, this is to
;; simplify API usage, and the kv instance is the main
;; object. Potentially these caches should simply just live in the
;; main node directly, but that requires passing more stuff around
;; to the lower levels.
(defrecord CacheProvidingKvStore [kv cache-state cache-size]
kv/KvStore
(open [this options]
(assoc this :kv (kv/open kv options)))
(new-snapshot [_]
(new-cached-snapshot (kv/new-snapshot kv) true))
(store [_ kvs]
(kv/store kv kvs))
(delete [_ ks]
(kv/delete kv ks))
(fsync [_]
(kv/fsync kv))
(backup [_ dir]
(kv/backup kv dir))
(count-keys [_]
(kv/count-keys kv))
(db-dir [_]
(kv/db-dir kv))
(kv-name [_]
(kv/kv-name kv))
Closeable
(close [_]
(.close ^Closeable kv))
CacheProvider
(get-named-cache [this cache-name]
(get (swap! cache-state
update
cache-name
(fn [cache]
(or cache (new-cache cache-size))))
cache-name)))
(defrecord CachedIndex [idx index-cache]
db/Index
(db/seek-values [this k]
(compute-if-absent index-cache k identity
(fn [k]
(db/seek-values idx k))))
(db/next-values [this]
(throw (UnsupportedOperationException.))))
(defn new-cached-index [idx cache-size]
(->CachedIndex idx (new-cache cache-size)))
(def ^:const default-query-cache-size 10240)
(def options
(merge kv/options
{::query-cache-size
{:doc "Query Cache Size"
:default default-query-cache-size
:crux.config/type :crux.config/nat-int}}))
(defn start-kv-store ^java.io.Closeable [kv {:keys [crux.kv/check-and-store-index-version
crux.lru/query-cache-size] :as options}]
(let [kv (if (instance? CacheProvidingKvStore kv)
kv
(->CacheProvidingKvStore kv (atom {}) query-cache-size))
kv (kv/open kv options)]
(try
(if check-and-store-index-version
(idx/check-and-store-index-version kv)
kv)
(catch Throwable t
(.close ^Closeable kv)
(throw t)))))