diff --git a/lib/kafka/fetch_operation.rb b/lib/kafka/fetch_operation.rb index 517b55a50..5b2130c9f 100644 --- a/lib/kafka/fetch_operation.rb +++ b/lib/kafka/fetch_operation.rb @@ -91,12 +91,9 @@ def execute messages = fetched_partition.messages.map {|message| FetchedMessage.new( - value: message.value, - key: message.key, + message: message, topic: fetched_topic.name, partition: fetched_partition.partition, - offset: message.offset, - create_time: message.create_time, ) } diff --git a/lib/kafka/fetched_message.rb b/lib/kafka/fetched_message.rb index fb1af5a0c..2f6e91fbc 100644 --- a/lib/kafka/fetched_message.rb +++ b/lib/kafka/fetched_message.rb @@ -1,31 +1,35 @@ module Kafka class FetchedMessage - - # @return [String] the value of the message. - attr_reader :value - - # @return [String] the key of the message. - attr_reader :key - # @return [String] the name of the topic that the message was written to. attr_reader :topic # @return [Integer] the partition number that the message was written to. attr_reader :partition + def initialize(message:, topic:, partition:) + @message = message + @topic = topic + @partition = partition + end + + # @return [String] the value of the message. + def value + @message.value + end + + # @return [String] the key of the message. + def key + @message.key + end + # @return [Integer] the offset of the message in the partition. - attr_reader :offset + def offset + @message.offset + end # @return [Time] the timestamp of the message. - attr_reader :create_time - - def initialize(value: nil, key: nil, topic:, partition:, offset:, create_time: nil) - @value = value - @key = key - @topic = topic - @partition = partition - @offset = offset - @create_time = create_time + def create_time + @message.create_time end end end diff --git a/ruby-kafka.gemspec b/ruby-kafka.gemspec index e5221485a..b8b42475f 100644 --- a/ruby-kafka.gemspec +++ b/ruby-kafka.gemspec @@ -43,5 +43,6 @@ Gem::Specification.new do |spec| spec.add_development_dependency "ruby-prof" spec.add_development_dependency "timecop" spec.add_development_dependency "rubocop", "~> 0.49.1" - spec.add_development_dependency "gssapi", '>=1.2.0' + spec.add_development_dependency "gssapi", ">= 1.2.0" + spec.add_development_dependency "stackprof" end diff --git a/spec/consumer_spec.rb b/spec/consumer_spec.rb index fea327749..58d86b638 100644 --- a/spec/consumer_spec.rb +++ b/spec/consumer_spec.rb @@ -53,14 +53,14 @@ describe "#each_message" do let(:messages) { [ - Kafka::FetchedMessage.new( + double(:message, { value: "hello", key: nil, topic: "greetings", partition: 0, offset: 13, create_time: Time.now, - ) + }) ] } @@ -190,14 +190,14 @@ describe "#each_batch" do let(:messages) { [ - Kafka::FetchedMessage.new( + double(:message, { value: "hello", key: nil, topic: "greetings", partition: 0, offset: 13, create_time: Time.now, - ) + }) ] }