diff --git a/lib/rdkafka/admin.rb b/lib/rdkafka/admin.rb index 8b1986d3..7da9b3d5 100644 --- a/lib/rdkafka/admin.rb +++ b/lib/rdkafka/admin.rb @@ -18,11 +18,16 @@ def finalizer # Close this admin instance def close + return if closed? ObjectSpace.undefine_finalizer(self) - @native_kafka.close end + # Whether this admin has closed + def closed? + @native_kafka.closed? + end + # Create a topic with the given partition count and replication factor # # @raise [ConfigError] When the partition count or replication factor are out of valid range @@ -149,7 +154,7 @@ def delete_topic(topic_name) private def closed_admin_check(method) - raise Rdkafka::ClosedAdminError.new(method) if @native_kafka.closed? + raise Rdkafka::ClosedAdminError.new(method) if closed? end end end diff --git a/lib/rdkafka/bindings.rb b/lib/rdkafka/bindings.rb index d51f74e7..379373b6 100644 --- a/lib/rdkafka/bindings.rb +++ b/lib/rdkafka/bindings.rb @@ -169,7 +169,9 @@ class TopicPartitionList < FFI::Struct ] attach_function :rd_kafka_new, [:kafka_type, :pointer, :pointer, :int], :pointer - attach_function :rd_kafka_destroy, [:pointer], :void + + RD_KAFKA_DESTROY_F_IMMEDIATE = 0x4 + attach_function :rd_kafka_destroy_flags, [:pointer, :int], :void # Consumer diff --git a/lib/rdkafka/config.rb b/lib/rdkafka/config.rb index 47fce820..cad5d0e1 100644 --- a/lib/rdkafka/config.rb +++ b/lib/rdkafka/config.rb @@ -157,13 +157,14 @@ def consumer Rdkafka::Bindings.rd_kafka_conf_set_rebalance_cb(config, Rdkafka::Bindings::RebalanceCallback) end + # Create native client kafka = native_kafka(config, :rd_kafka_consumer) # Redirect the main queue to the consumer Rdkafka::Bindings.rd_kafka_poll_set_consumer(kafka) # Return consumer with Kafka client - Rdkafka::Consumer.new(kafka) + Rdkafka::Consumer.new(Rdkafka::NativeKafka.new(kafka, run_polling_thread: false)) end # Create a producer with this configuration. @@ -181,7 +182,7 @@ def producer Rdkafka::Bindings.rd_kafka_conf_set_dr_msg_cb(config, Rdkafka::Callbacks::DeliveryCallbackFunction) # Return producer with Kafka client partitioner_name = self[:partitioner] || self["partitioner"] - Rdkafka::Producer.new(Rdkafka::NativeKafka.new(native_kafka(config, :rd_kafka_producer)), partitioner_name).tap do |producer| + Rdkafka::Producer.new(Rdkafka::NativeKafka.new(native_kafka(config, :rd_kafka_producer), run_polling_thread: true), partitioner_name).tap do |producer| opaque.producer = producer end end @@ -196,7 +197,7 @@ def admin opaque = Opaque.new config = native_config(opaque) Rdkafka::Bindings.rd_kafka_conf_set_background_event_cb(config, Rdkafka::Callbacks::BackgroundEventCallbackFunction) - Rdkafka::Admin.new(Rdkafka::NativeKafka.new(native_kafka(config, :rd_kafka_producer))) + Rdkafka::Admin.new(Rdkafka::NativeKafka.new(native_kafka(config, :rd_kafka_producer), run_polling_thread: true)) end # Error that is returned by the underlying rdkafka error if an invalid configuration option is present. diff --git a/lib/rdkafka/consumer.rb b/lib/rdkafka/consumer.rb index c58d62de..5cd4ac28 100644 --- a/lib/rdkafka/consumer.rb +++ b/lib/rdkafka/consumer.rb @@ -26,15 +26,14 @@ def finalizer # @return [nil] def close return if closed? - - Rdkafka::Bindings.rd_kafka_consumer_close(@native_kafka) - Rdkafka::Bindings.rd_kafka_destroy(@native_kafka) - @native_kafka = nil + ObjectSpace.undefine_finalizer(self) + Rdkafka::Bindings.rd_kafka_consumer_close(@native_kafka.inner) + @native_kafka.close end # Whether this consumer has closed def closed? - @native_kafka.nil? + @native_kafka.closed? end # Subscribe to one or more topics letting Kafka handle partition assignments. @@ -55,7 +54,7 @@ def subscribe(*topics) end # Subscribe to topic partition list and check this was successful - response = Rdkafka::Bindings.rd_kafka_subscribe(@native_kafka, tpl) + response = Rdkafka::Bindings.rd_kafka_subscribe(@native_kafka.inner, tpl) if response != 0 raise Rdkafka::RdkafkaError.new(response, "Error subscribing to '#{topics.join(', ')}'") end @@ -71,7 +70,7 @@ def subscribe(*topics) def unsubscribe closed_consumer_check(__method__) - response = Rdkafka::Bindings.rd_kafka_unsubscribe(@native_kafka) + response = Rdkafka::Bindings.rd_kafka_unsubscribe(@native_kafka.inner) if response != 0 raise Rdkafka::RdkafkaError.new(response) end @@ -94,7 +93,7 @@ def pause(list) tpl = list.to_native_tpl begin - response = Rdkafka::Bindings.rd_kafka_pause_partitions(@native_kafka, tpl) + response = Rdkafka::Bindings.rd_kafka_pause_partitions(@native_kafka.inner, tpl) if response != 0 list = TopicPartitionList.from_native_tpl(tpl) @@ -122,7 +121,7 @@ def resume(list) tpl = list.to_native_tpl begin - response = Rdkafka::Bindings.rd_kafka_resume_partitions(@native_kafka, tpl) + response = Rdkafka::Bindings.rd_kafka_resume_partitions(@native_kafka.inner, tpl) if response != 0 raise Rdkafka::RdkafkaError.new(response, "Error resume '#{list.to_h}'") end @@ -140,7 +139,7 @@ def subscription closed_consumer_check(__method__) ptr = FFI::MemoryPointer.new(:pointer) - response = Rdkafka::Bindings.rd_kafka_subscription(@native_kafka, ptr) + response = Rdkafka::Bindings.rd_kafka_subscription(@native_kafka.inner, ptr) if response != 0 raise Rdkafka::RdkafkaError.new(response) @@ -170,7 +169,7 @@ def assign(list) tpl = list.to_native_tpl begin - response = Rdkafka::Bindings.rd_kafka_assign(@native_kafka, tpl) + response = Rdkafka::Bindings.rd_kafka_assign(@native_kafka.inner, tpl) if response != 0 raise Rdkafka::RdkafkaError.new(response, "Error assigning '#{list.to_h}'") end @@ -188,7 +187,7 @@ def assignment closed_consumer_check(__method__) ptr = FFI::MemoryPointer.new(:pointer) - response = Rdkafka::Bindings.rd_kafka_assignment(@native_kafka, ptr) + response = Rdkafka::Bindings.rd_kafka_assignment(@native_kafka.inner, ptr) if response != 0 raise Rdkafka::RdkafkaError.new(response) end @@ -227,7 +226,7 @@ def committed(list=nil, timeout_ms=1200) tpl = list.to_native_tpl begin - response = Rdkafka::Bindings.rd_kafka_committed(@native_kafka, tpl, timeout_ms) + response = Rdkafka::Bindings.rd_kafka_committed(@native_kafka.inner, tpl, timeout_ms) if response != 0 raise Rdkafka::RdkafkaError.new(response) end @@ -253,7 +252,7 @@ def query_watermark_offsets(topic, partition, timeout_ms=200) high = FFI::MemoryPointer.new(:int64, 1) response = Rdkafka::Bindings.rd_kafka_query_watermark_offsets( - @native_kafka, + @native_kafka.inner, topic, partition, low, @@ -307,7 +306,7 @@ def lag(topic_partition_list, watermark_timeout_ms=100) # @return [String, nil] def cluster_id closed_consumer_check(__method__) - Rdkafka::Bindings.rd_kafka_clusterid(@native_kafka) + Rdkafka::Bindings.rd_kafka_clusterid(@native_kafka.inner) end # Returns this client's broker-assigned group member id @@ -317,7 +316,7 @@ def cluster_id # @return [String, nil] def member_id closed_consumer_check(__method__) - Rdkafka::Bindings.rd_kafka_memberid(@native_kafka) + Rdkafka::Bindings.rd_kafka_memberid(@native_kafka.inner) end # Store offset of a message to be used in the next commit of this consumer @@ -335,7 +334,7 @@ def store_offset(message) # rd_kafka_offset_store is one of the few calls that does not support # a string as the topic, so create a native topic for it. native_topic = Rdkafka::Bindings.rd_kafka_topic_new( - @native_kafka, + @native_kafka.inner, message.topic, nil ) @@ -367,7 +366,7 @@ def seek(message) # rd_kafka_offset_store is one of the few calls that does not support # a string as the topic, so create a native topic for it. native_topic = Rdkafka::Bindings.rd_kafka_topic_new( - @native_kafka, + @native_kafka.inner, message.topic, nil ) @@ -411,7 +410,7 @@ def commit(list=nil, async=false) tpl = list ? list.to_native_tpl : nil begin - response = Rdkafka::Bindings.rd_kafka_commit(@native_kafka, tpl, async) + response = Rdkafka::Bindings.rd_kafka_commit(@native_kafka.inner, tpl, async) if response != 0 raise Rdkafka::RdkafkaError.new(response) end @@ -430,7 +429,7 @@ def commit(list=nil, async=false) def poll(timeout_ms) closed_consumer_check(__method__) - message_ptr = Rdkafka::Bindings.rd_kafka_consumer_poll(@native_kafka, timeout_ms) + message_ptr = Rdkafka::Bindings.rd_kafka_consumer_poll(@native_kafka.inner, timeout_ms) if message_ptr.null? nil else @@ -445,7 +444,7 @@ def poll(timeout_ms) end ensure # Clean up rdkafka message if there is one - if !message_ptr.nil? && !message_ptr.null? + if message_ptr && !message_ptr.null? Rdkafka::Bindings.rd_kafka_message_destroy(message_ptr) end end diff --git a/lib/rdkafka/native_kafka.rb b/lib/rdkafka/native_kafka.rb index 675778bc..155acfb8 100644 --- a/lib/rdkafka/native_kafka.rb +++ b/lib/rdkafka/native_kafka.rb @@ -4,21 +4,26 @@ module Rdkafka # @private # A wrapper around a native kafka that polls and cleanly exits class NativeKafka - def initialize(inner) + def initialize(inner, run_polling_thread:) @inner = inner - # Start thread to poll client for delivery callbacks - @polling_thread = Thread.new do - loop do - Rdkafka::Bindings.rd_kafka_poll(inner, 250) - # Exit thread if closing and the poll queue is empty - if Thread.current[:closing] && Rdkafka::Bindings.rd_kafka_outq_len(inner) == 0 - break + if run_polling_thread + # Start thread to poll client for delivery callbacks, + # not used in consumer. + @polling_thread = Thread.new do + loop do + Rdkafka::Bindings.rd_kafka_poll(inner, 250) + # Exit thread if closing and the poll queue is empty + if Thread.current[:closing] && Rdkafka::Bindings.rd_kafka_outq_len(inner) == 0 + break + end end end + @polling_thread.abort_on_exception = true + @polling_thread[:closing] = false end - @polling_thread.abort_on_exception = true - @polling_thread[:closing] = false + + @closing = false end def inner @@ -30,22 +35,27 @@ def finalizer end def closed? - @inner.nil? + @closing || @inner.nil? end def close(object_id=nil) return if closed? - # Flush outstanding activity - Rdkafka::Bindings.rd_kafka_flush(@inner, 30 * 1000) - - # Indicate to polling thread that we're closing - @polling_thread[:closing] = true - # Wait for the polling thread to finish up - @polling_thread.join + # Indicate to the outside world that we are closing + @closing = true - Rdkafka::Bindings.rd_kafka_destroy(@inner) + if @polling_thread + # Indicate to polling thread that we're closing + @polling_thread[:closing] = true + # Wait for the polling thread to finish up + @polling_thread.join + end + # Destroy the client + Rdkafka::Bindings.rd_kafka_destroy_flags( + @inner, + Rdkafka::Bindings::RD_KAFKA_DESTROY_F_IMMEDIATE + ) @inner = nil end end diff --git a/lib/rdkafka/producer.rb b/lib/rdkafka/producer.rb index 01a7a57c..ed85c41a 100644 --- a/lib/rdkafka/producer.rb +++ b/lib/rdkafka/producer.rb @@ -40,11 +40,23 @@ def delivery_callback=(callback) # Close this producer and wait for the internal poll queue to empty. def close + return if closed? ObjectSpace.undefine_finalizer(self) - @native_kafka.close end + # Whether this producer has closed + def closed? + @native_kafka.closed? + end + + # Wait until all outstanding producer requests are completed, with the given timeout + # in seconds. Call this before closing a producer to ensure delivery of all messages. + def flush(timeout_ms=5_000) + closed_producer_check(__method__) + Rdkafka::Bindings.rd_kafka_flush(@native_kafka.inner, timeout_ms) + end + # Partition count for a given topic. # NOTE: If 'allow.auto.create.topics' is set to true in the broker, the topic will be auto-created after returning nil. # @@ -173,7 +185,7 @@ def arity(callback) private def closed_producer_check(method) - raise Rdkafka::ClosedProducerError.new(method) if @native_kafka.closed? + raise Rdkafka::ClosedProducerError.new(method) if closed? end end end diff --git a/spec/rdkafka/consumer/message_spec.rb b/spec/rdkafka/consumer/message_spec.rb index aa184cf7..84007a0a 100644 --- a/spec/rdkafka/consumer/message_spec.rb +++ b/spec/rdkafka/consumer/message_spec.rb @@ -28,7 +28,7 @@ end after(:each) do - Rdkafka::Bindings.rd_kafka_destroy(native_client) + Rdkafka::Bindings.rd_kafka_destroy_flags(native_client, Rdkafka::Bindings::RD_KAFKA_DESTROY_F_IMMEDIATE) end subject { Rdkafka::Consumer::Message.new(native_message) } diff --git a/spec/rdkafka/metadata_spec.rb b/spec/rdkafka/metadata_spec.rb index be6ae749..17aaa29d 100644 --- a/spec/rdkafka/metadata_spec.rb +++ b/spec/rdkafka/metadata_spec.rb @@ -10,7 +10,7 @@ after do Rdkafka::Bindings.rd_kafka_consumer_close(native_kafka) - Rdkafka::Bindings.rd_kafka_destroy(native_kafka) + Rdkafka::Bindings.rd_kafka_destroy_flags(native_kafka, Rdkafka::Bindings::RD_KAFKA_DESTROY_F_IMMEDIATE) end context "passing in a topic name" do diff --git a/spec/rdkafka/native_kafka_spec.rb b/spec/rdkafka/native_kafka_spec.rb index 5ae9b53b..58438035 100644 --- a/spec/rdkafka/native_kafka_spec.rb +++ b/spec/rdkafka/native_kafka_spec.rb @@ -8,12 +8,12 @@ let(:closing) { false } let(:thread) { double(Thread) } - subject(:client) { described_class.new(native) } + subject(:client) { described_class.new(native, run_polling_thread: true) } before do allow(Rdkafka::Bindings).to receive(:rd_kafka_poll).with(instance_of(FFI::Pointer), 250).and_call_original allow(Rdkafka::Bindings).to receive(:rd_kafka_outq_len).with(instance_of(FFI::Pointer)).and_return(0).and_call_original - allow(Rdkafka::Bindings).to receive(:rd_kafka_destroy) + allow(Rdkafka::Bindings).to receive(:rd_kafka_destroy_flags) allow(Thread).to receive(:new).and_return(thread) allow(thread).to receive(:[]=).with(:closing, anything) @@ -53,6 +53,16 @@ expect(Rdkafka::Bindings).to receive(:rd_kafka_outq_len).with(native).at_least(:once) end end + + context "if not enabled" do + subject(:client) { described_class.new(native, run_polling_thread: false) } + + it "is not created" do + expect(Thread).not_to receive(:new) + + client + end + end end def polling_loop_expects(&block) @@ -76,7 +86,7 @@ def polling_loop_expects(&block) context "and attempt to close" do it "calls the `destroy` binding" do - expect(Rdkafka::Bindings).to receive(:rd_kafka_destroy).with(native) + expect(Rdkafka::Bindings).to receive(:rd_kafka_destroy_flags).with(native, Rdkafka::Bindings::RD_KAFKA_DESTROY_F_IMMEDIATE) client.close end @@ -111,7 +121,7 @@ def polling_loop_expects(&block) context "and attempt to close again" do it "does not call the `destroy` binding" do - expect(Rdkafka::Bindings).not_to receive(:rd_kafka_destroy) + expect(Rdkafka::Bindings).not_to receive(:rd_kafka_destroy_flags) client.close end diff --git a/spec/rdkafka/producer_spec.rb b/spec/rdkafka/producer_spec.rb index cd0d969c..1c61a225 100644 --- a/spec/rdkafka/producer_spec.rb +++ b/spec/rdkafka/producer_spec.rb @@ -185,7 +185,8 @@ def call(_, handle) expect(report.partition).to eq 1 expect(report.offset).to be >= 0 - # Close producer + # Flush and close producer + producer.flush producer.close # Consume message and verify its content @@ -459,10 +460,10 @@ def call(_, handle) # wait for and check the message in the main process. reader, writer = IO.pipe - fork do + pid = fork do reader.close - # Avoids sharing the socket between processes. + # Avoid sharing the client between processes. producer = rdkafka_producer_config.producer handle = producer.produce( @@ -481,8 +482,10 @@ def call(_, handle) writer.write(report_json) writer.close + producer.flush producer.close end + Process.wait(pid) writer.close report_hash = JSON.parse(reader.read)