Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions lib/kafka/fetch_operation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,9 @@ def execute

messages = fetched_partition.messages.map {|message|
FetchedMessage.new(
value: message.value,
key: message.key,
message: message,
topic: fetched_topic.name,
partition: fetched_partition.partition,
offset: message.offset,
create_time: message.create_time,
)
}

Expand Down
38 changes: 21 additions & 17 deletions lib/kafka/fetched_message.rb
Original file line number Diff line number Diff line change
@@ -1,31 +1,35 @@
module Kafka
class FetchedMessage

# @return [String] the value of the message.
attr_reader :value

# @return [String] the key of the message.
attr_reader :key

# @return [String] the name of the topic that the message was written to.
attr_reader :topic

# @return [Integer] the partition number that the message was written to.
attr_reader :partition

def initialize(message:, topic:, partition:)
@message = message
@topic = topic
@partition = partition
end

# @return [String] the value of the message.
def value
@message.value
end

# @return [String] the key of the message.
def key
@message.key
end

# @return [Integer] the offset of the message in the partition.
attr_reader :offset
def offset
@message.offset
end

# @return [Time] the timestamp of the message.
attr_reader :create_time

def initialize(value: nil, key: nil, topic:, partition:, offset:, create_time: nil)
@value = value
@key = key
@topic = topic
@partition = partition
@offset = offset
@create_time = create_time
def create_time
@message.create_time
end
end
end
3 changes: 2 additions & 1 deletion ruby-kafka.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,6 @@ Gem::Specification.new do |spec|
spec.add_development_dependency "ruby-prof"
spec.add_development_dependency "timecop"
spec.add_development_dependency "rubocop", "~> 0.49.1"
spec.add_development_dependency "gssapi", '>=1.2.0'
spec.add_development_dependency "gssapi", ">= 1.2.0"
spec.add_development_dependency "stackprof"
end
8 changes: 4 additions & 4 deletions spec/consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@
describe "#each_message" do
let(:messages) {
[
Kafka::FetchedMessage.new(
double(:message, {
value: "hello",
key: nil,
topic: "greetings",
partition: 0,
offset: 13,
create_time: Time.now,
)
})
]
}

Expand Down Expand Up @@ -190,14 +190,14 @@
describe "#each_batch" do
let(:messages) {
[
Kafka::FetchedMessage.new(
double(:message, {
value: "hello",
key: nil,
topic: "greetings",
partition: 0,
offset: 13,
create_time: Time.now,
)
})
]
}

Expand Down