Skip to content

Commit

Permalink
Add metrics for requests rejected due to per-instance limits (grafana…
Browse files Browse the repository at this point in the history
…#5551)

Add metrics to distributors and ingesters that are incremented when we
reject requests due to hitting per-instance limits.

Related grafana#5494

Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com>
  • Loading branch information
56quarters committed Jul 20, 2023
1 parent d1fed9c commit 59ede00
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
* [ENHANCEMENT] Ingester: Add two metrics tracking resource utilization calculated by utilization based limiter: #5496
* `cortex_ingester_utilization_limiter_current_cpu_load`: The current exponential weighted moving average of the ingester's CPU load
* `cortex_ingester_utilization_limiter_current_memory_usage_bytes`: The current ingester memory utilization
* [ENHANCEMENT] Distributor Ingester: Add metrics to count the number of requests rejected for hitting per-instance limits, `cortex_distributor_instance_rejected_requests_total` and `cortex_ingester_instance_rejected_requests_total` respectively. #5551
* [ENHANCEMENT] Distributor: add support for ingesting exponential histograms that are over the native histogram scale limit of 8 in OpenTelemetry format by downscaling them. #5532
* [ENHANCEMENT] General: buffered logging: #5506
* `-log.buffered`: Enable buffered logging
Expand Down
22 changes: 22 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
ingester_client "github.com/grafana/mimir/pkg/ingester/client"
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/globalerror"
util_math "github.com/grafana/mimir/pkg/util/math"
"github.com/grafana/mimir/pkg/util/pool"
"github.com/grafana/mimir/pkg/util/push"
Expand All @@ -53,6 +54,10 @@ import (
var (
// Validation errors.
errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than or equal to zero")

reasonDistributorMaxIngestionRate = globalerror.DistributorMaxIngestionRate.LabelValue()
reasonDistributorMaxInflightPushRequests = globalerror.DistributorMaxInflightPushRequests.LabelValue()
reasonDistributorMaxInflightPushRequestsBytes = globalerror.DistributorMaxInflightPushRequestsBytes.LabelValue()
)

const (
Expand Down Expand Up @@ -126,12 +131,16 @@ type Distributor struct {
replicationFactor prometheus.Gauge
latestSeenSampleTimestampPerUser *prometheus.GaugeVec

// Metrics for data rejected for hitting per-tenant limits
discardedSamplesTooManyHaClusters *prometheus.CounterVec
discardedSamplesRateLimited *prometheus.CounterVec
discardedRequestsRateLimited *prometheus.CounterVec
discardedExemplarsRateLimited *prometheus.CounterVec
discardedMetadataRateLimited *prometheus.CounterVec

// Metrics for data rejected for hitting per-instance limits
rejectedRequests *prometheus.CounterVec

sampleValidationMetrics *validation.SampleValidationMetrics
exemplarValidationMetrics *validation.ExemplarValidationMetrics
metadataValidationMetrics *validation.MetadataValidationMetrics
Expand Down Expand Up @@ -335,11 +344,21 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
discardedExemplarsRateLimited: validation.DiscardedExemplarsCounter(reg, validation.ReasonRateLimited),
discardedMetadataRateLimited: validation.DiscardedMetadataCounter(reg, validation.ReasonRateLimited),

rejectedRequests: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_distributor_instance_rejected_requests_total",
Help: "Requests discarded for hitting per-instance limits",
}, []string{"reason"}),

sampleValidationMetrics: validation.NewSampleValidationMetrics(reg),
exemplarValidationMetrics: validation.NewExemplarValidationMetrics(reg),
metadataValidationMetrics: validation.NewMetadataValidationMetrics(reg),
}

// Initialize expected rejected request labels
d.rejectedRequests.WithLabelValues(reasonDistributorMaxIngestionRate)
d.rejectedRequests.WithLabelValues(reasonDistributorMaxInflightPushRequests)
d.rejectedRequests.WithLabelValues(reasonDistributorMaxInflightPushRequestsBytes)

promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
Name: instanceLimitsMetric,
Help: instanceLimitsMetricHelp,
Expand Down Expand Up @@ -1014,11 +1033,13 @@ func (d *Distributor) limitsMiddleware(next push.Func) push.Func {

il := d.getInstanceLimits()
if il.MaxInflightPushRequests > 0 && inflight > int64(il.MaxInflightPushRequests) {
d.rejectedRequests.WithLabelValues(reasonDistributorMaxInflightPushRequests).Inc()
return nil, middleware.DoNotLogError{Err: errMaxInflightRequestsReached}
}

if il.MaxIngestionRate > 0 {
if rate := d.ingestionRate.Rate(); rate >= il.MaxIngestionRate {
d.rejectedRequests.WithLabelValues(reasonDistributorMaxIngestionRate).Inc()
return nil, errMaxIngestionRateReached
}
}
Expand Down Expand Up @@ -1049,6 +1070,7 @@ func (d *Distributor) limitsMiddleware(next push.Func) push.Func {
})

if il.MaxInflightPushRequestsBytes > 0 && inflightBytes > int64(il.MaxInflightPushRequestsBytes) {
d.rejectedRequests.WithLabelValues(reasonDistributorMaxInflightPushRequestsBytes).Inc()
return nil, errMaxInflightRequestsBytesReached
}

Expand Down
11 changes: 11 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ const (
maxTSDBOpenWithoutConcurrency = 10
)

var (
reasonIngesterMaxIngestionRate = globalerror.IngesterMaxIngestionRate.LabelValue()
reasonIngesterMaxTenants = globalerror.IngesterMaxTenants.LabelValue()
reasonIngesterMaxInMemorySeries = globalerror.IngesterMaxInMemorySeries.LabelValue()
reasonIngesterMaxInflightPushRequests = globalerror.IngesterMaxInflightPushRequests.LabelValue()
)

// BlocksUploader interface is used to have an easy way to mock it in tests.
type BlocksUploader interface {
Sync(ctx context.Context) (uploaded int, err error)
Expand Down Expand Up @@ -718,6 +725,7 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, pushReq *push.Request) (
il := i.getInstanceLimits()
if il != nil && il.MaxInflightPushRequests > 0 {
if inflight > il.MaxInflightPushRequests {
i.metrics.rejected.WithLabelValues(reasonIngesterMaxInflightPushRequests).Inc()
return nil, errMaxInflightRequestsReached
}
}
Expand All @@ -729,6 +737,7 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, pushReq *push.Request) (

if il != nil && il.MaxIngestionRate > 0 {
if rate := i.ingestionRate.Rate(); rate >= il.MaxIngestionRate {
i.metrics.rejected.WithLabelValues(reasonIngesterMaxIngestionRate).Inc()
return nil, errMaxIngestionRateReached
}
}
Expand Down Expand Up @@ -2024,6 +2033,7 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error)
gl := i.getInstanceLimits()
if gl != nil && gl.MaxInMemoryTenants > 0 {
if users := int64(len(i.tsdbs)); users >= gl.MaxInMemoryTenants {
i.metrics.rejected.WithLabelValues(reasonIngesterMaxTenants).Inc()
return nil, errMaxTenantsReached
}
}
Expand Down Expand Up @@ -2058,6 +2068,7 @@ func (i *Ingester) createTSDB(userID string, walReplayConcurrency int) (*userTSD
ingestedRuleSamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod),
instanceLimitsFn: i.getInstanceLimits,
instanceSeriesCount: &i.seriesCount,
instanceErrors: i.metrics.rejected,
blockMinRetention: i.cfg.BlocksStorageConfig.TSDB.Retention,
}

