Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: additional options to allow for batching and asynchronous batch handling for BroadwayAdapter #103

Merged
merged 28 commits into from
Oct 7, 2024

Conversation

seungjinstord
Copy link
Contributor

@seungjinstord seungjinstord commented Sep 26, 2024

Related Ticket(s)

SIGNAL-7088

Checklist

Problem

Using Kafee's BroadwayAdapter with the FirehoseConsumer in wms-service, I ran a comparison test with a similar dummy handler for ProcessAdapter along with the dummy handler for BroadwayAdapter.

ProcessAdapter version took 7 seconds with a LOT of DBConnection pool errors, and the BroadwayAdapater took 66 seconds (at 12 partitions).

Roughly 10x difference, so a crude way to match would be to jack up partitions from 12 to 120.

Naturally I then jacked up partition to 120, keeping consumer_concurrency to 12, and it did help - now it took only 44 seconds.

Then I jacked up the consumer_concurrency to 120, and then I started to get DBConnection pool / page issues because my local DB couldn’t power 120 connection pooling, even with upping it in dev.exs. Therefore this “vanilla” approach to scaling won’t work.

Details

My first thought was if a naive way of tweaking batch config values would work - changing concurrency, batch size, or partitioning function. Partitioning function (to override Kafka's partitioning) is locked down by BroadwayKafka so that's not possible - concurrent processes used would only be up to the number of partitions, which I guess should be abiding by the rules of Kafka. But regardless, batching does expose a place where some chunked operation can happen.

I decided our events are pretty idempotent and protected from rigid rules of being chronologically ordered - this is battle tested already. I mean the current ProcessAdapter runs the event handlers asynchronously already.

We're exactly mimicing the pattern that ProcessAdapter goes through, if we use Batching:

  • ProcessAdapter records the events as they happen for that request_id, in chronological order, in its process state.
    Then, it runs it through the event handlers asynchronously.
  • BroadwayAdapter in this PR would have capability to group messages that are coming in chronological order from partitions into batches, and for the messages in each batch, there's a new configuration to allow for the messages to be handled asynchronously.

For simplicity of code paths, the pragmatic approach is chosed - it's always going to go through a default batching, with a size of 1 unless overriding config options is passed.

Local test result

Using the same 400 threshold automation config trigger (see description in this PR), doing it async with following settings resulted in 7 seconds (previously 66 seconds) to go through the same number of events, with NO DBConnection errors popping up! The batch options used were:

               batching: [
                 concurrency: System.schedulers_online() * 2,
                 size: 100,
                 timeout: 500,
                 async_run: true
               ]

@seungjinstord seungjinstord added the hold Do not merge this pull request label Sep 26, 2024
@seungjinstord seungjinstord self-assigned this Sep 26, 2024
if batch_config[:async_run] do
# No need for Task.Supervisor as it is not running under a GenServer,
# and Kafee.Consumer.Adapter.push_message does already have error handling.
Enum.each(messages, &Task.async(fn -> do_consumer_work(&1) end))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This matches how ProcessAdapter event processing runs in async - it doesn't wait for them to complete, it just fires the async operations.

@seungjinstord seungjinstord removed the hold Do not merge this pull request label Sep 27, 2024
@seungjinstord seungjinstord marked this pull request as ready for review September 27, 2024 18:22
@seungjinstord seungjinstord requested a review from a team as a code owner September 27, 2024 18:22
@seungjinstord seungjinstord requested review from btkostner and a team September 27, 2024 18:22
Copy link
Contributor

@btkostner btkostner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some cleanup comments to hopefully make this easier to use. Otherwise I like the idea and it should allow us to make something super fast.

lib/kafee/consumer/broadway_adapter.ex Show resolved Hide resolved
lib/kafee/consumer/broadway_adapter.ex Show resolved Hide resolved
lib/kafee/consumer/broadway_adapter.ex Outdated Show resolved Hide resolved
lib/kafee/consumer/broadway_adapter.ex Show resolved Hide resolved
lib/kafee/consumer/broadway_adapter.ex Outdated Show resolved Hide resolved
test/kafee/consumer/broadway_adapter_integration_test.exs Outdated Show resolved Hide resolved
@seungjinstord seungjinstord requested a review from a team October 2, 2024 17:31
@seungjinstord
Copy link
Contributor Author

seungjinstord commented Oct 3, 2024

@btkostner reminder for another round of review, thanks 🙏

@@ -100,34 +197,65 @@ defmodule Kafee.Consumer.BroadwayAdapter do

@doc false
@impl Broadway
def handle_message(:default, %Broadway.Message{data: value, metadata: metadata} = message, %{
def handle_message(:default, %Broadway.Message{metadata: metadata} = message, %{

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need this implementation if batching is always going to be on? 🤔

Copy link
Contributor Author

@seungjinstord seungjinstord Oct 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably still needed, as the spec requires handle_message (as it's not a defoverridable), but while reading the code again I did find an optimization point - I think I can actually not inject the additional metadata at this point. I can probably pull it from the context at handle_batch!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimization done in a913991

trideepgogoi
trideepgogoi previously approved these changes Oct 7, 2024
Copy link

@trideepgogoi trideepgogoi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks safe to me!

… because it's unnecessary work as context already has it
trideepgogoi
trideepgogoi previously approved these changes Oct 7, 2024
Copy link

@trideepgogoi trideepgogoi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link

@trideepgogoi trideepgogoi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@seungjinstord seungjinstord enabled auto-merge (squash) October 7, 2024 16:19
@cdipesa-stord cdipesa-stord requested review from cdipesa-stord and btkostner and removed request for btkostner and cdipesa-stord October 7, 2024 16:52
@seungjinstord seungjinstord merged commit f003f0b into main Oct 7, 2024
12 checks passed
@seungjinstord seungjinstord deleted the SIGNAL-7088-broadway-and-batch-option branch October 7, 2024 16:53
seungjinstord pushed a commit that referenced this pull request Oct 7, 2024
An automated release has been created for you.
---


## [3.3.0](v3.2.0...v3.3.0)
(2024-10-07)


### Features

* Additional options to allow for batching and asynchronous batch
handling for BroadwayAdapter
([#103](#103))
([f003f0b](f003f0b))

---
This PR was generated with [Release
Please](https://github.com/googleapis/release-please). See
[documentation](https://github.com/googleapis/release-please#release-please).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants