Skip to content

Commit

Permalink
receive: Improved efficiency of multitsdb appends, upgraded Prometheu…
Browse files Browse the repository at this point in the history
…s deps. (thanos-io#4078)

* receive: Improved efficiency of multitsdb appends.

Release vs current main looks the same:

```
benchstat -delta-test=none ../_dev/thanos/2021/receive/5.txt ../_dev/thanos/2021/receive2/main-go1.15.txt
name                                                                                old time/op    new time/op    delta
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12                        1.56ms ± 0%    1.45ms ± 0%  -7.12%
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12           6.49ms ± 0%    7.14ms ± 0%  +9.92%
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12                       16.0ms ± 0%    16.4ms ± 0%  +2.79%
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12          71.7ms ± 0%    69.4ms ± 0%  -3.20%
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12                  138ms ± 0%     131ms ± 0%  -4.79%
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12     1.58s ± 0%     1.68s ± 0%  +6.11%

name                                                                                old alloc/op   new alloc/op   delta
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12                        1.70MB ± 0%    1.70MB ± 0%  +0.12%
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12           4.84MB ± 0%    4.84MB ± 0%  +0.04%
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12                       18.3MB ± 0%    18.2MB ± 0%  -0.19%
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12          49.6MB ± 0%    49.6MB ± 0%  +0.00%
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12                  331MB ± 0%     331MB ± 0%  -0.00%
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12     804MB ± 0%     804MB ± 0%  +0.00%

name                                                                                old allocs/op  new allocs/op  delta
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12                         15.6k ± 0%     15.6k ± 0%  +0.04%
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12            35.6k ± 0%     35.6k ± 0%  +0.01%
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12                         155k ± 0%      155k ± 0%  -0.08%
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12            355k ± 0%      355k ± 0%  +0.00%
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12                    147 ± 0%       145 ± 0%  -1.36%
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12       417 ± 0%       421 ± 0%  +0.96%
```

Unfortunately go1.16 introduces more allocs overall (not that much more):

```
benchstat -delta-test=none ../_dev/thanos/2021/receive2/main-go1.15.txt ../_dev/thanos/2021/receive2/main-go1.16.3.txt
name                                                                                old time/op    new time/op    delta
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12                        1.45ms ± 0%    1.62ms ± 0%  +11.87%
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12           7.14ms ± 0%    6.47ms ± 0%   -9.40%
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12                       16.4ms ± 0%    15.8ms ± 0%   -3.87%
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12          69.4ms ± 0%    66.4ms ± 0%   -4.35%
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12                  131ms ± 0%     141ms ± 0%   +7.59%
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12     1.68s ± 0%     1.67s ± 0%   -0.49%

name                                                                                old alloc/op   new alloc/op   delta
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12                        1.70MB ± 0%    1.75MB ± 0%   +2.50%
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12           4.84MB ± 0%    4.89MB ± 0%   +0.88%
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12                       18.2MB ± 0%    18.8MB ± 0%   +3.07%
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12          49.6MB ± 0%    50.1MB ± 0%   +1.09%
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12                  331MB ± 0%     343MB ± 0%   +3.63%
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12     804MB ± 0%     816MB ± 0%   +1.50%

name                                                                                old allocs/op  new allocs/op  delta
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12                         15.6k ± 0%     15.6k ± 0%   -0.01%
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12            35.6k ± 0%     35.6k ± 0%   +0.01%
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12                         155k ± 0%      155k ± 0%   +0.08%
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12            355k ± 0%      355k ± 0%   +0.00%
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12                    145 ± 0%       166 ± 0%  +14.48%
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12       421 ± 0%       440 ± 0%   +4.51%
```

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Prometheus upgrade.

No difference.

```
 benchstat -delta-test=none ../_dev/thanos/2021/receive2/main-go1.16.3.txt ../_dev/thanos/2021/receive2/impr-go1.16.3-promup.txt
name                                                                                old time/op    new time/op    delta
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12                        1.62ms ± 0%    1.77ms ± 0%   +9.57%
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12           6.47ms ± 0%    5.71ms ± 0%  -11.76%
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12                       15.8ms ± 0%    15.2ms ± 0%   -3.83%
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12          66.4ms ± 0%    59.5ms ± 0%  -10.37%
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12                  141ms ± 0%     129ms ± 0%   -8.60%
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12     1.67s ± 0%     1.41s ± 0%  -15.58%

name                                                                                old alloc/op   new alloc/op   delta
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12                        1.75MB ± 0%    1.75MB ± 0%   +0.04%
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12           4.89MB ± 0%    4.89MB ± 0%   +0.02%
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12                       18.8MB ± 0%    18.8MB ± 0%   -0.05%
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12          50.1MB ± 0%    50.1MB ± 0%   +0.00%
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12                  343MB ± 0%     344MB ± 0%   +0.00%
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12     816MB ± 0%     816MB ± 0%   -0.00%

name                                                                                old allocs/op  new allocs/op  delta
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12                         15.6k ± 0%     15.6k ± 0%   +0.01%
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12            35.6k ± 0%     35.6k ± 0%   +0.00%
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12                         155k ± 0%      155k ± 0%   -0.06%
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12            355k ± 0%      355k ± 0%   +0.00%
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12                    166 ± 0%       169 ± 0%   +1.81%
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12       440 ± 0%       435 ± 0%   -1.14%
```

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* ReadAll to Grow + Copy.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Moved hashring to optimized hash function.

```
benchstat -delta-test=none ../_dev/thanos/2021/receive2/impr1-go1.16.3.txt ../_dev/thanos/2021/receive2/impr3-go1.16.3.txt
name                                                                                old time/op    new time/op    delta
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12                        1.54ms ± 0%    1.64ms ± 0%   +6.54%
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12           6.96ms ± 0%    8.02ms ± 0%  +15.23%
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12                       16.1ms ± 0%    16.5ms ± 0%   +2.77%
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12          65.4ms ± 0%    65.1ms ± 0%   -0.49%
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12                  168ms ± 0%     119ms ± 0%  -29.49%
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12     1.69s ± 0%     1.37s ± 0%  -19.05%

name                                                                                old alloc/op   new alloc/op   delta
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12                        1.75MB ± 0%    1.63MB ± 0%   -6.43%
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12           4.89MB ± 0%    4.77MB ± 0%   -2.50%
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12                       18.8MB ± 0%    17.6MB ± 0%   -6.55%
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12          50.1MB ± 0%    48.9MB ± 0%   -2.55%
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12                  344MB ± 0%     225MB ± 0%  -34.63%
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12     816MB ± 0%     697MB ± 0%  -14.59%

name                                                                                old allocs/op  new allocs/op  delta
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12                         15.6k ± 0%     13.6k ± 0%  -12.85%
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12            35.6k ± 0%     33.6k ± 0%   -5.64%
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12                         155k ± 0%      135k ± 0%  -12.95%
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12            355k ± 0%      335k ± 0%   -5.64%
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12                    170 ± 0%       101 ± 0%  -40.59%
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12       439 ± 0%       372 ± 0%  -15.26%
```

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Used Prometheus GetRef to avoid reallocating the same series.

```
 benchstat -delta-test=none ../_dev/thanos/2021/receive2/impr3-go1.16.3.txt ../_dev/thanos/2021/receive2/impr4-go1.16.3.txt
name                                                                                old time/op    new time/op    delta
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12                        1.64ms ± 0%    1.15ms ± 0%  -30.02%
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12           8.02ms ± 0%    5.57ms ± 0%  -30.53%
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12                       16.5ms ± 0%    11.5ms ± 0%  -30.28%
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12          65.1ms ± 0%    58.8ms ± 0%   -9.66%
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12                  119ms ± 0%     114ms ± 0%   -3.56%
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12     1.37s ± 0%     1.43s ± 0%   +4.58%

name                                                                                old alloc/op   new alloc/op   delta
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12                        1.63MB ± 0%    1.15MB ± 0%  -29.48%
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12           4.77MB ± 0%    4.29MB ± 0%  -10.07%
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12                       17.6MB ± 0%    12.8MB ± 0%  -27.20%
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12          48.9MB ± 0%    44.1MB ± 0%   -9.82%
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12                  225MB ± 0%     120MB ± 0%  -46.70%
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12     697MB ± 0%     592MB ± 0%  -15.05%

name                                                                                old allocs/op  new allocs/op  delta
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12                         13.6k ± 0%      3.6k ± 0%  -73.58%
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12            33.6k ± 0%     23.6k ± 0%  -29.75%
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12                         135k ± 0%       35k ± 0%  -73.84%
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12            335k ± 0%      235k ± 0%  -29.84%
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12                    101 ± 0%        79 ± 0%  -21.78%
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12       372 ± 0%       360 ± 0%   -3.23%
```

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Build fixes.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Fixes.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka authored and clyang82 committed Apr 27, 2021
1 parent b0bca31 commit 265b768
Show file tree
Hide file tree
Showing 10 changed files with 383 additions and 216 deletions.
23 changes: 12 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module github.com/thanos-io/thanos

require (
cloud.google.com/go v0.74.0
cloud.google.com/go v0.79.0
cloud.google.com/go/storage v1.10.0
github.com/Azure/azure-pipeline-go v0.2.2
github.com/Azure/azure-storage-blob-go v0.8.0
Expand All @@ -10,6 +10,7 @@ require (
github.com/aliyun/aliyun-oss-go-sdk v2.0.4+incompatible
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/cespare/xxhash v1.1.0
github.com/cespare/xxhash/v2 v2.1.1
github.com/chromedp/cdproto v0.0.0-20200424080200-0de008e41fa0
github.com/chromedp/chromedp v0.5.3
github.com/cortexproject/cortex v1.7.1-0.20210224085859-66d6fb5b0d42
Expand All @@ -32,7 +33,7 @@ require (
github.com/leanovate/gopter v0.2.4
github.com/lightstep/lightstep-tracer-go v0.18.1
github.com/lovoo/gcloud-opentracing v0.3.0
github.com/miekg/dns v1.1.38
github.com/miekg/dns v1.1.41
github.com/minio/minio-go/v7 v7.0.10
github.com/mozillazg/go-cos v0.13.0
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f
Expand All @@ -47,8 +48,8 @@ require (
github.com/prometheus/alertmanager v0.21.1-0.20201106142418-c39b78780054
github.com/prometheus/client_golang v1.9.0
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.15.0
github.com/prometheus/prometheus v1.8.2-0.20210215121130-6f488061dfb4
github.com/prometheus/common v0.20.0
github.com/prometheus/prometheus v1.8.2-0.20210413124018-62afcabd01ea
github.com/uber/jaeger-client-go v2.25.0+incompatible
github.com/uber/jaeger-lib v2.4.0+incompatible
github.com/weaveworks/common v0.0.0-20210112142934-23c8d7fa6120
Expand All @@ -57,13 +58,13 @@ require (
go.uber.org/atomic v1.7.0
go.uber.org/automaxprocs v1.2.0
go.uber.org/goleak v1.1.10
golang.org/x/crypto v0.0.0-20201208171446-5f87f3452ae9
golang.org/x/oauth2 v0.0.0-20210210192628-66670185b0cd
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83
golang.org/x/oauth2 v0.0.0-20210323180902-22b0adad7558
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/text v0.3.5
google.golang.org/api v0.39.0
google.golang.org/genproto v0.0.0-20201214200347-8c77b98c765d
google.golang.org/grpc v1.34.0
google.golang.org/api v0.42.0
google.golang.org/genproto v0.0.0-20210312152112-fc591d9ea70f
google.golang.org/grpc v1.36.0
gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/fsnotify.v1 v1.4.7
gopkg.in/yaml.v2 v2.4.0
Expand All @@ -80,7 +81,7 @@ replace (
// TODO: Remove this: https://github.com/thanos-io/thanos/issues/3967.
github.com/minio/minio-go/v7 => github.com/bwplotka/minio-go/v7 v7.0.11-0.20210324165441-f9927e5255a6
// Make sure Prometheus version is pinned as Prometheus semver does not include Go APIs.
github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20210215121130-6f488061dfb4
github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20210413124018-62afcabd01ea
github.com/sercand/kuberesolver => github.com/sercand/kuberesolver v2.4.0+incompatible
google.golang.org/grpc => google.golang.org/grpc v1.29.1

Expand Down
142 changes: 109 additions & 33 deletions go.sum

Large diffs are not rendered by default.

32 changes: 20 additions & 12 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
package receive

import (
"bytes"
"context"
"crypto/tls"
"fmt"
"io/ioutil"
"io"
stdlog "log"
"net"
"net/http"
Expand Down Expand Up @@ -262,7 +263,7 @@ func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenant string,
replicated: rep != 0,
}

// on-the-wire format is 1-indexed and in-code is 0-indexed so we decrement the value if it was already replicated.
// On the wire, format is 1-indexed and in-code is 0-indexed so we decrement the value if it was already replicated.
if r.replicated {
r.n--
}
Expand All @@ -277,17 +278,24 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
span, ctx := tracing.StartSpan(r.Context(), "receive_http")
defer span.Finish()

// TODO(bwplotka): Optimize readAll https://github.com/thanos-io/thanos/pull/3334/files.
compressed, err := ioutil.ReadAll(r.Body)
// ioutil.ReadAll dynamically adjust the byte slice for read data, starting from 512B.
// Since this is receive hot path, grow upfront saving allocations and CPU time.
compressed := bytes.Buffer{}
if r.ContentLength >= 0 {
compressed.Grow(int(r.ContentLength))
} else {
compressed.Grow(512)
}
_, err := io.Copy(&compressed, r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
http.Error(w, errors.Wrap(err, "read compressed request body").Error(), http.StatusInternalServerError)
return
}

reqBuf, err := snappy.Decode(nil, compressed)
reqBuf, err := snappy.Decode(nil, compressed.Bytes())
if err != nil {
level.Error(h.logger).Log("msg", "snappy decode error", "err", err)
http.Error(w, err.Error(), http.StatusBadRequest)
http.Error(w, errors.Wrap(err, "snappy decode error").Error(), http.StatusBadRequest)
return
}

Expand Down Expand Up @@ -410,9 +418,9 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma
}
}()

logger := log.With(h.logger, "tenant", tenant)
logTags := []interface{}{"tenant", tenant}
if id, ok := middleware.RequestIDFromContext(pctx); ok {
logger = log.With(logger, "request-id", id)
logTags = append(logTags, "request-id", id)
}

ec := make(chan error)
Expand Down Expand Up @@ -462,7 +470,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma
if err != nil {
// When a MultiError is added to another MultiError, the error slices are concatenated, not nested.
// To avoid breaking the counting logic, we need to flatten the error.
level.Debug(h.logger).Log("msg", "local tsdb write failed", "err", err.Error())
level.Debug(h.logger).Log(append(logTags, "msg", "local tsdb write failed", "err", err.Error()))
ec <- errors.Wrapf(determineWriteErrorCause(err, 1), "store locally for endpoint %v", endpoint)
return
}
Expand Down Expand Up @@ -525,7 +533,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma
b.attempt++
dur := h.expBackoff.ForAttempt(b.attempt)
b.nextAllowed = time.Now().Add(dur)
level.Debug(h.logger).Log("msg", "target unavailable backing off", "for", dur)
level.Debug(h.logger).Log(append(logTags, "msg", "target unavailable backing off", "for", dur))
} else {
h.peerStates[endpoint] = &retryState{nextAllowed: time.Now().Add(h.expBackoff.ForAttempt(0))}
}
Expand Down Expand Up @@ -554,7 +562,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma
go func() {
for err := range ec {
if err != nil {
level.Debug(logger).Log("msg", "request failed, but not needed to achieve quorum", "err", err)
level.Debug(h.logger).Log(append(logTags, "msg", "request failed, but not needed to achieve quorum", "err", err))
}
}
}()
Expand Down
116 changes: 113 additions & 3 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/golang/snappy"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/exemplar"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
Expand Down Expand Up @@ -185,6 +186,108 @@ func TestDetermineWriteErrorCause(t *testing.T) {
}
}

type fakeTenantAppendable struct {
f *fakeAppendable
}

func newFakeTenantAppendable(f *fakeAppendable) *fakeTenantAppendable {
return &fakeTenantAppendable{f: f}
}

func (t *fakeTenantAppendable) TenantAppendable(_ string) (Appendable, error) {
return t.f, nil
}

type fakeAppendable struct {
appender storage.Appender
appenderErr func() error
}

var _ Appendable = &fakeAppendable{}

func nilErrFn() error {
return nil
}

func (f *fakeAppendable) Appender(_ context.Context) (storage.Appender, error) {
errf := f.appenderErr
if errf == nil {
errf = nilErrFn
}
return f.appender, errf()
}

type fakeAppender struct {
sync.Mutex
samples map[uint64][]prompb.Sample
exemplars map[uint64][]exemplar.Exemplar
appendErr func() error
commitErr func() error
rollbackErr func() error
}

var _ storage.Appender = &fakeAppender{}
var _ storage.GetRef = &fakeAppender{}

func newFakeAppender(appendErr, commitErr, rollbackErr func() error) *fakeAppender { //nolint:unparam
if appendErr == nil {
appendErr = nilErrFn
}
if commitErr == nil {
commitErr = nilErrFn
}
if rollbackErr == nil {
rollbackErr = nilErrFn
}
return &fakeAppender{
samples: make(map[uint64][]prompb.Sample),
appendErr: appendErr,
commitErr: commitErr,
rollbackErr: rollbackErr,
}
}

func (f *fakeAppender) Get(l labels.Labels) []prompb.Sample {
f.Lock()
defer f.Unlock()
s := f.samples[l.Hash()]
res := make([]prompb.Sample, len(s))
copy(res, s)
return res
}

func (f *fakeAppender) Append(ref uint64, l labels.Labels, t int64, v float64) (uint64, error) {
f.Lock()
defer f.Unlock()
if ref == 0 {
ref = l.Hash()
}
f.samples[ref] = append(f.samples[ref], prompb.Sample{Timestamp: t, Value: v})
return ref, f.appendErr()
}

func (f *fakeAppender) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) {
f.Lock()
defer f.Unlock()
if ref == 0 {
ref = l.Hash()
}
f.exemplars[ref] = append(f.exemplars[ref], e)
return ref, f.appendErr()
}

func (f *fakeAppender) GetRef(l labels.Labels) (uint64, labels.Labels) {
return l.Hash(), l
}

func (f *fakeAppender) Commit() error {
return f.commitErr()
}

func (f *fakeAppender) Rollback() error {
return f.rollbackErr()
}

func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) ([]*Handler, Hashring) {
var (
cfg = []HashringConfig{{Hashring: "test"}}
Expand Down Expand Up @@ -1015,6 +1118,10 @@ func (a *tsOverrideAppender) AddFast(ref uint64, _ int64, v float64) error {
return a.Appender.AddFast(ref, cnt, v)
}

func (a *tsOverrideAppender) GetRef(lset labels.Labels) (uint64, labels.Labels) {
return a.Appender.(storage.GetRef).GetRef(lset)
}

// serializeSeriesWithOneSample returns marshaled and compressed remote write requests like it would
// be send to Thanos receive.
// It has one sample and allow passing multiple series, in same manner as typical Prometheus would batch it.
Expand Down Expand Up @@ -1171,15 +1278,18 @@ func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) {
for i := 0; i < n; i++ {
r := httptest.NewRecorder()
handler.receiveHTTP(r, &http.Request{ContentLength: int64(len(tcase.writeRequest)), Body: ioutil.NopCloser(bytes.NewReader(tcase.writeRequest))})
testutil.Equals(b, http.StatusConflict, r.Code, "%v", i)
testutil.Equals(b, http.StatusConflict, r.Code, "%v-%s", i, func() string {
b, _ := ioutil.ReadAll(r.Body)
return string(b)
}())
}
})
})
}