Expand Down
11 changes: 11 additions & 0 deletions pkg/ingester/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type ingesterMetrics struct {
openExistingTSDB prometheus.Counter

discarded *discardedMetrics
rejected *prometheus.CounterVec

// Discarded metadata
discardedMetadataPerUserMetadataLimit *prometheus.CounterVec
Expand Down Expand Up @@ -310,6 +311,10 @@ func newIngesterMetrics(
}),

discarded: newDiscardedMetrics(r),
rejected: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_instance_rejected_requests_total",
Help: "Requests rejected for hitting per-instance limits",
}, []string{"reason"}),

discardedMetadataPerUserMetadataLimit: validation.DiscardedMetadataCounter(r, perUserMetadataLimit),
discardedMetadataPerMetricMetadataLimit: validation.DiscardedMetadataCounter(r, perMetricMetadataLimit),
Expand All @@ -320,6 +325,12 @@ func newIngesterMetrics(
}),
}

// Initialize expected rejected request labels
m.rejected.WithLabelValues(reasonIngesterMaxIngestionRate)
m.rejected.WithLabelValues(reasonIngesterMaxTenants)
m.rejected.WithLabelValues(reasonIngesterMaxInMemorySeries)
m.rejected.WithLabelValues(reasonIngesterMaxInflightPushRequests)

