-
Notifications
You must be signed in to change notification settings - Fork 160
/
jnr.clj
137 lines (116 loc) · 4.54 KB
/
jnr.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
(ns crux.kv.lmdb.jnr
"LMDB KV backend for Crux (alternative)."
(:require [clojure.java.io :as io]
[clojure.tools.logging :as log]
[clojure.spec.alpha :as s]
[crux.kv :as kv]
[crux.lru :as lru]
[crux.memory :as mem])
(:import java.io.Closeable
org.agrona.ExpandableDirectByteBuffer
[org.lmdbjava CopyFlags Cursor Dbi DbiFlags DirectBufferProxy Env EnvFlags Env$MapFullException GetOp PutFlags Txn]))
(defrecord LMDBJNRIterator [^Txn tx ^Cursor cursor ^ExpandableDirectByteBuffer eb]
kv/KvIterator
(seek [this k]
(when (.get cursor (mem/ensure-off-heap k eb) GetOp/MDB_SET_RANGE)
(.key cursor)))
(next [this]
(when (.next cursor)
(.key cursor)))
(prev [this]
(when (.prev cursor)
(.key cursor)))
(value [this]
(.val cursor))
Closeable
(close [_]
(.close cursor)))
(defrecord LMDBJNRSnapshot [^Dbi dbi ^Txn tx]
kv/KvSnapshot
(new-iterator [_]
(->LMDBJNRIterator tx (.openCursor dbi tx) (ExpandableDirectByteBuffer.)))
(get-value [_ k]
(.get dbi tx (mem/->off-heap k)))
Closeable
(close [_]
(.close tx)))
(def ^:dynamic ^{:tag 'long} *mapsize-increase-factor* 1)
(def ^:const max-mapsize-increase-factor 32)
(def ^:private default-env-flags [EnvFlags/MDB_NOTLS
EnvFlags/MDB_NORDAHEAD])
(def ^:private no-sync-env-flags [EnvFlags/MDB_MAPASYNC
EnvFlags/MDB_NOSYNC
EnvFlags/MDB_NOMETASYNC])
(defn- increase-mapsize [^Env env ^long factor]
(let [new-mapsize (* factor (.mapSize (.info env)))]
(log/debug "Increasing mapsize to:" new-mapsize)
(.setMapSize env new-mapsize)))
(defrecord LMDBJNRKv [db-dir ^Env env ^Dbi dbi]
kv/KvStore
(open [this {:keys [crux.kv/db-dir crux.kv/sync? crux.kv.lmdb.java/env-flags] :as options}]
(let [env (.open (Env/create DirectBufferProxy/PROXY_DB)
(io/file db-dir)
(into-array EnvFlags (cond-> default-env-flags
(not sync?) (concat no-sync-env-flags))))
^String db-name nil]
(try
(assoc this
:env env
:dbi (.openDbi env db-name ^"[Lorg.lmdbjava.DbiFlags;" (make-array DbiFlags 0))
:db-dir db-dir)
(catch Throwable t
(.close env)
(throw t)))))
(new-snapshot [_]
(->LMDBJNRSnapshot dbi (.txnRead env)))
(store [this kvs]
(try
(with-open [tx (.txnWrite env)]
(let [kb (ExpandableDirectByteBuffer.)
vb (ExpandableDirectByteBuffer.)]
(doseq [[k v] kvs]
(.put dbi tx (mem/ensure-off-heap k kb) (mem/ensure-off-heap v vb) (make-array PutFlags 0)))
(.commit tx)))
(catch Env$MapFullException e
(binding [*mapsize-increase-factor* (* 2 *mapsize-increase-factor*)]
(when (> *mapsize-increase-factor* max-mapsize-increase-factor)
(throw (IllegalStateException. "Too large size of keys to store at once.")))
(increase-mapsize env *mapsize-increase-factor*)
(kv/store this kvs)))))
(delete [this ks]
(try
(with-open [tx (.txnWrite env)]
(let [kb (ExpandableDirectByteBuffer.)]
(doseq [k ks]
(.delete dbi tx (mem/ensure-off-heap k kb)))
(.commit tx)))
(catch Env$MapFullException e
(binding [*mapsize-increase-factor* (* 2 *mapsize-increase-factor*)]
(when (> *mapsize-increase-factor* max-mapsize-increase-factor)
(throw (IllegalStateException. "Too large size of keys to delete at once.")))
(increase-mapsize env *mapsize-increase-factor*)
(kv/delete this ks)))))
(fsync [this]
(.sync env true))
(backup [_ dir]
(let [file (io/file dir)]
(when (.exists file)
(throw (IllegalArgumentException. (str "Directory exists: " (.getAbsolutePath file)))))
(.mkdirs file)
(.copy env file (make-array CopyFlags 0))) )
(count-keys [_]
(with-open [tx (.txnRead env)]
(.entries (.stat dbi tx))))
(db-dir [this]
(str db-dir))
(kv-name [this]
(.getName (class this)))
Closeable
(close [_]
(.close env)))
(def kv {:start-fn (fn [_ {:keys [crux.kv/db-dir] :as options}]
(lru/start-kv-store (map->LMDBJNRKv {:db-dir db-dir}) options))
:args (merge lru/options
{::env-flags {:doc "LMDB Flags"
:crux.config/type [any? identity]}})})
(def kv-store {:crux.node/kv-store kv})