Skip to content

Commit

Permalink
v0.19.0 patch: Added receive benchmark; Fixed Receiver excessive mem …
Browse files Browse the repository at this point in the history
…usage introduced in 0.17 (#3943)

* Added receive benchmark, baseline.

```
goos: linux
goarch: amd64
pkg: github.com/thanos-io/thanos/pkg/receive
BenchmarkHandlerReceiveHTTP
BenchmarkHandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them.
BenchmarkHandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them./OK
BenchmarkHandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them./OK-12      	   22260	   1550152 ns/op	 1380340 B/op	    6093 allocs/op
BenchmarkHandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them./conflict_errors
BenchmarkHandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them./conflict_errors-12         	    6619	   6430408 ns/op	 4522487 B/op	   26118 allocs/op
BenchmarkHandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them.
BenchmarkHandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them./OK
BenchmarkHandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them./OK-12                     	    2695	  17208794 ns/op	15072963 B/op	   60441 allocs/op
BenchmarkHandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them./conflict_errors
BenchmarkHandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them./conflict_errors-12        	     474	  72533286 ns/op	46396932 B/op	  260141 allocs/op
BenchmarkHandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them
BenchmarkHandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK
BenchmarkHandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12                	     270	 137050518 ns/op	226595379 B/op	     132 allocs/op
BenchmarkHandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors
BenchmarkHandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12   	      21	1616025443 ns/op	698724321 B/op	     408 allocs/op
PASS

Process finished with exit code 0
```


Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Copy labels.

```
GOROOT=/home/bwplotka/.gvm/gos/go1.15 #gosetup
GOPATH=/home/bwplotka/Repos/thanosgopath #gosetup
/home/bwplotka/.gvm/gos/go1.15/bin/go test -c -o /tmp/___BenchmarkHandlerReceiveHTTP_in_github_com_thanos_io_thanos_pkg_receive github.com/thanos-io/thanos/pkg/receive #gosetup
/tmp/___BenchmarkHandlerReceiveHTTP_in_github_com_thanos_io_thanos_pkg_receive -test.v -test.bench ^\QBenchmarkHandlerReceiveHTTP\E$ -test.run ^$ -test.benchmem -test.benchtime=30s
goos: linux
goarch: amd64
pkg: github.com/thanos-io/thanos/pkg/receive
BenchmarkHandlerReceiveHTTP
BenchmarkHandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them
BenchmarkHandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK
BenchmarkHandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12      	   25887	   1537262 ns/op	 1380023 B/op	    6092 allocs/op
BenchmarkHandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors
BenchmarkHandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12         	    4237	   7547968 ns/op	 4522583 B/op	   26118 allocs/op
BenchmarkHandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them
BenchmarkHandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK
BenchmarkHandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12                     	    2205	  16513380 ns/op	15071092 B/op	   60420 allocs/op
BenchmarkHandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors
BenchmarkHandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12        	     525	  67278233 ns/op	46396645 B/op	  260141 allocs/op
BenchmarkHandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them
BenchmarkHandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK
BenchmarkHandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12               	     285	 148049189 ns/op	226596168 B/op	     132 allocs/op
BenchmarkHandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors
BenchmarkHandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12  	      20	1731361499 ns/op	698722550 B/op	     401 allocs/op
PASS

Process finished with exit code 0

```

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Addded bench.,

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Fix.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Improved API.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Changelog.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Addressed Lucas comments.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Mar 20, 2021
1 parent 38d78b5 commit b446fae
Show file tree
Hide file tree
Showing 6 changed files with 308 additions and 19 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ NOTE: As semantic versioning states all 0.y.z releases can contain breaking chan

We use _breaking :warning:_ to mark changes that are not backward compatible (relates only to v0.y.z releases.)

## [v0.19.0-rc.1](https://github.com/thanos-io/thanos/releases/tag/v0.19.0-rc.1) - 2021.03.09
## [v0.19.0-rc.1](https://github.com/thanos-io/thanos/releases/tag/v0.19.0-rc.2) - 2021.03.22

### Added

Expand All @@ -36,6 +36,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re
- [#3815](https://github.com/thanos-io/thanos/pull/3815) Receive: Improve handling of empty time series from clients
- [#3795](https://github.com/thanos-io/thanos/pull/3795) s3: A truncated "get object" response is reported as error.
- [#3899](https://github.com/thanos-io/thanos/pull/3899) Receive: Correct the inference of client gRPC configuration.
- [#3943](https://github.com/thanos-io/thanos/pull/3943): Receive: Fixed memory regression introduced in v0.17.0.

### Changed

Expand Down
6 changes: 5 additions & 1 deletion pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
span, ctx := tracing.StartSpan(r.Context(), "receive_http")
defer span.Finish()

// TODO(bwplotka): Optimize readAll https://github.com/thanos-io/thanos/pull/3334/files.
compressed, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand All @@ -290,6 +291,9 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
return
}

// NOTE: Due to zero copy ZLabels, Labels used from WriteRequests keeps memory
// from the whole request. Ensure that we always copy those when we want to
// store them for longer time.
var wreq prompb.WriteRequest
if err := proto.Unmarshal(reqBuf, &wreq); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
Expand All @@ -310,7 +314,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
tenant = h.options.DefaultTenantID
}

// exit early if the request contained no data
// Exit early if the request contained no data.
if len(wreq.Timeseries) == 0 {
level.Info(h.logger).Log("msg", "empty timeseries from client", "tenant", tenant)
return
Expand Down
256 changes: 247 additions & 9 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,16 @@ import (
"bytes"
"context"
"fmt"
"io/ioutil"
"math"
"math/rand"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"runtime"
"runtime/pprof"
"strings"
"sync"
"testing"
"time"
Expand All @@ -18,9 +25,12 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/runutil"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -175,13 +185,11 @@ func TestDetermineWriteErrorCause(t *testing.T) {
}
}

func newHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) ([]*Handler, Hashring) {
cfg := []HashringConfig{
{
Hashring: "test",
},
}
var handlers []*Handler
func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) ([]*Handler, Hashring) {
var (
cfg = []HashringConfig{{Hashring: "test"}}
handlers []*Handler
)
// create a fake peer group where we manually fill the cache with fake addresses pointed to our handlers
// This removes the network from the tests and creates a more consistent testing harness.
peers := &peerGroup{
Expand Down Expand Up @@ -511,7 +519,7 @@ func TestReceiveQuorum(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
handlers, hashring := newHandlerHashring(tc.appendables, tc.replicationFactor)
handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor)
tenant := "test"
// Test from the point of view of every node
// so that we know status code does not depend
Expand Down Expand Up @@ -850,7 +858,7 @@ func TestReceiveWithConsistencyDelay(t *testing.T) {
// to see all requests completing all the time, since we're using local
// network we are not expecting anything to go wrong with these.
t.Run(tc.name, func(t *testing.T) {
handlers, hashring := newHandlerHashring(tc.appendables, tc.replicationFactor)
handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor)
tenant := "test"
// Test from the point of view of every node
// so that we know status code does not depend
Expand Down Expand Up @@ -957,3 +965,233 @@ type fakeRemoteWriteGRPCServer struct {
func (f *fakeRemoteWriteGRPCServer) RemoteWrite(ctx context.Context, in *storepb.WriteRequest, opts ...grpc.CallOption) (*storepb.WriteResponse, error) {
return f.h.RemoteWrite(ctx, in)
}

func BenchmarkHandlerReceiveHTTP(b *testing.B) {
benchmarkHandlerMultiTSDBReceiveRemoteWrite(testutil.NewTB(b))
}

func TestHandlerReceiveHTTP(t *testing.T) {
benchmarkHandlerMultiTSDBReceiveRemoteWrite(testutil.NewTB(t))
}

// tsOverrideTenantStorage is storage that overrides timestamp to make it have consistent interval.
type tsOverrideTenantStorage struct {
TenantStorage

interval int64
}

func (s *tsOverrideTenantStorage) TenantAppendable(tenant string) (Appendable, error) {
a, err := s.TenantStorage.TenantAppendable(tenant)
return &tsOverrideAppendable{Appendable: a, interval: s.interval}, err
}

type tsOverrideAppendable struct {
Appendable

interval int64
}

func (a *tsOverrideAppendable) Appender(ctx context.Context) (storage.Appender, error) {
ret, err := a.Appendable.Appender(ctx)
return &tsOverrideAppender{Appender: ret, interval: a.interval}, err
}

type tsOverrideAppender struct {
storage.Appender

interval int64
}

var cnt int64

func (a *tsOverrideAppender) Add(l labels.Labels, _ int64, v float64) (uint64, error) {
cnt += a.interval
return a.Appender.Add(l, cnt, v)
}

func (a *tsOverrideAppender) AddFast(ref uint64, _ int64, v float64) error {
cnt += a.interval
return a.Appender.AddFast(ref, cnt, v)
}

// serializeSeriesWithOneSample returns marshaled and compressed remote write requests like it would
// be send to Thanos receive.
// It has one sample and allow passing multiple series, in same manner as typical Prometheus would batch it.
func serializeSeriesWithOneSample(t testing.TB, series [][]labelpb.ZLabel) []byte {
r := &prompb.WriteRequest{Timeseries: make([]prompb.TimeSeries, 0, len(series))}

for _, s := range series {
r.Timeseries = append(r.Timeseries, prompb.TimeSeries{
Labels: s,
// Timestamp does not matter, it will be overridden.
Samples: []prompb.Sample{{Value: math.MaxFloat64, Timestamp: math.MinInt64}},
})
}
body, err := proto.Marshal(r)
testutil.Ok(t, err)
return snappy.Encode(nil, body)
}

func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) {
dir, err := ioutil.TempDir("", "test_receive")
testutil.Ok(b, err)
defer func() { testutil.Ok(b, os.RemoveAll(dir)) }()

handlers, _ := newTestHandlerHashring([]*fakeAppendable{nil}, 1)
handler := handlers[0]

reg := prometheus.NewRegistry()

logger := log.NewNopLogger()
m := NewMultiTSDB(
dir, logger, reg, &tsdb.Options{
MinBlockDuration: int64(2 * time.Hour / time.Millisecond),
MaxBlockDuration: int64(2 * time.Hour / time.Millisecond),
RetentionDuration: int64(6 * time.Hour / time.Millisecond),
NoLockfile: true,
StripeSize: 1, // Disable stripe pre allocation so we can have clear profiles.
},
labels.FromStrings("replica", "01"),
"tenant_id",
nil,
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(b, m.Close()) }()
handler.writer = NewWriter(logger, m)

testutil.Ok(b, m.Flush())
testutil.Ok(b, m.Open())

for _, tcase := range []struct {
name string
writeRequest []byte
}{
{
name: "typical labels under 1KB, 500 of them",
writeRequest: serializeSeriesWithOneSample(b, func() [][]labelpb.ZLabel {
series := make([][]labelpb.ZLabel, 500)
for s := 0; s < len(series); s++ {
lbls := make([]labelpb.ZLabel, 10)
for i := 0; i < len(lbls); i++ {
// Label ~20B name, 50B value.
lbls[i] = labelpb.ZLabel{Name: fmt.Sprintf("abcdefghijabcdefghijabcdefghij%d", i), Value: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i)}
}
series[s] = lbls
}
return series
}()),
},
{
name: "typical labels under 1KB, 5000 of them",
writeRequest: serializeSeriesWithOneSample(b, func() [][]labelpb.ZLabel {
series := make([][]labelpb.ZLabel, 5000)
for s := 0; s < len(series); s++ {
lbls := make([]labelpb.ZLabel, 10)
for i := 0; i < len(lbls); i++ {
// Label ~20B name, 50B value.
lbls[i] = labelpb.ZLabel{Name: fmt.Sprintf("abcdefghijabcdefghijabcdefghij%d", i), Value: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i)}
}
series[s] = lbls
}
return series
}()),
},
{
name: "extremely large label value 10MB, 10 of them",
writeRequest: serializeSeriesWithOneSample(b, func() [][]labelpb.ZLabel {
series := make([][]labelpb.ZLabel, 10)
for s := 0; s < len(series); s++ {
lbl := &strings.Builder{}
lbl.Grow(1024 * 1024 * 10) // 10MB.
word := "abcdefghij"
for i := 0; i < lbl.Cap()/len(word); i++ {
_, _ = lbl.WriteString(word)
}
series[s] = []labelpb.ZLabel{{Name: "__name__", Value: lbl.String()}}
}
return series
}()),
},
} {
b.Run(tcase.name, func(b testutil.TB) {
handler.options.DefaultTenantID = fmt.Sprintf("%v-ok", tcase.name)
handler.writer.multiTSDB = &tsOverrideTenantStorage{TenantStorage: m, interval: 1}

// It takes time to create new tenant, wait for it.
{
app, err := m.TenantAppendable(handler.options.DefaultTenantID)
testutil.Ok(b, err)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

testutil.Ok(b, runutil.Retry(1*time.Second, ctx.Done(), func() error {
_, err = app.Appender(ctx)
return err
}))
}

b.Run("OK", func(b testutil.TB) {
n := b.N()
b.ResetTimer()
for i := 0; i < n; i++ {
r := httptest.NewRecorder()
handler.receiveHTTP(r, &http.Request{ContentLength: int64(len(tcase.writeRequest)), Body: ioutil.NopCloser(bytes.NewReader(tcase.writeRequest))})
testutil.Equals(b, http.StatusOK, r.Code, "got non 200 error: %v", r.Body.String())
}
})

handler.options.DefaultTenantID = fmt.Sprintf("%v-conflicting", tcase.name)
handler.writer.multiTSDB = &tsOverrideTenantStorage{TenantStorage: m, interval: -1} // Timestamp can't go down, which will cause conflict error.

// It takes time to create new tenant, wait for it.
{
app, err := m.TenantAppendable(handler.options.DefaultTenantID)
testutil.Ok(b, err)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

testutil.Ok(b, runutil.Retry(1*time.Second, ctx.Done(), func() error {
_, err = app.Appender(ctx)
return err
}))
}

// First request should be fine, since we don't change timestamp, rest is wrong.
r := httptest.NewRecorder()
handler.receiveHTTP(r, &http.Request{ContentLength: int64(len(tcase.writeRequest)), Body: ioutil.NopCloser(bytes.NewReader(tcase.writeRequest))})
testutil.Equals(b, http.StatusOK, r.Code, "got non 200 error: %v", r.Body.String())

b.Run("conflict errors", func(b testutil.TB) {
n := b.N()
b.ResetTimer()
for i := 0; i < n; i++ {
r := httptest.NewRecorder()
handler.receiveHTTP(r, &http.Request{ContentLength: int64(len(tcase.writeRequest)), Body: ioutil.NopCloser(bytes.NewReader(tcase.writeRequest))})
testutil.Equals(b, http.StatusConflict, r.Code, "%v", i)
}
})
})
}

runtime.GC()
// Take snapshot at the end to reveal how much memory we keep in TSDB.
testutil.Ok(b, Heap("../../"))

}

func Heap(dir string) (err error) {
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
return err
}

f, err := os.Create(filepath.Join(dir, "mem.pprof"))
if err != nil {
return err
}
defer runutil.CloseWithErrCapture(&err, f, "close")
return pprof.WriteHeapProfile(f)
}
14 changes: 7 additions & 7 deletions pkg/receive/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/store/labelpb"

"github.com/thanos-io/thanos/pkg/errutil"
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
Expand Down Expand Up @@ -61,13 +62,12 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR

var errs errutil.MultiError
for _, t := range wreq.Timeseries {
lset := make(labels.Labels, len(t.Labels))
for j := range t.Labels {
lset[j] = labels.Label{
Name: t.Labels[j].Name,
Value: t.Labels[j].Value,
}
}
// Copy labels so we allocate memory only for labels, nothing else.
labelpb.ReAllocZLabelsStrings(&t.Labels)

// TODO(bwplotka): Use improvement https://github.com/prometheus/prometheus/pull/8600, so we do that only when
// we need it (when we store labels for longer).
lset := labelpb.ZLabelsToPromLabels(t.Labels)

// Append as many valid samples as possible, but keep track of the errors.
for _, s := range t.Samples {
Expand Down

0 comments on commit b446fae

Please sign in to comment.