-
Notifications
You must be signed in to change notification settings - Fork 339
/
Copy pathcompression_spec.rb
40 lines (31 loc) · 1.16 KB
/
compression_spec.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# frozen_string_literal: true
describe "Compression", functional: true do
let!(:topic) { create_random_topic(num_partitions: 3) }
Kafka::Compression.codecs.each do |codec_name|
example "producing and consuming #{codec_name}-compressed messages" do
codec = Kafka::Compression.find_codec(codec_name)
unless kafka.supports_api?(Kafka::Protocol::PRODUCE_API, codec.produce_api_min_version)
skip("This Kafka version does not support #{codec_name}")
end
producer = kafka.producer(
compression_codec: codec_name,
max_retries: 0,
retry_backoff: 0
)
last_offset = fetch_last_offset
producer.produce("message1", topic: topic, partition: 0)
producer.produce("message2", topic: topic, partition: 0)
producer.deliver_messages
messages = kafka.fetch_messages(
topic: topic,
partition: 0,
offset: last_offset + 1,
)
expect(messages.last(2).map(&:value)).to eq ["message1", "message2"]
end
end
def fetch_last_offset
last_message = kafka.fetch_messages(topic: topic, partition: 0, offset: 0).last
last_message ? last_message.offset : -1
end
end