-
Notifications
You must be signed in to change notification settings - Fork 160
/
connect.clj
218 lines (196 loc) · 8.02 KB
/
connect.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
(ns crux.kafka.connect
(:require [cheshire.core :as json]
[cheshire.generate]
[clojure.tools.logging :as log]
[crux.codec :as c]
[crux.io :as cio]
[cognitect.transit :as transit])
(:import [org.apache.kafka.connect.data Schema Schema$Type Struct Field]
org.apache.kafka.connect.sink.SinkRecord
org.apache.kafka.connect.source.SourceRecord
java.io.ByteArrayOutputStream
[java.util UUID Map]
[com.fasterxml.jackson.core JsonGenerator JsonParseException]
crux.kafka.connect.CruxSinkConnector
crux.kafka.connect.CruxSourceConnector
crux.codec.EDNId
crux.api.ICruxAPI))
(cheshire.generate/add-encoder
EDNId
(fn [c ^JsonGenerator json-generator]
(.writeString json-generator (str (c/edn-id->original-id c)))))
(defn- map->edn [m]
(->> (for [[k v] m]
[(keyword k)
(if (instance? Map v)
(map->edn v)
v)])
(into {})))
(defn- get-struct-contents [val]
(cond
(instance? Struct val)
(let [struct-schema (.schema ^Struct val)
struct-fields (.fields ^Schema struct-schema)]
(reduce conj
(map (fn [^Field field] {(keyword (.name field)) (get-struct-contents (.get ^Struct val field))})
struct-fields)))
(instance? java.util.ArrayList val) (into [] (map get-struct-contents val))
(instance? java.util.HashMap val) (zipmap (map keyword (.keySet ^java.util.HashMap val)) (map get-struct-contents (.values ^java.util.HashMap val)))
:else val))
(defn- struct->edn [^Schema schema ^Struct s]
(let [ output-map (get-struct-contents s)]
(log/info "map val: " output-map)
output-map))
(defn- record->edn [^SinkRecord record]
(let [schema (.valueSchema record)
value (.value record)]
(cond
(and (instance? Struct value) schema)
(struct->edn schema value)
(and (instance? Map value)
(nil? schema)
(= #{"payload" "schema"} (set (keys value))))
(let [payload (.get ^Map value "payload")]
(cond
(string? payload)
(json/parse-string payload true)
(instance? Map payload)
(map->edn payload)
:else
(throw (IllegalArgumentException. (str "Unknown JSON payload type: " record)))))
(instance? Map value)
(map->edn value)
(string? value)
(try
(json/parse-string value true)
(catch JsonParseException e
(log/debug e "Failed to parse as JSON, trying EDN: " value)
(c/read-edn-string-with-readers value)))
:else
(throw (IllegalArgumentException. (str "Unknown message type: " record))))))
(defn- coerce-eid [id]
(cond
(and id (c/valid-id? id))
(c/id-edn-reader id)
(string? id)
(keyword id)))
(defn- find-eid [props ^SinkRecord record doc]
(let [id (or (get doc :crux.db/id)
(some->> (get props CruxSinkConnector/ID_KEY_CONFIG)
(keyword)
(get doc))
(.key record))]
(or (coerce-eid id)
(UUID/randomUUID))))
(defn transform-sink-record [props ^SinkRecord record]
(log/info "sink record:" record)
(let [tx-op (if (and (nil? (.value record))
(.key record))
[:crux.tx/delete (coerce-eid (.key record))]
(let [doc (record->edn record)
id (find-eid props record doc)]
[:crux.tx/put (assoc doc :crux.db/id id)]))]
(log/info "tx op:" tx-op)
tx-op))
(defn submit-sink-records [^ICruxAPI api props records]
(when (seq records)
(.submitTx api (vec (for [record records]
(transform-sink-record props record))))))
(defn- write-transit [x]
(with-open [out (ByteArrayOutputStream.)]
(let [writer (transit/writer out :json-verbose
{:handlers
{EDNId
(transit/write-handler
"crux/id"
c/edn-id->original-id)}})]
(transit/write writer x)
(.toString out))))
(defn- tx-op-with-explicit-valid-time [[op :as tx-op] tx-time]
(or (case op
:crux.tx/put
(when (= 2 (count tx-op))
(conj tx-op tx-time))
:crux.tx/delete
(when (= 2 (count tx-op))
(conj tx-op tx-time))
:crux.tx/cas
(when (= 3 (count tx-op))
(conj tx-op tx-time)))
tx-op))
(defn- tx-log-entry->tx-source-records [source-partition topic formatter {:keys [crux.api/tx-ops
crux.tx/tx-id
crux.tx/tx-time]
:as tx}]
[(SourceRecord. source-partition
{"offset" tx-id}
topic
nil
nil
nil
Schema/STRING_SCHEMA
(->> (for [tx-op tx-ops]
(tx-op-with-explicit-valid-time tx-op tx-time))
(vec)
(formatter))
(inst-ms tx-time))])
(defn- tx-op->id+doc [[op :as tx-op]]
(case op
:crux.tx/put
(when (= 2 (count tx-op))
(let [[_ new-doc] tx-op]
[(:crux.db/id new-doc)
new-doc]))
(:crux.tx/delete :crux.tx/evict)
(when (= 2 (count tx-op))
(let [[_ deleted-id] tx-op]
[deleted-id]))
:crux.tx/cas
(when (= 3 (count tx-op))
(let [[_ old-doc new-doc] tx-op]
[(:crux.db/id new-doc)
new-doc]))))
(defn- tx-log-entry->doc-source-records [source-partition topic formatter {:keys [crux.api/tx-ops
crux.tx/tx-id
crux.tx/tx-time]
:as tx}]
(log/info "tx-ops:" tx-ops)
(for [[op :as tx-op] tx-ops
:when (not= :crux.tx/fn op)
:let [[id doc] (tx-op->id+doc tx-op)
hashed-id (str (c/new-id id))
_ (log/info "tx-op:" tx-op "id:" id "hashed id:" hashed-id "doc:" doc)]
:when id]
(SourceRecord. source-partition
{"offset" tx-id}
topic
nil
Schema/STRING_SCHEMA
hashed-id
Schema/OPTIONAL_STRING_SCHEMA
(some-> doc (formatter))
(inst-ms tx-time))))
(defn poll-source-records [^ICruxAPI api source-offset props]
(with-open [tx-log-context (.newTxLogContext api)]
(let [url (get props CruxSourceConnector/URL_CONFIG)
topic (get props CruxSourceConnector/TOPIC_CONFIG)
format (get props CruxSourceConnector/FORMAT_CONFIG)
mode (get props CruxSourceConnector/MODE_CONFIG)
batch-size (get props CruxSourceConnector/TASK_BATCH_SIZE_CONFIG)
source-partition {"url" url}
formatter (case format
"edn" cio/pr-edn-str
"json" json/generate-string
"transit" write-transit)
tx-log-entry->source-records (case mode
"tx" tx-log-entry->tx-source-records
"doc" tx-log-entry->doc-source-records)
from-tx-id (inc (long (or (get source-offset "offset") -1)))]
(log/info "source offset:" source-offset "tx-id:" from-tx-id "format:" format "mode:" mode)
(let [records (->> (.txLog api tx-log-context from-tx-id true)
(take (Long/parseLong batch-size))
(map #(tx-log-entry->source-records source-partition topic formatter %))
(reduce into []))]
(when (seq records)
(log/info "source records:" records))
records))))