Skip to content

Commit

Permalink
feat: SIGNAL-7060 preemtively drop large messages from queue (#101)
Browse files Browse the repository at this point in the history
## Related Ticket(s)
SIGNAL-7060

<!--
Enter the Jira issue below in the following format: PROJECT-##
-->

## Checklist

<!--
For each bullet, ensure your pr meets the criteria and write a note
explaining how this PR relates. Mark them as complete as they are done.
All top-level checkboxes should be checked regardless of their relevance
to the pr with a note explaining whether they are relevant or not.
-->

- [x] Code conforms to the [Elixir
Styleguide](https://github.com/christopheradams/elixir_style_guide)

## Problem

We added logic to drop large messages during termination, but we still
observed raw `:message_too_large` errors bubbling up from brod code. Due
to the sheer traffic in topic `wms-service--firehose`, these errors are
observed once or twice every other day.


## Details

Preemptively drop large messages from even being attempted. Currently
prior to this PR, `AsyncWorker.build_message_batch()` would attempt to
push the message out to Kafka without checking the size.

This PR will check at the `AsyncWorker.queue()` lifecycle, so when
request comes into this GenServer to put a new message to the queue, it
will check and drop the large message - if it goes over
`state.max_request_bytes`.

The dropped messages follow the same logging code which should push out
the message to DataDog.

---------

Co-authored-by: Sam Hunter <1526888+kinson@users.noreply.github.com>
  • Loading branch information
seungjinstord and kinson authored Oct 1, 2024
1 parent 267bd67 commit 0baeae8
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 48 deletions.
55 changes: 38 additions & 17 deletions lib/kafee/producer/async_worker.ex
Original file line number Diff line number Diff line change
@@ -1,10 +1,28 @@
defmodule Kafee.Producer.AsyncWorker do
# A simple GenServer for every topic * partition in Kafka. It holds an
# erlang `:queue` and sends messages every so often. On process close, we
# attempt to send all messages to Kafka, and in the unlikely event we can't
# we write all messages to the logs.
@moduledoc """
A simple GenServer for every topic * partition in Kafka. It holds an
erlang `:queue` and sends messages every so often.
@moduledoc false
## Note on termination
On process close, we attempt to send all messages to Kafka,
and in the unlikely event we can't, we write all messages to the logs.
## When message is larger than what Kafka allows
### Configuration of `max_request_bytes` option in `Kafee.Producer.AsyncAdapter`
Although you'd set the max size of messages over at Kafka's cloud settings,
it is actually the `state.max_request_bytes` that should be set correctly in order for any
size based triaging can happen. Therefore setting that is critical for large message handling.
### During message queuing
Note that at `handle_cast({:queue, messages}, state)`, code will drop large messages so they don't actually get into the queue.
These dropped messsages will show up in logs - please see the defp `queue_without_large_messages/2`.
The messages should be picked up from the logs and should be triaged accordingly.
"""

use GenServer,
shutdown: :timer.seconds(25)
Expand Down Expand Up @@ -226,10 +244,13 @@ defmodule Kafee.Producer.AsyncWorker do
@doc false
def handle_info(_, state), do: {:noreply, state}

# A simple request to add more messages to the queue. Nothing fancy here.
# A simple request to add more messages to the queue.
# Note: will drop large messages and not add it to queue.
@doc false
def handle_cast({:queue, messages}, state) do
new_queue = :queue.join(state.queue, :queue.from_list(messages))
new_messages_queue = messages |> :queue.from_list() |> queue_without_large_messages(state.max_request_bytes)
new_queue = :queue.join(state.queue, new_messages_queue)

emit_queue_telemetry(state, :queue.len(new_queue))

Process.send_after(self(), :send, state.throttle_ms)
Expand All @@ -252,7 +273,13 @@ defmodule Kafee.Producer.AsyncWorker do
def terminate(_reason, %{send_task: nil} = state) do
# We only focus on triaging the queue in state. If there are messages too big, we log and don't send.
# Update state with queue just with messages that are acceptable
state = %{state | queue: state_queue_without_large_messages(state)}
state = %{state | queue: queue_without_large_messages(state.queue, state.max_request_bytes)}

count = :queue.len(state.queue)

if count > 0 do
Logger.info("Attempting to send #{count} messages to Kafka before terminate")
end

terminate_send(state)
end
Expand Down Expand Up @@ -316,21 +343,21 @@ defmodule Kafee.Producer.AsyncWorker do
:ok
end

defp state_queue_without_large_messages(state) do
defp queue_without_large_messages(queue, max_request_bytes) do
# messages_beyond_max_bytes are going to be logged and not processed,
# as they are individually already over max_request_bytes in size.

{messages_within_max_bytes_queue, messages_beyond_max_bytes_reversed} =
:queue.fold(
fn message, {acc_queue_messages_within_limit, acc_messages_beyond_limit} ->
if message_within_max_bytes?(message, state.max_request_bytes) do
if message_within_max_bytes?(message, max_request_bytes) do
{:queue.in(message, acc_queue_messages_within_limit), acc_messages_beyond_limit}
else
{acc_queue_messages_within_limit, [message | acc_messages_beyond_limit]}
end
end,
{:queue.new(), []},
state.queue
queue
)

messages_beyond_max_bytes = Enum.reverse(messages_beyond_max_bytes_reversed)
Expand All @@ -339,12 +366,6 @@ defmodule Kafee.Producer.AsyncWorker do
Logger.error("Message in queue is too large, will not push to Kafka", data: message)
end)

count = :queue.len(messages_within_max_bytes_queue)

if count > 0 do
Logger.info("Attempting to send #{count} messages to Kafka before terminate")
end

messages_within_max_bytes_queue
end

Expand Down
135 changes: 105 additions & 30 deletions test/kafee/producer/async_worker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,86 @@ defmodule Kafee.Producer.AsyncWorkerTest do
end

describe "queue/2" do
setup(%{topic: topic}) do
[small_message] = BrodApi.generate_producer_message_list(topic, 1)
message_fixture = File.read!("test/support/example/large_message.json")
large_message_fixture = String.duplicate(message_fixture, 10)

# This message will skip being sent to Kafka, and only be logged
large_message_1 =
topic
|> BrodApi.generate_producer_message()
|> Map.put(:value, large_message_fixture)
|> Map.put(:key, "large_msg_1")

large_message_2 =
topic
|> BrodApi.generate_producer_message()
|> Map.put(:value, large_message_fixture)
|> Map.put(:key, "large_msg_2")

[small_message: small_message, large_message_1: large_message_1, large_message_2: large_message_2]
end

test "queue a list of messages will send them", %{pid: pid, topic: topic} do
messages = BrodApi.generate_producer_message_list(topic, 2)
assert :ok = AsyncWorker.queue(pid, messages)
assert_receive {^topic, {GenServer, :cast, {:queue, ^messages}}}
end

@tag capture_log: true
test "any messages too large gets logged and dropped from queue when small message is first in list to enqueue", %{
pid: pid,
topic: topic,
small_message: small_message,
large_message_1: large_message_1,
large_message_2: large_message_2
} do
messages = [small_message, large_message_1, large_message_2]

log =
capture_log(fn ->
assert :ok = AsyncWorker.queue(pid, messages)
Process.sleep(@wait_timeout)
end)

expected_large_message_error_log = "Message in queue is too large, will not push to Kafka"
brod_message = BrodApi.to_kafka_message(small_message)
assert_called(:brod.produce(_client_id, ^topic, 0, :undefined, [^brod_message]))

assert 2 == (log |> String.split(expected_large_message_error_log) |> length()) - 1
async_worker_state = pid |> Patch.Listener.target() |> :sys.get_state()

# all of the messages in queue are processed or dropped
assert 0 == :queue.len(async_worker_state.queue)
end

@tag capture_log: true
test "any messages too large gets logged and dropped from queue when large message is first in list to enqueue", %{
pid: pid,
topic: topic,
small_message: small_message,
large_message_1: large_message_1,
large_message_2: large_message_2
} do
messages = [large_message_1, small_message, large_message_2]

log =
capture_log(fn ->
assert :ok = AsyncWorker.queue(pid, messages)
Process.sleep(@wait_timeout)
end)

expected_large_message_error_log = "Message in queue is too large, will not push to Kafka"
brod_message = BrodApi.to_kafka_message(large_message_1)
refute_called(:brod.produce(_client_id, ^topic, 0, :undefined, [^brod_message]))
brod_message = BrodApi.to_kafka_message(large_message_2)
refute_called(:brod.produce(_client_id, ^topic, 0, :undefined, [^brod_message]))
assert 2 == (log |> String.split(expected_large_message_error_log) |> length()) - 1

async_worker_state = pid |> Patch.Listener.target() |> :sys.get_state()
assert 0 == :queue.len(async_worker_state.queue)
end
end

describe "handle_info :send" do
Expand Down Expand Up @@ -100,18 +175,18 @@ defmodule Kafee.Producer.AsyncWorkerTest do
test ":ok removes sent messages from the queue", %{state: state, topic: topic} do
task = make_fake_task()
send_messages = BrodApi.generate_producer_message_list(topic, 4)
remaining_messages = BrodApi.generate_producer_message_list(topic, 3)
state = %{state | send_task: task, queue: :queue.from_list(send_messages ++ remaining_messages)}
messages = BrodApi.generate_producer_message_list(topic, 3)
state = %{state | send_task: task, queue: :queue.from_list(send_messages ++ messages)}

assert {:noreply, new_state} = AsyncWorker.handle_info({task.ref, {:ok, 4, 0}}, state)
assert ^remaining_messages = :queue.to_list(new_state.queue)
assert ^messages = :queue.to_list(new_state.queue)
end

test ":ok emits telemetry of remaining messages", %{state: state, topic: topic} do
task = make_fake_task()
send_messages = BrodApi.generate_producer_message_list(topic, 4)
remaining_messages = BrodApi.generate_producer_message_list(topic, 3)
state = %{state | send_task: task, queue: :queue.from_list(send_messages ++ remaining_messages)}
messages = BrodApi.generate_producer_message_list(topic, 3)
state = %{state | send_task: task, queue: :queue.from_list(send_messages ++ messages)}

assert {:noreply, _new_state} = AsyncWorker.handle_info({task.ref, {:ok, 4, 0}}, state)
assert_receive {:telemetry_event, [:kafee, :queue], %{count: 3}, %{partition: 0, topic: ^topic}}
Expand All @@ -133,7 +208,7 @@ defmodule Kafee.Producer.AsyncWorkerTest do
end

@tag capture_log: true
test "any single message too large gets logged and dropped from queue", %{pid: pid, topic: topic} do
test "any single message too large gets logged and not added to queue", %{pid: pid, topic: topic} do
message_fixture = File.read!("test/support/example/large_message.json")
large_message = String.duplicate(message_fixture, 10)

Expand All @@ -149,8 +224,8 @@ defmodule Kafee.Producer.AsyncWorkerTest do
end)

