diff --git a/CHANGELOG.md b/CHANGELOG.md index 5d140b30..2e9dc61c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,8 @@ ## 0.15.1 (Unreleased) - [Enhancement] Replace `rd_kafka_offset_store` with `rd_kafka_offsets_store` (mensfeld) - [Enhancement] Alias `topic_name` as `topic` in the delivery report (mensfeld) +- [Enhancement] Provide `label` producer handler and report reference for improved traceability (mensfeld) +- [Enhancement] Include the error when invoking `create_result` on producer handle (mensfeld) - [Fix] Fix return type on `#rd_kafka_poll` (mensfeld) - [Fix] `uint8_t` does not exist on Apple Silicon (mensfeld) - [Fix] Missing ACL `RD_KAFKA_RESOURCE_BROKER` constant reference (mensfeld) diff --git a/lib/rdkafka/callbacks.rb b/lib/rdkafka/callbacks.rb index 055a961f..460a5636 100644 --- a/lib/rdkafka/callbacks.rb +++ b/lib/rdkafka/callbacks.rb @@ -289,7 +289,16 @@ def self.call(_, message_ptr, opaque_ptr) # Call delivery callback on opaque if opaque = Rdkafka::Config.opaques[opaque_ptr.to_i] - opaque.call_delivery_callback(Rdkafka::Producer::DeliveryReport.new(message[:partition], message[:offset], topic_name, message[:err]), delivery_handle) + opaque.call_delivery_callback( + Rdkafka::Producer::DeliveryReport.new( + message[:partition], + message[:offset], + topic_name, + message[:err], + delivery_handle.label + ), + delivery_handle + ) end end end diff --git a/lib/rdkafka/producer.rb b/lib/rdkafka/producer.rb index 01e49bc0..5a83499d 100644 --- a/lib/rdkafka/producer.rb +++ b/lib/rdkafka/producer.rb @@ -164,11 +164,12 @@ def partition_count(topic) # @param partition_key [String, nil] Optional partition key based on which partition assignment can happen # @param timestamp [Time,Integer,nil] Optional timestamp of this message. Integer timestamp is in milliseconds since Jan 1 1970. # @param headers [Hash] Optional message headers + # @param label [Object, nil] a label that can be assigned when producing a message that will be part of the delivery handle and the delivery report # # @return [DeliveryHandle] Delivery handle that can be used to wait for the result of producing this message # # @raise [RdkafkaError] When adding the message to rdkafka's queue failed - def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil) + def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil, label: nil) closed_producer_check(__method__) # Start by checking and converting the input @@ -210,6 +211,7 @@ def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, end delivery_handle = DeliveryHandle.new + delivery_handle.label = label delivery_handle[:pending] = true delivery_handle[:response] = -1 delivery_handle[:partition] = -1 diff --git a/lib/rdkafka/producer/delivery_handle.rb b/lib/rdkafka/producer/delivery_handle.rb index 09a557fa..6c80f820 100644 --- a/lib/rdkafka/producer/delivery_handle.rb +++ b/lib/rdkafka/producer/delivery_handle.rb @@ -11,6 +11,9 @@ class DeliveryHandle < Rdkafka::AbstractHandle :offset, :int64, :topic_name, :pointer + # @return [Object, nil] label set during message production or nil by default + attr_accessor :label + # @return [String] the name of the operation (e.g. "delivery") def operation_name "delivery" @@ -18,7 +21,13 @@ def operation_name # @return [DeliveryReport] a report on the delivery of the message def create_result - DeliveryReport.new(self[:partition], self[:offset], self[:topic_name].read_string) + DeliveryReport.new( + self[:partition], + self[:offset], + self[:topic_name].read_string, + self[:response] != 0 ? RdkafkaError.new(self[:response]) : nil, + label + ) end end end diff --git a/lib/rdkafka/producer/delivery_report.rb b/lib/rdkafka/producer/delivery_report.rb index 9b35c0ff..5d422643 100644 --- a/lib/rdkafka/producer/delivery_report.rb +++ b/lib/rdkafka/producer/delivery_report.rb @@ -20,6 +20,9 @@ class DeliveryReport # @return [Integer] attr_reader :error + # @return [Object, nil] label set during message production or nil by default + attr_reader :label + # We alias the `#topic_name` under `#topic` to make this consistent with `Consumer::Message` # where the topic name is under `#topic` method. That way we have a consistent name that # is present in both places @@ -29,11 +32,12 @@ class DeliveryReport private - def initialize(partition, offset, topic_name = nil, error = nil) + def initialize(partition, offset, topic_name = nil, error = nil, label = nil) @partition = partition @offset = offset @topic_name = topic_name @error = error + @label = label end end end diff --git a/spec/rdkafka/producer_spec.rb b/spec/rdkafka/producer_spec.rb index c8c0ccd0..a856463f 100644 --- a/spec/rdkafka/producer_spec.rb +++ b/spec/rdkafka/producer_spec.rb @@ -34,6 +34,7 @@ producer.delivery_callback = lambda do |report| expect(report).not_to be_nil + expect(report.label).to eq "label" expect(report.partition).to eq 1 expect(report.offset).to be >= 0 expect(report.topic_name).to eq "produce_test_topic" @@ -44,9 +45,12 @@ handle = producer.produce( topic: "produce_test_topic", payload: "payload", - key: "key" + key: "key", + label: "label" ) + expect(handle.label).to eq "label" + # Wait for it to be delivered handle.wait(max_wait_timeout: 15) @@ -175,11 +179,13 @@ def call(_, handle) handle = producer.produce( topic: "produce_test_topic", payload: "payload", - key: "key" + key: "key", + label: "label" ) # Should be pending at first expect(handle.pending?).to be true + expect(handle.label).to eq "label" # Check delivery handle and report report = handle.wait(max_wait_timeout: 5) @@ -187,6 +193,7 @@ def call(_, handle) expect(report).not_to be_nil expect(report.partition).to eq 1 expect(report.offset).to be >= 0 + expect(report.label).to eq "label" # Flush and close producer producer.flush @@ -558,6 +565,23 @@ def call(_, handle) end end + context "when not being able to deliver the message" do + let(:producer) do + rdkafka_producer_config( + "bootstrap.servers": "localhost:9093", + "message.timeout.ms": 100 + ).producer + end + + it "should contain the error in the response when not deliverable" do + handler = producer.produce(topic: 'produce_test_topic', payload: nil, label: 'na') + # Wait for the async callbacks and delivery registry to update + sleep(2) + expect(handler.create_result.error).to be_a(Rdkafka::RdkafkaError) + expect(handler.create_result.label).to eq('na') + end + end + describe '#partition_count' do it { expect(producer.partition_count('consume_test_topic')).to eq(3) }