Skip to content

Commit

Permalink
Cache meta-monitoring query result
Browse files Browse the repository at this point in the history
Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>
  • Loading branch information
saswatamcode committed Jul 20, 2022
1 parent 8062bb3 commit 96a821e
Show file tree
Hide file tree
Showing 3 changed files with 256 additions and 33 deletions.
17 changes: 17 additions & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,23 @@ func runReceive(
)
}

if (receiveMode == receive.RouterOnly || receiveMode == receive.RouterIngestor) && conf.maxPerTenantLimit != 0 {
level.Debug(logger).Log("msg", "setting up periodic meta-monitoring query for limiting cache")
{
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(15*time.Second, ctx.Done(), func() error {
if err := webHandler.QueryMetaMonitoring(ctx); err != nil {
level.Error(logger).Log("msg", "failed to query meta-monitoring", "err", err.Error())
}
return nil
})
}, func(err error) {
cancel()
})
}
}

level.Debug(logger).Log("msg", "setting up periodic tenant pruning")
{
ctx, cancel := context.WithCancel(context.Background())
Expand Down
83 changes: 50 additions & 33 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,20 +117,21 @@ type Handler struct {
options *Options
listener net.Listener

mtx sync.RWMutex
hashring Hashring
peers *peerGroup
expBackoff backoff.Backoff
peerStates map[string]*retryState
receiverMode ReceiverMode
metaMonitoringClient *http.Client
mtx sync.RWMutex
hashring Hashring
peers *peerGroup
expBackoff backoff.Backoff
peerStates map[string]*retryState
receiverMode ReceiverMode
metaMonitoringClient *http.Client
tenantCurrentSeriesMap map[string]float64

forwardRequests *prometheus.CounterVec
replications *prometheus.CounterVec
replicationFactor prometheus.Gauge
configuredTenantLimit prometheus.Gauge
limitedRequests *prometheus.CounterVec
metaMonitoringErr *prometheus.CounterVec
metaMonitoringErr prometheus.Counter

writeSamplesTotal *prometheus.HistogramVec
writeTimeseriesTotal *prometheus.HistogramVec
Expand Down Expand Up @@ -189,11 +190,11 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
Help: "The total number of remote write requests that have been dropped due to head series limiting.",
}, []string{"tenant"},
),
metaMonitoringErr: promauto.With(registerer).NewCounterVec(
metaMonitoringErr: promauto.With(registerer).NewCounter(
prometheus.CounterOpts{
Name: "thanos_receive_metamonitoring_failed_queries_total",
Help: "The total number of meta-monitoring queries that failed while limiting.",
}, []string{"tenant"},
},
),
writeTimeseriesTotal: promauto.With(registerer).NewHistogramVec(
prometheus.HistogramOpts{
Expand Down Expand Up @@ -227,8 +228,9 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
}

h.configuredTenantLimit.Set(float64(o.MaxPerTenantLimit))
h.tenantCurrentSeriesMap = map[string]float64{}

if h.receiverMode == RouterOnly || h.receiverMode == RouterIngestor {
if (h.receiverMode == RouterOnly || h.receiverMode == RouterIngestor) && h.options.MaxPerTenantLimit != 0 {
// Use specified HTTPConfig to make requests to meta-monitoring.
httpConfContentYaml, err := h.options.MetaMonitoringHttpClient.Content()
if err != nil {
Expand Down Expand Up @@ -546,6 +548,33 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
h.writeSamplesTotal.WithLabelValues(strconv.Itoa(responseStatusCode), tenant).Observe(float64(totalSamples))
}

// QueryMetaMonitoring queries any Prometheus Query API compatible meta-monitoring
// solution with the configured query for getting current HEAD series of all tenants.
// It then populates tenantCurrentSeries map with result.
func (h *Handler) QueryMetaMonitoring(ctx context.Context) error {
c := promclient.NewWithTracingClient(h.logger, h.metaMonitoringClient, httpconfig.ThanosUserAgent)

vectorRes, _, err := c.QueryInstant(ctx, h.options.MetaMonitoringUrl, h.options.MetaMonitoringLimitQuery, time.Now(), promclient.QueryOptions{})
if err != nil {
h.metaMonitoringErr.Inc()
return err
}

level.Debug(h.logger).Log("msg", "successfully queried meta-monitoring", "vectors", len(vectorRes))

// Construct map of tenant name and current HEAD series.
for _, e := range vectorRes {
for k, v := range e.Metric {
if k == "tenant" {
h.tenantCurrentSeriesMap[string(v)] = float64(e.Value)
level.Debug(h.logger).Log("msg", "tenant value queried", "tenant", string(v), "value", e.Value)
}
}
}

return nil
}

// isUnderLimit ensures that the current number of active series for a tenant does not exceed given limit.
// It does so in a best-effort way, i.e, in case meta-monitoring is unreachable, it does not impose limits.
// TODO(saswatamcode): Add capability to configure diff limits for diff tenants.
Expand All @@ -554,36 +583,24 @@ func (h *Handler) isUnderLimit(ctx context.Context, tenant string, logger log.Lo
return true, nil
}

c := promclient.NewWithTracingClient(logger, h.metaMonitoringClient, httpconfig.ThanosUserAgent)

vectorRes, _, err := c.QueryInstant(ctx, h.options.MetaMonitoringUrl, h.options.MetaMonitoringLimitQuery, time.Now(), promclient.QueryOptions{})
if err != nil {
h.metaMonitoringErr.WithLabelValues(tenant).Inc()
return true, errors.Wrap(err, "failed to query meta-monitoring")
}

level.Debug(logger).Log("msg", "successfully queried meta-monitoring", "vectors", len(vectorRes))

// In such limiting flow, we ingest the first remote write request
// and then check meta-monitoring metric to ascertain current active
// series. As such metric is updated in intervals, it is possible
// that Receive ingests more series than the limit, before detecting that
// a tenant has exceeded the set limits.
for _, e := range vectorRes {
for k, v := range e.Metric {
// Search for metric which has tenant label for a particular tenant.
if k == "tenant" && string(v) == tenant {
if float64(e.Value) >= float64(h.options.MaxPerTenantLimit) {
level.Error(logger).Log("msg", "tenant above limit", "currentSeries", float64(e.Value), "limit", h.options.MaxPerTenantLimit)
h.limitedRequests.WithLabelValues(tenant).Inc()
return false, nil
}
v, ok := h.tenantCurrentSeriesMap[tenant]
if !ok {
return true, errors.New("tenant not in current series map")
}

level.Debug(logger).Log("msg", "tenant is under limit", "currentSeries", float64(e.Value))
}
}
if v >= float64(h.options.MaxPerTenantLimit) {
level.Error(logger).Log("msg", "tenant above limit", "currentSeries", v, "limit", h.options.MaxPerTenantLimit)
h.limitedRequests.WithLabelValues(tenant).Inc()
return false, nil
}

level.Debug(logger).Log("msg", "tenant is under limit", "currentSeries", v)

return true, nil
}

Expand Down
189 changes: 189 additions & 0 deletions test/e2e/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1114,4 +1114,193 @@ func TestReceive(t *testing.T) {
},
})
})

