-
Notifications
You must be signed in to change notification settings - Fork 160
/
api.clj
365 lines (289 loc) · 12.5 KB
/
api.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
(ns crux.api
"Public API of Crux."
(:refer-clojure :exclude [sync])
(:require [clojure.spec.alpha :as s]
[crux.codec :as c])
(:import [crux.api Crux ICruxAPI ICruxIngestAPI
ICruxAsyncIngestAPI ICruxDatasource]
java.io.Closeable
java.util.Date
java.time.Duration))
(s/def :crux.db/id (s/and (complement string?) c/valid-id?))
(s/def :crux.db/evicted? boolean?)
(s/def :crux.db.fn/args (s/coll-of any? :kind vector?))
(s/def :crux.db.fn/body (s/cat :fn #{'fn}
:args (s/coll-of symbol? :kind vector? :min-count 1)
:body (s/* any?)))
(s/def ::doc (s/and (s/map-of keyword? any?)
(s/keys :req [:crux.db/id] :opt [:crux.db.fn/body :crux.db.fn/args])))
(def ^:private date? (partial instance? Date))
(defmulti tx-op first)
(defmethod tx-op :crux.tx/put [_] (s/cat :op #{:crux.tx/put}
:doc ::doc
:start-valid-time (s/? date?)
:end-valid-time (s/? date?)))
(defmethod tx-op :crux.tx/delete [_] (s/cat :op #{:crux.tx/delete}
:id :crux.db/id
:start-valid-time (s/? date?)
:end-valid-time (s/? date?)))
(defmethod tx-op :crux.tx/cas [_] (s/cat :op #{:crux.tx/cas}
:old-doc (s/nilable ::doc)
:new-doc ::doc
:at-valid-time (s/? date?)))
(defmethod tx-op :crux.tx/evict [_] (s/cat :op #{:crux.tx/evict}
:id :crux.db/id))
(s/def ::args-doc (s/and ::doc (s/keys :req [:crux.db.fn/args])))
(defmethod tx-op :crux.tx/fn [_] (s/cat :op #{:crux.tx/fn}
:id :crux.db/id
:args-doc (s/? ::args-doc)))
(s/def ::tx-op (s/multi-spec tx-op first))
(s/def ::tx-ops (s/coll-of ::tx-op :kind vector?))
(defn- conform-tx-ops [tx-ops]
(->> tx-ops
(mapv
(fn [tx-op]
(map
#(if (instance? java.util.Map %) (into {} %) %)
tx-op)))
(mapv vec)))
(defprotocol PCruxNode
"Provides API access to Crux."
(db
[node]
[node ^Date valid-time]
[node ^Date valid-time ^Date transaction-time]
"Will return the latest value of the db currently known. Non-blocking.
When a valid time is specified then returned db value contains only those
documents whose valid time is not after the specified. Non-blocking.
When both valid and transaction time are specified returns a db value
as of the valid and transaction time. Will block until the transaction
time is present in the index.")
(document [node content-hash]
"Reads a document from the document store based on its
content hash.")
(documents [node content-hashes-set]
"Reads the set of documents from the document store based on their
respective content hashes. Returns a map content-hash->document")
(history [node eid]
"Returns the transaction history of an entity, in reverse
chronological order. Includes corrections, but does not include
the actual documents.")
(history-range [node eid
^Date valid-time-start
^Date transaction-time-start
^Date valid-time-end
^Date transaction-time-end]
"Returns the transaction history of an entity, ordered by valid
time / transaction time in chronological order, earliest
first. Includes corrections, but does not include the actual
documents.
Giving nil as any of the date arguments makes the range open
ended for that value.")
(status [node]
"Returns the status of this node as a map.")
(submitted-tx-updated-entity? [node submitted-tx eid]
"Checks if a submitted tx did update an entity.
submitted-tx must be a map returned from `submit-tx`
eid is an object that can be coerced into an entity id.
Returns true if the entity was updated in this transaction.")
(submitted-tx-corrected-entity? [node submitted-tx ^Date valid-time eid]
"Checks if a submitted tx did correct an entity as of valid time.
submitted-tx must be a map returned from `submit-tx`
valid-time valid time of the correction to check.
eid is an object that can be coerced into an entity id.
Returns true if the entity was updated in this transaction.")
(sync
[node ^Duration timeout]
[node ^Date transaction-time ^Duration timeout]
"If the transaction-time is supplied, blocks until indexing has
processed a tx with a greater-than transaction-time, otherwise
blocks until the node has caught up indexing the tx-log
backlog. Will throw an exception on timeout. The returned date is
the latest index time when this node has caught up as of this
call. This can be used as the second parameter in (db valid-time,
transaction-time) for consistent reads.
timeout – max time to wait, can be nil for the default.
Returns the latest known transaction time.")
(attribute-stats [node]
"Returns frequencies map for indexed attributes"))
(defprotocol PCruxIngestClient
"Provides API access to Crux ingestion."
(submit-tx [node tx-ops]
"Writes transactions to the log for processing
tx-ops datalog style transactions.
Returns a map with details about the submitted transaction,
including tx-time and tx-id.")
(new-tx-log-context ^java.io.Closeable [node]
"Returns a new transaction log context allowing for lazy reading
of the transaction log in a try-with-resources block using
(tx-log ^Closeable tx-Log-context, from-tx-id, boolean with-ops?).
Returns an implementation specific context.")
(tx-log [node tx-log-context from-tx-id with-ops?]
"Reads the transaction log lazily. Optionally includes
operations, which allow the contents under the :crux.api/tx-ops
key to be piped into (submit-tx tx-ops) of another
Crux instance.
tx-log-context a context from (new-tx-log-context node)
from-tx-id optional transaction id to start from.
with-ops? should the operations with documents be included?
Returns a lazy sequence of the transaction log."))
(extend-protocol PCruxNode
ICruxAPI
(db
([this]
(.db this))
([this ^Date valid-time]
(.db this valid-time))
([this ^Date valid-time ^Date transaction-time]
(.db this valid-time transaction-time)))
(document [this content-hash]
(.document this content-hash))
(documents [this content-hash-set]
(.documents this content-hash-set))
(history [this eid]
(.history this eid))
(history-range [this eid valid-time-start transaction-time-start valid-time-end transaction-time-end]
(.historyRange this eid valid-time-start transaction-time-start valid-time-end transaction-time-end))
(status [this]
(.status this))
(submitted-tx-updated-entity? [this submitted-tx eid]
(.hasSubmittedTxUpdatedEntity this submitted-tx eid))
(submitted-tx-corrected-entity? [this submitted-tx ^Date valid-time eid]
(.hasSubmittedTxCorrectedEntity this submitted-tx valid-time eid))
(sync
([this timeout]
(.sync this timeout))
([this transaction-time timeout]
(.sync this transaction-time timeout)))
(attribute-stats [this]
(.attributeStats this)))
(extend-protocol PCruxIngestClient
ICruxIngestAPI
(submit-tx [this tx-ops]
(.submitTx this (conform-tx-ops tx-ops)))
(new-tx-log-context ^java.io.Closeable [this]
(.newTxLogContext this))
(tx-log [this tx-log-context from-tx-id with-ops?]
(.txLog this tx-log-context from-tx-id with-ops?)))
(defprotocol PCruxDatasource
"Represents the database as of a specific valid and
transaction time."
(entity [db eid]
"queries a document map for an entity.
eid is an object which can be coerced into an entity id.
returns the entity document map.")
(entity-tx [db eid]
"returns the transaction details for an entity. Details
include tx-id and tx-time.
eid is an object that can be coerced into an entity id.")
(new-snapshot ^java.io.Closeable [db]
"Returns a new implementation specific snapshot allowing for lazy query results in a
try-with-resources block using (q db snapshot query)}.
Can also be used for
(history-ascending db snapshot eid) and
(history-descending db snapshot eid)
returns an implementation specific snapshot")
(q
[db query]
[db snapshot query]
"q[uery] a Crux db.
query param is a datalog query in map, vector or string form.
First signature will evaluate eagerly and will return a set or vector
of result tuples.
Second signature accepts a db snapshot, see `new-snapshot`.
Evaluates *lazily* consequently returns lazy sequence of result tuples.")
(history-ascending
[db snapshot eid]
"Retrieves entity history lazily in chronological order
from and including the valid time of the db while respecting
transaction time. Includes the documents.")
(history-descending
[db snapshot eid]
"Retrieves entity history lazily in reverse chronological order
from and including the valid time of the db while respecting
transaction time. Includes the documents.")
(valid-time [db]
"returns the valid time of the db.
If valid time wasn't specified at the moment of the db value retrieval
then valid time will be time of the latest transaction.")
(transaction-time [db]
"returns the time of the latest transaction applied to this db value.
If a tx time was specified when db value was acquired then returns
the specified time."))
(extend-protocol PCruxDatasource
ICruxDatasource
(entity [this eid]
(.entity this eid))
(entity-tx [this eid]
(.entityTx this eid))
(new-snapshot [this]
(.newSnapshot this))
(q
([this query]
(.q this query))
([this snapshot query]
(.q this snapshot query)))
(history-ascending [this snapshot eid]
(.historyAscending this snapshot eid))
(history-descending [this snapshot eid]
(.historyDescending this snapshot eid))
(valid-time
[this]
(.validTime this))
(transaction-time [this]
(.transactionTime this)))
(defprotocol PCruxAsyncIngestClient
"Provides API access to Crux async ingestion."
(submit-tx-async [node tx-ops]
"Writes transactions to the log for processing tx-ops datalog
style transactions. Non-blocking. Returns a deref with map with
details about the submitted transaction, including tx-time and
tx-id."))
(extend-protocol PCruxAsyncIngestClient
ICruxAsyncIngestAPI
(submit-tx-async [this tx-ops]
(.submitTxAsync this (conform-tx-ops tx-ops))))
(defn start-node
"NOTE: requires any dependendies on the classpath that the Crux modules may need.
options {:crux.node/topology e.g. \"crux.standalone/topology\"}
Options are specified as keywords using their long format name, like
:crux.kafka/bootstrap-servers etc. See the individual modules used in the specified
topology for option descriptions.
returns a node which implements ICruxAPI and
java.io.Closeable. Latter allows the node to be stopped by
calling `(.close node)`.
throws IndexVersionOutOfSyncException if the index needs rebuilding.
throws NonMonotonicTimeException if the clock has moved backwards since
last run. Only applicable when using the event log."
^ICruxAPI [options]
(Crux/startNode options))
(defn new-api-client
"Creates a new remote API client ICruxAPI. The remote client
requires valid and transaction time to be specified for all
calls to `db`.
NOTE: requires crux-http-client on the classpath, see
crux.remote-api-client/*internal-http-request-fn* for more
information.
url the URL to a Crux HTTP end-point.
returns a remote API client."
^ICruxAPI [url]
(Crux/newApiClient url))
(defn new-ingest-client
"Starts an ingest client for transacting into Kafka without running a
full local node with index.
For valid options, see crux.kafka/default-options. Options are
specified as keywords using their long format name, like
:crux.kafka/bootstrap-servers etc.
options
{:crux.kafka/bootstrap-servers \"kafka-cluster-kafka-brokers.crux.svc.cluster.local:9092\"
:crux.kafka/group-id \"group-id\"
:crux.kafka/tx-topic \"crux-transaction-log\"
:crux.kafka/doc-topic \"crux-docs\"
:crux.kafka/create-topics true
:crux.kafka/doc-partitions 1
:crux.kafka/replication-factor 1}
Returns a crux.api.ICruxIngestAPI component that implements
java.io.Closeable, which allows the client to be stopped by calling
close."
^ICruxAsyncIngestAPI [options]
(Crux/newIngestClient options))