-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkafka_consumer.rb
85 lines (71 loc) · 1.87 KB
/
kafka_consumer.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
require "statsd"
STATSD = Statsd.new("localhost", 9125)
# Abstract class meant to be subclassed by concrete Kafka consumer
# implementations. It provides every consumer integration with statsd and
# can also provide many other integrations in the future (Sentry, honeycomb etc.)
#
# @abstract
#
# @see {SampleConsumer}
class KafkaConsumer
attr_reader :group, :id, :client
@topic = nil
class << self
attr :topic
# @raise [RuntimeError] if topic is blank
def set_topic(t)
raise "Topic #{t.inspect} is invalid" if t.blank?
@topic = t
end
end
# @param group [String] kafka consumer group
# @param id [String] consumer id
# @param rafka_opts [Hash]
#
# @raise [RuntimeError] if {.set_topic} has not been called
def initialize(group, id, rafka_opts)
if self.class.topic.blank?
raise "Topic for #{self.class} should be set using `set_topic`"
end
@group = group
@id = id
rafka_opts = rafka_opts.merge(topic: self.class.topic, group: @group, id: @id)
@client = Rafka::Consumer.new(rafka_opts)
end
# @abstract
#
# @param msg [Rafka::Message]
def process(msg)
raise "Implement me!"
end
# @return [String]
def topic
self.class.topic
end
def to_s
"#{group}:#{id}"
end
# Consumes a message and processes it.
#
# @param timeout [Fixnum] number of seconds to wait for a message before
# retrying
#
# @return [Rafka::Message, nil]
def process_next(timeout)
client.consume(timeout) do |msg|
begin
# record timing statistics to some backend (eg. statsd)
STATSD.time("kafka.consumer.process.#{self.class}") do
process(msg)
end
rescue => e
# ...report the exception to a backend (eg. Sentry)
raise e
end
end
end
# Terminates the consumer and the connection to Rafka.
def shutdown
client.close
end
end