-
Notifications
You must be signed in to change notification settings - Fork 160
/
kafka_ingest_client.clj
36 lines (29 loc) · 1.19 KB
/
kafka_ingest_client.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
(ns crux.bootstrap.kafka-ingest-client
(:require [crux.bootstrap :as b]
[crux.db :as db]
[crux.kafka :as k])
(:import crux.api.ICruxAsyncIngestAPI
java.io.Closeable))
(defrecord CruxKafkaIngestClient [tx-log close-fn]
ICruxAsyncIngestAPI
(submitTxAsync [_ tx-ops]
(db/submit-tx tx-log tx-ops))
(submitTx [_ tx-ops]
@(db/submit-tx tx-log tx-ops))
(newTxLogContext [_]
(db/new-tx-log-context tx-log))
(txLog [_ tx-log-context from-tx-id with-documents?]
(when with-documents?
(throw (IllegalArgumentException. "with-documents? not supported")))
(db/tx-log tx-log tx-log-context from-tx-id))
Closeable
(close [_]
(when close-fn (close-fn))))
(def ingest-client-config {:tx-log k/tx-log
:admin-client k/admin-client
:admin-wrapper k/admin-wrapper
:producer k/producer})
(defn new-ingest-client ^ICruxAsyncIngestAPI [options]
(let [options (merge b/default-options options)
[node-modules close-fn] (b/start-modules ingest-client-config options)]
(map->CruxKafkaIngestClient (assoc node-modules :close-fn close-fn :options options))))