diff --git a/lib/kafka_ex/protocol/produce.ex b/lib/kafka_ex/protocol/produce.ex index d646b4cd..8c7823b0 100644 --- a/lib/kafka_ex/protocol/produce.ex +++ b/lib/kafka_ex/protocol/produce.ex @@ -31,11 +31,14 @@ defmodule KafkaEx.Protocol.Produce do @type t :: %Response{topic: binary, partitions: list} end - def create_request(correlation_id, client_id, %Request{topic: topic, partition: partition, required_acks: required_acks, timeout: timeout, compression: compression, messages: messages}) do + def create_request( + correlation_id, + client_id, + request = %Request{compression: compression, messages: messages} + ) do message_set = create_message_set(messages, compression) - KafkaEx.Protocol.create_request(:produce, correlation_id, client_id) <> - << required_acks :: 16-signed, timeout :: 32-signed, 1 :: 32-signed >> <> - << byte_size(topic) :: 16-signed, topic :: binary, 1 :: 32-signed, partition :: 32-signed, byte_size(message_set) :: 32-signed >> <> message_set + header = produce_header(correlation_id, client_id, request, message_set) + header <> message_set end def parse_response(<< _correlation_id :: 32-signed, num_topics :: 32-signed, rest :: binary >>), do: parse_topics(num_topics, rest, __MODULE__) @@ -44,7 +47,7 @@ defmodule KafkaEx.Protocol.Produce do defp create_message_set([], _compression_type), do: "" defp create_message_set([%Message{key: key, value: value}|messages], :none) do message = create_message(value, key) - message_set = << 0 :: 64-signed >> <> << byte_size(message) :: 32-signed >> <> message + message_set = message_set_header(message) <> message message_set <> create_message_set(messages, :none) end defp create_message_set(messages, compression_type) do @@ -53,7 +56,29 @@ defmodule KafkaEx.Protocol.Produce do Compression.compress(compression_type, message_set) message = create_message(compressed_message_set, nil, attribute) - << 0 :: 64-signed >> <> << byte_size(message) :: 32-signed >> <> message + message_set_header(message) <> message + end + + defp produce_header( + correlation_id, + client_id, + %Request{ + topic: topic, + partition: partition, + required_acks: required_acks, + timeout: timeout + }, + message_set + ) do + KafkaEx.Protocol.create_request(:produce, correlation_id, client_id) <> + << required_acks :: 16-signed, timeout :: 32-signed, 1 :: 32-signed >> <> + << byte_size(topic) :: 16-signed, topic :: binary >> <> + << 1 :: 32-signed, partition :: 32-signed >> <> + << byte_size(message_set) :: 32-signed >> + end + + defp message_set_header(message) do + << 0 :: 64-signed, byte_size(message) :: 32-signed >> end defp create_message(value, key, attributes \\ 0) do diff --git a/test/protocol/produce_test.exs b/test/protocol/produce_test.exs index 03158a67..e396a666 100644 --- a/test/protocol/produce_test.exs +++ b/test/protocol/produce_test.exs @@ -30,21 +30,55 @@ defmodule KafkaEx.Protocol.Produce.Test do test "create_request correctly encodes messages with gzip" do expected_request = <<0, 0, 0, 0, 0, 0, 0, 1, 0, 23, 99, 111, 109, 112, 114, 101, 115, 115, 105, 111, 110, 95, 99, 108, 105, 101, 110, 116, 95, 116, 101, 115, 116, 0, 1, 0, 0, 0, 10, 0, 0, 0, 1, 0, 16, 99, 111, 109, 112, 114, 101, 115, 115, 101, 100, 95, 116, 111, 112, 105, 99, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 86, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 74, 79, 44, 46, 209, 0, 1, 255, 255, 255, 255, 0, 0, 0, 60, 31, 139, 8, 0, 0, 0, 0, 0, 0, 3, 99, 96, 128, 3, 153, 135, 115, 4, 255, 131, 89, 172, 217, 169, 149, 10, 137, 64, 6, 103, 110, 106, 113, 113, 98, 122, 42, 152, 3, 87, 199, 242, 37, 117, 30, 66, 93, 18, 178, 186, 36, 0, 127, 205, 212, 97, 80, 0, 0, 0>> + client_id = "compression_client_test" + topic = "compressed_topic" + messages = [ + %KafkaEx.Protocol.Produce.Message{key: "key a", value: "message a"}, + %KafkaEx.Protocol.Produce.Message{key: "key b", value: "message b"} + ] + produce = %KafkaEx.Protocol.Produce.Request{ - topic: "compressed_topic", + topic: topic, partition: 0, required_acks: 1, timeout: 10, compression: :gzip, - messages: [ - %KafkaEx.Protocol.Produce.Message{key: "key a", value: "message a"}, - %KafkaEx.Protocol.Produce.Message{key: "key b", value: "message b"} - ] + messages: messages } - request = KafkaEx.Protocol.Produce.create_request(1, "compression_client_test", produce) - - assert expected_request == request + request = KafkaEx.Protocol.Produce.create_request(1, client_id, produce) + + # The exact binary contents of the message can change as zlib changes, + # but they should remain compatible. We test this by splitting the binary + # up into the parts that should be the same and the parts that may differ - + # which are the crc checkshum and the compressed part of the message. + # + # the byte sizes here are determined by looking at the construction of the + # messages and headers in produce.ex + pre_crc_header_size = 46 + byte_size(topic) + byte_size(client_id) + crc_size = 4 + # note this includes the size of the compressed part of the binary, which + # should be the same. + post_crc_header_size = 10 + + << pre_crc_header :: binary-size(pre_crc_header_size), + _crc :: binary-size(crc_size), + post_crc_header :: binary-size(post_crc_header_size), + compressed_message_set :: binary >> = request + + << expect_pre_crc_header :: binary-size(pre_crc_header_size), + _expect_crc :: binary-size(crc_size), + expect_post_crc_header :: binary-size(post_crc_header_size), + expect_compressed_message_set :: binary >> = expected_request + + assert pre_crc_header == expect_pre_crc_header + assert post_crc_header == expect_post_crc_header + + decompressed_message_set = :zlib.gunzip(compressed_message_set) + expect_decompressed_message_set = + :zlib.gunzip(expect_compressed_message_set) + + assert decompressed_message_set == expect_decompressed_message_set end test "create_request correctly encodes messages with snappy" do