return m
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/ingester/user_tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
Expand Down Expand Up @@ -88,6 +89,7 @@ type userTSDB struct {

instanceSeriesCount *atomic.Int64 // Shared across all userTSDB instances created by ingester.
instanceLimitsFn func() *InstanceLimits
instanceErrors *prometheus.CounterVec

stateMtx sync.RWMutex
state tsdbState
Expand Down Expand Up @@ -264,6 +266,7 @@ func (u *userTSDB) PreCreation(metric labels.Labels) error {
gl := u.instanceLimitsFn()
if gl != nil && gl.MaxInMemorySeries > 0 {
if series := u.instanceSeriesCount.Load(); series >= gl.MaxInMemorySeries {
u.instanceErrors.WithLabelValues(reasonIngesterMaxInMemorySeries).Inc()
return errMaxInMemorySeriesReached
}
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/util/globalerror/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ func (id ID) MessageWithStrategyAndPerTenantLimitConfig(msg, strategy, flag stri
msg, errPrefix, id, strategy, plural, flagsList)
}

// LabelValue returns the error ID converted to a form suitable for use as a Prometheus label value.
func (id ID) LabelValue() string {
return strings.ReplaceAll(string(id), "-", "_")
}

func buildFlagsList(flag string, addFlags ...string) (string, string) {
var sb strings.Builder
sb.WriteString("-")
Expand Down
33 changes: 14 additions & 19 deletions pkg/util/validation/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package validation

import (
"strings"
"time"
"unicode/utf8"

Expand All @@ -29,26 +28,26 @@ const (

var (
// Discarded series / samples reasons.
reasonMissingMetricName = metricReasonFromErrorID(globalerror.MissingMetricName)
reasonInvalidMetricName = metricReasonFromErrorID(globalerror.InvalidMetricName)
reasonMaxLabelNamesPerSeries = metricReasonFromErrorID(globalerror.MaxLabelNamesPerSeries)
reasonInvalidLabel = metricReasonFromErrorID(globalerror.SeriesInvalidLabel)
reasonLabelNameTooLong = metricReasonFromErrorID(globalerror.SeriesLabelNameTooLong)
reasonLabelValueTooLong = metricReasonFromErrorID(globalerror.SeriesLabelValueTooLong)
reasonMaxNativeHistogramBuckets = metricReasonFromErrorID(globalerror.MaxNativeHistogramBuckets)
reasonDuplicateLabelNames = metricReasonFromErrorID(globalerror.SeriesWithDuplicateLabelNames)
reasonTooFarInFuture = metricReasonFromErrorID(globalerror.SampleTooFarInFuture)
reasonMissingMetricName = globalerror.MissingMetricName.LabelValue()
reasonInvalidMetricName = globalerror.InvalidMetricName.LabelValue()
reasonMaxLabelNamesPerSeries = globalerror.MaxLabelNamesPerSeries.LabelValue()
reasonInvalidLabel = globalerror.SeriesInvalidLabel.LabelValue()
reasonLabelNameTooLong = globalerror.SeriesLabelNameTooLong.LabelValue()
reasonLabelValueTooLong = globalerror.SeriesLabelValueTooLong.LabelValue()
reasonMaxNativeHistogramBuckets = globalerror.MaxNativeHistogramBuckets.LabelValue()
reasonDuplicateLabelNames = globalerror.SeriesWithDuplicateLabelNames.LabelValue()
reasonTooFarInFuture = globalerror.SampleTooFarInFuture.LabelValue()

// Discarded exemplars reasons.
reasonExemplarLabelsMissing = metricReasonFromErrorID(globalerror.ExemplarLabelsMissing)
reasonExemplarLabelsTooLong = metricReasonFromErrorID(globalerror.ExemplarLabelsTooLong)
reasonExemplarTimestampInvalid = metricReasonFromErrorID(globalerror.ExemplarTimestampInvalid)
reasonExemplarLabelsMissing = globalerror.ExemplarLabelsMissing.LabelValue()
reasonExemplarLabelsTooLong = globalerror.ExemplarLabelsTooLong.LabelValue()
reasonExemplarTimestampInvalid = globalerror.ExemplarTimestampInvalid.LabelValue()
reasonExemplarLabelsBlank = "exemplar_labels_blank"
reasonExemplarTooOld = "exemplar_too_old"

// Discarded metadata reasons.
reasonMetadataMetricNameTooLong = metricReasonFromErrorID(globalerror.MetricMetadataMetricNameTooLong)
reasonMetadataUnitTooLong = metricReasonFromErrorID(globalerror.MetricMetadataUnitTooLong)
reasonMetadataMetricNameTooLong = globalerror.MetricMetadataMetricNameTooLong.LabelValue()
reasonMetadataUnitTooLong = globalerror.MetricMetadataUnitTooLong.LabelValue()

// ReasonRateLimited is one of the values for the reason to discard samples.
// Declared here to avoid duplication in ingester and distributor.
Expand All @@ -58,10 +57,6 @@ var (
ReasonTooManyHAClusters = "too_many_ha_clusters"
)

func metricReasonFromErrorID(id globalerror.ID) string {
return strings.ReplaceAll(string(id), "-", "_")
}

// DiscardedRequestsCounter creates per-user counter vector for requests discarded for a given reason.
func DiscardedRequestsCounter(reg prometheus.Registerer, reason string) *prometheus.CounterVec {
return promauto.With(reg).NewCounterVec(
Expand Down

0 comments on commit 59ede00

Please sign in to comment.