-
Notifications
You must be signed in to change notification settings - Fork 160
/
bootstrap.clj
157 lines (133 loc) · 6.12 KB
/
bootstrap.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
(ns crux.bootstrap
(:require [clojure.java.io :as io]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[clojure.java.io :as io]
[crux.backup :as backup]
[crux.codec :as c]
[crux.db :as db]
[crux.io :as cio]
[crux.index :as idx]
[crux.kv :as kv]
[crux.lru :as lru]
[crux.query :as q]
[crux.status :as status]
[crux.tx :as tx])
(:import java.io.Closeable
java.net.InetAddress
crux.api.ICruxAPI))
(s/check-asserts (if-let [check-asserts (System/getProperty "clojure.spec.compile-asserts")]
(Boolean/parseBoolean check-asserts)
true))
(def default-options {:bootstrap-servers "localhost:9092"
:group-id (.trim ^String (:out (clojure.java.shell/sh "hostname")))
:tx-topic "crux-transaction-log"
:doc-topic "crux-docs"
:create-topics true
:doc-partitions 1
:replication-factor 1
:db-dir "data"
:kv-backend "crux.kv.rocksdb.RocksKv"
:server-port 3000
:await-tx-timeout 10000
:doc-cache-size (* 128 1024)
:object-store "crux.index.KvObjectStore"})
(defrecord CruxVersion [version revision]
status/Status
(status-map [this]
{:crux.version/version version
:crux.version/revision revision}))
(def crux-version
(memoize
(fn []
(when-let [pom-file (io/resource "META-INF/maven/juxt/crux-uberjar/pom.properties")]
(with-open [in (io/reader pom-file)]
(let [{:strs [version
revision]} (cio/load-properties in)]
(->CruxVersion version revision)))))))
(defrecord CruxNode [kv-store tx-log indexer object-store options close-fn]
ICruxAPI
(db [this]
(let [tx-time (tx/latest-completed-tx-time (db/read-index-meta indexer :crux.tx-log/consumer-state))]
(q/db kv-store object-store tx-time tx-time options)))
(db [this valid-time]
(let [transact-time (tx/latest-completed-tx-time (db/read-index-meta indexer :crux.tx-log/consumer-state))]
(.db this valid-time transact-time)))
(db [_ valid-time transact-time]
(q/db kv-store object-store valid-time transact-time options))
(document [_ content-hash]
(with-open [snapshot (kv/new-snapshot kv-store)]
(db/get-single-object object-store snapshot (c/new-id content-hash))))
(history [_ eid]
(with-open [snapshot (kv/new-snapshot kv-store)]
(mapv c/entity-tx->edn (idx/entity-history snapshot eid))))
(historyRange [_ eid valid-time-start transaction-time-start valid-time-end transaction-time-end]
(with-open [snapshot (kv/new-snapshot kv-store)]
(->> (idx/entity-history-range snapshot eid valid-time-start transaction-time-start valid-time-end transaction-time-end)
(mapv c/entity-tx->edn)
(sort-by (juxt :crux.db/valid-time :crux.tx/tx-time)))))
(status [this]
(apply merge (map status/status-map (cons (crux-version) (vals this)))))
(attributeStats [this]
(idx/read-meta kv-store :crux.kv/stats))
(submitTx [_ tx-ops]
@(db/submit-tx tx-log tx-ops))
(hasSubmittedTxUpdatedEntity [this submitted-tx eid]
(.hasSubmittedTxCorrectedEntity this submitted-tx (:crux.tx/tx-time submitted-tx) eid))
(hasSubmittedTxCorrectedEntity [_ submitted-tx valid-time eid]
(tx/await-tx-time indexer (:crux.tx/tx-time submitted-tx) (:crux.tx-log/await-tx-timeout options))
(q/submitted-tx-updated-entity? kv-store submitted-tx valid-time eid))
(newTxLogContext [_]
(db/new-tx-log-context tx-log))
(txLog [_ tx-log-context from-tx-id with-documents?]
(for [tx-log-entry (db/tx-log tx-log tx-log-context from-tx-id)]
(if with-documents?
(update tx-log-entry
:crux.api/tx-ops
#(with-open [snapshot (kv/new-snapshot kv-store)]
(tx/enrich-tx-ops-with-documents snapshot object-store %)))
tx-log-entry)))
(sync [_ timeout]
(tx/await-no-consumer-lag
indexer
(cond-> options
timeout (assoc :crux.tx-log/await-tx-timeout (.toMillis timeout)))))
(sync [_ tx-time timeout]
(tx/await-tx-time indexer tx-time (when timeout {:crux.tx-log/await-tx-timeout (.toMillis timeout)})))
backup/INodeBackup
(write-checkpoint [this {:keys [crux.backup/checkpoint-directory] :as opts}]
(kv/backup kv-store (io/file checkpoint-directory "kv-store"))
(when (satisfies? tx-log backup/INodeBackup)
(backup/write-checkpoint tx-log opts)))
Closeable
(close [_]
(when close-fn (close-fn))))
(defn start-kv-store ^java.io.Closeable [{:keys [db-dir
kv-backend
sync?
crux.index/check-and-store-index-version]
:as options
:or {check-and-store-index-version true}}]
(s/assert :crux.kv/options options)
(let [kv (-> (kv/new-kv-store kv-backend)
(lru/new-cache-providing-kv-store)
(kv/open options))]
(try
(if check-and-store-index-version
(idx/check-and-store-index-version kv)
kv)
(catch Throwable t
(.close ^Closeable kv)
(throw t)))))
(defn start-object-store ^java.io.Closeable [partial-node {:keys [object-store]
:or {object-store (:object-store default-options)}
:as options}]
(-> (db/require-and-ensure-object-store-record object-store)
(cio/new-record)
(db/init partial-node options)))
(defn install-uncaught-exception-handler! []
(when-not (Thread/getDefaultUncaughtExceptionHandler)
(Thread/setDefaultUncaughtExceptionHandler
(reify Thread$UncaughtExceptionHandler
(uncaughtException [_ thread throwable]
(log/error throwable "Uncaught exception:"))))))