Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Allow configuration of a rule evaluation delay #14061

Merged
merged 19 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## unreleased

* [CHANGE] Rules: Execute 1 query instead of N (where N is the number of alerts within alert rule) when restoring alerts. #13980
* [FEATURE] Rules: Add new option `query_offset` for each rule group via rule group configuration file and `rule_query_offset` as part of the global configuration to have more resilience for remote write delays. #14061
* [ENHANCEMENT] Rules: Add `rule_group_last_restore_duration_seconds` to measure the time it takes to restore a rule group. #13974
* [ENHANCEMENT] OTLP: Improve remote write format translation performance by using label set hashes for metric identifiers instead of string based ones. #14006 #13991
* [ENHANCEMENT] TSDB: Optimize querying with regexp matchers. #13620
Expand Down
3 changes: 3 additions & 0 deletions cmd/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,9 @@ func main() {
ResendDelay: time.Duration(cfg.resendDelay),
MaxConcurrentEvals: cfg.maxConcurrentEvals,
ConcurrentEvalsEnabled: cfg.enableConcurrentRuleEval,
DefaultRuleQueryOffset: func() time.Duration {
return time.Duration(cfgFile.GlobalConfig.RuleQueryOffset)
},
})
}

Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ var (
ScrapeInterval: model.Duration(1 * time.Minute),
ScrapeTimeout: model.Duration(10 * time.Second),
EvaluationInterval: model.Duration(1 * time.Minute),
RuleQueryOffset: model.Duration(0 * time.Minute),
// When native histogram feature flag is enabled, ScrapeProtocols default
// changes to DefaultNativeHistogramScrapeProtocols.
ScrapeProtocols: DefaultScrapeProtocols,
Expand Down Expand Up @@ -397,6 +398,8 @@ type GlobalConfig struct {
ScrapeProtocols []ScrapeProtocol `yaml:"scrape_protocols,omitempty"`
// How frequently to evaluate rules by default.
EvaluationInterval model.Duration `yaml:"evaluation_interval,omitempty"`
// Offset the rule evaluation timestamp of this particular group by the specified duration into the past to ensure the underlying metrics have been received.
RuleQueryOffset model.Duration `yaml:"rule_query_offset"`
// File to which PromQL queries are logged.
QueryLogFile string `yaml:"query_log_file,omitempty"`
// The labels to add to any timeseries that this Prometheus instance scrapes.
Expand Down Expand Up @@ -556,6 +559,7 @@ func (c *GlobalConfig) isZero() bool {
c.ScrapeInterval == 0 &&
c.ScrapeTimeout == 0 &&
c.EvaluationInterval == 0 &&
c.RuleQueryOffset == 0 &&
c.QueryLogFile == "" &&
c.ScrapeProtocols == nil
}
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ global:

# How frequently to evaluate rules.
[ evaluation_interval: <duration> | default = 1m ]

# Offset the rule evaluation timestamp of this particular group by the specified duration into the past to ensure the underlying metrics have been received.
# Metric availability delays are more likely to occur when Prometheus is running as a remote write target, but can also occur when there's anomalies with scraping.
[ rule_query_offset: <duration> | default = 0s ]

# The labels to add to any time series or alerts when communicating with
# external systems (federation, remote storage, Alertmanager).
Expand Down
6 changes: 6 additions & 0 deletions docs/configuration/recording_rules.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ name: <string>
# rule can produce. 0 is no limit.
[ limit: <int> | default = 0 ]

# Offset the rule evaluation timestamp of this particular group by the specified duration into the past.
[ query_offset: <duration> | default = global.rule_query_offset ]

rules:
[ - <rule> ... ]
```
Expand Down Expand Up @@ -148,6 +151,9 @@ the rule, active, pending, or inactive, are cleared as well. The event will be
recorded as an error in the evaluation, and as such no stale markers are
written.

# Rule query offset
This is useful to ensure the underlying metrics have been received and stored in Prometheus. Metric availability delays are more likely to occur when Prometheus is running as a remote write target due to the nature of distributed systems, but can also occur when there's anomalies with scraping and/or short evaluation intervals.

# Failed rule evaluations due to slow evaluation

If a rule group hasn't finished evaluating before its next evaluation is supposed to start (as defined by the `evaluation_interval`), the next evaluation will be skipped. Subsequent evaluations of the rule group will continue to be skipped until the initial evaluation either completes or times out. When this happens, there will be a gap in the metric produced by the recording rule. The `rule_group_iterations_missed_total` metric will be incremented for each missed iteration of the rule group.
9 changes: 5 additions & 4 deletions model/rulefmt/rulefmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,11 @@ func (g *RuleGroups) Validate(node ruleGroups) (errs []error) {

// RuleGroup is a list of sequentially evaluated recording and alerting rules.
type RuleGroup struct {
Name string `yaml:"name"`
Interval model.Duration `yaml:"interval,omitempty"`
Limit int `yaml:"limit,omitempty"`
Rules []RuleNode `yaml:"rules"`
Name string `yaml:"name"`
Interval model.Duration `yaml:"interval,omitempty"`
QueryOffset *model.Duration `yaml:"query_offset,omitempty"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be a pointer? For the Interval above, we just use the zero value to indicate that it's unset (and the omitempty ensures that it's not serialized then).

Copy link
Member Author

@gotjosh gotjosh May 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this can potentially override the configuration value, we need to differentiate between a zero value (which means override whatever yo have in the global config) and no value (which means use the global configuration option). What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, right. So for example if you have a 5m offset in the global config, but you want a 0m offset in a specific rule group. Yeah, that makes sense, thanks for the explanation :) 👍

