-
Notifications
You must be signed in to change notification settings - Fork 26
/
information_model.cljc
195 lines (164 loc) · 8.08 KB
/
information_model.cljc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
(ns onyx.kafka.information-model)
(def model
{:catalog-entry
{:onyx.plugin.kafka/read-messages
{:summary "An input task to read messages from a Kafka topic."
:model {:kafka/topic
{:doc "The topic name to read from."
:type :string}
:kafka/group-id
{:doc "The consumer identity to store in ZooKeeper."
:type :string}
:kafka/partition
{:doc "Partition to read from if auto-assignment is not used."
:type :string
:optional? true}
:kafka/bootstrap-servers
{:doc "The list of Kafka brokers to use for connection bootstrap."
:type [:string]
:optional? true}
:kafka/zookeeper
{:doc "The ZooKeeper connection string."
:type :string
:optional? true}
:kafka/offset-reset
{:doc "Offset bound to seek to when not found - `:earliest` or `:latest`."
:choices [:earliest :latest]
:type :keyword}
:kafka/force-reset?
{:doc "Force to read from the beginning or end of the log, as specified by `:kafka/offset-reset`. If false, reads from the last acknowledged messsage if it exists."
:deprecation-version "0.9.10.0"
:deprecation-doc ":kafka/force-reset? deprecated as this functionality has been subsumed by onyx resume-point."
:type :boolean}
:kafka/chan-capacity
{:doc "The buffer size of the Kafka reading channel."
:type :long
:default 1000
:deprecation-version "0.9.10.0"
:deprecation-doc ":kafka/chan-capacity deprecated as onyx-kafka no longer uses a separate producer thread."
:optional? true}
:kafka/receive-buffer-bytes
{:doc "The size in the receive buffer in the Kafka consumer."
:type :long
:default 65536
:optional? true}
:kafka/consumer-opts
{:doc "A map of arbitrary configuration to merge into the underlying Kafka consumer base configuration. Map should contain keywords as keys, and the valid values described in the [Kafka Docs](http://kafka.apache.org/documentation.html#newconsumerconfigs). Please note that key values such as `fetch.min.bytes` must be in keyword form, i.e. `:fetch.min.bytes`."
:type :map
:optional? true}
:kafka/fetch-size
{:doc "The size in bytes to request from ZooKeeper per fetch request."
:type :long
:default 307200
:deprecation-version "0.9.10.0"
:deprecation-doc ":kafka/fetch-size deprecated. Use :kafka/receive-buffer-bytes instead."
:optional? true}
:kafka/empty-read-back-off
{:doc "The amount of time to back off between reads when nothing was fetched from a consumer."
:type :long
:default 500
:deprecation-version "0.9.10.0"
:deprecation-doc ":kafka/empty-read-back-off deprecated in lieu of better use of :onyx/batch-timeout"
:optional? true}
:kafka/commit-interval
{:doc "The interval in milliseconds to commit the latest acknowledged offset to ZooKeeper."
:type :long
:default 2000
:optional? true}
:kafka/deserializer-fn
{:doc "A keyword that represents a fully qualified namespaced function to deserialize a record's value. Takes one argument, which must be a byte array."
:type :keyword}
:kafka/key-deserializer-fn
{:doc "A keyword that represents a fully qualified namespaced function to deserialize a record's key. Takes one argument, which must be a byte array. Only used when `:kafka/wrap-with-metadata?` is true."
:type :keyword
:optional? true}
:kafka/wrap-with-metadata?
{:doc "Wraps message into map with keys `:offset`, `:partitions`, `:topic` and `:message` itself."
:type :boolean
:default false
:optional? true}
:kafka/start-offsets
{:doc "Allows a task to be supplied with the starting offsets for all partitions. Maps partition to offset, e.g. `{0 50, 1, 90}` will start at offset 50 for partition 0, and offset 90 for partition 1."
:type :map
:optional? true}
:kafka/target-offsets
{:doc "Allows a task to be supplied with target offsets for all partitions. The consumer will read up to and including the target offset for each partition."
:type :map
:optional? true}}}
:onyx.plugin.kafka/write-messages
{:summary "Write messages to kafka."
:model {:kafka/topic
{:doc "The topic name to write to. Must either be supplied or otherwise all messages must contain a `:topic` key"
:optional? true
:type :string}
:kafka/partition
{:doc "Partition to write to, if you do not wish messages to be auto allocated to partitions. Must either be supplied in the task map, or all messages should contain a `:partition` key."
:type :string
:optional? true}
:kafka/bootstrap-servers
{:doc "The list of Kafka brokers to use for connection bootstrap."
:type [:string]
:optional? true}
:kafka/zookeeper
{:doc "The ZooKeeper connection string."
:type :string
:optional? true}
:kafka/request-size
{:doc "The maximum size of request messages. Maps to the `max.request.size` value of the internal kafka producer."
:type :long
:optional? true}
:kafka/serializer-fn
{:doc "A keyword that represents a fully qualified namespaced function to serialize a record's value. Takes one argument - the segment."
:type :keyword}
:kafka/key-serializer-fn
{:doc "A keyword that represents a fully qualified namespaced function to serialize a record's key. Takes one argument - the segment."
:type :keyword
:optional? true}
:kafka/producer-opts
{:doc "A map of arbitrary configuration to merge into the underlying Kafka producer base configuration. Map should contain keywords as keys, and the valid values described in the [Kafka Docs](http://kafka.apache.org/documentation.html#producerconfigs). Please note that key values such as `buffer.memory` must be in keyword form, i.e. `:buffer.memory`."
:type :map
:optional? true}
:kafka/no-seal?
{:doc "Do not write :done to the topic when task receives the sentinel signal (end of batch job)."
:type :boolean
:default false
:optional? true}}}}
:lifecycle-entry
{:onyx.plugin.kafka/read-messages
{:model
[{:task.lifecycle/name :read-messages
:lifecycle/calls :onyx.plugin.kafka/read-messages-calls}]}
:onyx.plugin.kafka/write-messages
{:model
[{:task.lifecycle/name :write-messages
:lifecycle/calls :onyx.plugin.kafka/write-messages-calls}]}}
:display-order
{:onyx.plugin.kafka/read-messages
[:kafka/topic
:kafka/partition
:kafka/group-id
:kafka/zookeeper
:kafka/bootstrap-servers
:kafka/offset-reset
:kafka/force-reset?
:kafka/deserializer-fn
:kafka/key-deserializer-fn
:kafka/receive-buffer-bytes
:kafka/commit-interval
:kafka/wrap-with-metadata?
:kafka/start-offsets
:kafka/target-offsets
:kafka/consumer-opts
:kafka/empty-read-back-off
:kafka/fetch-size
:kafka/chan-capacity]
:onyx.plugin.kafka/write-messages
[:kafka/topic
:kafka/zookeeper
:kafka/bootstrap-servers
:kafka/partition
:kafka/serializer-fn
:kafka/key-serializer-fn
:kafka/request-size
:kafka/no-seal?
:kafka/producer-opts]}})