Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions modules/ingester/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

const (
errMaxSnapshotsPerUserLimitExceeded = "per-user snapshots limit (local: %d global: %d actual local: %d) exceeded"
errMaxSnapshotsPerTenantLimitExceeded = "per-tenant snapshots limit (local: %d global: %d actual local: %d) exceeded"
)

// RingCount is the interface exposed by a ring implementation which allows
Expand All @@ -51,26 +51,26 @@ func NewLimiter(limits *overrides.Overrides, ring RingCount, replicationFactor i
}
}

// AssertMaxSnapshotsPerUser ensures limit has not been reached compared to the current
// AssertMaxSnapshotsPerTenant ensures limit has not been reached compared to the current
// number of streams in input and returns an error if so.
func (l *Limiter) AssertMaxSnapshotsPerUser(userID string, snapshots int) error {
actualLimit := l.maxSnapshotsPerUser(userID)
func (l *Limiter) AssertMaxSnapshotsPerTenant(tenantID string, snapshots int) error {
actualLimit := l.maxSnapshotsPerTenant(tenantID)
if snapshots < actualLimit {
return nil
}

localLimit := l.limits.MaxLocalSnapshotsPerUser(userID)
globalLimit := l.limits.MaxGlobalSnapshotsPerUser(userID)
localLimit := l.limits.MaxLocalSnapshotsPerTenant(tenantID)
globalLimit := l.limits.MaxGlobalSnapshotsPerTenant(tenantID)

return fmt.Errorf(errMaxSnapshotsPerUserLimitExceeded, localLimit, globalLimit, actualLimit)
return fmt.Errorf(errMaxSnapshotsPerTenantLimitExceeded, localLimit, globalLimit, actualLimit)
}

func (l *Limiter) maxSnapshotsPerUser(userID string) int {
localLimit := l.limits.MaxLocalSnapshotsPerUser(userID)
func (l *Limiter) maxSnapshotsPerTenant(tenantID string) int {
localLimit := l.limits.MaxLocalSnapshotsPerTenant(tenantID)

// We can assume that snapshots are evenly distributed across ingesters
// so we do convert the global limit into a local limit
globalLimit := l.limits.MaxGlobalSnapshotsPerUser(userID)
globalLimit := l.limits.MaxGlobalSnapshotsPerTenant(tenantID)
localLimit = l.minNonZero(localLimit, l.convertGlobalToLocalLimit(globalLimit))

// If both the local and global limits are disabled, we just
Expand Down
2 changes: 1 addition & 1 deletion modules/ingester/tenantBlockManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (i *tenantBlockManager) PushBytes(ctx context.Context, id []byte, snapshotB
}

// check for max snapshots before grabbing the lock to better load shed
err := i.limiter.AssertMaxSnapshotsPerUser(i.tenantID, int(i.snapshotCount.Load()))
err := i.limiter.AssertMaxSnapshotsPerTenant(i.tenantID, int(i.snapshotCount.Load()))
if err != nil {
return status.Errorf(codes.FailedPrecondition, "%s max live snapshots exceeded for tenant %s: %v", overrides.ErrorPrefixLiveSnapshotsExceeded, i.tenantID, err)
}
Expand Down
2 changes: 1 addition & 1 deletion modules/ingester/tenantBlockManager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ package ingester
//func TestInstanceLimits(t *testing.T) {
// limits, err := overrides.NewOverrides(overrides.Limits{
// MaxBytesPerSnapshot: 1000,
// MaxLocalSnapshotsPerUser: 4,
// MaxLocalSnapshotsPerTenant: 4,
// })
// require.NoError(t, err, "unexpected error creating limits")
// limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
Expand Down
33 changes: 17 additions & 16 deletions modules/overrides/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ const (
ErrorPrefixLiveSnapshotsExceeded = "LIVE_SNAPSHOTS_EXCEEDED:"
// ErrorPrefixSnapshotTooLarge is used to flag batches from the ingester that were rejected b/c they exceeded the single trace limit
ErrorPrefixSnapshotTooLarge = "SNAPSHOT_TOO_LARGE:"
// ErrorPrefixRateLimited is used to flag batches that have exceeded the spans/second of the tenant
// ErrorPrefixRateLimited is used to flag batches that have exceeded the snapshot/second of the tenant
ErrorPrefixRateLimited = "RATE_LIMITED:"

// metrics
MetricMaxLocalTracesPerUser = "max_local_traces_per_user"
MetricMaxGlobalTracesPerUser = "max_global_traces_per_user"
MetricMaxBytesPerTrace = "max_bytes_per_snapshot"
MetricMaxLocalSnapshotsPerTenant = "max_local_snapshots_per_tenant"
MetricMaxGlobalSnapshotsPerTenant = "max_global_snapshots_per_tenant"
MetricMaxBytesPerSnapshot = "max_bytes_per_snapshot"
MetricMaxBytesPerTagValuesQuery = "max_bytes_per_tag_values_query"
MetricIngestionRateLimitBytes = "ingestion_rate_limit_bytes"
MetricIngestionBurstSizeBytes = "ingestion_burst_size_bytes"
Expand All @@ -67,8 +67,8 @@ type Limits struct {
IngestionBurstSizeBytes int `yaml:"ingestion_burst_size_bytes" json:"ingestion_burst_size_bytes"`

// Ingester enforced limits.
MaxLocalTracesPerUser int `yaml:"max_traces_per_user" json:"max_traces_per_user"`
MaxGlobalTracesPerUser int `yaml:"max_global_traces_per_user" json:"max_global_traces_per_user"`
MaxLocalSnapshotsPerTenant int `yaml:"max_snapshots_per_tenant" json:"max_snapshots_per_tenant"`
MaxGlobalSnapshotsPerTenant int `yaml:"max_global_snapshots_per_tenant" json:"max_global_snapshots_per_tenant"`

// Forwarders
Forwarders []string `yaml:"forwarders" json:"forwarders"`
Expand Down Expand Up @@ -100,8 +100,9 @@ type Limits struct {
// is not used when doing a trace by id lookup.
MaxBytesPerSnapshot int `yaml:"max_bytes_per_snapshot" json:"max_bytes_per_snapshot"`

// Configuration for overrides, convenient if it goes here.
PerTenantOverrideConfig string `yaml:"per_tenant_override_config" json:"per_tenant_override_config"`
// PerTenantOverrideConfig is the path to the per-tenant config
PerTenantOverrideConfig string `yaml:"per_tenant_override_config" json:"per_tenant_override_config"`
// PerTenantOverridePeriod the time between reloads of the override file
PerTenantOverridePeriod model.Duration `yaml:"per_tenant_override_period" json:"per_tenant_override_period"`
}

Expand All @@ -113,26 +114,26 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.IngestionBurstSizeBytes, "distributor.ingestion-burst-size-bytes", 20e6, "Per-user ingestion burst size in bytes. Should be set to the expected size (in bytes) of a single push request.")

// Ingester limits
f.IntVar(&l.MaxLocalTracesPerUser, "ingester.max-traces-per-user", 10e3, "Maximum number of active traces per user, per ingester. 0 to disable.")
f.IntVar(&l.MaxGlobalTracesPerUser, "ingester.max-global-traces-per-user", 0, "Maximum number of active traces per user, across the cluster. 0 to disable.")
f.IntVar(&l.MaxBytesPerSnapshot, "ingester.max-bytes-per-trace", 50e5, "Maximum size of a trace in bytes. 0 to disable.")
f.IntVar(&l.MaxLocalSnapshotsPerTenant, "ingester.max-snapshots-per-tenant", 10e3, "Maximum number of active snapshots per tenant, per ingester. 0 to disable.")
f.IntVar(&l.MaxGlobalSnapshotsPerTenant, "ingester.max-global-snapshots-per-tenant", 0, "Maximum number of active snapshots per tenant, across the cluster. 0 to disable.")
f.IntVar(&l.MaxBytesPerSnapshot, "ingester.max-bytes-per-snapshot", 50e5, "Maximum size of a snapshots in bytes. 0 to disable.")

// Querier limits
f.IntVar(&l.MaxBytesPerTagValuesQuery, "querier.max-bytes-per-tag-values-query", 50e5, "Maximum size of response for a tag-values query. Used mainly to limit large the number of values associated with a particular tag")

f.StringVar(&l.PerTenantOverrideConfig, "limits.per-user-override-config", "", "File name of per-user overrides.")
f.StringVar(&l.PerTenantOverrideConfig, "limits.per-tenant-override-config", "", "File name of per tenant overrides.")
_ = l.PerTenantOverridePeriod.Set("10s")
f.Var(&l.PerTenantOverridePeriod, "limits.per-user-override-period", "Period with this to reload the overrides.")
f.Var(&l.PerTenantOverridePeriod, "limits.per-tenant-override-period", "Period with this to reload the overrides.")
}

func (l *Limits) Describe(ch chan<- *prometheus.Desc) {
ch <- metricLimitsDesc
}

func (l *Limits) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.MaxLocalTracesPerUser), MetricMaxLocalTracesPerUser)
ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.MaxGlobalTracesPerUser), MetricMaxGlobalTracesPerUser)
ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.MaxBytesPerSnapshot), MetricMaxBytesPerTrace)
ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.MaxLocalSnapshotsPerTenant), MetricMaxLocalSnapshotsPerTenant)
ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.MaxGlobalSnapshotsPerTenant), MetricMaxGlobalSnapshotsPerTenant)
ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.MaxBytesPerSnapshot), MetricMaxBytesPerSnapshot)
ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.MaxBytesPerTagValuesQuery), MetricMaxBytesPerTagValuesQuery)
ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.IngestionRateLimitBytes), MetricIngestionRateLimitBytes)
ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.IngestionBurstSizeBytes), MetricIngestionBurstSizeBytes)
Expand Down
8 changes: 4 additions & 4 deletions modules/overrides/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ ingestion_rate_strategy: global
ingestion_rate_limit_bytes: 100_000
ingestion_burst_size_bytes: 100_000

max_traces_per_user: 1000
max_global_traces_per_user: 1000
max_snapshots_per_tenant: 1000
max_global_snapshots_per_tenant: 1000
max_bytes_per_snapshot: 100_000

block_retention: 24h
Expand All @@ -76,8 +76,8 @@ max_search_duration: 5m
"ingestion_rate_limit_bytes": 100000,
"ingestion_burst_size_bytes": 100000,

"max_traces_per_user": 1000,
"max_global_traces_per_user": 1000,
"max_snapshots_per_tenant": 1000,
"max_global_snapshots_per_tenant": 1000,
"max_bytes_per_snapshot": 100000,

"block_retention": "24h",
Expand Down
Loading