runtime.GC()
// Take snapshot at the end to reveal how much memory we keep in TSDB.
testutil.Ok(b, Heap("../../"))
testutil.Ok(b, Heap("../../../_dev/thanos/2021/receive2"))

}

Expand All @@ -1188,7 +1298,7 @@ func Heap(dir string) (err error) {
return err
}

f, err := os.Create(filepath.Join(dir, "mem.pprof"))
f, err := os.Create(filepath.Join(dir, "impr5-go1.16.3.pprof"))
if err != nil {
return err
}
Expand Down
27 changes: 6 additions & 21 deletions pkg/receive/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,12 @@ import (
"sort"
"sync"

"github.com/cespare/xxhash"
"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/store/labelpb"

"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
)

const sep = '\xff'

// insufficientNodesError is returned when a hashring does not
// have enough nodes to satisfy a request for a node.
type insufficientNodesError struct {
Expand All @@ -39,23 +37,6 @@ type Hashring interface {
GetN(tenant string, timeSeries *prompb.TimeSeries, n uint64) (string, error)
}

// hash returns a hash for the given tenant and time series.
func hash(tenant string, ts *prompb.TimeSeries) uint64 {
// Sort labelset to ensure a stable hash.
sort.Slice(ts.Labels, func(i, j int) bool { return ts.Labels[i].Name < ts.Labels[j].Name })

b := make([]byte, 0, 1024)
b = append(b, []byte(tenant)...)
b = append(b, sep)
for _, v := range ts.Labels {
b = append(b, v.Name...)
b = append(b, sep)
b = append(b, v.Value...)
b = append(b, sep)
}
return xxhash.Sum64(b)
}

// SingleNodeHashring always returns the same node.
type SingleNodeHashring string

Expand Down Expand Up @@ -85,7 +66,11 @@ func (s simpleHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
if n >= uint64(len(s)) {
return "", &insufficientNodesError{have: uint64(len(s)), want: n + 1}
}
return s[(hash(tenant, ts)+n)%uint64(len(s))], nil

// TODO(bwplotka): This might be not needed, double check.
sort.Slice(ts.Labels, func(i, j int) bool { return ts.Labels[i].Name < ts.Labels[j].Name })

return s[(labelpb.HashWithPrefix(tenant, ts.Labels)+n)%uint64(len(s))], nil
}

// multiHashring represents a set of hashrings.
Expand Down
23 changes: 0 additions & 23 deletions pkg/receive/hashring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,6 @@ import (
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
)

func TestHash(t *testing.T) {
ts := &prompb.TimeSeries{
Labels: []labelpb.ZLabel{
{
Name: "foo",
Value: "bar",
},
{
Name: "baz",
Value: "qux",
},
},
}

ts2 := &prompb.TimeSeries{
Labels: []labelpb.ZLabel{ts.Labels[1], ts.Labels[0]},
}

if hash("", ts) != hash("", ts2) {
t.Errorf("expected hashes to be independent of label order")
}
}

func TestHashringGet(t *testing.T) {
ts := &prompb.TimeSeries{
Labels: []labelpb.ZLabel{
Expand Down

0 comments on commit 265b768

Please sign in to comment.