diff --git a/lib/kafka_ex/protocol/fetch.ex b/lib/kafka_ex/protocol/fetch.ex index 3740e1e3..0d662c26 100644 --- a/lib/kafka_ex/protocol/fetch.ex +++ b/lib/kafka_ex/protocol/fetch.ex @@ -73,6 +73,9 @@ defmodule KafkaEx.Protocol.Fetch do defp parse_message_set([last|_] = list, _) do {:ok, Enum.reverse(list), last.offset} end + defp parse_message_set(_, << offset :: 64, msg_size :: 32, partial_message_data :: binary >>) when byte_size(partial_message_data) < msg_size do + raise RuntimeError, "Insufficient data fetched at offset #{offset}. Message size is #{msg_size} but only received #{byte_size(partial_message_data)} bytes. Try increasing max_bytes." + end # handles the single message case and the batch (compression) case defp append_messages([], list) do