From 0f8be33c2a497e1cbe2fcaea2a890aac0d7ff312 Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Tue, 26 Dec 2023 19:55:34 +0100 Subject: [PATCH 1/4] allow for reports and handles labeling --- CHANGELOG.md | 1 + lib/rdkafka/callbacks.rb | 10 ++++++++- lib/rdkafka/producer.rb | 4 +++- lib/rdkafka/producer/delivery_handle.rb | 11 +++++++++- lib/rdkafka/producer/delivery_report.rb | 6 +++++- spec/rdkafka/producer_spec.rb | 28 +++++++++++++++++++++++-- 6 files changed, 54 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5d140b30..7d8b0350 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## 0.15.1 (Unreleased) - [Enhancement] Replace `rd_kafka_offset_store` with `rd_kafka_offsets_store` (mensfeld) - [Enhancement] Alias `topic_name` as `topic` in the delivery report (mensfeld) +- [Enhancement] Provide `label` producer handler and report reference for improved traceability (mensfeld) - [Fix] Fix return type on `#rd_kafka_poll` (mensfeld) - [Fix] `uint8_t` does not exist on Apple Silicon (mensfeld) - [Fix] Missing ACL `RD_KAFKA_RESOURCE_BROKER` constant reference (mensfeld) diff --git a/lib/rdkafka/callbacks.rb b/lib/rdkafka/callbacks.rb index 055a961f..d4767835 100644 --- a/lib/rdkafka/callbacks.rb +++ b/lib/rdkafka/callbacks.rb @@ -289,7 +289,15 @@ def self.call(_, message_ptr, opaque_ptr) # Call delivery callback on opaque if opaque = Rdkafka::Config.opaques[opaque_ptr.to_i] - opaque.call_delivery_callback(Rdkafka::Producer::DeliveryReport.new(message[:partition], message[:offset], topic_name, message[:err]), delivery_handle) + opaque.call_delivery_callback( + Rdkafka::Producer::DeliveryReport.new( + message[:partition], + message[:offset], + topic_name, message[:err], + delivery_handle.label + ), + delivery_handle + ) end end end diff --git a/lib/rdkafka/producer.rb b/lib/rdkafka/producer.rb index 01e49bc0..5a83499d 100644 --- a/lib/rdkafka/producer.rb +++ b/lib/rdkafka/producer.rb @@ -164,11 +164,12 @@ def partition_count(topic) # @param partition_key [String, nil] Optional partition key based on which partition assignment can happen # @param timestamp [Time,Integer,nil] Optional timestamp of this message. Integer timestamp is in milliseconds since Jan 1 1970. # @param headers [Hash] Optional message headers + # @param label [Object, nil] a label that can be assigned when producing a message that will be part of the delivery handle and the delivery report # # @return [DeliveryHandle] Delivery handle that can be used to wait for the result of producing this message # # @raise [RdkafkaError] When adding the message to rdkafka's queue failed - def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil) + def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil, label: nil) closed_producer_check(__method__) # Start by checking and converting the input @@ -210,6 +211,7 @@ def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, end delivery_handle = DeliveryHandle.new + delivery_handle.label = label delivery_handle[:pending] = true delivery_handle[:response] = -1 delivery_handle[:partition] = -1 diff --git a/lib/rdkafka/producer/delivery_handle.rb b/lib/rdkafka/producer/delivery_handle.rb index 09a557fa..87956709 100644 --- a/lib/rdkafka/producer/delivery_handle.rb +++ b/lib/rdkafka/producer/delivery_handle.rb @@ -11,6 +11,9 @@ class DeliveryHandle < Rdkafka::AbstractHandle :offset, :int64, :topic_name, :pointer + # @return [Object, nil] label set during message production or nil by default + attr_accessor :label + # @return [String] the name of the operation (e.g. "delivery") def operation_name "delivery" @@ -18,7 +21,13 @@ def operation_name # @return [DeliveryReport] a report on the delivery of the message def create_result - DeliveryReport.new(self[:partition], self[:offset], self[:topic_name].read_string) + DeliveryReport.new( + self[:partition], + self[:offset], + self[:topic_name].read_string, + nil, + label + ) end end end diff --git a/lib/rdkafka/producer/delivery_report.rb b/lib/rdkafka/producer/delivery_report.rb index 9b35c0ff..5d422643 100644 --- a/lib/rdkafka/producer/delivery_report.rb +++ b/lib/rdkafka/producer/delivery_report.rb @@ -20,6 +20,9 @@ class DeliveryReport # @return [Integer] attr_reader :error + # @return [Object, nil] label set during message production or nil by default + attr_reader :label + # We alias the `#topic_name` under `#topic` to make this consistent with `Consumer::Message` # where the topic name is under `#topic` method. That way we have a consistent name that # is present in both places @@ -29,11 +32,12 @@ class DeliveryReport private - def initialize(partition, offset, topic_name = nil, error = nil) + def initialize(partition, offset, topic_name = nil, error = nil, label = nil) @partition = partition @offset = offset @topic_name = topic_name @error = error + @label = label end end end diff --git a/spec/rdkafka/producer_spec.rb b/spec/rdkafka/producer_spec.rb index c8c0ccd0..4ab38798 100644 --- a/spec/rdkafka/producer_spec.rb +++ b/spec/rdkafka/producer_spec.rb @@ -34,6 +34,7 @@ producer.delivery_callback = lambda do |report| expect(report).not_to be_nil + expect(report.label).to eq "label" expect(report.partition).to eq 1 expect(report.offset).to be >= 0 expect(report.topic_name).to eq "produce_test_topic" @@ -44,9 +45,12 @@ handle = producer.produce( topic: "produce_test_topic", payload: "payload", - key: "key" + key: "key", + label: "label" ) + expect(handle.label).to eq "label" + # Wait for it to be delivered handle.wait(max_wait_timeout: 15) @@ -175,11 +179,13 @@ def call(_, handle) handle = producer.produce( topic: "produce_test_topic", payload: "payload", - key: "key" + key: "key", + label: "label" ) # Should be pending at first expect(handle.pending?).to be true + expect(handle.label).to eq "label" # Check delivery handle and report report = handle.wait(max_wait_timeout: 5) @@ -187,6 +193,7 @@ def call(_, handle) expect(report).not_to be_nil expect(report.partition).to eq 1 expect(report.offset).to be >= 0 + expect(report.label).to eq "label" # Flush and close producer producer.flush @@ -556,6 +563,23 @@ def call(_, handle) }.to raise_exception(Rdkafka::ClosedProducerError, /#{method.to_s}/) end end + + context "when not being able to deliver the message" do + let(:producer) do + rdkafka_producer_config( + "bootstrap.servers": "localhost:9093", + "message.timeout.ms": 100 + ).producer + end + + it "should contain the error in the response when not deliverable" do + handler = producer.produce(topic: 'produce_test_topic', payload: nil, label: 'na') + # Wait for the async callbacks and delivery registry to update + sleep(2) + expect(handler.create_result.error).to be_a(Rdkafka::RdkafkaError) + expect(handler.create_result.label).to eq('na') + end + end end describe '#partition_count' do From 162ff8aaae30849fe91b19228fbba2090f408827 Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Tue, 26 Dec 2023 19:59:10 +0100 Subject: [PATCH 2/4] remarks --- lib/rdkafka/callbacks.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/rdkafka/callbacks.rb b/lib/rdkafka/callbacks.rb index d4767835..460a5636 100644 --- a/lib/rdkafka/callbacks.rb +++ b/lib/rdkafka/callbacks.rb @@ -293,7 +293,8 @@ def self.call(_, message_ptr, opaque_ptr) Rdkafka::Producer::DeliveryReport.new( message[:partition], message[:offset], - topic_name, message[:err], + topic_name, + message[:err], delivery_handle.label ), delivery_handle From a1abbe7faa59cbdb788a1863215e9f5f0e2e0dd4 Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Tue, 26 Dec 2023 20:04:31 +0100 Subject: [PATCH 3/4] remarks --- CHANGELOG.md | 1 + lib/rdkafka/producer/delivery_handle.rb | 2 +- spec/rdkafka/producer_spec.rb | 28 ++++++++++++------------- 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d8b0350..f4e073ef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - [Enhancement] Replace `rd_kafka_offset_store` with `rd_kafka_offsets_store` (mensfeld) - [Enhancement] Alias `topic_name` as `topic` in the delivery report (mensfeld) - [Enhancement] Provide `label` producer handler and report reference for improved traceability (mensfeld) +- [Enhancement] Include the error when invoking `create_result` on producer handle (menseld) - [Fix] Fix return type on `#rd_kafka_poll` (mensfeld) - [Fix] `uint8_t` does not exist on Apple Silicon (mensfeld) - [Fix] Missing ACL `RD_KAFKA_RESOURCE_BROKER` constant reference (mensfeld) diff --git a/lib/rdkafka/producer/delivery_handle.rb b/lib/rdkafka/producer/delivery_handle.rb index 87956709..6c80f820 100644 --- a/lib/rdkafka/producer/delivery_handle.rb +++ b/lib/rdkafka/producer/delivery_handle.rb @@ -25,7 +25,7 @@ def create_result self[:partition], self[:offset], self[:topic_name].read_string, - nil, + self[:response] != 0 ? RdkafkaError.new(self[:response]) : nil, label ) end diff --git a/spec/rdkafka/producer_spec.rb b/spec/rdkafka/producer_spec.rb index 4ab38798..a856463f 100644 --- a/spec/rdkafka/producer_spec.rb +++ b/spec/rdkafka/producer_spec.rb @@ -563,22 +563,22 @@ def call(_, handle) }.to raise_exception(Rdkafka::ClosedProducerError, /#{method.to_s}/) end end + end - context "when not being able to deliver the message" do - let(:producer) do - rdkafka_producer_config( - "bootstrap.servers": "localhost:9093", - "message.timeout.ms": 100 - ).producer - end + context "when not being able to deliver the message" do + let(:producer) do + rdkafka_producer_config( + "bootstrap.servers": "localhost:9093", + "message.timeout.ms": 100 + ).producer + end - it "should contain the error in the response when not deliverable" do - handler = producer.produce(topic: 'produce_test_topic', payload: nil, label: 'na') - # Wait for the async callbacks and delivery registry to update - sleep(2) - expect(handler.create_result.error).to be_a(Rdkafka::RdkafkaError) - expect(handler.create_result.label).to eq('na') - end + it "should contain the error in the response when not deliverable" do + handler = producer.produce(topic: 'produce_test_topic', payload: nil, label: 'na') + # Wait for the async callbacks and delivery registry to update + sleep(2) + expect(handler.create_result.error).to be_a(Rdkafka::RdkafkaError) + expect(handler.create_result.label).to eq('na') end end From 3dde16f8c5c1644050eaca85a402949f6b093afa Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Tue, 26 Dec 2023 20:06:09 +0100 Subject: [PATCH 4/4] fix typo in my own name --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f4e073ef..2e9dc61c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ - [Enhancement] Replace `rd_kafka_offset_store` with `rd_kafka_offsets_store` (mensfeld) - [Enhancement] Alias `topic_name` as `topic` in the delivery report (mensfeld) - [Enhancement] Provide `label` producer handler and report reference for improved traceability (mensfeld) -- [Enhancement] Include the error when invoking `create_result` on producer handle (menseld) +- [Enhancement] Include the error when invoking `create_result` on producer handle (mensfeld) - [Fix] Fix return type on `#rd_kafka_poll` (mensfeld) - [Fix] `uint8_t` does not exist on Apple Silicon (mensfeld) - [Fix] Missing ACL `RD_KAFKA_RESOURCE_BROKER` constant reference (mensfeld)