-
Notifications
You must be signed in to change notification settings - Fork 160
/
kafka_ingest_client.clj
54 lines (46 loc) · 2 KB
/
kafka_ingest_client.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
(ns crux.kafka-ingest-client
(:require [crux.db :as db]
[crux.kafka :as k]
[crux.topology :as topo]
[crux.node :as n]
[crux.tx :as tx]
[crux.tx.conform :as txc])
(:import crux.api.ICruxAsyncIngestAPI
java.io.Closeable
(org.apache.kafka.clients.producer KafkaProducer)))
(defrecord CruxKafkaIngestClient [tx-log document-store close-fn]
ICruxAsyncIngestAPI
(submitTxAsync [_ tx-ops]
(let [conformed-tx-ops (mapv txc/conform-tx-op tx-ops)]
(db/submit-docs document-store (into {} (mapcat :docs) conformed-tx-ops))
(db/submit-tx tx-log (mapv txc/->tx-event conformed-tx-ops))))
(submitTx [this tx-ops]
@(.submitTxAsync this tx-ops))
(openTxLog ^crux.api.ICursor [_ after-tx-id with-ops?]
(when with-ops?
(throw (IllegalArgumentException. "with-ops? not supported")))
(db/open-tx-log tx-log after-tx-id))
Closeable
(close [_]
(when close-fn (close-fn))))
(defrecord IngestOnlyDocumentStore [^KafkaProducer producer doc-topic]
db/DocumentStore
(submit-docs [this id-and-docs]
(k/submit-docs id-and-docs this))
(fetch-docs [this ids]
(throw (UnsupportedOperationException. "Can't fetch docs from ingest-only Kafka document store"))))
(def topology
{::n/tx-log k/tx-log
::n/document-store {:start-fn (fn [{:keys [::k/producer]} {:keys [::k/doc-topic]}]
(->IngestOnlyDocumentStore producer doc-topic))
:deps [::k/producer]}
::k/admin-client k/admin-client
::k/producer k/producer
::k/latest-submitted-tx-consumer k/latest-submitted-tx-consumer})
(defn new-ingest-client ^ICruxAsyncIngestAPI [options]
(let [[{::n/keys [tx-log document-store]} close-fn]
(topo/start-topology (merge {::n/topology topology}
options))]
(map->CruxKafkaIngestClient {:tx-log tx-log
:document-store document-store
:close-fn close-fn})))