-
Notifications
You must be signed in to change notification settings - Fork 122
/
kafka.rb
180 lines (171 loc) · 8.9 KB
/
kafka.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
require 'logstash/namespace'
require 'logstash/inputs/base'
require 'jruby-kafka'
# This input will read events from a Kafka topic. It uses the high level consumer API provided
# by Kafka to read messages from the broker. It also maintains the state of what has been
# consumed using Zookeeper. The default input codec is json
#
# You must configure `topic_id`, `white_list` or `black_list`. By default it will connect to a
# Zookeeper running on localhost. All the broker information is read from Zookeeper state
#
# Ideally you should have as many threads as the number of partitions for a perfect balance --
# more threads than partitions means that some threads will be idle
#
# For more information see http://kafka.apache.org/documentation.html#theconsumer
#
# Kafka consumer configuration: http://kafka.apache.org/documentation.html#consumerconfigs
#
class LogStash::Inputs::Kafka < LogStash::Inputs::Base
config_name 'kafka'
default :codec, 'json'
# Specifies the ZooKeeper connection string in the form hostname:port where host and port are
# the host and port of a ZooKeeper server. You can also specify multiple hosts in the form
# `hostname1:port1,hostname2:port2,hostname3:port3`.
#
# The server may also have a ZooKeeper chroot path as part of it's ZooKeeper connection string
# which puts its data under some path in the global ZooKeeper namespace. If so the consumer
# should use the same chroot path in its connection string. For example to give a chroot path of
# `/chroot/path` you would give the connection string as
# `hostname1:port1,hostname2:port2,hostname3:port3/chroot/path`.
config :zk_connect, :validate => :string, :default => 'localhost:2181'
# A string that uniquely identifies the group of consumer processes to which this consumer
# belongs. By setting the same group id multiple processes indicate that they are all part of
# the same consumer group.
config :group_id, :validate => :string, :default => 'logstash'
# The topic to consume messages from
config :topic_id, :validate => :string, :default => nil
# Whitelist of topics to include for consumption.
config :white_list, :validate => :string, :default => nil
# Blacklist of topics to exclude from consumption.
config :black_list, :validate => :string, :default => nil
# Reset the consumer group to start at the earliest message present in the log by clearing any
# offsets for the group stored in Zookeeper. This is destructive! Must be used in conjunction
# with auto_offset_reset => 'smallest'
config :reset_beginning, :validate => :boolean, :default => false
# `smallest` or `largest` - (optional, default `largest`) If the consumer does not already
# have an established offset or offset is invalid, start with the earliest message present in the
# log (`smallest`) or after the last message in the log (`largest`).
config :auto_offset_reset, :validate => %w( largest smallest ), :default => 'largest'
# Number of threads to read from the partitions. Ideally you should have as many threads as the
# number of partitions for a perfect balance. More threads than partitions means that some
# threads will be idle. Less threads means a single thread could be consuming from more than
# one partition
config :consumer_threads, :validate => :number, :default => 1
# Internal Logstash queue size used to hold events in memory after it has been read from Kafka
config :queue_size, :validate => :number, :default => 20
# When a new consumer joins a consumer group the set of consumers attempt to "rebalance" the
# load to assign partitions to each consumer. If the set of consumers changes while this
# assignment is taking place the rebalance will fail and retry. This setting controls the
# maximum number of attempts before giving up.
config :rebalance_max_retries, :validate => :number, :default => 4
# Backoff time between retries during rebalance.
config :rebalance_backoff_ms, :validate => :number, :default => 2000
# Throw a timeout exception to the consumer if no message is available for consumption after
# the specified interval
config :consumer_timeout_ms, :validate => :number, :default => -1
# Option to restart the consumer loop on error
config :consumer_restart_on_error, :validate => :boolean, :default => true
# Time in millis to wait for consumer to restart after an error
config :consumer_restart_sleep_ms, :validate => :number, :default => 0
# Option to add Kafka metadata like topic, message size to the event
config :decorate_events, :validate => :boolean, :default => false
# A unique id for the consumer; generated automatically if not set.
config :consumer_id, :validate => :string, :default => nil
# The number of byes of messages to attempt to fetch for each topic-partition in each fetch
# request. These bytes will be read into memory for each partition, so this helps control
# the memory used by the consumer. The fetch request size must be at least as large as the
# maximum message size the server allows or else it is possible for the producer to send
# messages larger than the consumer can fetch.
config :fetch_message_max_bytes, :validate => :number, :default => 1048576
# The serializer class for messages. The default decoder takes a byte[] and returns the same byte[]
config :decoder_class, :validate => :string, :default => 'kafka.serializer.DefaultDecoder'
# The serializer class for keys (defaults to the same default as for messages)
config :key_decoder_class, :validate => :string, :default => 'kafka.serializer.DefaultDecoder'
public
def register
LogStash::Logger.setup_log4j(@logger)
options = {
:zk_connect => @zk_connect,
:group_id => @group_id,
:topic_id => @topic_id,
:auto_offset_reset => @auto_offset_reset,
:rebalance_max_retries => @rebalance_max_retries,
:rebalance_backoff_ms => @rebalance_backoff_ms,
:consumer_timeout_ms => @consumer_timeout_ms,
:consumer_restart_on_error => @consumer_restart_on_error,
:consumer_restart_sleep_ms => @consumer_restart_sleep_ms,
:consumer_id => @consumer_id,
:fetch_message_max_bytes => @fetch_message_max_bytes,
:allow_topics => @white_list,
:filter_topics => @black_list,
:value_decoder_class => @decoder_class,
:key_decoder_class => @key_decoder_class
}
if @reset_beginning
options[:reset_beginning] = 'from-beginning'
end # if :reset_beginning
topic_or_filter = [@topic_id, @white_list, @black_list].compact
if topic_or_filter.count == 0
raise LogStash::ConfigurationError, 'topic_id, white_list or black_list required.'
elsif topic_or_filter.count > 1
raise LogStash::ConfigurationError, 'Invalid combination of topic_id, white_list or black_list. Use only one.'
end
@kafka_client_queue = SizedQueue.new(@queue_size)
@consumer_group = create_consumer_group(options)
@logger.info('Registering kafka', :group_id => @group_id, :topic_id => @topic_id, :zk_connect => @zk_connect)
end # def register
public
def run(logstash_queue)
# noinspection JRubyStringImportInspection
java_import 'kafka.common.ConsumerRebalanceFailedException'
@logger.info('Running kafka', :group_id => @group_id, :topic_id => @topic_id, :zk_connect => @zk_connect)
begin
@consumer_group.run(@consumer_threads,@kafka_client_queue)
begin
while true
event = @kafka_client_queue.pop
queue_event(event, logstash_queue)
end
rescue LogStash::ShutdownSignal
@logger.info('Kafka got shutdown signal')
@consumer_group.shutdown
end
until @kafka_client_queue.empty?
queue_event(@kafka_client_queue.pop,logstash_queue)
end
@logger.info('Done running kafka input')
rescue => e
@logger.warn('kafka client threw exception, restarting',
:exception => e)
if @consumer_group.running?
@consumer_group.shutdown
end
sleep(Float(@consumer_restart_sleep_ms) * 1 / 1000)
retry
end
finished
end # def run
private
def create_consumer_group(options)
Kafka::Group.new(options)
end
private
def queue_event(message_and_metadata, output_queue)
begin
@codec.decode("#{message_and_metadata.message}") do |event|
decorate(event)
if @decorate_events
event['kafka'] = {'msg_size' => event['message'].bytesize,
'topic' => message_and_metadata.topic,
'consumer_group' => @group_id,
'partition' => message_and_metadata.partition,
'key' => message_and_metadata.key}
end
output_queue << event
end # @codec.decode
rescue => e # parse or event creation error
@logger.error('Failed to create event', :message => "#{message_and_metadata.message}", :exception => e,
:backtrace => e.backtrace)
end # begin
end # def queue_event
end #class LogStash::Inputs::Kafka