brod_message = BrodApi.to_kafka_message(message)
assert_called(:brod.produce(_client_id, ^topic, 0, :undefined, [^brod_message]))
assert log =~ "Message in queue is too large"
refute_called(:brod.produce(_client_id, ^topic, 0, :undefined, [^brod_message]))
assert log =~ "Message in queue is too large, will not push to Kafka"

async_worker_state = pid |> Patch.Listener.target() |> :sys.get_state()
assert 0 == :queue.len(async_worker_state.queue)
Expand Down Expand Up @@ -271,21 +346,21 @@ defmodule Kafee.Producer.AsyncWorkerTest do
end

test "waits for in flight tasks to complete", %{state: state, topic: topic} do
remaining_messages = BrodApi.generate_producer_message_list(topic, 20)
remaining_brod_messages = BrodApi.to_kafka_message(remaining_messages)
messages = BrodApi.generate_producer_message_list(topic, 20)
remaining_brod_messages = BrodApi.to_kafka_message(messages)

state = %{state | queue: :queue.from_list(remaining_messages)}
state = %{state | queue: :queue.from_list(messages)}

assert :ok = AsyncWorker.terminate(:normal, state)
assert_called_once(:brod.produce(_client_id, ^topic, 0, _key, ^remaining_brod_messages))
end

