Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream compression #306

Merged
merged 12 commits into from Jul 22, 2019
Merged

Stream compression #306

merged 12 commits into from Jul 22, 2019

Conversation

@achille-roussel
Copy link
Contributor

achille-roussel commented Jul 9, 2019

This PR modifies the compression codec API to employ io.Reader and io.Writer instead of working with byte slices. The upside is the amount of memory that gets allocated internally is greatly reduced, and we pave the way for better internal abstractions to build more efficient programs with kafka-go.

This is a breaking change in the API, but I think it's OK since these packages are intended to work with kafka-go, and programs shouldn't use these directly (maybe we'll bump to 0.3.0 on the next release?).

Here are comparisons of the compression benchmarks taken before and after:

benchmark                            old ns/op     new ns/op     delta
BenchmarkCompression/GZIP1024        48754         57090         +17.10%
BenchmarkCompression/GZIP4096        116650        110110        -5.61%
BenchmarkCompression/GZIP8192        204592        161450        -21.09%
BenchmarkCompression/GZIP16384       388270        290743        -25.12%
BenchmarkCompression/Snappy1024      533           509           -4.50%
BenchmarkCompression/Snappy4096      1287          1055          -18.03%
BenchmarkCompression/Snappy8192      2361          1732          -26.64%
BenchmarkCompression/Snappy16384     4429          3427          -22.62%
BenchmarkCompression/LZ41024         1033620       2991          -99.71%
BenchmarkCompression/LZ44096         994821        13117         -98.68%
BenchmarkCompression/LZ48192         1063436       28193         -97.35%
BenchmarkCompression/LZ416384        1059942       67983         -93.59%
BenchmarkCompression/zstd1024        11835         12423         +4.97%
BenchmarkCompression/zstd4096        23943         27353         +14.24%
BenchmarkCompression/zstd8192        39247         38990         -0.65%
BenchmarkCompression/zstd16384       70687         69344         -1.90%

benchmark                            old MB/s       new MB/s     speedup
BenchmarkCompression/GZIP1024        16.63          14.17        0.85x
BenchmarkCompression/GZIP4096        26.58          28.19        1.06x
BenchmarkCompression/GZIP8192        30.13          38.20        1.27x
BenchmarkCompression/GZIP16384       31.66          42.27        1.34x
BenchmarkCompression/Snappy1024      1927.83        2057.82      1.07x
BenchmarkCompression/Snappy4096      3185.87        3904.71      1.23x
BenchmarkCompression/Snappy8192      3471.45        4743.12      1.37x
BenchmarkCompression/Snappy16384     3700.07        4788.41      1.29x
BenchmarkCompression/LZ41024         1.01           348.62       345.17x
BenchmarkCompression/LZ44096         4.14           313.71       75.78x
BenchmarkCompression/LZ48192         7.72           291.23       37.72x
BenchmarkCompression/LZ416384        15.48          241.28       15.59x
BenchmarkCompression/zstd1024        68.44          65.20        0.95x
BenchmarkCompression/zstd4096        129.22         113.22       0.88x
BenchmarkCompression/zstd8192        156.60         157.73       1.01x
BenchmarkCompression/zstd16384       173.44         176.77       1.02x

benchmark                            old allocs     new allocs     delta
BenchmarkCompression/GZIP1024        5              4              -20.00%
BenchmarkCompression/GZIP4096        7              4              -42.86%
BenchmarkCompression/GZIP8192        8              4              -50.00%
BenchmarkCompression/GZIP16384       9              4              -55.56%
BenchmarkCompression/Snappy1024      2              2              +0.00%
BenchmarkCompression/Snappy4096      2              2              +0.00%
BenchmarkCompression/Snappy8192      2              2              +0.00%
BenchmarkCompression/Snappy16384     2              2              +0.00%
BenchmarkCompression/LZ41024         18             2              -88.89%
BenchmarkCompression/LZ44096         20             2              -90.00%
BenchmarkCompression/LZ48192         21             2              -90.48%
BenchmarkCompression/LZ416384        22             2              -90.91%
BenchmarkCompression/zstd1024        4              2              -50.00%
BenchmarkCompression/zstd4096        4              2              -50.00%
BenchmarkCompression/zstd8192        4              2              -50.00%
BenchmarkCompression/zstd16384       4              2              -50.00%

benchmark                            old bytes     new bytes     delta
BenchmarkCompression/GZIP1024        4079          110           -97.30%
BenchmarkCompression/GZIP4096        23417         153           -99.35%
BenchmarkCompression/GZIP8192        49863         154           -99.69%
BenchmarkCompression/GZIP16384       101673        243           -99.76%
BenchmarkCompression/Snappy1024      2304          16            -99.31%
BenchmarkCompression/Snappy4096      8960          16            -99.82%
BenchmarkCompression/Snappy8192      17920         16            -99.91%
BenchmarkCompression/Snappy16384     36864         16            -99.96%
BenchmarkCompression/LZ41024         17316671      68            -100.00%
BenchmarkCompression/LZ44096         17341247      278           -100.00%
BenchmarkCompression/LZ48192         17374015      541           -100.00%
BenchmarkCompression/LZ416384        17439551      1331          -99.99%
BenchmarkCompression/zstd1024        2224          65            -97.08%
BenchmarkCompression/zstd4096        9008          66            -99.27%
BenchmarkCompression/zstd8192        17712         66            -99.63%
BenchmarkCompression/zstd16384       34864         71            -99.80%

One aspect of this change that I would like to open the conversation on is framing with snappy. We used to support xerial framing on the reader side only, here I implemented framing on the writer side as well, which means the batches produces to kafka will be slightly different than what they are today. Since we were compatible on the reader side already this shouldn't be an issue, however I'm thinking we should probably make it an opt-in feature and still default to the historical behavior by default.

Another aspect is the framing mechanism used by the github.com/golang/snappy package is different than the one typically used in kafka programs (xerial), so the snappy-encoded data that we produce cannot be decoded with a *snappy.Reader. I wonder if we should offer the ability to support both framing formats.

Let me know what you think!

@achille-roussel achille-roussel requested review from stevevls, rjenkins and Pryz Jul 9, 2019
@Pryz

This comment has been minimized.

Copy link
Contributor

Pryz commented Jul 9, 2019

Maybe a good opportunity to move to https://github.com/klauspost/compress/tree/master/zstd ?

@achille-roussel

This comment has been minimized.

Copy link
Contributor Author

achille-roussel commented Jul 9, 2019

We had #303 opened recently for this, let's give it a bit of time?

@Pryz

This comment has been minimized.

Copy link
Contributor

Pryz commented Jul 9, 2019

Another aspect is the framing mechanism used by the github.com/golang/snappy package is different than the one typically used in kafka programs (xerial), so the snappy-encoded data that we produce cannot be decoded with a *snappy.Reader. I wonder if we should offer the ability to support both framing formats.

I believe the xerial framing only matter if we let Kafka doing the compression. When using compression.type this may not matter. We should do some testing tho.

@achille-roussel

This comment has been minimized.

Copy link
Contributor Author

achille-roussel commented Jul 9, 2019

Cool, probably not worth investigating how to get compatibility with the github.com/golang/snappy reader and writer then 👍

gzip/gzip.go Show resolved Hide resolved
gzip/gzip.go Outdated Show resolved Hide resolved
message.go Show resolved Hide resolved
@stevevls

This comment has been minimized.

Copy link
Contributor

stevevls commented Jul 12, 2019

Re: the xerial framing, I would take a look at the java client, librdkafka, and maybe even sarama to see what they are able to produce/consume. I think we should stick to whatever is supported by the Kafka ecosystem at large. We don't want to run into situations where folks are deploying kafka-go alongside other kafka connectors and find that they can't talk to one another! 😅

@stevevls

This comment has been minimized.

Copy link
Contributor

stevevls commented Jul 12, 2019

Re: the breaking change, I think it's totally fine. I don't think there's a strong case for people to be writing their own codecs since any compression formats require broker support and a predefined constant.

@achille-roussel

This comment has been minimized.

Copy link
Contributor Author

achille-roussel commented Jul 12, 2019

@stevevls good point on comparing with other libraries.

kafka-python makes it optional, but ON by default:
https://github.com/dpkp/kafka-python/blob/master/kafka/codec.py#L101-L124

Sarama forces Xenial framing: https://github.com/Shopify/sarama/blob/94536b3e82d393e4b5dfa36475b40fc668e9486f/compress.go#L55

librdkafka seem to never output with framing enabled: https://github.com/edenhill/librdkafka/blob/bd74dc31ba6edf431749b4ad5a00aa4fc78882fc/src/snappy.c#L1408-L1484
but supports decoding with the xerial framing: https://github.com/edenhill/librdkafka/blob/bd74dc31ba6edf431749b4ad5a00aa4fc78882fc/src/snappy.c#L1583-L1694

So I guess we're pretty much free to pick whatever approach works for us 🤷‍♂

@stevevls

This comment has been minimized.

Copy link
Contributor

stevevls commented Jul 12, 2019

Just to make things more interesting, the java client uses Xerial. That's probably how all this craziness started:

https://github.com/apache/kafka/blob/741cb761c5239297029a446518c332f6c4ed08f6/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java#L210-L214

@stevevls

This comment has been minimized.

Copy link
Contributor

stevevls commented Jul 12, 2019

Looking at the above, it seems to me that we want to support decoding both framing formats for sure.

As for encoding...it seems like the best approach for interoperability would be to encode with xerial framing. However, as you point out, that would not be strictly backwards compatible. I think one could make the argument that it shouldn't cause operational issues given that all other known clients would be able to read the xerial-framed records.

I would say that if we're more concerned with strict b/w compatibility, let's default to unframed with an option to enable xerial. Otherwise, unless there's a big advantage to supporting both, I'd suggest we make our lives simple by just supporting xerial. Admittedly, I don't know enough about snappy to know whether one is better than the other. 😄

@achille-roussel

This comment has been minimized.

Copy link
Contributor Author

achille-roussel commented Jul 13, 2019

Defaulting to "unframed" means we have to buffer the whole compressed batch in the intermediary compression buffer, so we default to the unoptimized approach, it also means we retain the historical behavior.

My understanding is Kafka will always return a framed snappy compressed batch, even if it was produced without framing (with a single frame, which is why we had to support it). Although we can do more testing to validate this behavior.

@achille-roussel

This comment has been minimized.

Copy link
Contributor Author

achille-roussel commented Jul 13, 2019

Alright, I did some cleanup of the code, made framing configurable and the default in the snappy codec, and improved usability of the benchmarks, here's what they look like now:

goos: darwin
goarch: amd64
pkg: github.com/segmentio/kafka-go
BenchmarkCompression/none/compress         	   20000	     74888 ns/op	25911.37 MB/s	      16 B/op	       1 allocs/op
BenchmarkCompression/none/decompress       	   50000	     35377 ns/op	54851.19 MB/s	      16 B/op	       1 allocs/op
BenchmarkCompression/gzip/compress         	     100	  16930136 ns/op	   7.03 MB/s	    8156 B/op	       1 allocs/op
BenchmarkCompression/gzip/decompress       	    1000	   2403485 ns/op	 807.36 MB/s	    7533 B/op	      54 allocs/op
BenchmarkCompression/snappy/compress       	    2000	   1125301 ns/op	 235.03 MB/s	      45 B/op	       1 allocs/op
BenchmarkCompression/snappy/decompress     	    2000	    604146 ns/op	3211.92 MB/s	      40 B/op	       1 allocs/op
BenchmarkCompression/lz4/compress          	     500	   2737591 ns/op	  74.10 MB/s	   17850 B/op	       1 allocs/op
BenchmarkCompression/lz4/decompress        	    2000	   1167942 ns/op	1661.44 MB/s	    4206 B/op	       1 allocs/op
BenchmarkCompression/zstd/compress         	     300	   4920094 ns/op	  24.04 MB/s	   13222 B/op	       1 allocs/op
BenchmarkCompression/zstd/decompress       	    2000	    833343 ns/op	2328.54 MB/s	    2624 B/op	       1 allocs/op
input => 1.85 MB
  none:		0.00%
  gzip:		93.86%
  snappy:	86.37%
  lz4:		89.55%
  zstd:		93.90%

PASS
ok  	github.com/segmentio/kafka-go	20.338s
Copy link
Contributor

stevevls left a comment

Looks great!

@achille-roussel achille-roussel merged commit 59f58f0 into master Jul 22, 2019
4 checks passed
4 checks passed
ci/circleci: kafka-010 Your tests passed on CircleCI!
Details
ci/circleci: kafka-011 Your tests passed on CircleCI!
Details
ci/circleci: kafka-111 Your tests passed on CircleCI!
Details
ci/circleci: kafka-210 Your tests passed on CircleCI!
Details
@achille-roussel achille-roussel deleted the stream-compression branch Jul 22, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants
You can’t perform that action at this time.