Skip to content

Commit

Permalink
promql.Engine: Add Close method
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
  • Loading branch information
aknuds1 committed May 8, 2024
1 parent f2b4f76 commit 13becbb
Show file tree
Hide file tree
Showing 14 changed files with 229 additions and 86 deletions.
3 changes: 3 additions & 0 deletions cmd/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -942,12 +942,14 @@ func main() {
listener, err := webHandler.Listener()
if err != nil {
level.Error(logger).Log("msg", "Unable to start web listener", "err", err)
_ = queryEngine.Close()
os.Exit(1)
}

err = toolkit_web.Validate(*webConfig)
if err != nil {
level.Error(logger).Log("msg", "Unable to validate web configuration file", "err", err)
_ = queryEngine.Close()
os.Exit(1)
}

Expand All @@ -969,6 +971,7 @@ func main() {
case <-cancel:
reloadReady.Close()
}
_ = queryEngine.Close()
return nil
},
func(err error) {
Expand Down
8 changes: 8 additions & 0 deletions promql/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
Expand Down Expand Up @@ -256,6 +258,9 @@ func BenchmarkRangeQuery(b *testing.B) {
Timeout: 100 * time.Second,
}
engine := NewEngine(opts)
b.Cleanup(func() {
require.NoError(b, engine.Close())
})

const interval = 10000 // 10s interval.
// A day of data plus 10k steps.
Expand Down Expand Up @@ -339,6 +344,9 @@ func BenchmarkNativeHistograms(b *testing.B) {
for _, tc := range cases {
b.Run(tc.name, func(b *testing.B) {
ng := NewEngine(opts)
b.Cleanup(func() {
require.NoError(b, ng.Close())
})
for i := 0; i < b.N; i++ {
qry, err := ng.NewRangeQuery(context.Background(), testStorage, nil, tc.query, start, end, step)
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions promql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"errors"
"fmt"
"io"
"math"
"reflect"
"runtime"
Expand Down Expand Up @@ -423,6 +424,14 @@ func NewEngine(opts EngineOpts) *Engine {
}
}

// Close closes ng.
func (ng *Engine) Close() error {
if closer, ok := ng.activeQueryTracker.(io.Closer); ok {
return closer.Close()
}
return nil
}

// SetQueryLogger sets the query logger.
func (ng *Engine) SetQueryLogger(l QueryLogger) {
ng.queryLoggerLock.Lock()
Expand Down
94 changes: 64 additions & 30 deletions promql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"errors"
"fmt"
"math"
"os"
"sort"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -48,14 +48,7 @@ func TestMain(m *testing.M) {
func TestQueryConcurrency(t *testing.T) {
maxConcurrency := 10

dir, err := os.MkdirTemp("", "test_concurrency")
require.NoError(t, err)
defer os.RemoveAll(dir)
queryTracker := NewActiveQueryTracker(dir, maxConcurrency, nil)
t.Cleanup(func() {
require.NoError(t, queryTracker.Close())
})

queryTracker := NewActiveQueryTracker(t.TempDir(), maxConcurrency, nil)
opts := EngineOpts{
Logger: nil,
Reg: nil,
Expand All @@ -65,13 +58,18 @@ func TestQueryConcurrency(t *testing.T) {
}

engine := NewEngine(opts)
t.Cleanup(func() {
require.NoError(t, engine.Close())
})
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()
t.Cleanup(cancelCtx)

block := make(chan struct{})
processing := make(chan struct{})
done := make(chan int)
defer close(done)
t.Cleanup(func() {
close(done)
})

f := func(context.Context) error {
select {
Expand All @@ -86,9 +84,15 @@ func TestQueryConcurrency(t *testing.T) {
return nil
}

var wg sync.WaitGroup

for i := 0; i < maxConcurrency; i++ {
q := engine.newTestQuery(f)
go q.Exec(ctx)
wg.Add(1)
go func() {
q.Exec(ctx)
wg.Done()
}()
select {
case <-processing:
// Expected.
Expand All @@ -98,7 +102,11 @@ func TestQueryConcurrency(t *testing.T) {
}

q := engine.newTestQuery(f)
go q.Exec(ctx)
wg.Add(1)
go func() {
q.Exec(ctx)
wg.Done()
}()

select {
case <-processing:
Expand All @@ -121,6 +129,8 @@ func TestQueryConcurrency(t *testing.T) {
for i := 0; i < maxConcurrency; i++ {
block <- struct{}{}
}

wg.Wait()
}

func TestQueryTimeout(t *testing.T) {
Expand All @@ -131,6 +141,9 @@ func TestQueryTimeout(t *testing.T) {
Timeout: 5 * time.Millisecond,
}
engine := NewEngine(opts)
t.Cleanup(func() {
require.NoError(t, engine.Close())
})
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()

Expand Down Expand Up @@ -230,6 +243,9 @@ func TestQueryError(t *testing.T) {
Timeout: 10 * time.Second,
}
engine := NewEngine(opts)
t.Cleanup(func() {
require.NoError(t, engine.Close())
})
errStorage := ErrStorage{errors.New("storage error")}
queryable := storage.QueryableFunc(func(mint, maxt int64) (storage.Querier, error) {
return &errQuerier{err: errStorage}, nil
Expand Down Expand Up @@ -564,6 +580,9 @@ func TestSelectHintsSetCorrectly(t *testing.T) {
} {
t.Run(tc.query, func(t *testing.T) {
engine := NewEngine(opts)
t.Cleanup(func() {
require.NoError(t, engine.Close())
})
hintsRecorder := &noopHintRecordingQueryable{}

var (
Expand Down Expand Up @@ -595,6 +614,9 @@ func TestEngineShutdown(t *testing.T) {
Timeout: 10 * time.Second,
}
engine := NewEngine(opts)
t.Cleanup(func() {
require.NoError(t, engine.Close())
})
ctx, cancelCtx := context.WithCancel(context.Background())

block := make(chan struct{})
Expand Down Expand Up @@ -730,7 +752,7 @@ load 10s
t.Run(fmt.Sprintf("%d query=%s", i, c.Query), func(t *testing.T) {
var err error
var qry Query
engine := newTestEngine()
engine := newTestEngine(t)
if c.Interval == 0 {
qry, err = engine.NewInstantQuery(context.Background(), storage, nil, c.Query, c.Start)
} else {
Expand Down Expand Up @@ -1269,7 +1291,7 @@ load 10s
},
}

engine := newTestEngine()
engine := newTestEngine(t)
engine.enablePerStepStats = true
origMaxSamples := engine.maxSamplesPerQuery
for _, c := range cases {
Expand Down Expand Up @@ -1455,7 +1477,7 @@ load 10s

for _, c := range cases {
t.Run(c.Query, func(t *testing.T) {
engine := newTestEngine()
engine := newTestEngine(t)
testFunc := func(expError error) {
var err error
var qry Query
Expand Down Expand Up @@ -1487,7 +1509,7 @@ load 10s
}

func TestAtModifier(t *testing.T) {
engine := newTestEngine()
engine := newTestEngine(t)
storage := LoadedStorage(t, `
load 10s
metric{job="1"} 0+1x1000
Expand Down Expand Up @@ -2022,7 +2044,7 @@ func TestSubquerySelector(t *testing.T) {
},
} {
t.Run("", func(t *testing.T) {
engine := newTestEngine()
engine := newTestEngine(t)
storage := LoadedStorage(t, tst.loadString)
t.Cleanup(func() { storage.Close() })

Expand All @@ -2043,7 +2065,7 @@ func TestSubquerySelector(t *testing.T) {
}

func TestTimestampFunction_StepsMoreOftenThanSamples(t *testing.T) {
engine := newTestEngine()
engine := newTestEngine(t)
storage := LoadedStorage(t, `
load 1m
metric 0+1x1000
Expand Down Expand Up @@ -2113,6 +2135,9 @@ func TestQueryLogger_basic(t *testing.T) {
Timeout: 10 * time.Second,
}
engine := NewEngine(opts)
t.Cleanup(func() {
require.NoError(t, engine.Close())
})

queryExec := func() {
ctx, cancelCtx := context.WithCancel(context.Background())
Expand Down Expand Up @@ -2164,6 +2189,9 @@ func TestQueryLogger_fields(t *testing.T) {
Timeout: 10 * time.Second,
}
engine := NewEngine(opts)
t.Cleanup(func() {
require.NoError(t, engine.Close())
})

f1 := NewFakeQueryLogger()
engine.SetQueryLogger(f1)
Expand Down Expand Up @@ -2193,6 +2221,9 @@ func TestQueryLogger_error(t *testing.T) {
Timeout: 10 * time.Second,
}
engine := NewEngine(opts)
t.Cleanup(func() {
require.NoError(t, engine.Close())
})

f1 := NewFakeQueryLogger()
engine.SetQueryLogger(f1)
Expand Down Expand Up @@ -3076,6 +3107,9 @@ func TestEngineOptsValidation(t *testing.T) {

for _, c := range cases {
eng := NewEngine(c.opts)
t.Cleanup(func() {
require.NoError(t, eng.Close())
})
_, err1 := eng.NewInstantQuery(context.Background(), nil, nil, c.query, time.Unix(10, 0))
_, err2 := eng.NewRangeQuery(context.Background(), nil, nil, c.query, time.Unix(0, 0), time.Unix(10, 0), time.Second)
if c.fail {
Expand Down Expand Up @@ -3235,7 +3269,7 @@ func TestRangeQuery(t *testing.T) {
}
for _, c := range cases {
t.Run(c.Name, func(t *testing.T) {
engine := newTestEngine()
engine := newTestEngine(t)
storage := LoadedStorage(t, c.Load)
t.Cleanup(func() { storage.Close() })

Expand All @@ -3252,7 +3286,7 @@ func TestRangeQuery(t *testing.T) {
func TestNativeHistogramRate(t *testing.T) {
// TODO(beorn7): Integrate histograms into the PromQL testing framework
// and write more tests there.
engine := newTestEngine()
engine := newTestEngine(t)
storage := teststorage.New(t)
t.Cleanup(func() { storage.Close() })

Expand Down Expand Up @@ -3340,7 +3374,7 @@ func TestNativeHistogramRate(t *testing.T) {
func TestNativeFloatHistogramRate(t *testing.T) {
// TODO(beorn7): Integrate histograms into the PromQL testing framework
// and write more tests there.
engine := newTestEngine()
engine := newTestEngine(t)
storage := teststorage.New(t)
t.Cleanup(func() { storage.Close() })

Expand Down Expand Up @@ -3400,7 +3434,7 @@ func TestNativeHistogram_HistogramCountAndSum(t *testing.T) {
}
for _, floatHisto := range []bool{true, false} {
t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) {
engine := newTestEngine()
engine := newTestEngine(t)
storage := teststorage.New(t)
t.Cleanup(func() { storage.Close() })

Expand Down Expand Up @@ -3598,7 +3632,7 @@ func TestNativeHistogram_HistogramStdDevVar(t *testing.T) {
for _, tc := range testCases {
for _, floatHisto := range []bool{true, false} {
t.Run(fmt.Sprintf("%s floatHistogram=%t", tc.name, floatHisto), func(t *testing.T) {
engine := newTestEngine()
engine := newTestEngine(t)
storage := teststorage.New(t)
t.Cleanup(func() { storage.Close() })

Expand Down Expand Up @@ -3841,7 +3875,7 @@ func TestNativeHistogram_HistogramQuantile(t *testing.T) {
},
}

engine := newTestEngine()
engine := newTestEngine(t)
storage := teststorage.New(t)
t.Cleanup(func() { storage.Close() })
idx := int64(0)
Expand Down Expand Up @@ -4274,7 +4308,7 @@ func TestNativeHistogram_HistogramFraction(t *testing.T) {
for _, floatHisto := range []bool{true, false} {
for _, c := range cases {
t.Run(fmt.Sprintf("%s floatHistogram=%t", c.text, floatHisto), func(t *testing.T) {
engine := newTestEngine()
engine := newTestEngine(t)
storage := teststorage.New(t)
t.Cleanup(func() { storage.Close() })

Expand Down Expand Up @@ -4439,7 +4473,7 @@ func TestNativeHistogram_Sum_Count_Add_AvgOperator(t *testing.T) {
seriesName := "sparse_histogram_series"
seriesNameOverTime := "sparse_histogram_series_over_time"

engine := newTestEngine()
engine := newTestEngine(t)

ts := idx0 * int64(10*time.Minute/time.Millisecond)
app := storage.Appender(context.Background())
Expand Down Expand Up @@ -4709,7 +4743,7 @@ func TestNativeHistogram_SubOperator(t *testing.T) {
for _, c := range cases {
for _, floatHisto := range []bool{true, false} {
t.Run(fmt.Sprintf("floatHistogram=%t %d", floatHisto, idx0), func(t *testing.T) {
engine := newTestEngine()
engine := newTestEngine(t)
storage := teststorage.New(t)
t.Cleanup(func() { storage.Close() })

Expand Down Expand Up @@ -4870,7 +4904,7 @@ func TestNativeHistogram_MulDivOperator(t *testing.T) {
seriesName := "sparse_histogram_series"
floatSeriesName := "float_series"

engine := newTestEngine()
engine := newTestEngine(t)

ts := idx0 * int64(10*time.Minute/time.Millisecond)
app := storage.Appender(context.Background())
Expand Down Expand Up @@ -4993,7 +5027,7 @@ metric 0 1 2
for _, c := range cases {
c := c
t.Run(c.name, func(t *testing.T) {
engine := newTestEngine()
engine := newTestEngine(t)
storage := LoadedStorage(t, load)
t.Cleanup(func() { storage.Close() })

Expand Down

0 comments on commit 13becbb

Please sign in to comment.