Skip to content

Commit

Permalink
Refactor a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
dasch committed Aug 26, 2020
1 parent cefeeea commit e367b1e
Showing 1 changed file with 38 additions and 40 deletions.
78 changes: 38 additions & 40 deletions spec/integration/consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,43 @@
require "racecar/cli"
require "racecar/ctl"

def generate_token
SecureRandom.hex(8)
end

class EchoConsumer < Racecar::Consumer
subscribes_to "input"

self.group_id = "test-consumer-#{SecureRandom.hex(8)}"
self.group_id = "echo-consumer-#{generate_token}"

def process(message)
produce message.value, key: message.key, topic: "output"
end
end

module IntegrationSupport
INCOMING_MESSAGES = []

CONSUMER = Thread.new do
consumer = Rdkafka::Config.new({
"bootstrap.servers": Racecar.config.brokers.join(","),
"client.id": Racecar.config.client_id,
"group.id": "racecar-tests",
}.merge(Racecar.config.rdkafka_consumer)).consumer
class ResultsConsumer < Racecar::Consumer
subscribes_to "output"

consumer.subscribe("output")
self.group_id = "results-consumer-#{generate_token}"

consumer.each do |message|
puts "Received message #{message}"
INCOMING_MESSAGES << message
end
end
MESSAGES = []

def self.incoming_messages
INCOMING_MESSAGES
def process(message)
puts "Got result #{message.inspect}"
MESSAGES << message
end
end

RSpec.context "Integrating with a real Kafka cluster" do
before :all do
Thread.new do
Racecar::Cli.main(["ResultsConsumer"])
end
end

describe "Single-consumer groups" do
it "can consume and produce messages" do
token = SecureRandom.hex(8)
token = generate_token

worker = Thread.new do
Racecar::Cli.main(["EchoConsumer"])
Expand All @@ -48,26 +48,18 @@ def self.incoming_messages
produce -t input -v #{token} -k greetings
)

message = nil
attempt = 1

while message.nil? && attempt <= 10
puts "Waiting for message..."
sleep 2 ** attempt
message = IntegrationSupport.incoming_messages.last
attempt += 1
end
message = wait_for_token(token)

expect(message).not_to be_nil
expect(message.topic).to eq "output"
expect(message.payload).to eq token
expect(message.value).to eq token
expect(message.key).to eq "greetings"
end
end

describe "Multi-consumer groups" do
it "can consume and produce messages" do
token = SecureRandom.hex(8)
token = generate_token

worker1 = Thread.new do
Racecar::Cli.main(["EchoConsumer"])
Expand All @@ -85,20 +77,26 @@ def self.incoming_messages
produce -t input -v #{token} -k greetings
)

message = nil
attempt = 1

while message.nil? && attempt <= 10
puts "Waiting for message..."
sleep 2 ** attempt
message = IntegrationSupport.incoming_messages.last
attempt += 1
end
message = wait_for_token(token)

expect(message).not_to be_nil
expect(message.topic).to eq "output"
expect(message.payload).to eq token
expect(message.value).to eq token
expect(message.key).to eq "greetings"
end
end

def wait_for_token(token)
message = nil
attempt = 1

while message.nil? && attempt <= 10
puts "Waiting for token #{token}..."
sleep 2 ** attempt
message = ResultsConsumer::MESSAGES.find {|m| m.value == token }
attempt += 1
end

message
end
end

0 comments on commit e367b1e

Please sign in to comment.