-
Notifications
You must be signed in to change notification settings - Fork 0
/
core.clj
213 lines (174 loc) · 8.24 KB
/
core.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
(ns com.interrupt.streaming.core
(:require [onyx.test-helper :refer [with-test-env load-config]]
[onyx.plugin.kafka]
[onyx.api]
[franzy.clients.producer.client :as producer]
[franzy.clients.consumer.client :as consumer]
[franzy.clients.producer.protocols :refer :all]
[franzy.clients.consumer.protocols :refer :all]
[franzy.serialization.serializers :as serializers]
[franzy.serialization.deserializers :as deserializers]
[franzy.admin.zookeeper.client :as client]
#_[franzy.admin.topics :as topics]
[franzy.clients.producer.defaults :as pd]
[franzy.clients.consumer.defaults :as cd])
(:import [java.util UUID]))
(def zookeeper-url "zookeeper:2181")
(def kafka-url "kafka:9092")
(def topic-scanner-command "scanner-command")
(def topic-scanner "scanner")
(def key-serializer (serializers/keyword-serializer))
(def value-serializer (serializers/edn-serializer))
(def key-deserializer (deserializers/keyword-deserializer))
(def value-deserializer (deserializers/edn-deserializer))
#_(defn one-setup-topics []
(def zk-utils (client/make-zk-utils {:servers [zookeeper-url]} false))
(def two (topics/create-topic! zk-utils topic-scanner-command 10))
(def three (topics/create-topic! zk-utils topic-scanner 10))
(topics/all-topics zk-utils))
(defn two-write-to-topic []
(let [;; Use a vector if you wish for multiple servers in your cluster
pc {:bootstrap.servers [kafka-url]
:group.id "group.one"}
;;Serializes producer record keys that may be keywords
key-serializer (serializers/keyword-serializer)
;;Serializes producer record values as EDN, built-in
value-serializer (serializers/edn-serializer)
;;optionally create some options, even just use the defaults explicitly
;;for those that don't need anything fancy...
options (pd/make-default-producer-options)
topic topic-scanner-command
partition 0]
(with-open [p (producer/make-producer pc key-serializer value-serializer options)]
(let [send-fut (send-async! p topic partition :a {:foo :bar} options)]
(println "Async send results:" @send-fut)))))
(defn three-consume-from-topic []
(let [cc {:bootstrap.servers [kafka-url]
:group.id "group.one"
:auto.offset.reset :earliest
:enable.auto.commit true
:auto.commit.interval.ms 1000}
options (cd/make-default-consumer-options {})]
(with-open [c (consumer/make-consumer cc key-deserializer value-deserializer options)]
;; Note! - The subscription will read your comitted offsets to position the consumer accordingly
;; If you see no data, try changing the consumer group temporarily
;; If still no, have a look inside Kafka itself, perhaps with franzy-admin!
;; Alternatively, you can setup another threat that will produce to your topic while you consume, and all should be well
(subscribe-to-partitions! c [topic-scanner])
;; Let's see what we subscribed to, we don't need Cumberbatch to investigate here...
(println "Partitions subscribed to:" (partition-subscriptions c))
;; now we poll and see if there's any fun stuff for us
(let [cr (poll! c)
;;a naive transducer, written the long way
filter-xf (filter (fn [cr] (= (:key cr) :inconceivable)))
;;a naive transducer for viewing the values, again long way
value-xf (map (fn [cr] (:value cr)))
;;more misguided transducers
inconceivable-transduction (comp filter-xf value-xf)]
(println "Record count:" (record-count cr))
(println "Records by topic:" (records-by-topic cr topic-scanner))
;;;The source data is a seq, be careful!
(println "Records from a topic that doesn't exist:" (records-by-topic cr "no-one-of-consequence"))
(println "Records by topic partition:" (records-by-topic-partition cr topic-scanner 0))
;;;The source data is a list, so no worries here....
(println "Records by a topic partition that doesn't exist:" (records-by-topic-partition cr "no-one-of-consequence" 99))
(println "Topic Partitions in the result set:" (record-partitions cr))
(clojure.pprint/pprint (into [] inconceivable-transduction cr))
;(println "Now just the values of all distinct records:")
(println "Put all the records into a vector (calls IReduceInit):" (into [] cr))
;;wow, that was tiring, maybe now we don't want to listen anymore to this topic and take a break, maybe subscribe
;;to something else next poll....
(clear-subscriptions! c)
(println "After clearing subscriptions, a stunning development! We are now subscribed to the following partitions:"
(partition-subscriptions c))))))
(def workflow
[[:read-commands :process-commands]
[:process-commands :write-messages]])
(def printer (agent nil))
(defn echo-segments [event lifecycle]
(send printer
(fn [_]
(doseq [segment (:onyx.core/batch event)]
(println (format "Peer %s saw segment %s"
(:onyx.core/id event)
segment)))))
{})
(def identity-lifecycle
{:lifecycle/after-batch echo-segments})
(defn catalog [zookeeper-url topic-read topic-write]
[{:onyx/name :read-commands
:onyx/type :input
:onyx/medium :kafka
:onyx/plugin :onyx.plugin.kafka/read-messages
:kafka/wrap-with-metadata? true
:onyx/min-peers 1
:onyx/max-peers 1
:onyx/batch-size 10
:kafka/zookeeper zookeeper-url
:kafka/topic topic-read
:kafka/deserializer-fn :com.interrupt.streaming.core/deserialize-kafka-message
:kafka/offset-reset :earliest
:onyx/fn ::spy
:onyx/doc "Read commands from a Kafka topic"}
{:onyx/name :process-commands
:onyx/type :function
:onyx/min-peers 1
:onyx/max-peers 1
:onyx/batch-size 10
:onyx/fn :clojure.core/identity}
{:onyx/name :write-messages
:onyx/type :output
:onyx/medium :kafka
:onyx/plugin :onyx.plugin.kafka/write-messages
:onyx/min-peers 1
:onyx/max-peers 1
:onyx/batch-size 10
:kafka/zookeeper zookeeper-url
:kafka/topic topic-write
:kafka/serializer-fn :com.interrupt.streaming.core/serialize-kafka-message
:kafka/request-size 307200
:onyx/fn ::wrap-message
:onyx/doc "Writes messages to a Kafka topic"}])
(defn spy [segment]
(prn "Spying segment: " segment)
segment)
#_(defn build-lifecycles []
[{:lifecycle/task :process-commands
:lifecycle/calls :com.interrupt.streaming.core/identity-lifecycle
:onyx/doc "Lifecycle for logging segments"}])
(def serializer (serializers/edn-serializer))
(def deserializer (deserializers/edn-deserializer))
(defn deserialize-kafka-message [segments]
(.deserialize deserializer nil segments #_(String. segments "UTF-8")))
(defn serialize-kafka-message [segment]
(.serialize serializer nil segment))
(defn wrap-message [segment]
{:message segment
:key "2430d750-c423-4c36-990f-9fc75e416322"})
(comment
;; 1
(one-setup-topics)
;; 2
(two-write-to-topic)
;; 3
(three-consume-from-topic)
;; 4
(let [tenancy-id (UUID/randomUUID)
config (load-config "dev-config.edn")
env-config (assoc (:env-config config)
:onyx/tenancy-id tenancy-id
:zookeeper/address zookeeper-url)
peer-config (assoc (:peer-config config)
:onyx/tenancy-id tenancy-id
:zookeeper/address zookeeper-url)
job {:workflow workflow
:catalog (catalog zookeeper-url topic-scanner-command topic-scanner)
;; :lifecycles (build-lifecycles)
:task-scheduler :onyx.task-scheduler/balanced}
env (onyx.api/start-env env-config)
peer-group (onyx.api/start-peer-group peer-config)
v-peers (onyx.api/start-peers 5 peer-group)
submission (onyx.api/submit-job peer-config job)
job-submission-result (onyx.api/await-job-completion peer-config (:job-id submission))]
(println "Submit Job result:" submission)
(println "Await job completion:" job-submission-result)))