From 13becbb6b8fecdc3b30be023ab4199d2ed262e77 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Tue, 7 May 2024 18:14:22 +0200 Subject: [PATCH] promql.Engine: Add Close method Signed-off-by: Arve Knudsen --- cmd/prometheus/main.go | 3 ++ promql/bench_test.go | 8 ++++ promql/engine.go | 9 ++++ promql/engine_test.go | 94 ++++++++++++++++++++++++++------------- promql/functions_test.go | 3 ++ promql/promql_test.go | 14 ++++-- promql/query_logger.go | 1 + promql/test.go | 6 ++- promql/test_test.go | 2 +- rules/alerting_test.go | 70 +++++++++++++++++++---------- rules/manager_test.go | 36 ++++++++++++--- rules/recording_test.go | 13 ++++-- web/api/v1/api_test.go | 47 +++++++++++++------- web/api/v1/errors_test.go | 9 +++- 14 files changed, 229 insertions(+), 86 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 8218ffb18d4..b1d7e07ddd7 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -942,12 +942,14 @@ func main() { listener, err := webHandler.Listener() if err != nil { level.Error(logger).Log("msg", "Unable to start web listener", "err", err) + _ = queryEngine.Close() os.Exit(1) } err = toolkit_web.Validate(*webConfig) if err != nil { level.Error(logger).Log("msg", "Unable to validate web configuration file", "err", err) + _ = queryEngine.Close() os.Exit(1) } @@ -969,6 +971,7 @@ func main() { case <-cancel: reloadReady.Close() } + _ = queryEngine.Close() return nil }, func(err error) { diff --git a/promql/bench_test.go b/promql/bench_test.go index 516b0d74823..2b78a090adc 100644 --- a/promql/bench_test.go +++ b/promql/bench_test.go @@ -21,6 +21,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql/parser" @@ -256,6 +258,9 @@ func BenchmarkRangeQuery(b *testing.B) { Timeout: 100 * time.Second, } engine := NewEngine(opts) + b.Cleanup(func() { + require.NoError(b, engine.Close()) + }) const interval = 10000 // 10s interval. // A day of data plus 10k steps. @@ -339,6 +344,9 @@ func BenchmarkNativeHistograms(b *testing.B) { for _, tc := range cases { b.Run(tc.name, func(b *testing.B) { ng := NewEngine(opts) + b.Cleanup(func() { + require.NoError(b, ng.Close()) + }) for i := 0; i < b.N; i++ { qry, err := ng.NewRangeQuery(context.Background(), testStorage, nil, tc.query, start, end, step) if err != nil { diff --git a/promql/engine.go b/promql/engine.go index 4bd9d25d7a9..08b85ef1b4d 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -19,6 +19,7 @@ import ( "context" "errors" "fmt" + "io" "math" "reflect" "runtime" @@ -423,6 +424,14 @@ func NewEngine(opts EngineOpts) *Engine { } } +// Close closes ng. +func (ng *Engine) Close() error { + if closer, ok := ng.activeQueryTracker.(io.Closer); ok { + return closer.Close() + } + return nil +} + // SetQueryLogger sets the query logger. func (ng *Engine) SetQueryLogger(l QueryLogger) { ng.queryLoggerLock.Lock() diff --git a/promql/engine_test.go b/promql/engine_test.go index cfffea2c3a0..fb1ead0b1eb 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -18,8 +18,8 @@ import ( "errors" "fmt" "math" - "os" "sort" + "sync" "testing" "time" @@ -48,14 +48,7 @@ func TestMain(m *testing.M) { func TestQueryConcurrency(t *testing.T) { maxConcurrency := 10 - dir, err := os.MkdirTemp("", "test_concurrency") - require.NoError(t, err) - defer os.RemoveAll(dir) - queryTracker := NewActiveQueryTracker(dir, maxConcurrency, nil) - t.Cleanup(func() { - require.NoError(t, queryTracker.Close()) - }) - + queryTracker := NewActiveQueryTracker(t.TempDir(), maxConcurrency, nil) opts := EngineOpts{ Logger: nil, Reg: nil, @@ -65,13 +58,18 @@ func TestQueryConcurrency(t *testing.T) { } engine := NewEngine(opts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) ctx, cancelCtx := context.WithCancel(context.Background()) - defer cancelCtx() + t.Cleanup(cancelCtx) block := make(chan struct{}) processing := make(chan struct{}) done := make(chan int) - defer close(done) + t.Cleanup(func() { + close(done) + }) f := func(context.Context) error { select { @@ -86,9 +84,15 @@ func TestQueryConcurrency(t *testing.T) { return nil } + var wg sync.WaitGroup + for i := 0; i < maxConcurrency; i++ { q := engine.newTestQuery(f) - go q.Exec(ctx) + wg.Add(1) + go func() { + q.Exec(ctx) + wg.Done() + }() select { case <-processing: // Expected. @@ -98,7 +102,11 @@ func TestQueryConcurrency(t *testing.T) { } q := engine.newTestQuery(f) - go q.Exec(ctx) + wg.Add(1) + go func() { + q.Exec(ctx) + wg.Done() + }() select { case <-processing: @@ -121,6 +129,8 @@ func TestQueryConcurrency(t *testing.T) { for i := 0; i < maxConcurrency; i++ { block <- struct{}{} } + + wg.Wait() } func TestQueryTimeout(t *testing.T) { @@ -131,6 +141,9 @@ func TestQueryTimeout(t *testing.T) { Timeout: 5 * time.Millisecond, } engine := NewEngine(opts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) ctx, cancelCtx := context.WithCancel(context.Background()) defer cancelCtx() @@ -230,6 +243,9 @@ func TestQueryError(t *testing.T) { Timeout: 10 * time.Second, } engine := NewEngine(opts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) errStorage := ErrStorage{errors.New("storage error")} queryable := storage.QueryableFunc(func(mint, maxt int64) (storage.Querier, error) { return &errQuerier{err: errStorage}, nil @@ -564,6 +580,9 @@ func TestSelectHintsSetCorrectly(t *testing.T) { } { t.Run(tc.query, func(t *testing.T) { engine := NewEngine(opts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) hintsRecorder := &noopHintRecordingQueryable{} var ( @@ -595,6 +614,9 @@ func TestEngineShutdown(t *testing.T) { Timeout: 10 * time.Second, } engine := NewEngine(opts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) ctx, cancelCtx := context.WithCancel(context.Background()) block := make(chan struct{}) @@ -730,7 +752,7 @@ load 10s t.Run(fmt.Sprintf("%d query=%s", i, c.Query), func(t *testing.T) { var err error var qry Query - engine := newTestEngine() + engine := newTestEngine(t) if c.Interval == 0 { qry, err = engine.NewInstantQuery(context.Background(), storage, nil, c.Query, c.Start) } else { @@ -1269,7 +1291,7 @@ load 10s }, } - engine := newTestEngine() + engine := newTestEngine(t) engine.enablePerStepStats = true origMaxSamples := engine.maxSamplesPerQuery for _, c := range cases { @@ -1455,7 +1477,7 @@ load 10s for _, c := range cases { t.Run(c.Query, func(t *testing.T) { - engine := newTestEngine() + engine := newTestEngine(t) testFunc := func(expError error) { var err error var qry Query @@ -1487,7 +1509,7 @@ load 10s } func TestAtModifier(t *testing.T) { - engine := newTestEngine() + engine := newTestEngine(t) storage := LoadedStorage(t, ` load 10s metric{job="1"} 0+1x1000 @@ -2022,7 +2044,7 @@ func TestSubquerySelector(t *testing.T) { }, } { t.Run("", func(t *testing.T) { - engine := newTestEngine() + engine := newTestEngine(t) storage := LoadedStorage(t, tst.loadString) t.Cleanup(func() { storage.Close() }) @@ -2043,7 +2065,7 @@ func TestSubquerySelector(t *testing.T) { } func TestTimestampFunction_StepsMoreOftenThanSamples(t *testing.T) { - engine := newTestEngine() + engine := newTestEngine(t) storage := LoadedStorage(t, ` load 1m metric 0+1x1000 @@ -2113,6 +2135,9 @@ func TestQueryLogger_basic(t *testing.T) { Timeout: 10 * time.Second, } engine := NewEngine(opts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) queryExec := func() { ctx, cancelCtx := context.WithCancel(context.Background()) @@ -2164,6 +2189,9 @@ func TestQueryLogger_fields(t *testing.T) { Timeout: 10 * time.Second, } engine := NewEngine(opts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) f1 := NewFakeQueryLogger() engine.SetQueryLogger(f1) @@ -2193,6 +2221,9 @@ func TestQueryLogger_error(t *testing.T) { Timeout: 10 * time.Second, } engine := NewEngine(opts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) f1 := NewFakeQueryLogger() engine.SetQueryLogger(f1) @@ -3076,6 +3107,9 @@ func TestEngineOptsValidation(t *testing.T) { for _, c := range cases { eng := NewEngine(c.opts) + t.Cleanup(func() { + require.NoError(t, eng.Close()) + }) _, err1 := eng.NewInstantQuery(context.Background(), nil, nil, c.query, time.Unix(10, 0)) _, err2 := eng.NewRangeQuery(context.Background(), nil, nil, c.query, time.Unix(0, 0), time.Unix(10, 0), time.Second) if c.fail { @@ -3235,7 +3269,7 @@ func TestRangeQuery(t *testing.T) { } for _, c := range cases { t.Run(c.Name, func(t *testing.T) { - engine := newTestEngine() + engine := newTestEngine(t) storage := LoadedStorage(t, c.Load) t.Cleanup(func() { storage.Close() }) @@ -3252,7 +3286,7 @@ func TestRangeQuery(t *testing.T) { func TestNativeHistogramRate(t *testing.T) { // TODO(beorn7): Integrate histograms into the PromQL testing framework // and write more tests there. - engine := newTestEngine() + engine := newTestEngine(t) storage := teststorage.New(t) t.Cleanup(func() { storage.Close() }) @@ -3340,7 +3374,7 @@ func TestNativeHistogramRate(t *testing.T) { func TestNativeFloatHistogramRate(t *testing.T) { // TODO(beorn7): Integrate histograms into the PromQL testing framework // and write more tests there. - engine := newTestEngine() + engine := newTestEngine(t) storage := teststorage.New(t) t.Cleanup(func() { storage.Close() }) @@ -3400,7 +3434,7 @@ func TestNativeHistogram_HistogramCountAndSum(t *testing.T) { } for _, floatHisto := range []bool{true, false} { t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) { - engine := newTestEngine() + engine := newTestEngine(t) storage := teststorage.New(t) t.Cleanup(func() { storage.Close() }) @@ -3598,7 +3632,7 @@ func TestNativeHistogram_HistogramStdDevVar(t *testing.T) { for _, tc := range testCases { for _, floatHisto := range []bool{true, false} { t.Run(fmt.Sprintf("%s floatHistogram=%t", tc.name, floatHisto), func(t *testing.T) { - engine := newTestEngine() + engine := newTestEngine(t) storage := teststorage.New(t) t.Cleanup(func() { storage.Close() }) @@ -3841,7 +3875,7 @@ func TestNativeHistogram_HistogramQuantile(t *testing.T) { }, } - engine := newTestEngine() + engine := newTestEngine(t) storage := teststorage.New(t) t.Cleanup(func() { storage.Close() }) idx := int64(0) @@ -4274,7 +4308,7 @@ func TestNativeHistogram_HistogramFraction(t *testing.T) { for _, floatHisto := range []bool{true, false} { for _, c := range cases { t.Run(fmt.Sprintf("%s floatHistogram=%t", c.text, floatHisto), func(t *testing.T) { - engine := newTestEngine() + engine := newTestEngine(t) storage := teststorage.New(t) t.Cleanup(func() { storage.Close() }) @@ -4439,7 +4473,7 @@ func TestNativeHistogram_Sum_Count_Add_AvgOperator(t *testing.T) { seriesName := "sparse_histogram_series" seriesNameOverTime := "sparse_histogram_series_over_time" - engine := newTestEngine() + engine := newTestEngine(t) ts := idx0 * int64(10*time.Minute/time.Millisecond) app := storage.Appender(context.Background()) @@ -4709,7 +4743,7 @@ func TestNativeHistogram_SubOperator(t *testing.T) { for _, c := range cases { for _, floatHisto := range []bool{true, false} { t.Run(fmt.Sprintf("floatHistogram=%t %d", floatHisto, idx0), func(t *testing.T) { - engine := newTestEngine() + engine := newTestEngine(t) storage := teststorage.New(t) t.Cleanup(func() { storage.Close() }) @@ -4870,7 +4904,7 @@ func TestNativeHistogram_MulDivOperator(t *testing.T) { seriesName := "sparse_histogram_series" floatSeriesName := "float_series" - engine := newTestEngine() + engine := newTestEngine(t) ts := idx0 * int64(10*time.Minute/time.Millisecond) app := storage.Appender(context.Background()) @@ -4993,7 +5027,7 @@ metric 0 1 2 for _, c := range cases { c := c t.Run(c.name, func(t *testing.T) { - engine := newTestEngine() + engine := newTestEngine(t) storage := LoadedStorage(t, load) t.Cleanup(func() { storage.Close() }) diff --git a/promql/functions_test.go b/promql/functions_test.go index 6d5c3784ea6..bf97d8a4cd1 100644 --- a/promql/functions_test.go +++ b/promql/functions_test.go @@ -40,6 +40,9 @@ func TestDeriv(t *testing.T) { Timeout: 10 * time.Second, } engine := NewEngine(opts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) a := storage.Appender(context.Background()) diff --git a/promql/promql_test.go b/promql/promql_test.go index 05821b1c11f..9834be6960f 100644 --- a/promql/promql_test.go +++ b/promql/promql_test.go @@ -25,8 +25,9 @@ import ( "github.com/prometheus/prometheus/util/teststorage" ) -func newTestEngine() *Engine { - return NewEngine(EngineOpts{ +func newTestEngine(t *testing.T) *Engine { + t.Helper() + ng := NewEngine(EngineOpts{ Logger: nil, Reg: nil, MaxSamples: 10000, @@ -36,10 +37,14 @@ func newTestEngine() *Engine { EnableNegativeOffset: true, EnablePerStepStats: true, }) + t.Cleanup(func() { + require.NoError(t, ng.Close()) + }) + return ng } func TestEvaluations(t *testing.T) { - RunBuiltinTests(t, newTestEngine()) + RunBuiltinTests(t, newTestEngine(t)) } // Run a lot of queries at the same time, to check for race conditions. @@ -53,6 +58,9 @@ func TestConcurrentRangeQueries(t *testing.T) { Timeout: 100 * time.Second, } engine := NewEngine(opts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) const interval = 10000 // 10s interval. // A day of data plus 10k steps. diff --git a/promql/query_logger.go b/promql/query_logger.go index b9bd105eafb..01fed6720c6 100644 --- a/promql/query_logger.go +++ b/promql/query_logger.go @@ -230,6 +230,7 @@ func (tracker *ActiveQueryTracker) Close() error { return nil } if err := tracker.closer.Close(); err != nil { + fmt.Printf("Closing %T\n", tracker.closer) return fmt.Errorf("close ActiveQueryTracker.closer: %w", err) } return nil diff --git a/promql/test.go b/promql/test.go index 1cdfe8d311c..9b94157ffc8 100644 --- a/promql/test.go +++ b/promql/test.go @@ -1072,5 +1072,9 @@ func (ll *LazyLoader) Storage() storage.Storage { // Close closes resources associated with the LazyLoader. func (ll *LazyLoader) Close() error { ll.cancelCtx() - return ll.storage.Close() + err := ll.queryEngine.Close() + if sErr := ll.storage.Close(); err == nil && sErr != nil { + return sErr + } + return err } diff --git a/promql/test_test.go b/promql/test_test.go index a5b24ac6986..37b9eacf85d 100644 --- a/promql/test_test.go +++ b/promql/test_test.go @@ -450,7 +450,7 @@ eval range from 0 to 5m step 5m testmetric for name, testCase := range testCases { t.Run(name, func(t *testing.T) { - err := runTest(t, testCase.input, newTestEngine()) + err := runTest(t, testCase.input, newTestEngine(t)) if testCase.expectedError == "" { require.NoError(t, err) diff --git a/rules/alerting_test.go b/rules/alerting_test.go index 5fae3edd18c..5a2e64da794 100644 --- a/rules/alerting_test.go +++ b/rules/alerting_test.go @@ -35,16 +35,23 @@ import ( "github.com/prometheus/prometheus/util/testutil" ) -var testEngine = promql.NewEngine(promql.EngineOpts{ - Logger: nil, - Reg: nil, - MaxSamples: 10000, - Timeout: 100 * time.Second, - NoStepSubqueryIntervalFn: func(int64) int64 { return 60 * 1000 }, - EnableAtModifier: true, - EnableNegativeOffset: true, - EnablePerStepStats: true, -}) +func testEngine(tb testing.TB) *promql.Engine { + tb.Helper() + e := promql.NewEngine(promql.EngineOpts{ + Logger: nil, + Reg: nil, + MaxSamples: 10000, + Timeout: 100 * time.Second, + NoStepSubqueryIntervalFn: func(int64) int64 { return 60 * 1000 }, + EnableAtModifier: true, + EnableNegativeOffset: true, + EnablePerStepStats: true, + }) + tb.Cleanup(func() { + require.NoError(tb, e.Close()) + }) + return e +} func TestAlertingRuleState(t *testing.T) { tests := []struct { @@ -224,12 +231,14 @@ func TestAlertingRuleLabelsUpdate(t *testing.T) { }, } + ng := testEngine(t) + baseTime := time.Unix(0, 0) for i, result := range results { t.Logf("case %d", i) evalTime := baseTime.Add(time.Duration(i) * time.Minute) result[0].T = timestamp.FromTime(evalTime) - res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0) + res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0) require.NoError(t, err) var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. @@ -246,7 +255,7 @@ func TestAlertingRuleLabelsUpdate(t *testing.T) { testutil.RequireEqual(t, result, filteredRes) } evalTime := baseTime.Add(time.Duration(len(results)) * time.Minute) - res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0) + res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0) require.NoError(t, err) require.Empty(t, res) } @@ -308,13 +317,15 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) { }, } + ng := testEngine(t) + evalTime := time.Unix(0, 0) result[0].T = timestamp.FromTime(evalTime) result[1].T = timestamp.FromTime(evalTime) var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. res, err := ruleWithoutExternalLabels.Eval( - context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0, + context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0, ) require.NoError(t, err) for _, smpl := range res { @@ -328,7 +339,7 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) { } res, err = ruleWithExternalLabels.Eval( - context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0, + context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0, ) require.NoError(t, err) for _, smpl := range res { @@ -405,9 +416,11 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) { result[0].T = timestamp.FromTime(evalTime) result[1].T = timestamp.FromTime(evalTime) + ng := testEngine(t) + var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. res, err := ruleWithoutExternalURL.Eval( - context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0, + context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0, ) require.NoError(t, err) for _, smpl := range res { @@ -421,7 +434,7 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) { } res, err = ruleWithExternalURL.Eval( - context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0, + context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0, ) require.NoError(t, err) for _, smpl := range res { @@ -474,9 +487,11 @@ func TestAlertingRuleEmptyLabelFromTemplate(t *testing.T) { evalTime := time.Unix(0, 0) result[0].T = timestamp.FromTime(evalTime) + ng := testEngine(t) + var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. res, err := rule.Eval( - context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0, + context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0, ) require.NoError(t, err) for _, smpl := range res { @@ -519,6 +534,8 @@ instance: {{ $v.Labels.instance }}, value: {{ printf "%.0f" $v.Value }}; ) evalTime := time.Unix(0, 0) + ng := testEngine(t) + startQueryCh := make(chan struct{}) getDoneCh := make(chan struct{}) slowQueryFunc := func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) { @@ -532,7 +549,7 @@ instance: {{ $v.Labels.instance }}, value: {{ printf "%.0f" $v.Value }}; require.Fail(t, "unexpected blocking when template expanding.") } } - return EngineQueryFunc(testEngine, storage)(ctx, q, ts) + return EngineQueryFunc(ng, storage)(ctx, q, ts) } go func() { <-startQueryCh @@ -578,6 +595,9 @@ func TestAlertingRuleDuplicate(t *testing.T) { } engine := promql.NewEngine(opts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) ctx, cancelCtx := context.WithCancel(context.Background()) defer cancelCtx() @@ -641,9 +661,9 @@ func TestAlertingRuleLimit(t *testing.T) { ) evalTime := time.Unix(0, 0) - + ng := testEngine(t) for _, test := range tests { - switch _, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, test.limit); { + switch _, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, test.limit); { case err != nil: require.EqualError(t, err, test.err) case test.err != "": @@ -865,12 +885,13 @@ func TestKeepFiringFor(t *testing.T) { }, } + ng := testEngine(t) baseTime := time.Unix(0, 0) for i, result := range results { t.Logf("case %d", i) evalTime := baseTime.Add(time.Duration(i) * time.Minute) result[0].T = timestamp.FromTime(evalTime) - res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0) + res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0) require.NoError(t, err) var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. @@ -887,7 +908,7 @@ func TestKeepFiringFor(t *testing.T) { testutil.RequireEqual(t, result, filteredRes) } evalTime := baseTime.Add(time.Duration(len(results)) * time.Minute) - res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0) + res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0) require.NoError(t, err) require.Empty(t, res) } @@ -922,9 +943,10 @@ func TestPendingAndKeepFiringFor(t *testing.T) { F: 1, } + ng := testEngine(t) baseTime := time.Unix(0, 0) result.T = timestamp.FromTime(baseTime) - res, err := rule.Eval(context.TODO(), baseTime, EngineQueryFunc(testEngine, storage), nil, 0) + res, err := rule.Eval(context.TODO(), baseTime, EngineQueryFunc(ng, storage), nil, 0) require.NoError(t, err) require.Len(t, res, 2) @@ -939,7 +961,7 @@ func TestPendingAndKeepFiringFor(t *testing.T) { } evalTime := baseTime.Add(time.Minute) - res, err = rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0) + res, err = rule.Eval(context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0) require.NoError(t, err) require.Empty(t, res) } diff --git a/rules/manager_test.go b/rules/manager_test.go index c45569befd1..cebaa17677a 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -155,12 +155,13 @@ func TestAlertingRule(t *testing.T) { }, } + ng := testEngine(t) for i, test := range tests { t.Logf("case %d", i) evalTime := baseTime.Add(test.time) - res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0) + res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0) require.NoError(t, err) var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. @@ -294,6 +295,7 @@ func TestForStateAddSamples(t *testing.T) { }, } + ng := testEngine(t) var forState float64 for i, test := range tests { t.Logf("case %d", i) @@ -306,7 +308,7 @@ func TestForStateAddSamples(t *testing.T) { forState = float64(value.StaleNaN) } - res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0) + res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0) require.NoError(t, err) var filteredRes promql.Vector // After removing 'ALERTS' samples. @@ -357,8 +359,9 @@ func TestForStateRestore(t *testing.T) { expr, err := parser.ParseExpr(`http_requests{group="canary", job="app-server"} < 100`) require.NoError(t, err) + ng := testEngine(t) opts := &ManagerOptions{ - QueryFunc: EngineQueryFunc(testEngine, storage), + QueryFunc: EngineQueryFunc(ng, storage), Appendable: storage, Queryable: storage, Context: context.Background(), @@ -526,6 +529,9 @@ func TestStaleness(t *testing.T) { Timeout: 10 * time.Second, } engine := promql.NewEngine(engineOpts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) opts := &ManagerOptions{ QueryFunc: EngineQueryFunc(engine, st), Appendable: st, @@ -718,6 +724,9 @@ func TestUpdate(t *testing.T) { Timeout: 10 * time.Second, } engine := promql.NewEngine(opts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) ruleManager := NewManager(&ManagerOptions{ Appendable: st, Queryable: st, @@ -856,6 +865,9 @@ func TestNotify(t *testing.T) { Timeout: 10 * time.Second, } engine := promql.NewEngine(engineOpts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) var lastNotified []*Alert notifyFunc := func(ctx context.Context, expr string, alerts ...*Alert) { lastNotified = alerts @@ -931,6 +943,9 @@ func TestMetricsUpdate(t *testing.T) { Timeout: 10 * time.Second, } engine := promql.NewEngine(opts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) ruleManager := NewManager(&ManagerOptions{ Appendable: storage, Queryable: storage, @@ -1005,6 +1020,9 @@ func TestGroupStalenessOnRemoval(t *testing.T) { Timeout: 10 * time.Second, } engine := promql.NewEngine(opts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) ruleManager := NewManager(&ManagerOptions{ Appendable: storage, Queryable: storage, @@ -1082,6 +1100,9 @@ func TestMetricsStalenessOnManagerShutdown(t *testing.T) { Timeout: 10 * time.Second, } engine := promql.NewEngine(opts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) ruleManager := NewManager(&ManagerOptions{ Appendable: storage, Queryable: storage, @@ -1184,6 +1205,9 @@ func TestRuleHealthUpdates(t *testing.T) { Timeout: 10 * time.Second, } engine := promql.NewEngine(engineOpts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) opts := &ManagerOptions{ QueryFunc: EngineQueryFunc(engine, st), Appendable: st, @@ -1280,9 +1304,10 @@ func TestRuleGroupEvalIterationFunc(t *testing.T) { }, } + ng := testEngine(t) testFunc := func(tst testInput) { opts := &ManagerOptions{ - QueryFunc: EngineQueryFunc(testEngine, storage), + QueryFunc: EngineQueryFunc(ng, storage), Appendable: storage, Queryable: storage, Context: context.Background(), @@ -1366,8 +1391,9 @@ func TestNativeHistogramsInRecordingRules(t *testing.T) { } require.NoError(t, app.Commit()) + ng := testEngine(t) opts := &ManagerOptions{ - QueryFunc: EngineQueryFunc(testEngine, storage), + QueryFunc: EngineQueryFunc(ng, storage), Appendable: storage, Queryable: storage, Context: context.Background(), diff --git a/rules/recording_test.go b/rules/recording_test.go index 24b7d653902..3bfeb04353d 100644 --- a/rules/recording_test.go +++ b/rules/recording_test.go @@ -122,10 +122,11 @@ func TestRuleEval(t *testing.T) { storage := setUpRuleEvalTest(t) t.Cleanup(func() { storage.Close() }) + ng := testEngine(t) for _, scenario := range ruleEvalTestScenarios { t.Run(scenario.name, func(t *testing.T) { rule := NewRecordingRule("test_rule", scenario.expr, scenario.ruleLabels) - result, err := rule.Eval(context.TODO(), ruleEvaluationTime, EngineQueryFunc(testEngine, storage), nil, 0) + result, err := rule.Eval(context.TODO(), ruleEvaluationTime, EngineQueryFunc(ng, storage), nil, 0) require.NoError(t, err) testutil.RequireEqual(t, scenario.expected, result) }) @@ -136,6 +137,7 @@ func BenchmarkRuleEval(b *testing.B) { storage := setUpRuleEvalTest(b) b.Cleanup(func() { storage.Close() }) + ng := testEngine(b) for _, scenario := range ruleEvalTestScenarios { b.Run(scenario.name, func(b *testing.B) { rule := NewRecordingRule("test_rule", scenario.expr, scenario.ruleLabels) @@ -143,7 +145,7 @@ func BenchmarkRuleEval(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - _, err := rule.Eval(context.TODO(), ruleEvaluationTime, EngineQueryFunc(testEngine, storage), nil, 0) + _, err := rule.Eval(context.TODO(), ruleEvaluationTime, EngineQueryFunc(ng, storage), nil, 0) if err != nil { require.NoError(b, err) } @@ -165,6 +167,9 @@ func TestRuleEvalDuplicate(t *testing.T) { } engine := promql.NewEngine(opts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) ctx, cancelCtx := context.WithCancel(context.Background()) defer cancelCtx() @@ -211,10 +216,10 @@ func TestRecordingRuleLimit(t *testing.T) { labels.FromStrings("test", "test"), ) + ng := testEngine(t) evalTime := time.Unix(0, 0) - for _, test := range tests { - switch _, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, test.limit); { + switch _, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, test.limit); { case err != nil: require.EqualError(t, err, test.err) case test.err != "": diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index bb2a73f6dbc..4abd558a8d7 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -57,16 +57,25 @@ import ( "github.com/prometheus/prometheus/util/teststorage" ) -var testEngine = promql.NewEngine(promql.EngineOpts{ - Logger: nil, - Reg: nil, - MaxSamples: 10000, - Timeout: 100 * time.Second, - NoStepSubqueryIntervalFn: func(int64) int64 { return 60 * 1000 }, - EnableAtModifier: true, - EnableNegativeOffset: true, - EnablePerStepStats: true, -}) +func testEngine(t *testing.T) *promql.Engine { + t.Helper() + + ng := promql.NewEngine(promql.EngineOpts{ + Logger: nil, + Reg: nil, + MaxSamples: 10000, + Timeout: 100 * time.Second, + NoStepSubqueryIntervalFn: func(int64) int64 { return 60 * 1000 }, + EnableAtModifier: true, + EnableNegativeOffset: true, + EnablePerStepStats: true, + }) + t.Cleanup(func() { + require.NoError(t, ng.Close()) + }) + + return ng +} // testMetaStore satisfies the scrape.MetricMetadataStore interface. // It is used to inject specific metadata as part of a test case. @@ -210,6 +219,7 @@ func (t testAlertmanagerRetriever) toFactory() func(context.Context) Alertmanage } type rulesRetrieverMock struct { + t *testing.T alertingRules []*rules.AlertingRule ruleGroups []*rules.Group testing *testing.T @@ -281,6 +291,9 @@ func (m *rulesRetrieverMock) CreateRuleGroups() { } engine := promql.NewEngine(engineOpts) + m.t.Cleanup(func() { + require.NoError(m.t, engine.Close()) + }) opts := &rules.ManagerOptions{ QueryFunc: rules.EngineQueryFunc(engine, storage), Appendable: storage, @@ -401,8 +414,10 @@ func TestEndpoints(t *testing.T) { now := time.Now() + ng := testEngine(t) + t.Run("local", func(t *testing.T) { - algr := rulesRetrieverMock{} + algr := rulesRetrieverMock{t: t} algr.testing = t algr.CreateAlertingRules() @@ -415,7 +430,7 @@ func TestEndpoints(t *testing.T) { api := &API{ Queryable: storage, - QueryEngine: testEngine, + QueryEngine: ng, ExemplarQueryable: storage.ExemplarQueryable(), targetRetriever: testTargetRetriever.toFactory(), alertmanagerRetriever: testAlertmanagerRetriever{}.toFactory(), @@ -466,7 +481,7 @@ func TestEndpoints(t *testing.T) { }) require.NoError(t, err) - algr := rulesRetrieverMock{} + algr := rulesRetrieverMock{t: t} algr.testing = t algr.CreateAlertingRules() @@ -479,7 +494,7 @@ func TestEndpoints(t *testing.T) { api := &API{ Queryable: remote, - QueryEngine: testEngine, + QueryEngine: ng, ExemplarQueryable: storage.ExemplarQueryable(), targetRetriever: testTargetRetriever.toFactory(), alertmanagerRetriever: testAlertmanagerRetriever{}.toFactory(), @@ -621,7 +636,7 @@ func TestQueryExemplars(t *testing.T) { api := &API{ Queryable: storage, - QueryEngine: testEngine, + QueryEngine: testEngine(t), ExemplarQueryable: storage.ExemplarQueryable(), } @@ -829,7 +844,7 @@ func TestStats(t *testing.T) { api := &API{ Queryable: storage, - QueryEngine: testEngine, + QueryEngine: testEngine(t), now: func() time.Time { return time.Unix(123, 0) }, diff --git a/web/api/v1/errors_test.go b/web/api/v1/errors_test.go index e76a1a3d35a..5ddd47e1b27 100644 --- a/web/api/v1/errors_test.go +++ b/web/api/v1/errors_test.go @@ -86,7 +86,7 @@ func TestApiStatusCodes(t *testing.T) { "error from seriesset": errorTestQueryable{q: errorTestQuerier{s: errorTestSeriesSet{err: tc.err}}}, } { t.Run(fmt.Sprintf("%s/%s", name, k), func(t *testing.T) { - r := createPrometheusAPI(q) + r := createPrometheusAPI(t, q) rec := httptest.NewRecorder() req := httptest.NewRequest(http.MethodGet, "/api/v1/query?query=up", nil) @@ -100,7 +100,9 @@ func TestApiStatusCodes(t *testing.T) { } } -func createPrometheusAPI(q storage.SampleAndChunkQueryable) *route.Router { +func createPrometheusAPI(t *testing.T, q storage.SampleAndChunkQueryable) *route.Router { + t.Helper() + engine := promql.NewEngine(promql.EngineOpts{ Logger: log.NewNopLogger(), Reg: nil, @@ -108,6 +110,9 @@ func createPrometheusAPI(q storage.SampleAndChunkQueryable) *route.Router { MaxSamples: 100, Timeout: 5 * time.Second, }) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) api := NewAPI( engine,