Skip to content

Commit

Permalink
Add seek_by method (#462)
Browse files Browse the repository at this point in the history
* Add seek_by method

This is to enable using only the needed parameters and remove the need to create an Rdkafka Message which is an implementation wrapper that should not be needed for the API of the clients

* Use topic variable

* Use seek_by method

* Add seek_by to changelog
  • Loading branch information
zinahia committed Jun 19, 2024
1 parent 075d963 commit c33ee20
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## 0.17.0 (Unreleased)
- [Enhancement] Update `librdkafka` to `2.4.0`
- [Feature] Add `#seek_by` to be able to seek for a message by topic, partition and offset (zinahia)

## 0.16.0 (2024-06-13)
- **[Breaking]** Retire support for Ruby 2.7.
Expand Down
19 changes: 16 additions & 3 deletions lib/rdkafka/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -423,21 +423,34 @@ def store_offset(message)
# @return [nil]
# @raise [RdkafkaError] When seeking fails
def seek(message)
seek_by(message.topic, message.partition, message.offset)
end

# Seek to a particular message by providing the topic, partition and offset.
# The next poll on the topic/partition will return the
# message at the given offset.
#
# @param topic [String] The topic in which to seek
# @param partition [Integer] The partition number to seek
# @param offset [Integer] The partition offset to seek
# @return [nil]
# @raise [RdkafkaError] When seeking fails
def seek_by(topic, partition, offset)
closed_consumer_check(__method__)

# rd_kafka_offset_store is one of the few calls that does not support
# a string as the topic, so create a native topic for it.
native_topic = @native_kafka.with_inner do |inner|
Rdkafka::Bindings.rd_kafka_topic_new(
inner,
message.topic,
topic,
nil
)
end
response = Rdkafka::Bindings.rd_kafka_seek(
native_topic,
message.partition,
message.offset,
partition,
offset,
0 # timeout
)
if response != 0
Expand Down
89 changes: 89 additions & 0 deletions spec/rdkafka/consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,95 @@ def send_one_message(val)
end
end

describe "#seek_by" do
let(:topic) { "consume_test_topic" }
let(:partition) { 0 }
let(:offset) { 0 }

it "should raise an error when seeking fails" do
expect(Rdkafka::Bindings).to receive(:rd_kafka_seek).and_return(20)
expect {
consumer.seek_by(topic, partition, offset)
}.to raise_error Rdkafka::RdkafkaError
end

context "subscription" do
let(:timeout) { 1000 }

before do
consumer.subscribe(topic)

# 1. partitions are assigned
wait_for_assignment(consumer)
expect(consumer.assignment).not_to be_empty

# 2. eat unrelated messages
while(consumer.poll(timeout)) do; end
end
after { consumer.unsubscribe }

def send_one_message(val)
producer.produce(
topic: topic,
payload: "payload #{val}",
key: "key 1",
partition: 0
).wait
end

it "works when a partition is paused" do
# 3. get reference message
send_one_message(:a)
message1 = consumer.poll(timeout)
expect(message1&.payload).to eq "payload a"

# 4. pause the subscription
tpl = Rdkafka::Consumer::TopicPartitionList.new
tpl.add_topic(topic, 1)
consumer.pause(tpl)

# 5. seek by the previous message fields
consumer.seek_by(message1.topic, message1.partition, message1.offset)

# 6. resume the subscription
tpl = Rdkafka::Consumer::TopicPartitionList.new
tpl.add_topic(topic, 1)
consumer.resume(tpl)

# 7. ensure same message is read again
message2 = consumer.poll(timeout)

# This is needed because `enable.auto.offset.store` is true but when running in CI that
# is overloaded, offset store lags
sleep(2)

consumer.commit
expect(message1.offset).to eq message2.offset
expect(message1.payload).to eq message2.payload
end

it "allows skipping messages" do
# 3. send messages
send_one_message(:a)
send_one_message(:b)
send_one_message(:c)

# 4. get reference message
message = consumer.poll(timeout)
expect(message&.payload).to eq "payload a"

# 5. seek over one message
consumer.seek_by(message.topic, message.partition, message.offset + 2)

# 6. ensure that only one message is available
records = consumer.poll(timeout)
expect(records&.payload).to eq "payload c"
records = consumer.poll(timeout)
expect(records).to be_nil
end
end
end

describe "#assign and #assignment" do
it "should return an empty assignment if nothing is assigned" do
expect(consumer.assignment).to be_empty
Expand Down

0 comments on commit c33ee20

Please sign in to comment.