-
Notifications
You must be signed in to change notification settings - Fork 339
/
Copy pathbatch_consumer_spec.rb
92 lines (74 loc) · 2.32 KB
/
batch_consumer_spec.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# frozen_string_literal: true
describe "Batch Consumer API", functional: true do
example "consuming messages using the batch API" do
num_partitions = 15
message_count = 1_000
messages = (1...message_count).to_set
message_queue = Queue.new
offset_retention_time = 30
topic = create_random_topic(num_partitions: 15)
Thread.new do
kafka = Kafka.new(kafka_brokers, client_id: "test")
producer = kafka.producer
messages.each do |i|
producer.produce(i.to_s, topic: topic, partition_key: i.to_s)
end
producer.deliver_messages
end
group_id = "test#{rand(1000)}"
threads = 2.times.map do |thread_id|
t = Thread.new do
kafka = Kafka.new(kafka_brokers, client_id: "test", logger: logger)
consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time)
consumer.subscribe(topic)
consumer.each_batch do |batch|
batch.messages.each do |message|
message_queue << Integer(message.value)
end
end
end
t.abort_on_exception = true
t
end
received_messages = Set.new
duplicates = Set.new
loop do
message = message_queue.pop
if received_messages.include?(message)
duplicates.add(message)
else
received_messages.add(message)
end
break if received_messages.size == messages.size
end
expect(received_messages).to eq messages
expect(duplicates).to eq Set.new
end
example 'support record headers' do
topic = create_random_topic(num_partitions: 1)
kafka = Kafka.new(seed_brokers: kafka_brokers, client_id: "test")
producer = kafka.producer
producer.produce(
'hello', topic: topic, headers: { 'TracingID' => 'a1', 'SpanID' => 'b2' }
)
producer.produce(
'hello2', topic: topic, headers: { 'TracingID' => 'c3', 'SpanID' => 'd4' }
)
producer.deliver_messages
consumer = kafka.consumer(group_id: SecureRandom.uuid)
consumer.subscribe(topic)
headers = []
consumer.each_batch do |batch|
batch.messages.each do |message|
headers << message.headers
end
break
end
expect(headers).to eql(
[
{ 'TracingID' => 'a1', 'SpanID' => 'b2' },
{ 'TracingID' => 'c3', 'SpanID' => 'd4' }
]
)
end
end