-
Notifications
You must be signed in to change notification settings - Fork 339
/
Copy pathasync_producer_spec.rb
93 lines (68 loc) · 2.33 KB
/
async_producer_spec.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# frozen_string_literal: true
describe "Producer API", functional: true do
let(:producer) { kafka.async_producer(max_retries: 1, retry_backoff: 0) }
let!(:topic) { create_random_topic(num_partitions: 3) }
after do
producer.shutdown
end
example "writing messages using async producer" do
value1 = rand(10_000).to_s
value2 = rand(10_000).to_s
producer.produce(value1, key: "x", topic: topic, partition: 0)
producer.produce(value2, key: "y", topic: topic, partition: 1)
producer.deliver_messages
# Wait for everything to be delivered.
producer.shutdown
message1 = kafka.fetch_messages(topic: topic, partition: 0, offset: :earliest).last
message2 = kafka.fetch_messages(topic: topic, partition: 1, offset: :earliest).last
expect(message1.value).to eq value1
expect(message2.value).to eq value2
end
example "automatically delivering messages with a fixed time interval" do
producer = kafka.async_producer(delivery_interval: 0.1)
value = rand(10_000).to_s
producer.produce(value, topic: topic, partition: 0)
sleep 1
messages = kafka.fetch_messages(
topic: topic,
partition: 0,
offset: 0,
max_wait_time: 0.1,
)
expect(messages.last.value).to eq value
producer.shutdown
end
example "automatically delivering messages when a buffer threshold is reached" do
producer = kafka.async_producer(delivery_threshold: 5)
values = 5.times.map { rand(10_000).to_s }
values.each do |value|
producer.produce(value, topic: topic, partition: 0)
end
sleep 0.2
messages = kafka.fetch_messages(
topic: topic,
partition: 0,
offset: 0,
max_wait_time: 0,
)
expect(messages.last(5).map(&:value)).to eq values
producer.shutdown
end
example 'support record headers' do
topic = create_random_topic(num_partitions: 1)
producer = kafka.async_producer(delivery_threshold: 1)
producer.produce(
"hello", topic: topic,
headers: { hello: 'World', 'greeting' => 'is great', bye: 1, love: nil }
)
sleep 0.2
messages = kafka.fetch_messages(topic: topic, partition: 0, offset: 0)
expect(messages[0].value).to eq "hello"
expect(messages[0].headers).to eql(
'hello' => 'World',
'greeting' => 'is great',
'bye' => '1',
'love' => ''
)
end
end