-
Notifications
You must be signed in to change notification settings - Fork 339
/
Copy pathfetch_spec.rb
61 lines (53 loc) · 1.46 KB
/
fetch_spec.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# frozen_string_literal: true
describe "Fetch API", functional: true do
example "fetching from a non-existing topic when auto-create is enabled" do
topic = "rand#{SecureRandom.uuid}"
attempt = 1
messages = nil
begin
messages = kafka.fetch_messages(
topic: topic,
partition: 0,
offset: 0,
max_wait_time: 0.1
)
rescue Kafka::LeaderNotAvailable, Kafka::NotLeaderForPartition
if attempt < 10
attempt += 1
sleep 0.1
retry
else
raise "timed out"
end
end
expect(messages).to eq []
end
example "Number of fetching requests should be small" do
kafka = ::Kafka::Client.new(
seed_brokers: ['localhost:9092']
)
total_messages = 1000
message_size = 50
# Create test data
producer = kafka.producer
topic = "topic-#{SecureRandom.uuid}"
kafka.create_topic(topic, num_partitions: 3)
total_messages.times do |index|
producer.produce('a' * message_size, topic: topic)
end
producer.deliver_messages
batch_count = 0
ActiveSupport::Notifications.subscribe 'fetch_batch.consumer.kafka' do |*args|
batch_count += 1
end
# Test consuming
consumer = kafka.consumer(group_id: SecureRandom.uuid)
consumer.subscribe(topic)
message_count = 0
consumer.each_message do
message_count += 1
break if message_count == total_messages
end
expect(batch_count).to be <= 4
end
end