Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
## 0.15.1 (Unreleased)
- [Enhancement] Replace `rd_kafka_offset_store` with `rd_kafka_offsets_store` (mensfeld)
- [Enhancement] Alias `topic_name` as `topic` in the delivery report (mensfeld)
- [Enhancement] Provide `label` producer handler and report reference for improved traceability (mensfeld)
- [Enhancement] Include the error when invoking `create_result` on producer handle (mensfeld)
- [Fix] Fix return type on `#rd_kafka_poll` (mensfeld)
- [Fix] `uint8_t` does not exist on Apple Silicon (mensfeld)
- [Fix] Missing ACL `RD_KAFKA_RESOURCE_BROKER` constant reference (mensfeld)
Expand Down
11 changes: 10 additions & 1 deletion lib/rdkafka/callbacks.rb
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,16 @@ def self.call(_, message_ptr, opaque_ptr)

# Call delivery callback on opaque
if opaque = Rdkafka::Config.opaques[opaque_ptr.to_i]
opaque.call_delivery_callback(Rdkafka::Producer::DeliveryReport.new(message[:partition], message[:offset], topic_name, message[:err]), delivery_handle)
opaque.call_delivery_callback(
Rdkafka::Producer::DeliveryReport.new(
message[:partition],
message[:offset],
topic_name,
message[:err],
delivery_handle.label
),
delivery_handle
)
end
end
end
Expand Down
4 changes: 3 additions & 1 deletion lib/rdkafka/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,12 @@ def partition_count(topic)
# @param partition_key [String, nil] Optional partition key based on which partition assignment can happen
# @param timestamp [Time,Integer,nil] Optional timestamp of this message. Integer timestamp is in milliseconds since Jan 1 1970.
# @param headers [Hash<String,String>] Optional message headers
# @param label [Object, nil] a label that can be assigned when producing a message that will be part of the delivery handle and the delivery report
#
# @return [DeliveryHandle] Delivery handle that can be used to wait for the result of producing this message
#
# @raise [RdkafkaError] When adding the message to rdkafka's queue failed
def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil)
def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil, label: nil)
closed_producer_check(__method__)

# Start by checking and converting the input
Expand Down Expand Up @@ -210,6 +211,7 @@ def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil,
end

delivery_handle = DeliveryHandle.new
delivery_handle.label = label
delivery_handle[:pending] = true
delivery_handle[:response] = -1
delivery_handle[:partition] = -1
Expand Down
11 changes: 10 additions & 1 deletion lib/rdkafka/producer/delivery_handle.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,23 @@ class DeliveryHandle < Rdkafka::AbstractHandle
:offset, :int64,
:topic_name, :pointer

# @return [Object, nil] label set during message production or nil by default
attr_accessor :label

# @return [String] the name of the operation (e.g. "delivery")
def operation_name
"delivery"
end

# @return [DeliveryReport] a report on the delivery of the message
def create_result
DeliveryReport.new(self[:partition], self[:offset], self[:topic_name].read_string)
DeliveryReport.new(
self[:partition],
self[:offset],
self[:topic_name].read_string,
self[:response] != 0 ? RdkafkaError.new(self[:response]) : nil,
label
)
end
end
end
Expand Down
6 changes: 5 additions & 1 deletion lib/rdkafka/producer/delivery_report.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ class DeliveryReport
# @return [Integer]
attr_reader :error

# @return [Object, nil] label set during message production or nil by default
attr_reader :label

# We alias the `#topic_name` under `#topic` to make this consistent with `Consumer::Message`
# where the topic name is under `#topic` method. That way we have a consistent name that
# is present in both places
Expand All @@ -29,11 +32,12 @@ class DeliveryReport

private

def initialize(partition, offset, topic_name = nil, error = nil)
def initialize(partition, offset, topic_name = nil, error = nil, label = nil)
@partition = partition
@offset = offset
@topic_name = topic_name
@error = error
@label = label
end
end
end
Expand Down
28 changes: 26 additions & 2 deletions spec/rdkafka/producer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

producer.delivery_callback = lambda do |report|
expect(report).not_to be_nil
expect(report.label).to eq "label"
expect(report.partition).to eq 1
expect(report.offset).to be >= 0
expect(report.topic_name).to eq "produce_test_topic"
Expand All @@ -44,9 +45,12 @@
handle = producer.produce(
topic: "produce_test_topic",
payload: "payload",
key: "key"
key: "key",
label: "label"
)

expect(handle.label).to eq "label"

# Wait for it to be delivered
handle.wait(max_wait_timeout: 15)

Expand Down Expand Up @@ -175,18 +179,21 @@ def call(_, handle)
handle = producer.produce(
topic: "produce_test_topic",
payload: "payload",
key: "key"
key: "key",
label: "label"
)

# Should be pending at first
expect(handle.pending?).to be true
expect(handle.label).to eq "label"

# Check delivery handle and report
report = handle.wait(max_wait_timeout: 5)
expect(handle.pending?).to be false
expect(report).not_to be_nil
expect(report.partition).to eq 1
expect(report.offset).to be >= 0
expect(report.label).to eq "label"

# Flush and close producer
producer.flush
Expand Down Expand Up @@ -558,6 +565,23 @@ def call(_, handle)
end
end

context "when not being able to deliver the message" do
let(:producer) do
rdkafka_producer_config(
"bootstrap.servers": "localhost:9093",
"message.timeout.ms": 100
).producer
end

it "should contain the error in the response when not deliverable" do
handler = producer.produce(topic: 'produce_test_topic', payload: nil, label: 'na')
# Wait for the async callbacks and delivery registry to update
sleep(2)
expect(handler.create_result.error).to be_a(Rdkafka::RdkafkaError)
expect(handler.create_result.label).to eq('na')
end
end

describe '#partition_count' do
it { expect(producer.partition_count('consume_test_topic')).to eq(3) }

Expand Down