Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#92 Allow specifying target consumer_group when creating a test consumer #93

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Karafka Test gem changelog

## 2.0.2 (Unreleased)
- Provide multi-consumer group testing support (#92)
- Fail fast if requested topic is present in multiple consumer groups but consumer group is not specified.

## 2.0.1 (2022-08-05)
- Require non rc version of Karafka.

Expand Down
5 changes: 2 additions & 3 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
PATH
remote: .
specs:
karafka-testing (2.0.1)
karafka-testing (2.0.2)
karafka (>= 2.0, < 3.0.0)

GEM
remote: https://rubygems.org/
specs:
concurrent-ruby (1.1.10)
ffi (1.15.5)
karafka (2.0.8)
karafka (2.0.10)
karafka-core (>= 2.0.2, < 3.0.0)
rdkafka (>= 0.12)
thor (>= 0.20)
Expand All @@ -31,7 +31,6 @@ GEM
zeitwerk (2.6.0)

PLATFORMS
arm64-darwin
x86_64-linux

DEPENDENCIES
Expand Down
26 changes: 0 additions & 26 deletions certs/cert_chain.pem

This file was deleted.

4 changes: 4 additions & 0 deletions lib/karafka/testing/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ module Testing
module Errors
# Base error for all the internal errors
BaseError = Class.new(StandardError)

# Raised when we want to build a consumer for a topic that does not exist
TopicNotFoundError = Class.new(BaseError)

# Raised when topic is in many consumer groups and not limited by consumer group expectation
TopicInManyConsumerGroupsError = Class.new(BaseError)
end
end
end
44 changes: 29 additions & 15 deletions lib/karafka/testing/rspec/helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ def included(base)
#
# @param requested_topic [String, Symbol] name of the topic for which we want to
# create a consumer instance
# @param requested_consumer_group [String, Symbol, nil] optional name of the consumer group
# if we have multiple consumer groups listening on the same topic
# @return [Object] described_class instance
# @raise [Karafka::Testing::Errors::TopicNotFoundError] raised when we're unable to find
# topic that was requested
Expand All @@ -36,26 +38,23 @@ def included(base)
# RSpec.describe MyConsumer do
# subject(:consumer) { karafka.consumer_for(:my_requested_topic) }
# end
def karafka_consumer_for(requested_topic)
selected_topic = nil
def karafka_consumer_for(requested_topic, requested_consumer_group = nil)
all_topics = ::Karafka::App.consumer_groups.map(&:topics).flat_map(&:to_a)

# @note Remove in 2.1. This won't work without the global state
::Karafka::App.consumer_groups.each do |consumer_group|
consumer_group.topics.each do |topic|
selected_topic = topic if topic.name == requested_topic.to_s
end
# First select topics that match what we are looking for
selected_topics = all_topics.select do |topic|
topic.name == requested_topic.to_s
end

raise Karafka::Testing::Errors::TopicNotFoundError, requested_topic unless selected_topic
# And then narrow it down based on the consumer group criteria (if present)
selected_topics.delete_if do |topic|
requested_consumer_group && topic.consumer_group.name != requested_consumer_group.to_s
end

coordinators = Karafka::Processing::CoordinatorsBuffer.new
raise Errors::TopicInManyConsumerGroupsError, requested_topic if selected_topics.size > 1
raise Errors::TopicNotFoundError, requested_topic if selected_topics.empty?

consumer = described_class.new
consumer.topic = selected_topic
consumer.producer = Karafka::App.producer
consumer.client = Karafka::Testing::DummyClient.new
consumer.coordinator = coordinators.find_or_create(requested_topic, 0)
consumer
karafka_build_consumer_for(selected_topics.first)
end

# Adds a new Karafka message instance with given payload and options into an internal
Expand Down Expand Up @@ -108,6 +107,21 @@ def karafka_message_metadata_defaults
topic: subject.topic.name
}
end

# Builds the consumer instance based on the provided topic
#
# @param topic [Karafka::Routing::Topic] topic for which we want to build the consumer
# @return [Object] karafka consumer
def karafka_build_consumer_for(topic)
coordinators = Karafka::Processing::CoordinatorsBuffer.new

consumer = described_class.new
consumer.topic = topic
consumer.producer = Karafka::App.producer
consumer.client = Karafka::Testing::DummyClient.new
consumer.coordinator = coordinators.find_or_create(topic.name, 0)
consumer
end
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/karafka/testing/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@
module Karafka
module Testing
# Current version of gem. It should match Karafka framework version
VERSION = '2.0.1'
VERSION = '2.0.2'
end
end