Skip to content

Commit

Permalink
Ensure sockets are closed in specs
Browse files Browse the repository at this point in the history
  • Loading branch information
Adithya Pentela committed Feb 16, 2020
1 parent 3e5331a commit 6d7b472
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 23 deletions.
3 changes: 3 additions & 0 deletions lib/rdkafka/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@ def delivery_callback=(callback)

# Close this producer and wait for the internal poll queue to empty.
def close
return if @closed

# Indicate to polling thread that we're closing
@closing = true
# Wait for the polling thread to finish up
@polling_thread.join
Rdkafka::Bindings.rd_kafka_destroy(@native_kafka)
@closed = true
end

# Produces a message to a Kafka topic. The message is added to rdkafka's queue, call {DeliveryHandle#wait wait} on the returned delivery handle to make sure it is delivered.
Expand Down
8 changes: 6 additions & 2 deletions spec/rdkafka/config_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@
end

it "should create a consumer with valid config" do
expect(rdkafka_config.consumer).to be_a Rdkafka::Consumer
consumer = rdkafka_config.consumer
expect(consumer).to be_a Rdkafka::Consumer
consumer.close
end

it "should raise an error when creating a consumer with invalid config" do
Expand All @@ -76,7 +78,9 @@
end

it "should create a producer with valid config" do
expect(rdkafka_config.producer).to be_a Rdkafka::Producer
producer = rdkafka_config.producer
expect(producer).to be_a Rdkafka::Producer
producer.close
end

it "should raise an error when creating a producer with invalid config" do
Expand Down
7 changes: 6 additions & 1 deletion spec/rdkafka/consumer/message_spec.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
require "spec_helper"

describe Rdkafka::Consumer::Message do
let(:native_topic) { new_native_topic }
let(:native_client) { new_native_client }
let(:native_topic) { new_native_topic(native_client: native_client) }
let(:payload) { nil }
let(:key) { nil }
let(:native_message) do
Expand All @@ -24,6 +25,10 @@
end
end

after(:each) do
Rdkafka::Bindings.rd_kafka_destroy(native_client)
end

subject { Rdkafka::Consumer::Message.new(native_message) }

before do
Expand Down
23 changes: 15 additions & 8 deletions spec/rdkafka/consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,9 @@ def send_one_message(val)
}.to raise_error TypeError
end

context "with a commited consumer" do
context "with a committed consumer" do
before :all do
# Make sure there are some message
# Make sure there are some messages.
handles = []
producer = rdkafka_config.producer
10.times do
Expand All @@ -329,6 +329,7 @@ def send_one_message(val)
end
end
handles.each(&:wait)
producer.close
end

before do
Expand Down Expand Up @@ -389,28 +390,34 @@ def send_one_message(val)

describe "#store_offset" do
before do
config = {}
config[:'enable.auto.offset.store'] = false
config[:'enable.auto.commit'] = false
consumer.subscribe("consume_test_topic")
wait_for_assignment(consumer)
@new_consumer = rdkafka_config(config).consumer
@new_consumer.subscribe("consume_test_topic")
wait_for_assignment(@new_consumer)
end

after do
@new_consumer.close
end

it "should store the offset for a message" do
consumer.store_offset(message)
consumer.commit
@new_consumer.store_offset(message)
@new_consumer.commit

list = Rdkafka::Consumer::TopicPartitionList.new.tap do |list|
list.add_topic("consume_test_topic", [0, 1, 2])
end
partitions = consumer.committed(list).to_h["consume_test_topic"]
partitions = @new_consumer.committed(list).to_h["consume_test_topic"]
expect(partitions).not_to be_nil
expect(partitions[message.partition].offset).to eq(message.offset + 1)
end

it "should raise an error with invalid input" do
allow(message).to receive(:partition).and_return(9999)
expect {
consumer.store_offset(message)
@new_consumer.store_offset(message)
}.to raise_error Rdkafka::RdkafkaError
end
end
Expand Down
33 changes: 23 additions & 10 deletions spec/rdkafka/producer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