test "waits for in flight send and sends remaining messages", %{state: state, topic: topic} do
remaining_messages = BrodApi.generate_producer_message_list(topic, 20)
remaining_brod_messages = BrodApi.to_kafka_message(remaining_messages)
messages = BrodApi.generate_producer_message_list(topic, 20)
remaining_brod_messages = BrodApi.to_kafka_message(messages)

task = make_fake_task()
state = %{state | queue: :queue.from_list(remaining_messages), send_task: task, send_timeout: :infinity}
state = %{state | queue: :queue.from_list(messages), send_task: task, send_timeout: :infinity}

Process.send_after(self(), {task.ref, {:ok, 0, 0}}, 10)
assert :ok = AsyncWorker.terminate(:normal, state)
Expand All @@ -294,31 +369,31 @@ defmodule Kafee.Producer.AsyncWorkerTest do

@tag capture_log: true
test "waits for in flight error and retries sending messages", %{state: state, topic: topic} do
remaining_messages = BrodApi.generate_producer_message_list(topic, 20)
remaining_brod_messages = BrodApi.to_kafka_message(remaining_messages)
messages = BrodApi.generate_producer_message_list(topic, 20)
remaining_brod_messages = BrodApi.to_kafka_message(messages)

task = make_fake_task()
state = %{state | queue: :queue.from_list(remaining_messages), send_task: task, send_timeout: :infinity}
state = %{state | queue: :queue.from_list(messages), send_task: task, send_timeout: :infinity}