t.Run("out_of_sync_active_series_limiting", func(t *testing.T) {

/*
The out_of_sync_active_series_limiting suite configures a hashring with
two avalanche writers and dedicated meta-monitoring. But it starts Receive
instances at different times, so that meta-monitoring queries occur at different
intervals for each.
┌──────────┐ ┌──────────┐
│ │ │ │
│Avalanche │ │Avalanche │
│ │ │ │
│ │ │ │
└──────────┴──────────┐ ┌──────────┴──────────┘
│ │
┌─▼─────▼──┐
│ │
│Router ├────────────────► Meta-monitoring
│Ingestor │
│ │
└──▲─┬──▲──┘
│ │ │
┌──────────┐ │ │ │ ┌──────────┐
│ │ │ │ │ │ │
│Router ◄───────┘ │ └────────►Router │
│Ingestor │ │ │Ingestor │
│ ◄─────────┼───────────► │
└────┬─────┘ │ └────┬─────┘
│ │ │
│ ┌────▼─────┐ │
│ │ │ │
└──────────► Query ◄──────────┘
│ │
│ │
└──────────┘
NB: Made with asciiflow.com - you can copy & paste the above there to modify.
*/

t.Parallel()
e, err := e2e.NewDockerEnvironment("e2e_out_of_sync_active_series_limiting")
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

// This can be treated as the meta-monitoring service.
meta, err := e2emonitoring.Start(e)
testutil.Ok(t, err)

// Setup 3 RouterIngestors with a limit of 10 active series.
ingestor1 := e2ethanos.NewReceiveBuilder(e, "i1").WithIngestionEnabled()
ingestor2 := e2ethanos.NewReceiveBuilder(e, "i2").WithIngestionEnabled()
ingestor3 := e2ethanos.NewReceiveBuilder(e, "i3").WithIngestionEnabled()

h := receive.HashringConfig{
Endpoints: []string{
ingestor1.InternalEndpoint("grpc"),
ingestor2.InternalEndpoint("grpc"),
ingestor3.InternalEndpoint("grpc"),
},
}

i1Runnable := ingestor1.WithRouting(1, h).WithValidationEnabled(10, "http://"+meta.GetMonitoringRunnable().InternalEndpoint(e2edb.AccessPortName)).Init()
i2Runnable := ingestor2.WithRouting(1, h).WithValidationEnabled(10, "http://"+meta.GetMonitoringRunnable().InternalEndpoint(e2edb.AccessPortName)).Init()
i3Runnable := ingestor3.WithRouting(1, h).WithValidationEnabled(10, "http://"+meta.GetMonitoringRunnable().InternalEndpoint(e2edb.AccessPortName)).Init()

testutil.Ok(t, e2e.StartAndWaitReady(i1Runnable))

time.Sleep(7 * time.Second)

testutil.Ok(t, e2e.StartAndWaitReady(i2Runnable))

time.Sleep(12 * time.Second)

testutil.Ok(t, e2e.StartAndWaitReady(i3Runnable))

querier := e2ethanos.NewQuerierBuilder(e, "1", ingestor1.InternalEndpoint("grpc"), ingestor2.InternalEndpoint("grpc"), ingestor3.InternalEndpoint("grpc")).Init()
testutil.Ok(t, e2e.StartAndWaitReady(querier))

testutil.Ok(t, querier.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics()))

// We run two avalanches, one tenant which exceeds the limit, and one tenant which remains under it.

// Avalanche in this configuration, would send 5 requests each with 10 new timeseries.
// One request always fails due to TSDB not being ready for new tenant.
// So without limiting we end up with 40 timeseries and 40 samples.
avalanche1 := e2ethanos.NewAvalanche(e, "avalanche-1",
e2ethanos.AvalancheOptions{
MetricCount: "10",
SeriesCount: "1",
MetricInterval: "30",
SeriesInterval: "3600",
ValueInterval: "3600",

RemoteURL: e2ethanos.RemoteWriteEndpoint(ingestor1.InternalEndpoint("remote-write")),
RemoteWriteInterval: "30s",
RemoteBatchSize: "10",
RemoteRequestCount: "5",

TenantID: "exceed-tenant",
})

// Avalanche in this configuration, would send 5 requests each with 5 of the same timeseries.
// One request always fails due to TSDB not being ready for new tenant.
// So we end up with 5 timeseries, 20 samples.
avalanche2 := e2ethanos.NewAvalanche(e, "avalanche-2",
e2ethanos.AvalancheOptions{
MetricCount: "5",
SeriesCount: "1",
MetricInterval: "3600",
SeriesInterval: "3600",
ValueInterval: "3600",

RemoteURL: e2ethanos.RemoteWriteEndpoint(ingestor1.InternalEndpoint("remote-write")),
RemoteWriteInterval: "30s",
RemoteBatchSize: "5",
RemoteRequestCount: "5",

TenantID: "under-tenant",
})

testutil.Ok(t, e2e.StartAndWaitReady(avalanche1, avalanche2))

// Here, 3/5 requests are failed due to limiting, as one request fails due to TSDB readiness and we ingest one initial request.
// 3 limited requests belong to the exceed-tenant.
testutil.Ok(t, i1Runnable.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"thanos_receive_head_series_limited_requests_total"}, e2e.WithWaitBackoff(&backoff.Config{Min: 1 * time.Second, Max: 10 * time.Minute, MaxRetries: 200}), e2e.WaitMissingMetrics()))

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
t.Cleanup(cancel)

