Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: kafka compression #3179

Merged
merged 3 commits into from
Apr 11, 2023
Merged

feat: kafka compression #3179

merged 3 commits into from
Apr 11, 2023

Conversation

fracasula
Copy link
Collaborator

@fracasula fracasula commented Apr 6, 2023

Description

These changes add the ability to use compression in the Kafka manager.

Compression in Kafka can happen in two ways:

  1. Compression enabled producer-side
  2. Compression enabled broker-side

Here we are using the compression enabled producer-side. The advantage is that it doesn’t require any configuration change in the brokers or in the consumers.

This is the reason why I believe that we can enabled it by default for everybody while still giving the ability to change it as need arises. Another approach is to have None as default, start enabling it for one customer at a time and then change the default once everybody is already running on it.

In order to understand which algorithm we should have enabled by default I built a benchmark to compare them. The main objective was to find an algorithm that would give a decent compression without being too cumbersome on CPU/Memory.

How I built the benchmark

  • one Kafka setup is created from scratch every time a compression type benchmark is created (i.e. one for gzip, one for snappy, etc...).
  • the payloads are the same for all benchmarks, even if randomly generated they are generated only once at the beginning so that all algorithms have to work on the same sample.
  • to measure the data transferred I'm connecting to a local proxy that measures the bytes received from the producer.
  • in order to be able to use the proxy properly kafka has to advertise the proxy address as the broker address (thus the kafka destination changes).
  • in order to measure the real transferred data I'm closing the proxy to make sure that all connections (thus all io.Copys) are terminated. this is the reason why I start and stop the proxy each time.

Why Ztsd as the default algorithm?

If you look at the benchmark results you might notice that the algorithm that gives the best compression is indeed Ztsd.

At the same time is one of the most efficient as well.

If we compare it with not having compression at all we can see for example that pushing a single message with no compression leads to 18k ops vs only 6k ops with Ztsd (None is ~285% faster than Ztsd). However with no compression we would have to transfer 300GB of data vs 110GB with Ztsd. So there it depends on where the bottleneck would be, if CPU or network. Either way not a huge win, but that's to expect without batching.

If we look at the other results where we are sending batches of messages, the data looks a lot more promising.

A batch of 100 messages of 1kb each with no compression, once repeated enough times to get the benchmark results, it leads to 2.4TB of data transferred and a total of 5077 ops (meaning 101.6 kb/op).

With Ztsd we run only 4773 ops (i.e. 248584 ns/op vs the faster without compression counterpart at 218305 ns/op).
This time None is not ~285% faster anymore but ~106% faster but the data transferred with Ztsd caps at 1.206 kb/op (vs 101.6 kb/op). It beats even Gzip with its 1.865 kb/op. However Gzip had to use a sizeable 348371 ns/op to get there, basically 100k ns more than Ztsd. One thing to consider though is that Gzip there is a little easier on the memory.

TL;DR: 4773 ops vs 5077 in the same amount of time is surely a bit less but the amount of data not transferred is outstandingly lower (23.7GB vs 2.4TB).

All considered, when looking at the benchmark results I'd say that Ztsd wins in most scenarios and that it could be our default compression algorithm candidate.

Sidenotes

  • If you think it might be useful we can have the TCP proxy in the go-kit
  • I'll change the base branch myself once this PR gets merged.

Notion Ticket

< Notion Link >

Security

  • The code changed/added as part of this pull request won't create any security issues with how the software is being used.

@codecov
Copy link

codecov bot commented Apr 6, 2023

Codecov Report

Patch coverage: 5.40% and project coverage change: -0.05 ⚠️

Comparison is base (db9fdc5) 53.30% compared to head (c8325e5) 53.26%.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #3179      +/-   ##
==========================================
- Coverage   53.30%   53.26%   -0.05%     
==========================================
  Files         315      316       +1     
  Lines       51777    51850      +73     
==========================================
+ Hits        27602    27618      +16     
- Misses      22511    22568      +57     
  Partials     1664     1664              
Impacted Files Coverage Δ
testhelper/destination/kafka/kafka.go 77.86% <0.00%> (-2.85%) ⬇️
utils/tcpproxy/tcpproxy.go 0.00% <0.00%> (ø)
services/streammanager/kafka/kafkamanager.go 75.76% <27.27%> (-1.05%) ⬇️
services/streammanager/kafka/client/producer.go 84.03% <100.00%> (ø)

... and 5 files with indirect coverage changes

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

☔ View full report in Codecov by Sentry.
📢 Do you have feedback about the report comment? Let us know in this issue.

@fracasula fracasula marked this pull request as ready for review April 6, 2023 14:05
@@ -234,6 +235,19 @@ func Init() {
)
}

kafkaCompression = client.CompressionZstd
if kc := config.GetInt("Router.kafkaCompression", -1); kc != -1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should use the "standard" naming pattern, e.g. Router.KAFKA.compression

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had exactly the same thought when I was coming up with the name but then I realized that the batching parameter is the only one following that pattern 😅

The majority of the parameters use this other one so I went with the majority. Not strong on this though, I can change it if you guys prefer it 👍

services/streammanager/kafka/kafkamanager.go Outdated Show resolved Hide resolved
Base automatically changed from test.kafkaBatching to master April 9, 2023 05:48
@fracasula fracasula merged commit acb3918 into master Apr 11, 2023
16 checks passed
@fracasula fracasula deleted the test.kafkaCompression branch April 11, 2023 08:19
Sidddddarth pushed a commit that referenced this pull request Apr 12, 2023
* feat: kafka compression

* chore: enabling compression by default only if batching is on
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants