-
Notifications
You must be signed in to change notification settings - Fork 160
/
standalone.clj
180 lines (149 loc) · 7.83 KB
/
standalone.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
(ns crux.standalone
(:require [clojure.tools.logging :as log]
[crux.db :as db]
[crux.kv :as kv]
[crux.object-store :as os]
[crux.node :as n]
[crux.tx :as tx]
[crux.codec :as c]
[crux.io :as cio]
[crux.index :as idx]
[crux.lru :as lru]
[crux.topology :as topo]
[clojure.set :as set])
(:import (crux.api ITxLog)
(java.io Closeable)
(java.util Date)
(java.util.concurrent ArrayBlockingQueue
ExecutorService Executors TimeUnit
ThreadPoolExecutor ThreadPoolExecutor$DiscardPolicy)))
(defn- open-tx-log ^crux.api.ITxLog [{:keys [event-log-kv-store]} from-tx-id]
(let [snapshot (kv/new-snapshot event-log-kv-store)
iterator (kv/new-iterator snapshot)]
(letfn [(tx-log [k]
(lazy-seq
(when (some-> k c/tx-event-key?)
(cons (assoc (c/decode-tx-event-key-from k)
:crux.tx.event/tx-events (os/<-nippy-buffer (kv/value iterator)))
(tx-log (kv/next iterator))))))]
(let [k (kv/seek iterator (c/encode-tx-event-key-to nil {::tx/tx-id (or from-tx-id 0)}))]
(->> (when k (tx-log (if from-tx-id (kv/next iterator) k)))
(db/->closeable-tx-log-iterator (fn []
(cio/try-close iterator)
(cio/try-close snapshot))))))))
(defn- index-txs [{:keys [indexer kv-store object-store document-store] :as deps}]
(with-open [tx-log (open-tx-log deps (::tx/tx-id (db/latest-completed-tx indexer)))
snapshot (kv/new-snapshot kv-store)]
(->> (iterator-seq tx-log)
(run! (fn [{:keys [crux.tx.event/tx-events] :as tx}]
(let [doc-hashes (->> tx-events (into #{} (mapcat tx/tx-event->doc-hashes)))]
(db/index-docs indexer (->> (db/fetch-docs document-store doc-hashes)
(into {} (map (juxt (comp c/new-id key) val)))))
(db/index-tx indexer (select-keys tx [::tx/tx-time ::tx/tx-id]) tx-events)
(when (Thread/interrupted)
(reduced nil))))))))
(defn- submit-tx [{:keys [!submitted-tx tx-events]}
{:keys [^ExecutorService tx-submit-executor ^ExecutorService tx-indexer-executor
indexer event-log-kv-store]
:as deps}]
(when (.isShutdown tx-submit-executor)
(deliver !submitted-tx ::closed))
(let [tx-time (Date.)
tx-id (inc (or (idx/read-meta event-log-kv-store ::latest-submitted-tx-id) -1))
next-tx {:crux.tx/tx-id tx-id, :crux.tx/tx-time tx-time}]
(kv/store event-log-kv-store [[(c/encode-tx-event-key-to nil next-tx)
(os/->nippy-buffer tx-events)]
(idx/meta-kv ::latest-submitted-tx-id tx-id)])
(deliver !submitted-tx next-tx)
(.submit tx-indexer-executor ^Runnable #(index-txs deps))))
(defrecord StandaloneTxLog [^ExecutorService tx-submit-executor
^ExecutorService tx-indexer-executor
indexer kv-store object-store document-store
event-log-kv-store]
db/TxLog
(submit-tx [this tx-ops]
(when (.isShutdown tx-submit-executor)
(throw (IllegalStateException. "TxLog is closed.")))
(let [!submitted-tx (promise)]
(.submit tx-submit-executor
^Runnable #(submit-tx {:!submitted-tx !submitted-tx
:tx-events (mapv tx/tx-op->tx-event tx-ops)}
this))
(delay
(let [submitted-tx @!submitted-tx]
(when (= ::closed submitted-tx)
(throw (IllegalStateException. "TxLog is closed.")))
submitted-tx))))
(latest-submitted-tx [this]
(when-let [tx-id (idx/read-meta event-log-kv-store ::latest-submitted-tx-id)]
{::tx/tx-id tx-id}))
(open-tx-log [this from-tx-id]
(open-tx-log this from-tx-id))
Closeable
(close [_]
(try
(.shutdown tx-submit-executor)
(catch Exception e
(log/warn e "Error shutting down tx-submit-executor")))
(try
(.shutdownNow tx-indexer-executor)
(catch Exception e
(log/warn e "Error shutting down tx-indexer-executor")))
(or (.awaitTermination tx-submit-executor 5 TimeUnit/SECONDS)
(log/warn "waited 5s for tx-submit-executor to exit, no dice."))
(or (.awaitTermination tx-indexer-executor 5 TimeUnit/SECONDS)
(log/warn "waited 5s for tx-indexer-executor to exit, no dice."))))
(defn- ->tx-log [{:keys [::n/indexer ::n/kv-store ::n/object-store ::n/document-store ::event-log]} _]
(let [tx-submit-executor (Executors/newSingleThreadExecutor (cio/thread-factory "standalone-tx-log"))
tx-indexer-executor (ThreadPoolExecutor. 1 1
100 TimeUnit/MILLISECONDS
(ArrayBlockingQueue. 1)
(cio/thread-factory "standalone-tx-indexer")
(ThreadPoolExecutor$DiscardPolicy.))
tx-log (->StandaloneTxLog tx-submit-executor tx-indexer-executor
indexer kv-store object-store document-store (:kv-store event-log))]
;; when we restart the standalone node, we want it to start indexing straightaway if it's behind
(.submit tx-indexer-executor ^Runnable #(index-txs tx-log))
tx-log))
(defrecord StandaloneDocumentStore [event-log-kv-store event-log-object-store]
db/DocumentStore
(submit-docs [this id-and-docs]
(db/put-objects event-log-object-store id-and-docs))
(fetch-docs [this ids]
(with-open [snapshot (kv/new-snapshot event-log-kv-store)]
(db/get-objects event-log-object-store snapshot ids))))
(defn- ->document-store [{{:keys [kv-store object-store]} ::event-log} _]
(->StandaloneDocumentStore kv-store object-store))
(def ^:private event-log-args
{::event-log-kv-store {:doc "The KV store to use for the standalone event log"
:default 'crux.kv.memdb/kv
:crux.config/type :crux.topology/module}
::event-log-dir {:doc "The directory to persist the standalone event log to"
:required? false
:crux.config/type :crux.config/string}
::event-log-sync? {:doc "Sync the event-log backed KV store to disk after every write."
:default true
:crux.config/type :crux.config/boolean}
::event-log-object-store {:doc "The object store to use for the standalone event log"
:default 'crux.object-store/kv-object-store
:crux.config/type :crux.config/module}})
(defrecord EventLog [kv-store object-store]
Closeable
(close [_]
(cio/try-close kv-store)
(cio/try-close object-store)))
(defn ->event-log [deps {::keys [event-log-kv-store event-log-object-store] :as args}]
(let [args (-> args
(set/rename-keys {::event-log-dir :crux.kv/db-dir
::event-log-sync? :crux.kv/sync?}))
kv-store (topo/start-component event-log-kv-store {} args)
object-store (topo/start-component event-log-object-store {::n/kv-store kv-store} args)]
(->EventLog kv-store object-store)))
(def topology
(merge n/base-topology
{::event-log {:start-fn ->event-log
:args event-log-args}
::n/tx-log {:start-fn ->tx-log
:deps [::n/indexer ::n/kv-store ::n/object-store ::n/document-store ::event-log]}
::n/document-store {:start-fn ->document-store
:deps [::event-log]}}))