// Here for exceed-tenant we go above limit by 10, which results in 0 value.
queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string {
return "sum(prometheus_tsdb_head_series{tenant=\"exceed-tenant\"}) - on() thanos_receive_tenant_head_series_limit{instance=\"e2e_out_of_sync_active_series_limiting-receive-i1:8080\", job=\"receive-i1\"}"
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, model.Vector{
&model.Sample{
Metric: model.Metric{},
Value: model.SampleValue(0),
},
})

// For under-tenant we stay at -5, as we have only pushed 5 series.
queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string {
return "sum(prometheus_tsdb_head_series{tenant=\"under-tenant\"}) - on() thanos_receive_tenant_head_series_limit{instance=\"e2e_out_of_sync_active_series_limiting-receive-i1:8080\", job=\"receive-i1\"}"
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, model.Vector{
&model.Sample{
Metric: model.Metric{},
Value: model.SampleValue(-5),
},
})

// Query meta-monitoring solution to assert that only 10 timeseries have been ingested for exceed-tenant.
queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { return "sum(prometheus_tsdb_head_series{tenant=\"exceed-tenant\"})" }, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, model.Vector{
&model.Sample{
Metric: model.Metric{},
Value: model.SampleValue(10),
},
})

// Query meta-monitoring solution to assert that only 5 timeseries have been ingested for under-tenant.
queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { return "sum(prometheus_tsdb_head_series{tenant=\"under-tenant\"})" }, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, model.Vector{
&model.Sample{
Metric: model.Metric{},
Value: model.SampleValue(5),
},
})

// Query meta-monitoring solution to assert that 3 requests were limited for exceed-tenant and none for under-tenant.
queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { return "thanos_receive_head_series_limited_requests_total" }, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, model.Vector{
&model.Sample{
Metric: model.Metric{
"__name__": "thanos_receive_head_series_limited_requests_total",
"instance": "e2e_out_of_sync_active_series_limiting-receive-i1:8080",
"job": "receive-i1",
"tenant": "exceed-tenant",
},
Value: model.SampleValue(3),
},
})
})
}

0 comments on commit 96a821e

Please sign in to comment.