From a9076b4c99067b153a0ea78f945796425c04cb88 Mon Sep 17 00:00:00 2001 From: Francesco Casula Date: Thu, 6 Apr 2023 15:00:23 +0200 Subject: [PATCH] feat: kafka compression --- Makefile | 4 + .../client/compression_benchmark_results.txt | 90 +++++++++ .../client/compression_benchmark_test.go | 174 ++++++++++++++++++ .../streammanager/kafka/client/producer.go | 13 +- services/streammanager/kafka/kafkamanager.go | 15 ++ testhelper/destination/kafka/kafka.go | 12 ++ utils/tcpproxy/tcpproxy.go | 94 ++++++++++ 7 files changed, 401 insertions(+), 1 deletion(-) create mode 100644 services/streammanager/kafka/client/compression_benchmark_results.txt create mode 100644 services/streammanager/kafka/client/compression_benchmark_test.go create mode 100644 utils/tcpproxy/tcpproxy.go diff --git a/Makefile b/Makefile index 7177e48fa54..ef6050469ed 100644 --- a/Makefile +++ b/Makefile @@ -116,3 +116,7 @@ run-warehouse-integration: setup-warehouse-integration make cleanup-warehouse-integration; \ exit 1; \ fi + +.PHONY: bench-kafka +bench-kafka: + go test -count 1 -run BenchmarkCompression -bench=. -benchmem ./services/streammanager/kafka/client diff --git a/services/streammanager/kafka/client/compression_benchmark_results.txt b/services/streammanager/kafka/client/compression_benchmark_results.txt new file mode 100644 index 00000000000..a860d80b9b8 --- /dev/null +++ b/services/streammanager/kafka/client/compression_benchmark_results.txt @@ -0,0 +1,90 @@ +BenchmarkCompression/none/1.0kB-1-1ns-24 18363 71620 ns/op 295624.13 MB/s 0 errors 1.126 kb/op 3850 B/op 50 allocs/op +BenchmarkCompression/none/1.0kB-1-1ms-24 16392 72852 ns/op 259430.11 MB/s 0 errors 1.126 kb/op 3779 B/op 50 allocs/op +BenchmarkCompression/none/1.0kB-100-1ns-24 5077 218305 ns/op 2419835.83 MB/s 0 errors 101.6 kb/op 58750 B/op 458 allocs/op +BenchmarkCompression/none/1.0kB-100-1ms-24 4585 226962 ns/op 2101982.10 MB/s 0 errors 101.6 kb/op 58867 B/op 458 allocs/op +BenchmarkCompression/none/1.0kB-1000-1ns-24 708 1727235 ns/op 426365.58 MB/s 0 errors 1016 kb/op 681585 B/op 4113 allocs/op +BenchmarkCompression/none/1.0kB-1000-1ms-24 622 1912290 ns/op 338327.23 MB/s 0 errors 1016 kb/op 677768 B/op 4114 allocs/op +BenchmarkCompression/none/10.2kB-1-1ns-24 14065 84065 ns/op 1735179.24 MB/s 0 errors 10.13 kb/op 3785 B/op 50 allocs/op +BenchmarkCompression/none/10.2kB-1-1ms-24 15145 74437 ns/op 2110080.11 MB/s 0 errors 10.13 kb/op 3829 B/op 50 allocs/op +BenchmarkCompression/none/10.2kB-100-1ns-24 1383 853408 ns/op 1662452.74 MB/s 0 errors 1002 kb/op 56731 B/op 459 allocs/op +BenchmarkCompression/none/10.2kB-100-1ms-24 1192 846306 ns/op 1444883.84 MB/s 0 errors 1002 kb/op 57082 B/op 460 allocs/op +BenchmarkCompression/none/10.2kB-1000-1ns-24 133 10124218 ns/op 134764.07 MB/s 0 errors 10018 kb/op 1828180 B/op 4489 allocs/op +BenchmarkCompression/none/10.2kB-1000-1ms-24 124 8846420 ns/op 143793.13 MB/s 0 errors 10018 kb/op 1831709 B/op 4492 allocs/op +BenchmarkCompression/none/102.4kB-1-1ns-24 7374 137306 ns/op 5506407.79 MB/s 0 errors 100.1 kb/op 3768 B/op 50 allocs/op +BenchmarkCompression/none/102.4kB-1-1ms-24 7430 134939 ns/op 5645550.61 MB/s 0 errors 100.1 kb/op 3775 B/op 50 allocs/op +BenchmarkCompression/none/102.4kB-100-1ns-24 142 9059868 ns/op 160541.39 MB/s 0 errors 10003 kb/op 221793 B/op 860 allocs/op +BenchmarkCompression/none/102.4kB-100-1ms-24 135 8767735 ns/op 157712.79 MB/s 0 errors 10003 kb/op 222660 B/op 869 allocs/op +BenchmarkCompression/none/102.4kB-1000-1ns-24 14 81213347 ns/op 17657.17 MB/s 0 errors 100028 kb/op 14697012 B/op 8398 allocs/op +BenchmarkCompression/none/102.4kB-1000-1ms-24 14 76716586 ns/op 18692.15 MB/s 0 errors 100028 kb/op 14711957 B/op 8468 allocs/op +BenchmarkCompression/gzip/1.0kB-1-1ns-24 13765 81353 ns/op 199324.06 MB/s 0 errors 1.150 kb/op 7308 B/op 51 allocs/op +BenchmarkCompression/gzip/1.0kB-1-1ms-24 14481 81059 ns/op 210457.94 MB/s 0 errors 1.150 kb/op 6920 B/op 51 allocs/op +BenchmarkCompression/gzip/1.0kB-100-1ns-24 3396 348371 ns/op 18621.08 MB/s 0 errors 1.865 kb/op 103445 B/op 461 allocs/op +BenchmarkCompression/gzip/1.0kB-100-1ms-24 3476 358825 ns/op 18504.75 MB/s 0 errors 1.865 kb/op 99236 B/op 461 allocs/op +BenchmarkCompression/gzip/1.0kB-1000-1ns-24 469 2491430 ns/op 2031.34 MB/s 0 errors 10.54 kb/op 966421 B/op 4121 allocs/op +BenchmarkCompression/gzip/1.0kB-1000-1ms-24 502 2452866 ns/op 2208.68 MB/s 0 errors 10.54 kb/op 1021873 B/op 4127 allocs/op +BenchmarkCompression/gzip/10.2kB-1-1ns-24 13184 97022 ns/op 1412689.83 MB/s 0 errors 10.15 kb/op 6481 B/op 51 allocs/op +BenchmarkCompression/gzip/10.2kB-1-1ms-24 11881 95080 ns/op 1299067.48 MB/s 0 errors 10.15 kb/op 6707 B/op 51 allocs/op +BenchmarkCompression/gzip/10.2kB-100-1ns-24 633 2234347 ns/op 4306.78 MB/s 0 errors 14.85 kb/op 107912 B/op 472 allocs/op +BenchmarkCompression/gzip/10.2kB-100-1ms-24 714 1511212 ns/op 7182.03 MB/s 0 errors 14.84 kb/op 107866 B/op 466 allocs/op +BenchmarkCompression/gzip/10.2kB-1000-1ns-24 67 17227485 ns/op 590.95 MB/s 0 errors 148.4 kb/op 2235967 B/op 4559 allocs/op +BenchmarkCompression/gzip/10.2kB-1000-1ms-24 73 16939099 ns/op 654.83 MB/s 0 errors 148.4 kb/op 2264393 B/op 4575 allocs/op +BenchmarkCompression/gzip/102.4kB-1-1ns-24 5578 185513 ns/op 3083797.55 MB/s 0 errors 100.2 kb/op 5801 B/op 51 allocs/op +BenchmarkCompression/gzip/102.4kB-1-1ms-24 7047 177071 ns/op 4081684.27 MB/s 0 errors 100.2 kb/op 5294 B/op 52 allocs/op +BenchmarkCompression/gzip/102.4kB-100-1ns-24 94 13147624 ns/op 73239.23 MB/s 0 errors 10004 kb/op 411992 B/op 910 allocs/op +BenchmarkCompression/gzip/102.4kB-100-1ms-24 98 11490708 ns/op 87366.01 MB/s 0 errors 10004 kb/op 358082 B/op 899 allocs/op +BenchmarkCompression/gzip/102.4kB-1000-1ns-24 9 115049381 ns/op 8013.48 MB/s 0 errors 100038 kb/op 15899084 B/op 8932 allocs/op +BenchmarkCompression/gzip/102.4kB-1000-1ms-24 9 111450807 ns/op 8272.23 MB/s 0 errors 100038 kb/op 16112782 B/op 9003 allocs/op +BenchmarkCompression/snappy/1.0kB-1-1ns-24 17739 67341 ns/op 310325.75 MB/s 0 errors 1.150 kb/op 3891 B/op 51 allocs/op +BenchmarkCompression/snappy/1.0kB-1-1ms-24 17096 68793 ns/op 292762.32 MB/s 0 errors 1.150 kb/op 3776 B/op 51 allocs/op +BenchmarkCompression/snappy/1.0kB-100-1ns-24 4754 255922 ns/op 180507.16 MB/s 0 errors 9.489 kb/op 61871 B/op 460 allocs/op +BenchmarkCompression/snappy/1.0kB-100-1ms-24 4740 249785 ns/op 184398.30 MB/s 0 errors 9.490 kb/op 61145 B/op 461 allocs/op +BenchmarkCompression/snappy/1.0kB-1000-1ns-24 716 1793461 ns/op 36010.05 MB/s 0 errors 88.09 kb/op 727427 B/op 4121 allocs/op +BenchmarkCompression/snappy/1.0kB-1000-1ms-24 600 1947027 ns/op 27796.39 MB/s 0 errors 88.09 kb/op 726621 B/op 4129 allocs/op +BenchmarkCompression/snappy/10.2kB-1-1ns-24 12513 85834 ns/op 1515564.80 MB/s 0 errors 10.15 kb/op 4011 B/op 51 allocs/op +BenchmarkCompression/snappy/10.2kB-1-1ms-24 12354 82170 ns/op 1563008.36 MB/s 0 errors 10.15 kb/op 3920 B/op 51 allocs/op +BenchmarkCompression/snappy/10.2kB-100-1ns-24 1044 1167081 ns/op 325914.71 MB/s 0 errors 355.8 kb/op 58352 B/op 465 allocs/op +BenchmarkCompression/snappy/10.2kB-100-1ms-24 974 1110879 ns/op 319445.85 MB/s 0 errors 355.8 kb/op 58961 B/op 467 allocs/op +BenchmarkCompression/snappy/10.2kB-1000-1ns-24 80 12709507 ns/op 22530.51 MB/s 0 errors 3496 kb/op 1857219 B/op 4579 allocs/op +BenchmarkCompression/snappy/10.2kB-1000-1ms-24 93 13124410 ns/op 25363.72 MB/s 0 errors 3496 kb/op 1846332 B/op 4581 allocs/op +BenchmarkCompression/snappy/102.4kB-1-1ns-24 6392 204222 ns/op 3210876.66 MB/s 0 errors 100.2 kb/op 3861 B/op 51 allocs/op +BenchmarkCompression/snappy/102.4kB-1-1ms-24 6447 188104 ns/op 3516006.25 MB/s 0 errors 100.2 kb/op 3950 B/op 52 allocs/op +BenchmarkCompression/snappy/102.4kB-100-1ns-24 92 12884794 ns/op 73159.94 MB/s 0 errors 10006 kb/op 241452 B/op 944 allocs/op +BenchmarkCompression/snappy/102.4kB-100-1ms-24 90 14210870 ns/op 64891.09 MB/s 0 errors 10006 kb/op 240694 B/op 964 allocs/op +BenchmarkCompression/snappy/102.4kB-1000-1ns-24 8 125019549 ns/op 6556.55 MB/s 0 errors 100061 kb/op 14924641 B/op 9228 allocs/op +BenchmarkCompression/snappy/102.4kB-1000-1ms-24 9 136035164 ns/op 6778.83 MB/s 0 errors 100061 kb/op 14898059 B/op 9675 allocs/op +BenchmarkCompression/lz4/1.0kB-1-1ns-24 15775 72391 ns/op 255415.44 MB/s 0 errors 1.145 kb/op 10580 B/op 54 allocs/op +BenchmarkCompression/lz4/1.0kB-1-1ms-24 16063 71881 ns/op 261920.19 MB/s 0 errors 1.145 kb/op 8020 B/op 54 allocs/op +BenchmarkCompression/lz4/1.0kB-100-1ns-24 6225 190480 ns/op 66319.05 MB/s 0 errors 1.982 kb/op 112572 B/op 463 allocs/op +BenchmarkCompression/lz4/1.0kB-100-1ms-24 6112 192383 ns/op 64470.18 MB/s 0 errors 1.982 kb/op 103129 B/op 463 allocs/op +BenchmarkCompression/lz4/1.0kB-1000-1ns-24 878 1380452 ns/op 11150.04 MB/s 0 errors 17.12 kb/op 1323802 B/op 4128 allocs/op +BenchmarkCompression/lz4/1.0kB-1000-1ms-24 552 2158364 ns/op 4483.68 MB/s 0 errors 17.12 kb/op 1260947 B/op 4133 allocs/op +BenchmarkCompression/lz4/10.2kB-1-1ns-24 13184 90086 ns/op 1520591.37 MB/s 0 errors 10.15 kb/op 7394 B/op 54 allocs/op +BenchmarkCompression/lz4/10.2kB-1-1ms-24 14508 84872 ns/op 1776093.60 MB/s 0 errors 10.15 kb/op 8494 B/op 54 allocs/op +BenchmarkCompression/lz4/10.2kB-100-1ns-24 1860 648775 ns/op 72157.71 MB/s 0 errors 24.58 kb/op 100525 B/op 467 allocs/op +BenchmarkCompression/lz4/10.2kB-100-1ms-24 1537 811082 ns/op 47695.30 MB/s 0 errors 24.58 kb/op 148374 B/op 469 allocs/op +BenchmarkCompression/lz4/10.2kB-1000-1ns-24 171 7372614 ns/op 5837.79 MB/s 0 errors 245.8 kb/op 3158692 B/op 4606 allocs/op +BenchmarkCompression/lz4/10.2kB-1000-1ms-24 162 7302749 ns/op 5583.36 MB/s 0 errors 245.8 kb/op 3181724 B/op 4579 allocs/op +BenchmarkCompression/lz4/102.4kB-1-1ns-24 7178 157404 ns/op 4676524.13 MB/s 0 errors 100.1 kb/op 11099 B/op 55 allocs/op +BenchmarkCompression/lz4/102.4kB-1-1ms-24 6600 170318 ns/op 3973944.27 MB/s 0 errors 100.1 kb/op 6453 B/op 56 allocs/op +BenchmarkCompression/lz4/102.4kB-100-1ns-24 98 11971963 ns/op 83847.45 MB/s 0 errors 10003 kb/op 620253 B/op 981 allocs/op +BenchmarkCompression/lz4/102.4kB-100-1ms-24 98 11523122 ns/op 87113.46 MB/s 0 errors 10003 kb/op 314306 B/op 986 allocs/op +BenchmarkCompression/lz4/102.4kB-1000-1ns-24 9 115211591 ns/op 8001.58 MB/s 0 errors 100030 kb/op 22132883 B/op 9952 allocs/op +BenchmarkCompression/lz4/102.4kB-1000-1ms-24 9 114272650 ns/op 8067.33 MB/s 0 errors 100030 kb/op 19214946 B/op 10023 allocs/op +BenchmarkCompression/zstd/1.0kB-1-1ns-24 6436 162081 ns/op 36701.88 MB/s 0 errors 0.9026 kb/op 3670 B/op 52 allocs/op +BenchmarkCompression/zstd/1.0kB-1-1ms-24 11448 89854 ns/op 117745.54 MB/s 0 errors 0.9025 kb/op 7834 B/op 52 allocs/op +BenchmarkCompression/zstd/1.0kB-100-1ns-24 4773 248584 ns/op 23720.74 MB/s 0 errors 1.206 kb/op 142631 B/op 463 allocs/op +BenchmarkCompression/zstd/1.0kB-100-1ms-24 5300 232631 ns/op 28146.70 MB/s 0 errors 1.206 kb/op 128500 B/op 463 allocs/op +BenchmarkCompression/zstd/1.0kB-1000-1ns-24 799 1670588 ns/op 2150.67 MB/s 0 errors 4.391 kb/op 2279023 B/op 4134 allocs/op +BenchmarkCompression/zstd/1.0kB-1000-1ms-24 476 2153281 ns/op 994.32 MB/s 0 errors 4.393 kb/op 1675134 B/op 4140 allocs/op +BenchmarkCompression/zstd/10.2kB-1-1ns-24 9764 128877 ns/op 573542.37 MB/s 0 errors 7.393 kb/op 9354 B/op 52 allocs/op +BenchmarkCompression/zstd/10.2kB-1-1ms-24 9652 118880 ns/op 614639.16 MB/s 0 errors 7.393 kb/op 7251 B/op 52 allocs/op +BenchmarkCompression/zstd/10.2kB-100-1ns-24 1737 680276 ns/op 20810.71 MB/s 0 errors 7.959 kb/op 119513 B/op 468 allocs/op +BenchmarkCompression/zstd/10.2kB-100-1ms-24 1495 698771 ns/op 17437.76 MB/s 0 errors 7.959 kb/op 142446 B/op 469 allocs/op +BenchmarkCompression/zstd/10.2kB-1000-1ns-24 145 7667982 ns/op 1540.89 MB/s 0 errors 79.58 kb/op 6165529 B/op 4603 allocs/op +BenchmarkCompression/zstd/10.2kB-1000-1ms-24 157 8209581 ns/op 1558.35 MB/s 0 errors 79.58 kb/op 4289406 B/op 4606 allocs/op +BenchmarkCompression/zstd/102.4kB-1-1ns-24 2758 412986 ns/op 495434.27 MB/s 0 errors 72.45 kb/op 3896 B/op 55 allocs/op +BenchmarkCompression/zstd/102.4kB-1-1ms-24 2414 451970 ns/op 396237.04 MB/s 0 errors 72.45 kb/op 7063 B/op 57 allocs/op +BenchmarkCompression/zstd/102.4kB-100-1ns-24 106 11229176 ns/op 8143.01 MB/s 0 errors 842.4 kb/op 572094 B/op 1019 allocs/op +BenchmarkCompression/zstd/102.4kB-100-1ms-24 100 11246398 ns/op 7670.28 MB/s 0 errors 842.4 kb/op 2464017 B/op 994 allocs/op +BenchmarkCompression/zstd/102.4kB-1000-1ns-24 10 110627236 ns/op 779.77 MB/s 0 errors 8424 kb/op 25910839 B/op 9967 allocs/op +BenchmarkCompression/zstd/102.4kB-1000-1ms-24 10 109110192 ns/op 790.61 MB/s 0 errors 8424 kb/op 35261769 B/op 10063 allocs/op diff --git a/services/streammanager/kafka/client/compression_benchmark_test.go b/services/streammanager/kafka/client/compression_benchmark_test.go new file mode 100644 index 00000000000..d8a29ae8682 --- /dev/null +++ b/services/streammanager/kafka/client/compression_benchmark_test.go @@ -0,0 +1,174 @@ +package client + +import ( + "context" + "fmt" + "strconv" + "testing" + "time" + + "github.com/ory/dockertest/v3" + "github.com/stretchr/testify/require" + + "github.com/rudderlabs/rudder-go-kit/testhelper" + "github.com/rudderlabs/rudder-go-kit/testhelper/rand" + "github.com/rudderlabs/rudder-server/services/streammanager/kafka/client/testutil" + "github.com/rudderlabs/rudder-server/testhelper/destination/kafka" + "github.com/rudderlabs/rudder-server/utils/tcpproxy" +) + +func BenchmarkCompression(b *testing.B) { + proxyPort, err := testhelper.GetFreePort() + require.NoError(b, err) + + var ( + ctx = context.Background() + topic = "foo_bar_topic" + proxyHost = "localhost:" + strconv.Itoa(proxyPort) + ) + + setupKafka := func(b *testing.B) string { + pool, err := dockertest.NewPool("") + require.NoError(b, err) + + kafkaContainer, err := kafka.Setup(pool, b, kafka.WithCustomAdvertisedListener(proxyHost)) + require.NoError(b, err) + + return "localhost:" + kafkaContainer.Ports[0] + } + + setupProxy := func(b *testing.B, kafkaAddr string, c Compression, bs int, bt time.Duration) ( + *tcpproxy.Proxy, + *Producer, + ) { + proxy := &tcpproxy.Proxy{ + LocalAddr: proxyHost, + RemoteAddr: kafkaAddr, + } + go proxy.Start(b) + + client, err := New("tcp", []string{proxy.LocalAddr}, Config{}) + require.NoError(b, err) + require.Eventuallyf(b, func() bool { + err = client.Ping(ctx) + return err == nil + }, 30*time.Second, 100*time.Millisecond, "failed to connect to kafka: %v", err) + + producer, err := client.NewProducer(ProducerConfig{ + Compression: c, + BatchSize: bs, + BatchTimeout: bt, + }) + require.NoError(b, err) + + return proxy, producer + } + + run := func(addr string, comp Compression, value string, batchSize int, batchTimeout time.Duration) func(*testing.B) { + return func(b *testing.B) { + proxy, producer := setupProxy(b, addr, comp, batchSize, batchTimeout) + + kafkaCtx, kafkaCtxCancel := context.WithTimeout(context.Background(), 3*time.Minute) + err = waitForKafka(kafkaCtx, topic, addr) + kafkaCtxCancel() + require.NoError(b, err) + + var ( + noOfErrors int + messages = make([]Message, 0, batchSize) + ) + for i := 0; i < batchSize; i++ { + messages = append(messages, Message{ + Key: []byte("my-key"), + Value: []byte(value), + Topic: topic, + }) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + if err := producer.Publish(ctx, messages...); err != nil { + noOfErrors++ + } + } + b.StopTimer() + + _ = producer.Close(ctx) + proxy.Stop() // stopping the proxy here to properly gather the metrics + + b.SetBytes(proxy.BytesReceived.Load()) + b.ReportMetric(float64(proxy.BytesReceived.Load())/float64(b.N)/1024, "kb/op") + b.ReportMetric(float64(noOfErrors), "errors") + } + } + + var ( + compressionTypes = []Compression{CompressionNone, CompressionGzip, CompressionSnappy, CompressionLz4, CompressionZstd} + compressionTypesMap = map[Compression]string{ + CompressionNone: "none", CompressionGzip: "gzip", CompressionSnappy: "snappy", CompressionLz4: "lz4", CompressionZstd: "zstd", + } + batchSizes = []int{1, 100, 1000} + batchTimeouts = []time.Duration{time.Nanosecond, time.Millisecond} + values = []string{rand.String(1 << 10), rand.String(10 << 10), rand.String(100 << 10)} + ) + for _, comp := range compressionTypes { + b.Run(compressionTypesMap[comp], func(b *testing.B) { + kafkaAddr := setupKafka(b) // setup kafka only once per compression combination + for _, value := range values { + for _, batchSize := range batchSizes { + for _, batchTimeout := range batchTimeouts { + b.Run( + fmt.Sprintf("%s-%d-%s", byteCount(len(value)), batchSize, batchTimeout), + run(kafkaAddr, comp, value, batchSize, batchTimeout), + ) + } + } + } + }) + } +} + +func byteCount(b int) string { + const unit = 1000 + if b < unit { + return fmt.Sprintf("%dB", b) + } + div, exp := int64(unit), 0 + for n := b / unit; n >= unit; n /= unit { + div *= unit + exp++ + } + return fmt.Sprintf("%.1f%cB", + float64(b)/float64(div), "kMGTPE"[exp]) +} + +func waitForKafka(ctx context.Context, topic, addr string) (err error) { + tc := testutil.New("tcp", addr) + for { + select { + case <-ctx.Done(): + return fmt.Errorf("kafka not ready within context (%v): %v", ctx.Err(), err) + case <-time.After(50 * time.Millisecond): + var topics []testutil.TopicPartition + topics, err = tc.ListTopics(ctx) + if err != nil { + continue + } + + var found bool + for _, top := range topics { + if top.Topic == topic { + found = true + break + } + } + if found { + return nil + } + + if err = tc.CreateTopic(ctx, topic, 1, 1); err != nil { + continue + } + } + } +} diff --git a/services/streammanager/kafka/client/producer.go b/services/streammanager/kafka/client/producer.go index 81bcc7f88dc..566d97a7733 100644 --- a/services/streammanager/kafka/client/producer.go +++ b/services/streammanager/kafka/client/producer.go @@ -12,12 +12,23 @@ import ( "github.com/segmentio/kafka-go" ) +type Compression = kafka.Compression + +const ( + CompressionNone Compression = 0 + CompressionGzip Compression = kafka.Gzip + CompressionSnappy Compression = kafka.Snappy + CompressionLz4 Compression = kafka.Lz4 + CompressionZstd Compression = kafka.Zstd +) + type ProducerConfig struct { ClientID string WriteTimeout, ReadTimeout, BatchTimeout time.Duration BatchSize int + Compression Compression Logger Logger ErrorLogger Logger } @@ -85,7 +96,7 @@ func (c *Client) NewProducer(producerConf ProducerConfig) (p *Producer, err erro RequiredAcks: kafka.RequireAll, AllowAutoTopicCreation: true, Async: false, - Compression: 0, + Compression: producerConf.Compression, Transport: transport, }, } diff --git a/services/streammanager/kafka/kafkamanager.go b/services/streammanager/kafka/kafkamanager.go index 489871fec61..add9c80c710 100644 --- a/services/streammanager/kafka/kafkamanager.go +++ b/services/streammanager/kafka/kafkamanager.go @@ -181,6 +181,7 @@ var ( kafkaBatchTimeout = defaultBatchTimeout kafkaBatchSize = defaultBatchSize kafkaBatchingEnabled bool + kafkaCompression client.Compression allowReqsWithoutUserIDAndAnonymousID bool kafkaStats managerStats @@ -234,6 +235,19 @@ func Init() { ) } + kafkaCompression = client.CompressionZstd + if kc := config.GetInt("Router.kafkaCompression", -1); kc != -1 { + switch client.Compression(kc) { + case client.CompressionNone, + client.CompressionGzip, + client.CompressionSnappy, + client.CompressionLz4: + kafkaCompression = client.Compression(kc) + default: + pkgLogger.Errorf("Invalid Kafka compression codec: %d", kc) + } + } + kafkaStats = managerStats{ creationTime: stats.Default.NewStat("router.kafka.creation_time", stats.TimerType), creationTimeConfluentCloud: stats.Default.NewStat("router.kafka.creation_time_confluent_cloud", stats.TimerType), @@ -722,6 +736,7 @@ func newProducerConfig() client.ProducerConfig { pc := client.ProducerConfig{ ReadTimeout: kafkaReadTimeout, WriteTimeout: kafkaWriteTimeout, + Compression: kafkaCompression, Logger: &client.KafkaLogger{Logger: pkgLogger}, ErrorLogger: &client.KafkaLogger{Logger: pkgLogger, IsErrorLogger: true}, } diff --git a/testhelper/destination/kafka/kafka.go b/testhelper/destination/kafka/kafka.go index 7162ee32d6e..24d154b6ad7 100644 --- a/testhelper/destination/kafka/kafka.go +++ b/testhelper/destination/kafka/kafka.go @@ -51,6 +51,7 @@ type config struct { saslConfig *SASLConfig network *dc.Network dontUseDockerHostListeners bool + customAdvertisedListener string useSchemaRegistry bool } @@ -113,6 +114,13 @@ func WithoutDockerHostListeners() Option { }} } +// WithCustomAdvertisedListener allows to set a custom advertised listener +func WithCustomAdvertisedListener(listener string) Option { + return withOption{setup: func(c *config) { + c.customAdvertisedListener = listener + }} +} + // WithSchemaRegistry allows to use the schema registry func WithSchemaRegistry() Option { return withOption{setup: func(c *config) { @@ -330,6 +338,10 @@ func Setup(pool *dockertest.Pool, cln destination.Cleaner, opts ...Option) (*Res nodeEnvVars = append(nodeEnvVars, "KAFKA_CFG_ADVERTISED_LISTENERS="+fmt.Sprintf( "INTERNAL://%s:9090,CLIENT://%s:%s", hostname, hostname, kafkaClientPort, )) + } else if c.customAdvertisedListener != "" { + nodeEnvVars = append(nodeEnvVars, "KAFKA_CFG_ADVERTISED_LISTENERS="+fmt.Sprintf( + "INTERNAL://%s:9090,CLIENT://%s", hostname, c.customAdvertisedListener, + )) } else { nodeEnvVars = append(nodeEnvVars, "KAFKA_CFG_ADVERTISED_LISTENERS="+fmt.Sprintf( "INTERNAL://%s:9090,CLIENT://localhost:%d", hostname, localhostPortInt, diff --git a/utils/tcpproxy/tcpproxy.go b/utils/tcpproxy/tcpproxy.go new file mode 100644 index 00000000000..0d9fdaf3a9f --- /dev/null +++ b/utils/tcpproxy/tcpproxy.go @@ -0,0 +1,94 @@ +package tcpproxy + +import ( + "io" + "net" + "os" + "sync" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/require" +) + +type Proxy struct { + LocalAddr string + RemoteAddr string + BytesSent atomic.Int64 + BytesReceived atomic.Int64 + Verbose bool + + wg sync.WaitGroup + stop chan struct{} +} + +func (p *Proxy) Start(t testing.TB) { + p.wg.Add(1) + defer p.wg.Done() + + listener, err := net.Listen("tcp", p.LocalAddr) + require.NoError(t, err) + + p.stop = make(chan struct{}) + p.wg.Add(1) + go func() { + <-p.stop + _ = listener.Close() + p.wg.Done() + }() + + for { + select { + case <-p.stop: + return + + default: + connRcv, err := listener.Accept() + if err != nil { + continue // error accepting connection + } + + p.wg.Add(1) + go func() { + defer p.wg.Done() + defer func() { _ = connRcv.Close() }() + + connSend, err := net.Dial("tcp", p.RemoteAddr) + if err != nil { + t.Logf("Cannot dial remote: %v", err) + return // cannot dial remote, return and listen for new connections + } + + defer func() { _ = connSend.Close() }() + + p.wg.Add(2) + done := make(chan struct{}, 2) + go p.pipe(connRcv, connSend, &p.BytesReceived, done) + go p.pipe(connSend, connRcv, &p.BytesSent, done) + select { + case <-done: // one of the connections got terminated + case <-p.stop: // TCP proxy stopped + } + }() + } + } +} + +func (p *Proxy) Stop() { + close(p.stop) + p.wg.Wait() +} + +func (p *Proxy) pipe(src io.Reader, dst io.Writer, bytesMetric *atomic.Int64, done chan struct{}) { + defer p.wg.Done() + + wrt, rdr := dst, src + if p.Verbose { + wrt = os.Stdout + rdr = io.TeeReader(src, dst) + } + n, _ := io.Copy(wrt, rdr) // this is a blocking call, it terminates when the connection is closed + bytesMetric.Add(n) + + done <- struct{}{} // connection is closed, send signal to stop proxy +}