-
Notifications
You must be signed in to change notification settings - Fork 95
/
consumer_set.rb
273 lines (234 loc) · 9.35 KB
/
consumer_set.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
# frozen_string_literal: true
module Racecar
class ConsumerSet
MAX_POLL_TRIES = 10
def initialize(config, logger, instrumenter = NullInstrumenter)
@config, @logger = config, logger
@instrumenter = instrumenter
raise ArgumentError, "Subscriptions must not be empty when subscribing" if @config.subscriptions.empty?
@consumers = []
@consumer_id_iterator = (0...@config.subscriptions.size).cycle
@previous_retries = 0
@last_poll_read_nil_message = false
@paused_tpls = Hash.new { |h, k| h[k] = {} }
end
def poll(max_wait_time_ms = @config.max_wait_time_ms)
batch_poll(max_wait_time_ms, 1).first
end
# batch_poll collects messages until any of the following occurs:
# - max_wait_time_ms time has passed
# - max_messages have been collected
# - a nil message was polled (end of topic, Kafka stalled, etc.)
#
# The messages are from a single topic, but potentially from more than one partition.
#
# Any errors during polling are retried in an exponential backoff fashion. If an error
# occurs, but there is no time left for a backoff and retry, it will return the
# already collected messages and only retry on the next call.
def batch_poll(max_wait_time_ms = @config.max_wait_time_ms, max_messages = @config.fetch_messages)
started_at = Time.now
remain_ms = max_wait_time_ms
maybe_select_next_consumer
messages = []
while remain_ms > 0 && messages.size < max_messages
remain_ms = remaining_time_ms(max_wait_time_ms, started_at)
msg = poll_with_retries(remain_ms)
break if msg.nil?
messages << msg
end
messages
end
def store_offset(message)
current.store_offset(message)
rescue Rdkafka::RdkafkaError => e
if e.code == :state # -172
@logger.warn "Attempted to store_offset, but we're not subscribed to it: #{ErroneousStateError.new(e)}"
return
end
raise e
end
def commit
each_subscribed do |consumer|
commit_rescue_no_offset(consumer)
end
end
def close
each_subscribed(&:close)
@paused_tpls.clear
end
def current
@consumers[@consumer_id_iterator.peek] ||= begin
consumer_config = Rdkafka::Config.new(rdkafka_config(current_subscription))
listener = RebalanceListener.new(@config.consumer_class, @instrumenter)
consumer_config.consumer_rebalance_listener = listener
consumer = consumer_config.consumer
listener.rdkafka_consumer = consumer
@instrumenter.instrument('join_group') do
consumer.subscribe current_subscription.topic
end
consumer
end
end
def each_subscribed
if block_given?
@consumers.each { |c| yield c }
else
@consumers.each
end
end
def pause(topic, partition, offset)
consumer, filtered_tpl = find_consumer_by(topic, partition)
if !consumer
@logger.info "Attempted to pause #{topic}/#{partition}, but we're not subscribed to it"
return
end
consumer.pause(filtered_tpl)
fake_msg = OpenStruct.new(topic: topic, partition: partition, offset: offset)
consumer.seek(fake_msg)
@paused_tpls[topic][partition] = [consumer, filtered_tpl]
end
def resume(topic, partition)
consumer, filtered_tpl = find_consumer_by(topic, partition)
if !consumer && @paused_tpls[topic][partition]
consumer, filtered_tpl = @paused_tpls[topic][partition]
end
if !consumer
@logger.info "Attempted to resume #{topic}/#{partition}, but we're not subscribed to it"
return
end
consumer.resume(filtered_tpl)
@paused_tpls[topic].delete(partition)
@paused_tpls.delete(topic) if @paused_tpls[topic].empty?
end
alias :each :each_subscribed
# Subscribe to all topics eagerly, even if there's still messages elsewhere. Usually
# that's not needed and Kafka might rebalance if topics are not polled frequently
# enough.
def subscribe_all
@config.subscriptions.size.times do
current
select_next_consumer
end
end
private
# polls a single message from the current consumer, retrying errors with exponential
# backoff. The sleep time is capped by max_wait_time_ms. If there's enough time budget
# left, it will retry before returning. If there isn't, the retry will only occur on
# the next call. It tries up to MAX_POLL_TRIES before passing on the exception.
def poll_with_retries(max_wait_time_ms)
try ||= @previous_retries
@previous_retries = 0
started_at ||= Time.now
remain_ms = remaining_time_ms(max_wait_time_ms, started_at)
wait_ms = try == 0 ? 0 : 50 * (2**try) # 0ms, 100ms, 200ms, 400ms, …
if wait_ms >= max_wait_time_ms && remain_ms > 1
@logger.debug "Capping #{wait_ms}ms to #{max_wait_time_ms-1}ms."
sleep (max_wait_time_ms-1)/1000.0
remain_ms = 1
elsif try == 0 && remain_ms == 0
@logger.debug "No time remains for polling messages. Will try on next call."
return nil
elsif wait_ms >= remain_ms
@logger.warn "Only #{remain_ms}ms left, but want to wait for #{wait_ms}ms before poll. Will retry on next call."
@previous_retries = try
return nil
elsif wait_ms > 0
sleep wait_ms/1000.0
remain_ms -= wait_ms
end
poll_current_consumer(remain_ms)
rescue Rdkafka::RdkafkaError => e
try += 1
@instrumenter.instrument("poll_retry", try: try, rdkafka_time_limit: remain_ms, exception: e)
@logger.error "(try #{try}/#{MAX_POLL_TRIES}): Error for topic subscription #{current_subscription}: #{e}"
raise if try >= MAX_POLL_TRIES
retry
end
# polls a message for the current consumer, handling any API edge cases.
def poll_current_consumer(max_wait_time_ms)
@last_poll_read_nil_message = false
msg = current.poll(max_wait_time_ms)
rescue Rdkafka::RdkafkaError => e
case e.code
when :max_poll_exceeded, :transport, :not_coordinator # -147, -195, 16
reset_current_consumer
end
raise
ensure
@last_poll_read_nil_message = msg.nil?
end
def find_consumer_by(topic, partition)
each do |consumer|
tpl = consumer.assignment.to_h
rdkafka_partition = tpl[topic]&.detect { |part| part.partition == partition }
next unless rdkafka_partition
filtered_tpl = Rdkafka::Consumer::TopicPartitionList.new({ topic => [rdkafka_partition] })
return consumer, filtered_tpl
end
return nil, nil
end
def current_subscription
@config.subscriptions[@consumer_id_iterator.peek]
end
def reset_current_consumer
current_consumer_id = @consumer_id_iterator.peek
@logger.info "Resetting consumer with id: #{current_consumer_id}"
consumer = @consumers[current_consumer_id]
consumer.close unless consumer.nil?
@consumers[current_consumer_id] = nil
end
def maybe_select_next_consumer
case @config.multi_subscription_strategy
when "round-robin"
select_next_consumer
else # "exhaust-topic"
if @last_poll_read_nil_message
select_next_consumer
end
end
end
def select_next_consumer
@consumer_id_iterator.next
end
def commit_rescue_no_offset(consumer)
consumer.commit(nil, !@config.synchronous_commits)
rescue Rdkafka::RdkafkaError => e
raise e if e.code != :no_offset
@logger.debug "Nothing to commit."
end
def rdkafka_config(subscription)
# https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
config = {
# Manually store offset after messages have been processed successfully
# to avoid marking failed messages as committed. The call just updates
# a value within librdkafka and is asynchronously written to proper
# storage through auto commits.
"enable.auto.offset.store" => false,
"auto.commit.interval.ms" => @config.offset_commit_interval * 1000,
"auto.offset.reset" => subscription.start_from_beginning ? "earliest" : "largest",
"bootstrap.servers" => @config.brokers.join(","),
"client.id" => @config.client_id,
"enable.partition.eof" => false,
"fetch.max.bytes" => @config.max_bytes,
"message.max.bytes" => subscription.max_bytes_per_partition,
"fetch.min.bytes" => @config.fetch_min_bytes,
"fetch.wait.max.ms" => @config.max_wait_time_ms,
"group.id" => @config.group_id,
"heartbeat.interval.ms" => @config.heartbeat_interval * 1000,
"max.poll.interval.ms" => @config.max_poll_interval * 1000,
"queued.min.messages" => @config.min_message_queue_size,
"session.timeout.ms" => @config.session_timeout * 1000,
"socket.timeout.ms" => @config.socket_timeout * 1000,
"statistics.interval.ms" => @config.statistics_interval_ms,
"partition.assignment.strategy" => @config.partition_assignment_strategy,
}
config.merge! @config.rdkafka_consumer
config.merge! subscription.additional_config
config
end
def remaining_time_ms(limit_ms, started_at_time)
r = limit_ms - ((Time.now - started_at_time)*1000).round
r <= 0 ? 0 : r
end
end
end