describe Rdkafka::Producer do
let(:producer) { rdkafka_config.producer }
let(:consumer) { rdkafka_config.consumer }

after do
# Registry should always end up being empty
expect(Rdkafka::Producer::DeliveryHandle::REGISTRY).to be_empty
producer.close
consumer.close
end

context "delivery callback" do
Expand Down Expand Up @@ -82,7 +85,8 @@
# Consume message and verify it's content
message = wait_for_message(
topic: "produce_test_topic",
delivery_report: report
delivery_report: report,
consumer: consumer
)
expect(message.partition).to eq 1
expect(message.payload).to eq "payload"
Expand All @@ -105,7 +109,8 @@
# Consume message and verify it's content
message = wait_for_message(
topic: "produce_test_topic",
delivery_report: report
delivery_report: report,
consumer: consumer
)
expect(message.partition).to eq 1
expect(message.key).to eq "key partition"
Expand All @@ -122,7 +127,8 @@
# Consume message and verify it's content
message = wait_for_message(
topic: "produce_test_topic",
delivery_report: report
delivery_report: report,
consumer: consumer
)

expect(message.partition).to eq 1
Expand Down Expand Up @@ -154,7 +160,8 @@
# Consume message and verify it's content
message = wait_for_message(
topic: "produce_test_topic",
delivery_report: report
delivery_report: report,
consumer: consumer
)

expect(message.partition).to eq 2
Expand All @@ -174,7 +181,8 @@
# Consume message and verify it's content
message = wait_for_message(
topic: "produce_test_topic",
delivery_report: report
delivery_report: report,
consumer: consumer
)

expect(message.partition).to eq 2
Expand All @@ -193,7 +201,8 @@
# Consume message and verify it's content
message = wait_for_message(
topic: "produce_test_topic",
delivery_report: report
delivery_report: report,
consumer: consumer
)

expect(message.key).to be_nil
Expand All @@ -210,7 +219,8 @@
# Consume message and verify it's content
message = wait_for_message(
topic: "produce_test_topic",
delivery_report: report
delivery_report: report,
consumer: consumer
)

expect(message.key).to eq "key no payload"
Expand All @@ -229,7 +239,8 @@
# Consume message and verify it's content
message = wait_for_message(
topic: "produce_test_topic",
delivery_report: report
delivery_report: report,
consumer: consumer
)

expect(message.payload).to eq "payload headers"
Expand All @@ -251,7 +262,8 @@
# Consume message and verify it's content
message = wait_for_message(
topic: "produce_test_topic",
delivery_report: report
delivery_report: report,
consumer: consumer
)

expect(message.payload).to eq "payload headers"
Expand Down Expand Up @@ -325,7 +337,8 @@
# Consume message and verify it's content
message = wait_for_message(
topic: "produce_test_topic",
delivery_report: report
delivery_report: report,
consumer: consumer
)
expect(message.partition).to eq 0
expect(message.payload).to eq "payload-forked"
Expand Down
7 changes: 5 additions & 2 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ def rdkafka_config(config_overrides={})
Rdkafka::Config.new(config)
end

def native_client
def new_native_client
config = rdkafka_config
config.send(:native_kafka, config.send(:native_config), :rd_kafka_producer)
end

def new_native_topic(topic_name="topic_name")
def new_native_topic(topic_name="topic_name", native_client: )
Rdkafka::Bindings.rd_kafka_topic_new(
native_client,
topic_name,
Expand All @@ -46,6 +46,7 @@ def new_native_topic(topic_name="topic_name")
end

def wait_for_message(topic:, delivery_report:, timeout_in_seconds: 30, consumer: nil)
new_consumer = !!consumer
consumer = rdkafka_config.consumer if consumer.nil?
consumer.subscribe(topic)
timeout = Time.now.to_i + timeout_in_seconds
Expand All @@ -60,6 +61,8 @@ def wait_for_message(topic:, delivery_report:, timeout_in_seconds: 30, consumer:
return message
end
end
ensure
consumer.close if new_consumer
end

def wait_for_assignment(consumer)
Expand Down

0 comments on commit 6d7b472

Please sign in to comment.