-
Notifications
You must be signed in to change notification settings - Fork 202
/
kinesis.clj
322 lines (284 loc) · 12.4 KB
/
kinesis.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
(ns amazonica.aws.kinesis
(:require [amazonica.core :as amz]
[taoensso.nippy :as nippy]
[clojure.algo.generic.functor :as functor])
(:import [com.amazonaws.auth
AWSCredentialsProvider
AWSCredentials
AWSCredentialsProviderChain
DefaultAWSCredentialsProviderChain]
com.amazonaws.internal.StaticCredentialsProvider
com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient
[com.amazonaws.regions
Region
Regions]
[com.amazonaws.services.kinesis
AmazonKinesisClient]
[com.amazonaws.services.kinesis.model
Record]
[com.amazonaws.services.kinesis.clientlibrary.interfaces
IRecordProcessorCheckpointer]
[com.amazonaws.services.kinesis.clientlibrary.interfaces.v2
IRecordProcessorFactory
IRecordProcessor
IShutdownNotificationAware]
[com.amazonaws.services.kinesis.clientlibrary.exceptions
InvalidStateException
KinesisClientLibDependencyException
ShutdownException
ThrottlingException]
[com.amazonaws.services.kinesis.clientlibrary.lib.worker
InitialPositionInStream
KinesisClientLibConfiguration
Worker
Worker$Builder
ShutdownReason]
[com.amazonaws.services.kinesis.metrics.interfaces
MetricsLevel]
java.nio.ByteBuffer
java.util.UUID))
(set! *warn-on-reflection* true)
(amz/set-client AmazonKinesisClient *ns*)
(defn- ->bytes
[data]
(if (instance? ByteBuffer data)
data
(ByteBuffer/wrap (nippy/freeze data))))
(alter-var-root
#'amazonica.aws.kinesis/put-record
(fn [f]
(fn [& args]
(let [parsed (amz/parse-args (first args) (rest args))
args (:args parsed)
[stream data key & [seq-id]] args
bytes (->bytes data)
putrec (->> (list (:cred parsed) stream bytes key)
(filter (complement nil?))
(apply partial f))]
(if seq-id
(putrec seq-id)
(putrec))))))
(alter-var-root
#'amazonica.aws.kinesis/put-records
(fn [f]
(fn [& args]
(let [parsed (amz/parse-args (first args) (rest args))
[stream data] (:args parsed)
data-byte (map (fn [x] (update-in x [:data] ->bytes)) data)]
(if (nil? (:cred parsed))
(f :stream-name stream :records data-byte)
(f (:cred parsed) :stream-name stream :records data-byte))))))
(defn unwrap
"Get the contents of the given buffer as a byte-array, decoding as
Nippy bytes if they appear to be Nippy encoded. If the ByteBuffer
does not appear to contain Nippy data, the bytes found will be
returned unchanged. This technique is inspired by ptaoussanis/faraday."
[^java.nio.ByteBuffer byte-buffer]
(let [byte-array (.array byte-buffer)
serialized? (#'nippy/try-parse-header byte-array)]
(if-not serialized?
byte-array ; No Nippy header => assume non-nippy binary data
(try ; Header match _may_ have been a fluke (though v. unlikely)
(nippy/thaw byte-array)
(catch Exception e
byte-array)))))
(alter-var-root
#'amazonica.aws.kinesis/get-shard-iterator
(fn [f]
(fn [& args]
(:shard-iterator (apply f args)))))
(alter-var-root
#'amazonica.aws.kinesis/get-records
(fn [f]
(fn [& args]
(let [parsed (amz/parse-args (first args) (rest args))
args (if (= 1 (count (:args parsed)))
(first (:args parsed))
(apply hash-map (seq (:args parsed))))
deserializer (or (:deserializer args) unwrap)
result (->> (list (:cred parsed) args)
(filter (complement nil?))
(apply f))]
(assoc result
:records
(functor/fmap
(fn [record]
(update-in record [:data] (fn [d] (deserializer d))))
(:records result)))))))
(defn marshall
[deserializer ^Record record]
{:approximate-arrival-timestamp (amz/marshall (.getApproximateArrivalTimestamp record))
:encryption-type (.getEncryptionType record)
:sequence-number (.getSequenceNumber record)
:partition-key (.getPartitionKey record)
:data (deserializer (.getData record))})
(defn- mark-checkpoint [^IRecordProcessorCheckpointer checkpointer]
(try
(.checkpoint checkpointer)
true
(catch ShutdownException se true)
(catch InvalidStateException ise false)
(catch KinesisClientLibDependencyException de false)
(catch ThrottlingException te
(println "sleeping for 3s due to throttling....")
(Thread/sleep 3000)
false)))
(defn- processor-factory
[processor deserializer checkpoint]
(reify IRecordProcessorFactory
(createProcessor [_this]
(let [next-check (atom 0)]
(reify IRecordProcessor
(initialize [_this _initialization-input])
(shutdown [_this shutdown-input]
(let [reason (.getShutdownReason shutdown-input)
checkpointer (.getCheckpointer shutdown-input)]
(when (or (= ShutdownReason/TERMINATE reason)
(= "TERMINATE" reason))
(some (fn [_] (mark-checkpoint checkpointer)) (repeat 5 nil)))))
(processRecords [_this process-records-input]
(let [records (vec (seq (.getRecords process-records-input)))
checkpointer (.getCheckpointer process-records-input)]
(when (or (processor (functor/fmap (partial marshall deserializer)
records))
(and checkpoint
(> (System/currentTimeMillis) @next-check)))
(when checkpoint
(reset! next-check
(+' (System/currentTimeMillis)
(*' 1000 checkpoint))))
(some (fn [_] (mark-checkpoint checkpointer)) (repeat 5 nil)))))
IShutdownNotificationAware
(shutdownRequested [_this checkpointer]
(some (fn [_] (mark-checkpoint checkpointer)) (repeat 5 nil))))))))
(defn- kinesis-client-lib-configuration
"Instantiate a KinesisClientLibConfiguration instance."
^KinesisClientLibConfiguration [^AWSCredentialsProvider provider
{:keys [app
stream
worker-id
endpoint
dynamodb-endpoint
billing-mode
initial-position-in-stream
^java.util.Date initial-position-in-stream-date
failover-time-millis
shard-sync-interval-millis
max-records
idle-time-between-reads-in-millis
call-process-records-even-for-empty-record-list
parent-shard-poll-interval-millis
cleanup-leases-upon-shard-completion
common-client-config
kinesis-client-config
dynamodb-client-config
cloud-watch-client-config
user-agent
task-backoff-time-millis
metrics-level
metrics-buffer-time-millis
metrics-max-queue-size
validate-sequence-number-before-checkpointing
region-name
initial-lease-table-read-capacity
initial-lease-table-write-capacity]
:or {worker-id (str (UUID/randomUUID))}}]
(cond-> (KinesisClientLibConfiguration. (name app)
(name stream)
provider
(name worker-id))
endpoint
(.withKinesisEndpoint endpoint)
dynamodb-endpoint
(.withDynamoDBEndpoint dynamodb-endpoint)
billing-mode
(.withBillingMode billing-mode)
initial-position-in-stream
(.withInitialPositionInStream
(InitialPositionInStream/valueOf (name initial-position-in-stream)))
initial-position-in-stream-date
(.withTimestampAtInitialPositionInStream
initial-position-in-stream-date)
failover-time-millis
(.withFailoverTimeMillis failover-time-millis)
shard-sync-interval-millis
(.withShardSyncIntervalMillis shard-sync-interval-millis)
max-records
(.withMaxRecords max-records)
idle-time-between-reads-in-millis
(.withIdleTimeBetweenReadsInMillis idle-time-between-reads-in-millis)
call-process-records-even-for-empty-record-list
(.withCallProcessRecordsEvenForEmptyRecordList
call-process-records-even-for-empty-record-list)
parent-shard-poll-interval-millis
(.withParentShardPollIntervalMillis
parent-shard-poll-interval-millis)
cleanup-leases-upon-shard-completion
(.withCleanupLeasesUponShardCompletion
cleanup-leases-upon-shard-completion)
common-client-config
(.withCommonClientConfig common-client-config)
kinesis-client-config
(.withKinesisClientConfig kinesis-client-config)
dynamodb-client-config
(.withDynamoDBClientConfig dynamodb-client-config)
cloud-watch-client-config
(.withCloudWatchClientConfig cloud-watch-client-config)
user-agent
(.withUserAgent user-agent)
task-backoff-time-millis
(.withTaskBackoffTimeMillis task-backoff-time-millis)
metrics-level
(.withMetricsLevel (MetricsLevel/valueOf (name metrics-level)))
metrics-buffer-time-millis
(.withMetricsBufferTimeMillis metrics-buffer-time-millis)
metrics-max-queue-size
(.withMetricsMaxQueueSize metrics-max-queue-size)
validate-sequence-number-before-checkpointing
(.withValidateSequenceNumberBeforeCheckpointing validate-sequence-number-before-checkpointing)
region-name
(.withRegionName region-name)
initial-lease-table-read-capacity
(.withInitialLeaseTableReadCapacity initial-lease-table-read-capacity)
initial-lease-table-write-capacity
(.withInitialLeaseTableWriteCapacity initial-lease-table-write-capacity)))
(defn worker
"Instantiate a kinesis Worker."
[& args]
(let [opts (if (associative? (first args))
(first args)
(apply hash-map args))
{:keys [processor deserializer checkpoint credentials dynamodb-adaptor-client? ^String region-name ^String endpoint]
:or {checkpoint 60
deserializer unwrap
endpoint "kinesis.us-east-1.amazonaws.com"}} opts
factory (processor-factory processor deserializer checkpoint)
creds (amz/get-credentials credentials)
provider (if (instance? AWSCredentials creds)
(StaticCredentialsProvider. creds)
creds)
config (kinesis-client-lib-configuration provider (assoc opts :endpoint endpoint))
worker-identifier (.getWorkerIdentifier config)]
[(-> (Worker$Builder.)
(.recordProcessorFactory ^IRecordProcessorFactory factory)
(.config ^KinesisClientLibConfiguration config)
(cond->
dynamodb-adaptor-client?
;; this will result in some warnings at debug from the kinesis client lib as it will try to set the region/endpoint on this client.
;; These are safe to ignore as we pre-configure the correct values
(.kinesisClient
(doto (if provider
(AmazonDynamoDBStreamsAdapterClient. ^AWSCredentialsProvider provider (.getKinesisClientConfiguration config))
(AmazonDynamoDBStreamsAdapterClient. (.getKinesisClientConfiguration config)))
(cond->
region-name ^AmazonDynamoDBStreamsAdapterClient (.setRegion (Region/getRegion (Regions/fromName region-name))))
(cond->
endpoint ^AmazonDynamoDBStreamsAdapterClient (.setEndpoint endpoint)))))
(.build)) worker-identifier]))
(defn worker!
"Instantiate a new kinesis Worker and invoke its run method in a
separate thread. Return the identifier of the Worker."
[& args]
(let [[^Worker worker uuid] (apply worker args)]
(future (.run worker))
uuid))