-
Notifications
You must be signed in to change notification settings - Fork 297
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
7 changed files
with
401 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
90 changes: 90 additions & 0 deletions
90
services/streammanager/kafka/client/compression_benchmark_results.txt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
BenchmarkCompression/none/1.0kB-1-1ns-24 18363 71620 ns/op 295624.13 MB/s 0 errors 1.126 kb/op 3850 B/op 50 allocs/op | ||
BenchmarkCompression/none/1.0kB-1-1ms-24 16392 72852 ns/op 259430.11 MB/s 0 errors 1.126 kb/op 3779 B/op 50 allocs/op | ||
BenchmarkCompression/none/1.0kB-100-1ns-24 5077 218305 ns/op 2419835.83 MB/s 0 errors 101.6 kb/op 58750 B/op 458 allocs/op | ||
BenchmarkCompression/none/1.0kB-100-1ms-24 4585 226962 ns/op 2101982.10 MB/s 0 errors 101.6 kb/op 58867 B/op 458 allocs/op | ||
BenchmarkCompression/none/1.0kB-1000-1ns-24 708 1727235 ns/op 426365.58 MB/s 0 errors 1016 kb/op 681585 B/op 4113 allocs/op | ||
BenchmarkCompression/none/1.0kB-1000-1ms-24 622 1912290 ns/op 338327.23 MB/s 0 errors 1016 kb/op 677768 B/op 4114 allocs/op | ||
BenchmarkCompression/none/10.2kB-1-1ns-24 14065 84065 ns/op 1735179.24 MB/s 0 errors 10.13 kb/op 3785 B/op 50 allocs/op | ||
BenchmarkCompression/none/10.2kB-1-1ms-24 15145 74437 ns/op 2110080.11 MB/s 0 errors 10.13 kb/op 3829 B/op 50 allocs/op | ||
BenchmarkCompression/none/10.2kB-100-1ns-24 1383 853408 ns/op 1662452.74 MB/s 0 errors 1002 kb/op 56731 B/op 459 allocs/op | ||
BenchmarkCompression/none/10.2kB-100-1ms-24 1192 846306 ns/op 1444883.84 MB/s 0 errors 1002 kb/op 57082 B/op 460 allocs/op | ||
BenchmarkCompression/none/10.2kB-1000-1ns-24 133 10124218 ns/op 134764.07 MB/s 0 errors 10018 kb/op 1828180 B/op 4489 allocs/op | ||
BenchmarkCompression/none/10.2kB-1000-1ms-24 124 8846420 ns/op 143793.13 MB/s 0 errors 10018 kb/op 1831709 B/op 4492 allocs/op | ||
BenchmarkCompression/none/102.4kB-1-1ns-24 7374 137306 ns/op 5506407.79 MB/s 0 errors 100.1 kb/op 3768 B/op 50 allocs/op | ||
BenchmarkCompression/none/102.4kB-1-1ms-24 7430 134939 ns/op 5645550.61 MB/s 0 errors 100.1 kb/op 3775 B/op 50 allocs/op | ||
BenchmarkCompression/none/102.4kB-100-1ns-24 142 9059868 ns/op 160541.39 MB/s 0 errors 10003 kb/op 221793 B/op 860 allocs/op | ||
BenchmarkCompression/none/102.4kB-100-1ms-24 135 8767735 ns/op 157712.79 MB/s 0 errors 10003 kb/op 222660 B/op 869 allocs/op | ||
BenchmarkCompression/none/102.4kB-1000-1ns-24 14 81213347 ns/op 17657.17 MB/s 0 errors 100028 kb/op 14697012 B/op 8398 allocs/op | ||
BenchmarkCompression/none/102.4kB-1000-1ms-24 14 76716586 ns/op 18692.15 MB/s 0 errors 100028 kb/op 14711957 B/op 8468 allocs/op | ||
BenchmarkCompression/gzip/1.0kB-1-1ns-24 13765 81353 ns/op 199324.06 MB/s 0 errors 1.150 kb/op 7308 B/op 51 allocs/op | ||
BenchmarkCompression/gzip/1.0kB-1-1ms-24 14481 81059 ns/op 210457.94 MB/s 0 errors 1.150 kb/op 6920 B/op 51 allocs/op | ||
BenchmarkCompression/gzip/1.0kB-100-1ns-24 3396 348371 ns/op 18621.08 MB/s 0 errors 1.865 kb/op 103445 B/op 461 allocs/op | ||
BenchmarkCompression/gzip/1.0kB-100-1ms-24 3476 358825 ns/op 18504.75 MB/s 0 errors 1.865 kb/op 99236 B/op 461 allocs/op | ||
BenchmarkCompression/gzip/1.0kB-1000-1ns-24 469 2491430 ns/op 2031.34 MB/s 0 errors 10.54 kb/op 966421 B/op 4121 allocs/op | ||
BenchmarkCompression/gzip/1.0kB-1000-1ms-24 502 2452866 ns/op 2208.68 MB/s 0 errors 10.54 kb/op 1021873 B/op 4127 allocs/op | ||
BenchmarkCompression/gzip/10.2kB-1-1ns-24 13184 97022 ns/op 1412689.83 MB/s 0 errors 10.15 kb/op 6481 B/op 51 allocs/op | ||
BenchmarkCompression/gzip/10.2kB-1-1ms-24 11881 95080 ns/op 1299067.48 MB/s 0 errors 10.15 kb/op 6707 B/op 51 allocs/op | ||
BenchmarkCompression/gzip/10.2kB-100-1ns-24 633 2234347 ns/op 4306.78 MB/s 0 errors 14.85 kb/op 107912 B/op 472 allocs/op | ||
BenchmarkCompression/gzip/10.2kB-100-1ms-24 714 1511212 ns/op 7182.03 MB/s 0 errors 14.84 kb/op 107866 B/op 466 allocs/op | ||
BenchmarkCompression/gzip/10.2kB-1000-1ns-24 67 17227485 ns/op 590.95 MB/s 0 errors 148.4 kb/op 2235967 B/op 4559 allocs/op | ||
BenchmarkCompression/gzip/10.2kB-1000-1ms-24 73 16939099 ns/op 654.83 MB/s 0 errors 148.4 kb/op 2264393 B/op 4575 allocs/op | ||
BenchmarkCompression/gzip/102.4kB-1-1ns-24 5578 185513 ns/op 3083797.55 MB/s 0 errors 100.2 kb/op 5801 B/op 51 allocs/op | ||
BenchmarkCompression/gzip/102.4kB-1-1ms-24 7047 177071 ns/op 4081684.27 MB/s 0 errors 100.2 kb/op 5294 B/op 52 allocs/op | ||
BenchmarkCompression/gzip/102.4kB-100-1ns-24 94 13147624 ns/op 73239.23 MB/s 0 errors 10004 kb/op 411992 B/op 910 allocs/op | ||
BenchmarkCompression/gzip/102.4kB-100-1ms-24 98 11490708 ns/op 87366.01 MB/s 0 errors 10004 kb/op 358082 B/op 899 allocs/op | ||
BenchmarkCompression/gzip/102.4kB-1000-1ns-24 9 115049381 ns/op 8013.48 MB/s 0 errors 100038 kb/op 15899084 B/op 8932 allocs/op | ||
BenchmarkCompression/gzip/102.4kB-1000-1ms-24 9 111450807 ns/op 8272.23 MB/s 0 errors 100038 kb/op 16112782 B/op 9003 allocs/op | ||
BenchmarkCompression/snappy/1.0kB-1-1ns-24 17739 67341 ns/op 310325.75 MB/s 0 errors 1.150 kb/op 3891 B/op 51 allocs/op | ||
BenchmarkCompression/snappy/1.0kB-1-1ms-24 17096 68793 ns/op 292762.32 MB/s 0 errors 1.150 kb/op 3776 B/op 51 allocs/op | ||
BenchmarkCompression/snappy/1.0kB-100-1ns-24 4754 255922 ns/op 180507.16 MB/s 0 errors 9.489 kb/op 61871 B/op 460 allocs/op | ||
BenchmarkCompression/snappy/1.0kB-100-1ms-24 4740 249785 ns/op 184398.30 MB/s 0 errors 9.490 kb/op 61145 B/op 461 allocs/op | ||
BenchmarkCompression/snappy/1.0kB-1000-1ns-24 716 1793461 ns/op 36010.05 MB/s 0 errors 88.09 kb/op 727427 B/op 4121 allocs/op | ||
BenchmarkCompression/snappy/1.0kB-1000-1ms-24 600 1947027 ns/op 27796.39 MB/s 0 errors 88.09 kb/op 726621 B/op 4129 allocs/op | ||
BenchmarkCompression/snappy/10.2kB-1-1ns-24 12513 85834 ns/op 1515564.80 MB/s 0 errors 10.15 kb/op 4011 B/op 51 allocs/op | ||
BenchmarkCompression/snappy/10.2kB-1-1ms-24 12354 82170 ns/op 1563008.36 MB/s 0 errors 10.15 kb/op 3920 B/op 51 allocs/op | ||
BenchmarkCompression/snappy/10.2kB-100-1ns-24 1044 1167081 ns/op 325914.71 MB/s 0 errors 355.8 kb/op 58352 B/op 465 allocs/op | ||
BenchmarkCompression/snappy/10.2kB-100-1ms-24 974 1110879 ns/op 319445.85 MB/s 0 errors 355.8 kb/op 58961 B/op 467 allocs/op | ||
BenchmarkCompression/snappy/10.2kB-1000-1ns-24 80 12709507 ns/op 22530.51 MB/s 0 errors 3496 kb/op 1857219 B/op 4579 allocs/op | ||
BenchmarkCompression/snappy/10.2kB-1000-1ms-24 93 13124410 ns/op 25363.72 MB/s 0 errors 3496 kb/op 1846332 B/op 4581 allocs/op | ||
BenchmarkCompression/snappy/102.4kB-1-1ns-24 6392 204222 ns/op 3210876.66 MB/s 0 errors 100.2 kb/op 3861 B/op 51 allocs/op | ||
BenchmarkCompression/snappy/102.4kB-1-1ms-24 6447 188104 ns/op 3516006.25 MB/s 0 errors 100.2 kb/op 3950 B/op 52 allocs/op | ||
BenchmarkCompression/snappy/102.4kB-100-1ns-24 92 12884794 ns/op 73159.94 MB/s 0 errors 10006 kb/op 241452 B/op 944 allocs/op | ||
BenchmarkCompression/snappy/102.4kB-100-1ms-24 90 14210870 ns/op 64891.09 MB/s 0 errors 10006 kb/op 240694 B/op 964 allocs/op | ||
BenchmarkCompression/snappy/102.4kB-1000-1ns-24 8 125019549 ns/op 6556.55 MB/s 0 errors 100061 kb/op 14924641 B/op 9228 allocs/op | ||
BenchmarkCompression/snappy/102.4kB-1000-1ms-24 9 136035164 ns/op 6778.83 MB/s 0 errors 100061 kb/op 14898059 B/op 9675 allocs/op | ||
BenchmarkCompression/lz4/1.0kB-1-1ns-24 15775 72391 ns/op 255415.44 MB/s 0 errors 1.145 kb/op 10580 B/op 54 allocs/op | ||
BenchmarkCompression/lz4/1.0kB-1-1ms-24 16063 71881 ns/op 261920.19 MB/s 0 errors 1.145 kb/op 8020 B/op 54 allocs/op | ||
BenchmarkCompression/lz4/1.0kB-100-1ns-24 6225 190480 ns/op 66319.05 MB/s 0 errors 1.982 kb/op 112572 B/op 463 allocs/op | ||
BenchmarkCompression/lz4/1.0kB-100-1ms-24 6112 192383 ns/op 64470.18 MB/s 0 errors 1.982 kb/op 103129 B/op 463 allocs/op | ||
BenchmarkCompression/lz4/1.0kB-1000-1ns-24 878 1380452 ns/op 11150.04 MB/s 0 errors 17.12 kb/op 1323802 B/op 4128 allocs/op | ||
BenchmarkCompression/lz4/1.0kB-1000-1ms-24 552 2158364 ns/op 4483.68 MB/s 0 errors 17.12 kb/op 1260947 B/op 4133 allocs/op | ||
BenchmarkCompression/lz4/10.2kB-1-1ns-24 13184 90086 ns/op 1520591.37 MB/s 0 errors 10.15 kb/op 7394 B/op 54 allocs/op | ||
BenchmarkCompression/lz4/10.2kB-1-1ms-24 14508 84872 ns/op 1776093.60 MB/s 0 errors 10.15 kb/op 8494 B/op 54 allocs/op | ||
BenchmarkCompression/lz4/10.2kB-100-1ns-24 1860 648775 ns/op 72157.71 MB/s 0 errors 24.58 kb/op 100525 B/op 467 allocs/op | ||
BenchmarkCompression/lz4/10.2kB-100-1ms-24 1537 811082 ns/op 47695.30 MB/s 0 errors 24.58 kb/op 148374 B/op 469 allocs/op | ||
BenchmarkCompression/lz4/10.2kB-1000-1ns-24 171 7372614 ns/op 5837.79 MB/s 0 errors 245.8 kb/op 3158692 B/op 4606 allocs/op | ||
BenchmarkCompression/lz4/10.2kB-1000-1ms-24 162 7302749 ns/op 5583.36 MB/s 0 errors 245.8 kb/op 3181724 B/op 4579 allocs/op | ||
BenchmarkCompression/lz4/102.4kB-1-1ns-24 7178 157404 ns/op 4676524.13 MB/s 0 errors 100.1 kb/op 11099 B/op 55 allocs/op | ||
BenchmarkCompression/lz4/102.4kB-1-1ms-24 6600 170318 ns/op 3973944.27 MB/s 0 errors 100.1 kb/op 6453 B/op 56 allocs/op | ||
BenchmarkCompression/lz4/102.4kB-100-1ns-24 98 11971963 ns/op 83847.45 MB/s 0 errors 10003 kb/op 620253 B/op 981 allocs/op | ||
BenchmarkCompression/lz4/102.4kB-100-1ms-24 98 11523122 ns/op 87113.46 MB/s 0 errors 10003 kb/op 314306 B/op 986 allocs/op | ||
BenchmarkCompression/lz4/102.4kB-1000-1ns-24 9 115211591 ns/op 8001.58 MB/s 0 errors 100030 kb/op 22132883 B/op 9952 allocs/op | ||
BenchmarkCompression/lz4/102.4kB-1000-1ms-24 9 114272650 ns/op 8067.33 MB/s 0 errors 100030 kb/op 19214946 B/op 10023 allocs/op | ||
BenchmarkCompression/zstd/1.0kB-1-1ns-24 6436 162081 ns/op 36701.88 MB/s 0 errors 0.9026 kb/op 3670 B/op 52 allocs/op | ||
BenchmarkCompression/zstd/1.0kB-1-1ms-24 11448 89854 ns/op 117745.54 MB/s 0 errors 0.9025 kb/op 7834 B/op 52 allocs/op | ||
BenchmarkCompression/zstd/1.0kB-100-1ns-24 4773 248584 ns/op 23720.74 MB/s 0 errors 1.206 kb/op 142631 B/op 463 allocs/op | ||
BenchmarkCompression/zstd/1.0kB-100-1ms-24 5300 232631 ns/op 28146.70 MB/s 0 errors 1.206 kb/op 128500 B/op 463 allocs/op | ||
BenchmarkCompression/zstd/1.0kB-1000-1ns-24 799 1670588 ns/op 2150.67 MB/s 0 errors 4.391 kb/op 2279023 B/op 4134 allocs/op | ||
BenchmarkCompression/zstd/1.0kB-1000-1ms-24 476 2153281 ns/op 994.32 MB/s 0 errors 4.393 kb/op 1675134 B/op 4140 allocs/op | ||
BenchmarkCompression/zstd/10.2kB-1-1ns-24 9764 128877 ns/op 573542.37 MB/s 0 errors 7.393 kb/op 9354 B/op 52 allocs/op | ||
BenchmarkCompression/zstd/10.2kB-1-1ms-24 9652 118880 ns/op 614639.16 MB/s 0 errors 7.393 kb/op 7251 B/op 52 allocs/op | ||
BenchmarkCompression/zstd/10.2kB-100-1ns-24 1737 680276 ns/op 20810.71 MB/s 0 errors 7.959 kb/op 119513 B/op 468 allocs/op | ||
BenchmarkCompression/zstd/10.2kB-100-1ms-24 1495 698771 ns/op 17437.76 MB/s 0 errors 7.959 kb/op 142446 B/op 469 allocs/op | ||
BenchmarkCompression/zstd/10.2kB-1000-1ns-24 145 7667982 ns/op 1540.89 MB/s 0 errors 79.58 kb/op 6165529 B/op 4603 allocs/op | ||
BenchmarkCompression/zstd/10.2kB-1000-1ms-24 157 8209581 ns/op 1558.35 MB/s 0 errors 79.58 kb/op 4289406 B/op 4606 allocs/op | ||
BenchmarkCompression/zstd/102.4kB-1-1ns-24 2758 412986 ns/op 495434.27 MB/s 0 errors 72.45 kb/op 3896 B/op 55 allocs/op | ||
BenchmarkCompression/zstd/102.4kB-1-1ms-24 2414 451970 ns/op 396237.04 MB/s 0 errors 72.45 kb/op 7063 B/op 57 allocs/op | ||
BenchmarkCompression/zstd/102.4kB-100-1ns-24 106 11229176 ns/op 8143.01 MB/s 0 errors 842.4 kb/op 572094 B/op 1019 allocs/op | ||
BenchmarkCompression/zstd/102.4kB-100-1ms-24 100 11246398 ns/op 7670.28 MB/s 0 errors 842.4 kb/op 2464017 B/op 994 allocs/op | ||
BenchmarkCompression/zstd/102.4kB-1000-1ns-24 10 110627236 ns/op 779.77 MB/s 0 errors 8424 kb/op 25910839 B/op 9967 allocs/op | ||
BenchmarkCompression/zstd/102.4kB-1000-1ms-24 10 109110192 ns/op 790.61 MB/s 0 errors 8424 kb/op 35261769 B/op 10063 allocs/op |
174 changes: 174 additions & 0 deletions
174
services/streammanager/kafka/client/compression_benchmark_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,174 @@ | ||
package client | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"strconv" | ||
"testing" | ||
"time" | ||
|
||
"github.com/ory/dockertest/v3" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/rudderlabs/rudder-go-kit/testhelper" | ||
"github.com/rudderlabs/rudder-go-kit/testhelper/rand" | ||
"github.com/rudderlabs/rudder-server/services/streammanager/kafka/client/testutil" | ||
"github.com/rudderlabs/rudder-server/testhelper/destination/kafka" | ||
"github.com/rudderlabs/rudder-server/utils/tcpproxy" | ||
) | ||
|
||
func BenchmarkCompression(b *testing.B) { | ||
proxyPort, err := testhelper.GetFreePort() | ||
require.NoError(b, err) | ||
|
||
var ( | ||
ctx = context.Background() | ||
topic = "foo_bar_topic" | ||
proxyHost = "localhost:" + strconv.Itoa(proxyPort) | ||
) | ||
|
||
setupKafka := func(b *testing.B) string { | ||
pool, err := dockertest.NewPool("") | ||
require.NoError(b, err) | ||
|
||
kafkaContainer, err := kafka.Setup(pool, b, kafka.WithCustomAdvertisedListener(proxyHost)) | ||
require.NoError(b, err) | ||
|
||
return "localhost:" + kafkaContainer.Ports[0] | ||
} | ||
|
||
setupProxy := func(b *testing.B, kafkaAddr string, c Compression, bs int, bt time.Duration) ( | ||
*tcpproxy.Proxy, | ||
*Producer, | ||
) { | ||
proxy := &tcpproxy.Proxy{ | ||
LocalAddr: proxyHost, | ||
RemoteAddr: kafkaAddr, | ||
} | ||
go proxy.Start(b) | ||
|
||
client, err := New("tcp", []string{proxy.LocalAddr}, Config{}) | ||
require.NoError(b, err) | ||
require.Eventuallyf(b, func() bool { | ||
err = client.Ping(ctx) | ||
return err == nil | ||
}, 30*time.Second, 100*time.Millisecond, "failed to connect to kafka: %v", err) | ||
|
||
producer, err := client.NewProducer(ProducerConfig{ | ||
Compression: c, | ||
BatchSize: bs, | ||
BatchTimeout: bt, | ||
}) | ||
require.NoError(b, err) | ||
|
||
return proxy, producer | ||
} | ||
|
||
run := func(addr string, comp Compression, value string, batchSize int, batchTimeout time.Duration) func(*testing.B) { | ||
return func(b *testing.B) { | ||
proxy, producer := setupProxy(b, addr, comp, batchSize, batchTimeout) | ||
|
||
kafkaCtx, kafkaCtxCancel := context.WithTimeout(context.Background(), 3*time.Minute) | ||
err = waitForKafka(kafkaCtx, topic, addr) | ||
kafkaCtxCancel() | ||
require.NoError(b, err) | ||
|
||
var ( | ||
noOfErrors int | ||
messages = make([]Message, 0, batchSize) | ||
) | ||
for i := 0; i < batchSize; i++ { | ||
messages = append(messages, Message{ | ||
Key: []byte("my-key"), | ||
Value: []byte(value), | ||
Topic: topic, | ||
}) | ||
} | ||
|
||
b.ResetTimer() | ||
for i := 0; i < b.N; i++ { | ||
if err := producer.Publish(ctx, messages...); err != nil { | ||
noOfErrors++ | ||
} | ||
} | ||
b.StopTimer() | ||
|
||
_ = producer.Close(ctx) | ||
proxy.Stop() // stopping the proxy here to properly gather the metrics | ||
|
||
b.SetBytes(proxy.BytesReceived.Load()) | ||
b.ReportMetric(float64(proxy.BytesReceived.Load())/float64(b.N)/1024, "kb/op") | ||
b.ReportMetric(float64(noOfErrors), "errors") | ||
} | ||
} | ||
|
||
var ( | ||
compressionTypes = []Compression{CompressionNone, CompressionGzip, CompressionSnappy, CompressionLz4, CompressionZstd} | ||
compressionTypesMap = map[Compression]string{ | ||
CompressionNone: "none", CompressionGzip: "gzip", CompressionSnappy: "snappy", CompressionLz4: "lz4", CompressionZstd: "zstd", | ||
} | ||
batchSizes = []int{1, 100, 1000} | ||
batchTimeouts = []time.Duration{time.Nanosecond, time.Millisecond} | ||
values = []string{rand.String(1 << 10), rand.String(10 << 10), rand.String(100 << 10)} | ||
) | ||
for _, comp := range compressionTypes { | ||
b.Run(compressionTypesMap[comp], func(b *testing.B) { | ||
kafkaAddr := setupKafka(b) // setup kafka only once per compression combination | ||
for _, value := range values { | ||
for _, batchSize := range batchSizes { | ||
for _, batchTimeout := range batchTimeouts { | ||
b.Run( | ||
fmt.Sprintf("%s-%d-%s", byteCount(len(value)), batchSize, batchTimeout), | ||
run(kafkaAddr, comp, value, batchSize, batchTimeout), | ||
) | ||
} | ||
} | ||
} | ||
}) | ||
} | ||
} | ||
|
||
func byteCount(b int) string { | ||
const unit = 1000 | ||
if b < unit { | ||
return fmt.Sprintf("%dB", b) | ||
} | ||
div, exp := int64(unit), 0 | ||
for n := b / unit; n >= unit; n /= unit { | ||
div *= unit | ||
exp++ | ||
} | ||
return fmt.Sprintf("%.1f%cB", | ||
float64(b)/float64(div), "kMGTPE"[exp]) | ||
} | ||
|
||
func waitForKafka(ctx context.Context, topic, addr string) (err error) { | ||
tc := testutil.New("tcp", addr) | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return fmt.Errorf("kafka not ready within context (%v): %v", ctx.Err(), err) | ||
case <-time.After(50 * time.Millisecond): | ||
var topics []testutil.TopicPartition | ||
topics, err = tc.ListTopics(ctx) | ||
if err != nil { | ||
continue | ||
} | ||
|
||
var found bool | ||
for _, top := range topics { | ||
if top.Topic == topic { | ||
found = true | ||
break | ||
} | ||
} | ||
if found { | ||
return nil | ||
} | ||
|
||
if err = tc.CreateTopic(ctx, topic, 1, 1); err != nil { | ||
continue | ||
} | ||
} | ||
} | ||
} |
Oops, something went wrong.