Process.send_after(self(), {task.ref, {:error, :internal_error}}, 10)
assert :ok = AsyncWorker.terminate(:normal, state)
assert_called_once(:brod.produce(_client_id, ^topic, 0, _key, ^remaining_brod_messages))
end

test "waits for timeout and retries sending messages", %{state: state, topic: topic} do
remaining_messages = BrodApi.generate_producer_message_list(topic, 20)
remaining_brod_messages = BrodApi.to_kafka_message(remaining_messages)
messages = BrodApi.generate_producer_message_list(topic, 20)
remaining_brod_messages = BrodApi.to_kafka_message(messages)

task = make_fake_task()
state = %{state | queue: :queue.from_list(remaining_messages), send_task: task, send_timeout: 10}
state = %{state | queue: :queue.from_list(messages), send_task: task, send_timeout: 10}

assert :ok = AsyncWorker.terminate(:normal, state)
assert_called_once(:brod.produce(_client_id, ^topic, 0, _key, ^remaining_brod_messages))
end

test "any brod errors are logged before terminate", %{state: state, topic: topic} do
remaining_messages = BrodApi.generate_producer_message_list(topic, 10)
state = %{state | queue: :queue.from_list(remaining_messages), send_timeout: :infinity}
messages = BrodApi.generate_producer_message_list(topic, 10)
state = %{state | queue: :queue.from_list(messages), send_timeout: :infinity}

patch(:brod, :sync_produce_request_offset, fn _ref, _timeout ->
{:error, :timeout}
Expand All @@ -335,8 +410,8 @@ defmodule Kafee.Producer.AsyncWorkerTest do
end

test "any raised errors are logged before terminate", %{state: state, topic: topic} do
remaining_messages = BrodApi.generate_producer_message_list(topic, 10)
state = %{state | queue: :queue.from_list(remaining_messages), send_timeout: :infinity}
messages = BrodApi.generate_producer_message_list(topic, 10)
state = %{state | queue: :queue.from_list(messages), send_timeout: :infinity}

patch(:brod, :sync_produce_request_offset, fn _ref, _timeout ->
raise RuntimeError, message: "test"
Expand Down Expand Up @@ -369,8 +444,8 @@ defmodule Kafee.Producer.AsyncWorkerTest do
|> Map.put(:value, large_message_fixture)
|> Map.put(:key, "large_msg_1")

remaining_messages = [small_message_1, large_message_1, small_message_2]
state = %{state | queue: :queue.from_list(remaining_messages), send_timeout: :infinity}
messages = [small_message_1, large_message_1, small_message_2]
state = %{state | queue: :queue.from_list(messages), send_timeout: :infinity}

log =
capture_log(fn ->
Expand Down Expand Up @@ -401,9 +476,9 @@ defmodule Kafee.Producer.AsyncWorkerTest do
small_message_unit_size = kafka_message_size_bytes(small_message)

small_message_total = Kernel.ceil(max_request_bytes / small_message_unit_size) * 2
remaining_messages = BrodApi.generate_producer_message_list(topic, small_message_total)
messages = BrodApi.generate_producer_message_list(topic, small_message_total)

state = %{state | queue: :queue.from_list(remaining_messages), send_timeout: :infinity}
state = %{state | queue: :queue.from_list(messages), send_timeout: :infinity}

log =
capture_log(fn ->
Expand Down Expand Up @@ -440,8 +515,8 @@ defmodule Kafee.Producer.AsyncWorkerTest do
batch = private(AsyncWorker.build_message_batch(messages, 1_040_384))
assert length(batch) in 10_000..15_000

{_batched_messages, remaining_messages} = batch |> length() |> :queue.split(messages)
remaining_batch = private(AsyncWorker.build_message_batch(remaining_messages, 1_040_384))
{_batched_messages, messages} = batch |> length() |> :queue.split(messages)
remaining_batch = private(AsyncWorker.build_message_batch(messages, 1_040_384))
assert length(remaining_batch) in 5_000..10_000

assert 20_000 = length(batch) + length(remaining_batch)
Expand Down
1 change: 0 additions & 1 deletion test/kafee/producer_integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ defmodule Kafee.ProducerIntegrationTest do
end)

refute log =~ "Message in queue is too large"
assert log =~ "brod producer process is currently down"
assert log =~ "Successfully sent messages to Kafka"
end
end
Expand Down

0 comments on commit 0baeae8

Please sign in to comment.