Limit int `yaml:"limit,omitempty"`
Rules []RuleNode `yaml:"rules"`
}

// Rule describes an alerting or recording rule.
Expand Down
9 changes: 4 additions & 5 deletions rules/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,10 +338,9 @@ const resolvedRetention = 15 * time.Minute

// Eval evaluates the rule expression and then creates pending alerts and fires
// or removes previously pending alerts accordingly.
func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL, limit int) (promql.Vector, error) {
func (r *AlertingRule) Eval(ctx context.Context, queryOffset time.Duration, ts time.Time, query QueryFunc, externalURL *url.URL, limit int) (promql.Vector, error) {
ctx = NewOriginContext(ctx, NewRuleDetail(r))

res, err := query(ctx, r.vector.String(), ts)
res, err := query(ctx, r.vector.String(), ts.Add(-queryOffset))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -484,8 +483,8 @@ func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc,
}

if r.restored.Load() {
vec = append(vec, r.sample(a, ts))
vec = append(vec, r.forStateSample(a, ts, float64(a.ActiveAt.Unix())))
vec = append(vec, r.sample(a, ts.Add(-queryOffset)))
vec = append(vec, r.forStateSample(a, ts.Add(-queryOffset), float64(a.ActiveAt.Unix())))
}
}

Expand Down
32 changes: 16 additions & 16 deletions rules/alerting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestAlertingRuleTemplateWithHistogram(t *testing.T) {
)

evalTime := time.Now()
res, err := rule.Eval(context.TODO(), evalTime, q, nil, 0)
res, err := rule.Eval(context.TODO(), 0, evalTime, q, nil, 0)
require.NoError(t, err)

