Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Meta-monitoring based active series limiting #5520

Merged
merged 15 commits into from
Aug 2, 2022
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5470](https://github.com/thanos-io/thanos/pull/5470) Receive: Implement exposing TSDB stats for all tenants
- [#5493](https://github.com/thanos-io/thanos/pull/5493) Compact: Added `--compact.blocks-fetch-concurrency` allowing to configure number of go routines for download blocks during compactions.
- [#5527](https://github.com/thanos-io/thanos/pull/5527) Receive: Add per request limits for remote write.
- [#5520](https://github.com/thanos-io/thanos/pull/5520) Receive: Meta-monitoring based active series limiting

### Changed

Expand Down
39 changes: 39 additions & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package main
import (
"context"
"io/ioutil"
"net/url"
"os"
"path"
"strings"
Expand Down Expand Up @@ -185,6 +186,9 @@ func runReceive(
return errors.Wrap(err, "parse relabel configuration")
}

// Impose active series limit only if Receiver is in Router or RouterIngestor mode, and config is provided.
seriesLimitSupported := (receiveMode == receive.RouterOnly || receiveMode == receive.RouterIngestor) && conf.maxPerTenantLimit != 0

dbs := receive.NewMultiTSDB(
conf.dataDir,
logger,
Expand Down Expand Up @@ -214,6 +218,11 @@ func runReceive(
DialOpts: dialOpts,
ForwardTimeout: time.Duration(*conf.forwardTimeout),
TSDBStats: dbs,
SeriesLimitSupported: seriesLimitSupported,
MaxPerTenantLimit: conf.maxPerTenantLimit,
MetaMonitoringUrl: conf.metaMonitoringUrl,
MetaMonitoringHttpClient: conf.metaMonitoringHttpClient,
MetaMonitoringLimitQuery: conf.metaMonitoringLimitQuery,
WriteSeriesLimit: conf.writeSeriesLimit,
WriteSamplesLimit: conf.writeSamplesLimit,
WriteRequestSizeLimit: conf.writeRequestSizeLimit,
Expand Down Expand Up @@ -297,6 +306,23 @@ func runReceive(
)
}

if seriesLimitSupported {
level.Info(logger).Log("msg", "setting up periodic (every 15s) meta-monitoring query for limiting cache")
{
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(15*time.Second, ctx.Done(), func() error {
if err := webHandler.ActiveSeriesLimit.QueryMetaMonitoring(ctx, log.With(logger, "component", "receive-meta-monitoring")); err != nil {
level.Error(logger).Log("msg", "failed to query meta-monitoring", "err", err.Error())
}
return nil
})
}, func(err error) {
cancel()
})
}
}

level.Debug(logger).Log("msg", "setting up periodic tenant pruning")
{
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -733,6 +759,11 @@ type receiveConfig struct {
rwClientServerCA string
rwClientServerName string

maxPerTenantLimit uint64
metaMonitoringLimitQuery string
metaMonitoringUrl *url.URL
metaMonitoringHttpClient *extflag.PathOrContent

dataDir string
labelStrs []string

Expand Down Expand Up @@ -831,6 +862,14 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

cmd.Flag("receive.replication-factor", "How many times to replicate incoming write requests.").Default("1").Uint64Var(&rc.replicationFactor)

cmd.Flag("receive.tenant-limits.max-head-series", "The total number of active (head) series that a tenant is allowed to have within a Receive topology. For more details refer: https://thanos.io/tip/components/receive.md/#limiting").Hidden().Uint64Var(&rc.maxPerTenantLimit)

cmd.Flag("receive.tenant-limits.meta-monitoring-url", "Meta-monitoring URL which is compatible with Prometheus Query API for active series limiting.").Hidden().URLVar(&rc.metaMonitoringUrl)

cmd.Flag("receive.tenant-limits.meta-monitoring-query", "PromQL Query to execute against meta-monitoring, to get the current number of active series for each tenant, across Receive replicas.").Default("sum(prometheus_tsdb_head_series) by (tenant)").Hidden().StringVar(&rc.metaMonitoringLimitQuery)

rc.metaMonitoringHttpClient = extflag.RegisterPathOrContent(cmd, "receive.tenant-limits.meta-monitoring-client", "YAML file or string with http client configs for meta-monitoring.", extflag.WithHidden())

rc.forwardTimeout = extkingpin.ModelDuration(cmd.Flag("receive-forward-timeout", "Timeout for each forward request.").Default("5s").Hidden())

rc.relabelConfigPath = extflag.RegisterPathOrContent(cmd, "receive.relabel-config", "YAML file that contains relabeling configuration.", extflag.WithEnvSubstitution())
Expand Down
17 changes: 17 additions & 0 deletions docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,23 @@ The available request gates in Thanos Receive can be configured with the followi

By default all gates are disabled.

## Active Series Limiting (experimental)

Thanos Receive, in Router or RouterIngestor mode, supports limiting tenant active (head) series to maintain the system's stability. It uses any Prometheus Query API compatible meta-monitoring solution that consumes the metrics exposed by all receivers in the Thanos system. Such query endpoint allows getting the scrape time seconds old number of all active series per tenant, which is then compared with a configured limit before ingesting any tenant's remote write request. In case a tenant has gone above the limit, their remote write requests fail fully.

Every Receive Router/RouterIngestor node, queries meta-monitoring for active series of all tenants, every 15 seconds, and caches the results in a map. This cached result is used to limit all incoming remote write requests.

To use the feature, one should specify the following (hidden) flags:
- `--receive.tenant-limits.max-head-series`: Specifies the total number of active (head) series for any tenant, across all replicas (including data replication), allowed by Thanos Receive.
- `--receive.tenant-limits.meta-monitoring-url`: Specifies Prometheus Query API compatible meta-monitoring endpoint.
- `--receive.tenant-limits.meta-monitoring-query`: Optional flag to specify PromQL query to execute against meta-monitoring.
- `--receive.tenant-limits.meta-monitoring-client`: Optional YAML file/string specifying HTTP client config for meta-monitoring.

NOTE:
- It is possible that Receive ingests more active series than the specified limit, as it relies on meta-monitoring, which may not have the latest data for current number of active series of a tenant at all times.
- Thanos Receive performs best-effort limiting. In case meta-monitoring is down/unreachable, Thanos Receive will not impose limits and only log errors for meta-monitoring being unreachable. Similaly to when one receiver cannot be scraped.
- Support for different limit configuration for different tenants is planned for the future.

## Flags

```$ mdox-exec="thanos receive --help"
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ require (
github.com/chromedp/chromedp v0.8.2
github.com/davecgh/go-spew v1.1.1
github.com/dustin/go-humanize v1.0.0
github.com/efficientgo/e2e v0.12.1
github.com/efficientgo/e2e v0.12.2-0.20220714084440-2f5240d8c363
github.com/efficientgo/tools/core v0.0.0-20220225185207-fe763185946b
github.com/efficientgo/tools/extkingpin v0.0.0-20220225185207-fe763185946b
github.com/efficientgo/tools/extkingpin v0.0.0-20220801101838-3312908f6a9d
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb
github.com/fatih/structtag v1.2.0
github.com/felixge/fgprof v0.9.2
Expand Down
8 changes: 5 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -296,13 +296,14 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/edsrzf/mmap-go v1.1.0 h1:6EUwBLQ/Mcr1EYLE4Tn1VdW1A4ckqCQWZBw8Hr0kjpQ=
github.com/edsrzf/mmap-go v1.1.0/go.mod h1:19H/e8pUPLicwkyNgOykDXkJ9F0MHE+Z52B8EIth78Q=
github.com/efficientgo/e2e v0.12.1 h1:ZYNTf09ptlba0I3ZStYaF7gCbevWdalriiX7usOSiFM=
github.com/efficientgo/e2e v0.12.1/go.mod h1:xDHUyIqAWyVWU29Lf+BaZoavW7xAbDEvTwHWWI/3bhk=
github.com/efficientgo/e2e v0.12.2-0.20220714084440-2f5240d8c363 h1:Nw7SeBNMBrX3s0BbDlAWuGhEEDcKLteMsMmPThj4sxQ=
github.com/efficientgo/e2e v0.12.2-0.20220714084440-2f5240d8c363/go.mod h1:0Jrqcog5+GlJkbC8ulPkgyRZwq+GsvjUlNt+B2swzJ8=
github.com/efficientgo/tools/core v0.0.0-20210129205121-421d0828c9a6/go.mod h1:OmVcnJopJL8d3X3sSXTiypGoUSgFq1aDGmlrdi9dn/M=
github.com/efficientgo/tools/core v0.0.0-20220225185207-fe763185946b h1:ZHiD4/yE4idlbqvAO6iYCOYRzOMRpxkW+FKasRA3tsQ=
github.com/efficientgo/tools/core v0.0.0-20220225185207-fe763185946b/go.mod h1:OmVcnJopJL8d3X3sSXTiypGoUSgFq1aDGmlrdi9dn/M=
github.com/efficientgo/tools/extkingpin v0.0.0-20220225185207-fe763185946b h1:rFV4ZGoCKjhOyc4vjrzuCsi9BbrxMJvwmtceN0iR4Zc=
github.com/efficientgo/tools/extkingpin v0.0.0-20220225185207-fe763185946b/go.mod h1:ZV0utlglOczUWv3ih2AbqPSoLoFzdplUYxwV62eZi6Q=
github.com/efficientgo/tools/extkingpin v0.0.0-20220801101838-3312908f6a9d h1:WZV/mrUyKS9w9r+Jdw+zq/tdGAb5LwB+H37EkMLhEMA=
github.com/efficientgo/tools/extkingpin v0.0.0-20220801101838-3312908f6a9d/go.mod h1:ZV0utlglOczUWv3ih2AbqPSoLoFzdplUYxwV62eZi6Q=
github.com/elastic/go-sysinfo v1.1.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0=
github.com/elastic/go-sysinfo v1.8.1 h1:4Yhj+HdV6WjbCRgGdZpPJ8lZQlXZLKDAeIkmQ/VRvi4=
github.com/elastic/go-sysinfo v1.8.1/go.mod h1:JfllUnzoQV/JRYymbH3dO1yggI3mV2oTKSXsDHM+uIM=
Expand Down Expand Up @@ -1011,6 +1012,7 @@ github.com/prometheus/common v0.29.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+
github.com/prometheus/common v0.30.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
github.com/prometheus/common v0.35.0/go.mod h1:phzohg0JFMnBEFGxTDbfu3QyL5GI8gTQJFhYO5B3mfA=
github.com/prometheus/common v0.36.0/go.mod h1:phzohg0JFMnBEFGxTDbfu3QyL5GI8gTQJFhYO5B3mfA=
github.com/prometheus/common v0.37.0 h1:ccBbHCgIiT9uSoFY0vX8H3zsNR5eLt17/RQLUvn8pXE=
github.com/prometheus/common v0.37.0/go.mod h1:phzohg0JFMnBEFGxTDbfu3QyL5GI8gTQJFhYO5B3mfA=
github.com/prometheus/common/assets v0.2.0/go.mod h1:D17UVUE12bHbim7HzwUvtqm6gwBEaDQ0F+hIGbFbccI=
Expand Down
179 changes: 173 additions & 6 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,20 @@ import (
stdlog "log"
"net"
"net/http"
"net/url"
"sort"
"strconv"
"sync"
"time"

extflag "github.com/efficientgo/tools/extkingpin"
"github.com/thanos-io/thanos/pkg/api"
statusapi "github.com/thanos-io/thanos/pkg/api/status"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/httpconfig"
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/promclient"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -101,12 +105,23 @@ type Options struct {
ForwardTimeout time.Duration
RelabelConfigs []*relabel.Config
TSDBStats TSDBStats
SeriesLimitSupported bool
MaxPerTenantLimit uint64
MetaMonitoringUrl *url.URL
MetaMonitoringHttpClient *extflag.PathOrContent
MetaMonitoringLimitQuery string
WriteSeriesLimit int64
WriteSamplesLimit int64
WriteRequestSizeLimit int64
WriteRequestConcurrencyLimit int
}

// activeSeriesLimiter encompasses active series limiting logic.
type activeSeriesLimiter interface {
QueryMetaMonitoring(context.Context, log.Logger) error
isUnderLimit(string, log.Logger) (bool, error)
}

// Handler serves a Prometheus remote write receiving HTTP endpoint.
type Handler struct {
logger log.Logger
Expand All @@ -115,12 +130,13 @@ type Handler struct {
options *Options
listener net.Listener

mtx sync.RWMutex
hashring Hashring
peers *peerGroup
expBackoff backoff.Backoff
peerStates map[string]*retryState
receiverMode ReceiverMode
mtx sync.RWMutex
hashring Hashring
peers *peerGroup
expBackoff backoff.Backoff
peerStates map[string]*retryState
receiverMode ReceiverMode
ActiveSeriesLimit activeSeriesLimiter

forwardRequests *prometheus.CounterVec
replications *prometheus.CounterVec
Expand Down Expand Up @@ -219,6 +235,11 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
h.replicationFactor.Set(1)
}

h.ActiveSeriesLimit = NewNopSeriesLimit()
if h.options.SeriesLimitSupported {
h.ActiveSeriesLimit = NewActiveSeriesLimit(h.options, registerer, h.receiverMode, logger)
}

ins := extpromhttp.NewNopInstrumentationMiddleware()
if o.Registry != nil {
ins = extpromhttp.NewTenantInstrumentationMiddleware(
Expand Down Expand Up @@ -431,6 +452,17 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {

defer h.writeGate.Done()

under, err := h.ActiveSeriesLimit.isUnderLimit(tenant, tLogger)
if err != nil {
level.Error(tLogger).Log("msg", "error while limiting", "err", err.Error())
}

// Fail request fully if tenant has exceeded set limit.
if !under {
http.Error(w, "tenant is above active series limit", http.StatusTooManyRequests)
return
}

// ioutil.ReadAll dynamically adjust the byte slice for read data, starting from 512B.
// Since this is receive hot path, grow upfront saving allocations and CPU time.
compressed := bytes.Buffer{}
Expand Down Expand Up @@ -534,6 +566,141 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
h.writeSamplesTotal.WithLabelValues(strconv.Itoa(responseStatusCode), tenant).Observe(float64(totalSamples))
}

// activeSeriesLimit implements activeSeriesLimiter interface.
type activeSeriesLimit struct {
mtx sync.RWMutex
limit uint64
tenantCurrentSeriesMap map[string]float64

metaMonitoringURL *url.URL
metaMonitoringClient *http.Client
metaMonitoringQuery string

configuredTenantLimit prometheus.Gauge
limitedRequests *prometheus.CounterVec
metaMonitoringErr prometheus.Counter
}

func NewActiveSeriesLimit(o *Options, registerer prometheus.Registerer, r ReceiverMode, logger log.Logger) *activeSeriesLimit {
limit := &activeSeriesLimit{
limit: o.MaxPerTenantLimit,
metaMonitoringURL: o.MetaMonitoringUrl,
metaMonitoringQuery: o.MetaMonitoringLimitQuery,
configuredTenantLimit: promauto.With(registerer).NewGauge(
prometheus.GaugeOpts{
Name: "thanos_receive_tenant_head_series_limit",
Help: "The configured limit for active (head) series of tenants.",
},
),
limitedRequests: promauto.With(registerer).NewCounterVec(
prometheus.CounterOpts{
Name: "thanos_receive_head_series_limited_requests_total",
Help: "The total number of remote write requests that have been dropped due to active series limiting.",
}, []string{"tenant"},
),
metaMonitoringErr: promauto.With(registerer).NewCounter(
prometheus.CounterOpts{
Name: "thanos_receive_metamonitoring_failed_queries_total",
Help: "The total number of meta-monitoring queries that failed while limiting.",
},
),
}

limit.configuredTenantLimit.Set(float64(o.MaxPerTenantLimit))
limit.tenantCurrentSeriesMap = map[string]float64{}

// Use specified HTTPConfig to make requests to meta-monitoring.
httpConfContentYaml, err := o.MetaMonitoringHttpClient.Content()
if err != nil {
level.Error(logger).Log("msg", "getting http client config", "err", err.Error())
}

httpClientConfig, err := httpconfig.NewClientConfigFromYAML(httpConfContentYaml)
if err != nil {
level.Error(logger).Log("msg", "parsing http config YAML", "err", err.Error())
}

limit.metaMonitoringClient, err = httpconfig.NewHTTPClient(*httpClientConfig, "meta-mon-for-limit")
if err != nil {
level.Error(logger).Log("msg", "improper http client config", "err", err.Error())
}

return limit
}

// QueryMetaMonitoring queries any Prometheus Query API compatible meta-monitoring
// solution with the configured query for getting current active (head) series of all tenants.
// It then populates tenantCurrentSeries map with result.
func (a *activeSeriesLimit) QueryMetaMonitoring(ctx context.Context, logger log.Logger) error {
c := promclient.NewWithTracingClient(logger, a.metaMonitoringClient, httpconfig.ThanosUserAgent)

vectorRes, _, err := c.QueryInstant(ctx, a.metaMonitoringURL, a.metaMonitoringQuery, time.Now(), promclient.QueryOptions{})
if err != nil {
saswatamcode marked this conversation as resolved.
Show resolved Hide resolved
a.metaMonitoringErr.Inc()
return err
}

level.Debug(logger).Log("msg", "successfully queried meta-monitoring", "vectors", len(vectorRes))

a.mtx.Lock()
defer a.mtx.Unlock()
// Construct map of tenant name and current HEAD series.
saswatamcode marked this conversation as resolved.
Show resolved Hide resolved
for _, e := range vectorRes {
for k, v := range e.Metric {
if k == "tenant" {
a.tenantCurrentSeriesMap[string(v)] = float64(e.Value)
level.Debug(logger).Log("msg", "tenant value queried", "tenant", string(v), "value", e.Value)
}
}
}

return nil
}

// isUnderLimit ensures that the current number of active series for a tenant does not exceed given limit.
// It does so in a best-effort way, i.e, in case meta-monitoring is unreachable, it does not impose limits.
// TODO(saswatamcode): Add capability to configure different limits for different tenants.
func (a *activeSeriesLimit) isUnderLimit(tenant string, logger log.Logger) (bool, error) {
a.mtx.RLock()
defer a.mtx.RUnlock()
if a.limit == 0 || a.metaMonitoringURL.Host == "" {
return true, nil
}

// In such limiting flow, we ingest the first remote write request
// and then check meta-monitoring metric to ascertain current active
// series. As such metric is updated in intervals, it is possible
// that Receive ingests more series than the limit, before detecting that
// a tenant has exceeded the set limits.
v, ok := a.tenantCurrentSeriesMap[tenant]
if !ok {
Copy link
Collaborator

@matej-g matej-g Jul 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, are we taking into consideration this could be the first request from a new tenant? That should not be considered an error I think.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this error is not explicit (due to best-effort limiting), we only log this error. There doesn't seem to be a neat way of detecting that this is the first request of a tenant via Handler. Also, in case a tenant sends multiple request before we have had a chance to query meta-monitoring, all those requests would log errors, even if we detect first one.

WDYT would be a good way of handling this? 馃檪

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm that is true that we could receive multiple requests before the 15 second interval from a new tenant, feels like that's simply shortcoming of the meta-monitoring approach. Either way, are there other cases where a tenant would be missing from the map, apart from the case of new tenant? In that case I'd not consider it an error but simply return true without error and let the tenant have it "for free" before we query meta-monitoring.

One thing that comes to my mind would be using a channel to signal to the querying goroutine to fire off, once we detect a request from new tenant has been ingested? Might be easier said than done.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will try this out. But another reason why a tenant may not be in the map is that meta-monitoring does not have metrics for one instance of a Receive, thereby completely missing a few tenants. In which case such an error log could be a clue. WDYT?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm right I have not thought about hard tenancy 馃. Could we detect within meta-monitoring if we have metrics for expected number of receiver instances and if not warn about it?

return true, errors.New("tenant not in current series map")
}

if v >= float64(a.limit) {
level.Error(logger).Log("msg", "tenant above limit", "currentSeries", v, "limit", a.limit)
saswatamcode marked this conversation as resolved.
Show resolved Hide resolved
a.limitedRequests.WithLabelValues(tenant).Inc()
return false, nil
}

return true, nil
}

// nopSeriesLimit implements activeSeriesLimiter interface as no-op.
type nopSeriesLimit struct{}

func NewNopSeriesLimit() *nopSeriesLimit {
return &nopSeriesLimit{}
}

func (a *nopSeriesLimit) QueryMetaMonitoring(_ context.Context, _ log.Logger) error {
return nil
}

func (a *nopSeriesLimit) isUnderLimit(_ string, _ log.Logger) (bool, error) {
return true, nil
}

// forward accepts a write request, batches its time series by
// corresponding endpoint, and forwards them in parallel to the
// correct endpoint. Requests destined for the local node are written
Expand Down