Skip to content

Commit

Permalink
Adding benchmark tests for zstd
Browse files Browse the repository at this point in the history
  • Loading branch information
rnishtala-sumo committed Mar 12, 2024
1 parent 2413346 commit c9e1529
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 1 deletion.
2 changes: 1 addition & 1 deletion config/confighttp/compressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var (
_ writeCloserReset = (*snappy.Writer)(nil)
snappyPool = &compressor{pool: sync.Pool{New: func() any { return snappy.NewBufferedWriter(nil) }}}
_ writeCloserReset = (*zstd.Encoder)(nil)
zStdPool = &compressor{pool: sync.Pool{New: func() any { zw, _ := zstd.NewWriter(nil); return zw }}}
zStdPool = &compressor{pool: sync.Pool{New: func() any { zw, _ := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1)); return zw }}}
_ writeCloserReset = (*zlib.Writer)(nil)
zLibPool = &compressor{pool: sync.Pool{New: func() any { return zlib.NewWriter(nil) }}}
)
Expand Down
91 changes: 91 additions & 0 deletions config/confighttp/compressor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package confighttp // import "go.opentelemetry.io/collector/config/confighttp"

import (
"bytes"
"fmt"
"io"
"strings"
"testing"

"github.com/klauspost/compress/zstd"
"go.opentelemetry.io/collector/config/configcompression"
)

func BenchmarkCompression(b *testing.B) {
benchmarks := []struct {
codec configcompression.Type
function func(*testing.B, configcompression.Type, *bytes.Buffer, []byte)
}{
{
codec: configcompression.TypeZstd,
function: benchmarkCompression,
},
{
codec: configcompression.TypeZstd,
function: benchmarkCompressionNoConcurrency,
},
}
payload := make([]byte, 10<<20)

buffer := bytes.Buffer{}
buffer.Grow(len(payload))

ts := &bytes.Buffer{}
defer func() {
fmt.Printf("input => %.2f MB\n", float64(len(payload))/(1024*1024))
fmt.Println(ts)
}()

for i := range benchmarks {
benchmark := &benchmarks[i]
b.Run(fmt.Sprintf(string(benchmark.codec)), func(b *testing.B) {
benchmark.function(b, benchmark.codec, &buffer, payload)
})

}
}

func benchmarkCompression(b *testing.B, codec configcompression.Type, buf *bytes.Buffer, payload []byte) {
// In case only the decompression benchmark are run, we use this flags to
// detect whether we have to compress the payload before the decompression
// benchmarks.

b.Run("compress", func(b *testing.B) {
stringReader := strings.NewReader(string(payload))
stringReadCloser := io.NopCloser(stringReader)
var enc io.Writer
/*compressor, err := newCompressor(codec)
if err != nil {
b.Fatal(err)
}*/
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(payload)))
for i := 0; i < b.N; i++ {
// compressor.compress(buf, stringReadCloser)
enc, _ = zstd.NewWriter(nil, zstd.WithEncoderConcurrency(5))
enc.(writeCloserReset).Reset(buf)
io.Copy(enc, stringReadCloser)
}
})
}

func benchmarkCompressionNoConcurrency(b *testing.B, codec configcompression.Type, buf *bytes.Buffer, payload []byte) {
// In case only the decompression benchmark are run, we use this flags to
// detect whether we have to compress the payload before the decompression
// benchmarks.

b.Run("compress", func(b *testing.B) {
stringReader := strings.NewReader(string(payload))
stringReadCloser := io.NopCloser(stringReader)
var enc io.Writer
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(payload)))
for i := 0; i < b.N; i++ {
enc, _ = zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1))
enc.(writeCloserReset).Reset(buf)
io.Copy(enc, stringReadCloser)
}
})
}

0 comments on commit c9e1529

Please sign in to comment.