require.Len(t, res, 2)
Expand Down Expand Up @@ -230,7 +230,7 @@ func TestAlertingRuleLabelsUpdate(t *testing.T) {
t.Logf("case %d", i)
evalTime := baseTime.Add(time.Duration(i) * time.Minute)
result[0].T = timestamp.FromTime(evalTime)
res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
require.NoError(t, err)

var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
Expand All @@ -247,7 +247,7 @@ func TestAlertingRuleLabelsUpdate(t *testing.T) {
testutil.RequireEqual(t, result, filteredRes)
}
evalTime := baseTime.Add(time.Duration(len(results)) * time.Minute)
res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
require.NoError(t, err)
require.Empty(t, res)
}
Expand Down Expand Up @@ -315,7 +315,7 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) {

var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
res, err := ruleWithoutExternalLabels.Eval(
context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
)
require.NoError(t, err)
for _, smpl := range res {
Expand All @@ -329,7 +329,7 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) {
}

res, err = ruleWithExternalLabels.Eval(
context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
)
require.NoError(t, err)
for _, smpl := range res {
Expand Down Expand Up @@ -408,7 +408,7 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) {

var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
res, err := ruleWithoutExternalURL.Eval(
context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
)
require.NoError(t, err)
for _, smpl := range res {
Expand All @@ -422,7 +422,7 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) {
}

res, err = ruleWithExternalURL.Eval(
context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
)
require.NoError(t, err)
for _, smpl := range res {
Expand Down Expand Up @@ -477,7 +477,7 @@ func TestAlertingRuleEmptyLabelFromTemplate(t *testing.T) {

var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
res, err := rule.Eval(
context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
)
require.NoError(t, err)
for _, smpl := range res {
Expand Down Expand Up @@ -544,7 +544,7 @@ instance: {{ $v.Labels.instance }}, value: {{ printf "%.0f" $v.Value }};
close(getDoneCh)
}()
_, err = ruleWithQueryInTemplate.Eval(
context.TODO(), evalTime, slowQueryFunc, nil, 0,
context.TODO(), 0, evalTime, slowQueryFunc, nil, 0,
)
require.NoError(t, err)
}
Expand Down Expand Up @@ -596,7 +596,7 @@ func TestAlertingRuleDuplicate(t *testing.T) {
"",
true, log.NewNopLogger(),
)
_, err := rule.Eval(ctx, now, EngineQueryFunc(engine, storage), nil, 0)
_, err := rule.Eval(ctx, 0, now, EngineQueryFunc(engine, storage), nil, 0)
require.Error(t, err)
require.EqualError(t, err, "vector contains metrics with the same labelset after applying alert labels")
}
Expand Down Expand Up @@ -644,7 +644,7 @@ func TestAlertingRuleLimit(t *testing.T) {
evalTime := time.Unix(0, 0)

for _, test := range tests {
switch _, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, test.limit); {
switch _, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, test.limit); {
case err != nil:
require.EqualError(t, err, test.err)
case test.err != "":
Expand Down Expand Up @@ -871,7 +871,7 @@ func TestKeepFiringFor(t *testing.T) {
t.Logf("case %d", i)
evalTime := baseTime.Add(time.Duration(i) * time.Minute)
result[0].T = timestamp.FromTime(evalTime)
res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
require.NoError(t, err)

var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
Expand All @@ -888,7 +888,7 @@ func TestKeepFiringFor(t *testing.T) {
testutil.RequireEqual(t, result, filteredRes)
}
evalTime := baseTime.Add(time.Duration(len(results)) * time.Minute)
res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
require.NoError(t, err)
require.Empty(t, res)
}
Expand Down Expand Up @@ -925,7 +925,7 @@ func TestPendingAndKeepFiringFor(t *testing.T) {

baseTime := time.Unix(0, 0)
result.T = timestamp.FromTime(baseTime)
res, err := rule.Eval(context.TODO(), baseTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err := rule.Eval(context.TODO(), 0, baseTime, EngineQueryFunc(testEngine, storage), nil, 0)
require.NoError(t, err)

require.Len(t, res, 2)
Expand All @@ -940,7 +940,7 @@ func TestPendingAndKeepFiringFor(t *testing.T) {
}

evalTime := baseTime.Add(time.Minute)
res, err = rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err = rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
require.NoError(t, err)
require.Empty(t, res)
}
Expand Down Expand Up @@ -974,7 +974,7 @@ func TestAlertingEvalWithOrigin(t *testing.T) {
true, log.NewNopLogger(),
)

_, err = rule.Eval(ctx, now, func(ctx context.Context, qs string, _ time.Time) (promql.Vector, error) {
_, err = rule.Eval(ctx, 0, now, func(ctx context.Context, qs string, _ time.Time) (promql.Vector, error) {
detail = FromOriginContext(ctx)
return nil, nil
}, nil, 0)
Expand Down
24 changes: 21 additions & 3 deletions rules/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Group struct {
name string
file string
interval time.Duration
queryOffset *time.Duration
limit int
rules []Rule
seriesInPreviousEval []map[string]labels.Labels // One per Rule.
Expand Down Expand Up @@ -90,6 +91,7 @@ type GroupOptions struct {
Rules []Rule
ShouldRestore bool
Opts *ManagerOptions
QueryOffset *time.Duration
done chan struct{}
EvalIterationFunc GroupEvalIterationFunc
}
Expand Down Expand Up @@ -126,6 +128,7 @@ func NewGroup(o GroupOptions) *Group {
name: o.Name,
file: o.File,
interval: o.Interval,
queryOffset: o.QueryOffset,
limit: o.Limit,
rules: o.Rules,
shouldRestore: o.ShouldRestore,
Expand Down Expand Up @@ -443,6 +446,8 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
wg sync.WaitGroup
)

ruleQueryOffset := g.QueryOffset()

for i, rule := range g.rules {
select {
case <-g.done:
Expand Down Expand Up @@ -473,7 +478,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {

g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()

vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit())
vector, err := rule.Eval(ctx, ruleQueryOffset, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit())
if err != nil {
rule.SetHealth(HealthBad)
rule.SetLastError(err)
Expand Down Expand Up @@ -562,7 +567,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
for metric, lset := range g.seriesInPreviousEval[i] {
if _, ok := seriesReturned[metric]; !ok {
// Series no longer exposed, mark it stale.
_, err = app.Append(0, lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))
_, err = app.Append(0, lset, timestamp.FromTime(ts.Add(-ruleQueryOffset)), math.Float64frombits(value.StaleNaN))
unwrappedErr := errors.Unwrap(err)
if unwrappedErr == nil {
unwrappedErr = err
Expand Down Expand Up @@ -601,14 +606,27 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
g.cleanupStaleSeries(ctx, ts)
}

func (g *Group) QueryOffset() time.Duration {
if g.queryOffset != nil {
return *g.queryOffset
}

if g.opts.DefaultRuleQueryOffset != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we add some pre checks to ensure that we'll be able to append that "old" data?
And some pre-check regarding the eval_interval? (delay<eval_interval maybe?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow, can you please elaborate?

Copy link
Collaborator

@machine424 machine424 May 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding some validation to make sure that the "append in the past" will be possible (with regard to minValidTime and OOO config as well) otherwise they would be rejected.
Also we may want to prevent setting the delay > eval_interval.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also we may want to prevent setting the delay > eval_interval.

I don't think I get this. Why does this matter? Assuming we have a 5m delay and a 1m interval - this is perfectly fine.

groups:
  - name: offset
    query_offset: 5m
    interval: 1m
    rules:
      - record: offset
        expr: vector(12)

Your initial rule evaluation would be delayed by 5m, but afterwards, it'll continue to be evaluated every 1m, but for a timestamp 5m in the past.

Adding some validation to make sure that the "append in the past" will be possible (with regard to minValidTime
and OOO config as well) otherwise they would be rejected.

I'm not entirely sure this is needed; you'd have 3 signals to indicate that something went wrong:

  1. The rule health would be set to bad and it would show up in the UI.
  2. The rule evaluation would fail - and we highly encourage you'd have meta-monitoring on those
  3. We would log the reason for the failure.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand this. Why does it matter? Let's assume we have a 5-minute delay and a 1-minute interval - this seems perfectly fine.

Yes, I'm aware that everything will be shifted. I was just considering that perhaps we might want to "guarantee" that when delay is enabled and Prometheus is shut down, the window of data that won't be evaluated is <delay+eval_interval<2*eval_interval (an extra eval_interval compared to when delay is disabled). With an unlimited delay, the gap may be larger upon Prometheus shutdown. But if we assume users know what they're doing, we can leave it as is.

I'm not entirely sure this is necessary; you'd have 3 signals to indicate that something went wrong:

Yes, I just thought that "failing early" would be better, but I think some of the parameters that control the "append in the past" may change dynamically, which could make spotting that more difficult.

Copy link
Member Author

@gotjosh gotjosh May 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your feedback is very valid. Would you mind opening up an issue with your first point? If it becomes a wider problem (I can't think of a world where this makes sense) to use offsets so long that this matters.

I think we can fix this by looking at the last evaluation as we restart, but we've never had this feedback before, which makes me think that this is not as relevant as we think it is and is not a straightforward piece of work.

return g.opts.DefaultRuleQueryOffset()
}

return time.Duration(0)
}

func (g *Group) cleanupStaleSeries(ctx context.Context, ts time.Time) {
if len(g.staleSeries) == 0 {
return
}
app := g.opts.Appendable.Appender(ctx)
queryOffset := g.QueryOffset()
for _, s := range g.staleSeries {
// Rule that produced series no longer configured, mark it stale.
_, err := app.Append(0, s, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))
_, err := app.Append(0, s, timestamp.FromTime(ts.Add(-queryOffset)), math.Float64frombits(value.StaleNaN))
unwrappedErr := errors.Unwrap(err)
if unwrappedErr == nil {
unwrappedErr = err
Expand Down
2 changes: 2 additions & 0 deletions rules/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ type ManagerOptions struct {
ForGracePeriod time.Duration
ResendDelay time.Duration
GroupLoader GroupLoader
DefaultRuleQueryOffset func() time.Duration
MaxConcurrentEvals int64
ConcurrentEvalsEnabled bool
RuleConcurrencyController RuleConcurrencyController
Expand Down Expand Up @@ -336,6 +337,7 @@ func (m *Manager) LoadGroups(
Rules: rules,
ShouldRestore: shouldRestore,
Opts: m.opts,
QueryOffset: (*time.Duration)(rg.QueryOffset),
done: m.done,
EvalIterationFunc: groupEvalIterationFunc,
})
Expand Down
Loading
Loading