Skip to content

Commit

Permalink
Explicitly set the topics to get metadata from
Browse files Browse the repository at this point in the history
* Limit the response size by only requesting metadata for topics that we
  buffered have messages for.
* By setting the topics in the metadata request Kafka will automatically
  create them if `auto.create.topics.enable` is enabled in the Kafka
  brokers.

The producer adds the topics with buffered messages to the broker pool's
list of target topics. The broker pool takes care of re-fetching
metadata when needed.
  • Loading branch information
dasch committed Feb 11, 2016
1 parent 142ee01 commit ffc1a34
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 1 deletion.
11 changes: 10 additions & 1 deletion lib/kafka/broker_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ def initialize(seed_brokers:, client_id:, logger:, connect_timeout: nil, socket_
@brokers = {}
@seed_brokers = seed_brokers
@cluster_info = nil
@target_topics = []
end

def add_target_topic(topic)
unless @target_topics.include?(topic)
@logger.info "New topic added to target list: #{topic}"
@target_topics.push(topic)
mark_as_stale!
end
end

def mark_as_stale!
Expand Down Expand Up @@ -96,7 +105,7 @@ def fetch_cluster_info
logger: @logger,
)

cluster_info = broker.fetch_metadata
cluster_info = broker.fetch_metadata(topics: @target_topics)

@logger.info "Initialized broker pool with brokers: #{cluster_info.brokers.inspect}"

Expand Down
3 changes: 3 additions & 0 deletions lib/kafka/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ def produce(value, key: nil, topic:, partition: nil, partition_key: nil)
raise BufferOverflow, "Max buffer size #{@max_buffer_size} exceeded"
end

# Make sure we get metadata for this topic.
@broker_pool.add_target_topic(topic)

if partition.nil?
# If no explicit partition key is specified we use the message key instead.
partition_key ||= key
Expand Down
1 change: 1 addition & 0 deletions spec/producer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

before do
allow(broker_pool).to receive(:mark_as_stale!)
allow(broker_pool).to receive(:add_target_topic).with("greetings")

allow(broker_pool).to receive(:get_leader).with("greetings", 0) { broker1 }
allow(broker_pool).to receive(:get_leader).with("greetings", 1) { broker2 }
Expand Down

0 comments on commit ffc1a34

Please sign in to comment.