Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Adds E2E tests for evaluating recording rules.
Browse files Browse the repository at this point in the history
Signed-off-by: Harkishen-Singh <harkishensingh@hotmail.com>

This commit adds an E2E test that evaluates 2 samples of a constant
timestamp and checks for the recording metric name and the new sample
in the database.
  • Loading branch information
Harkishen-Singh committed Jun 1, 2022
1 parent 3fb6f85 commit 6dc1a15
Show file tree
Hide file tree
Showing 11 changed files with 329 additions and 280 deletions.
24 changes: 12 additions & 12 deletions go.mod
Expand Up @@ -10,7 +10,7 @@ require (
github.com/docker/go-connections v0.4.0
github.com/edsrzf/mmap-go v1.1.0
github.com/felixge/fgprof v0.9.2
github.com/go-kit/log v0.2.0
github.com/go-kit/log v0.2.1
github.com/gogo/protobuf v1.3.2
github.com/golang/snappy v0.0.4
github.com/google/uuid v1.3.0
Expand All @@ -29,31 +29,31 @@ require (
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58
github.com/peterbourgon/ff/v3 v3.1.2
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.12.1
github.com/prometheus/client_golang v1.12.2
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.34.0
github.com/prometheus/prometheus v1.8.2-0.20220329205120-3e4bd4d91352
github.com/prometheus/prometheus v1.8.2-0.20220308163432-03831554a519
github.com/sergi/go-diff v1.2.0
github.com/shurcooL/vfsgen v0.0.0-20200824052919-0d455de96546
github.com/stretchr/testify v1.7.1
github.com/testcontainers/testcontainers-go v0.13.0
github.com/thanos-io/thanos v0.25.2
go.opentelemetry.io/collector/model v0.49.0
go.opentelemetry.io/collector/pdata v0.49.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.31.0
go.opentelemetry.io/otel v1.6.3
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.32.0
go.opentelemetry.io/otel v1.7.0
go.opentelemetry.io/otel/exporters/jaeger v1.6.3
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.6.3
go.opentelemetry.io/otel/sdk v1.6.3
go.opentelemetry.io/otel/trace v1.6.3
go.uber.org/atomic v1.9.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.7.0
go.opentelemetry.io/otel/sdk v1.7.0
go.opentelemetry.io/otel/trace v1.7.0
go.uber.org/atomic v1.9.0
go.uber.org/automaxprocs v1.5.1
go.uber.org/goleak v1.1.12
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150
google.golang.org/grpc v1.46.0
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a
google.golang.org/grpc v1.46.2
google.golang.org/protobuf v1.28.0
gopkg.in/yaml.v2 v2.4.0
)

// Make sure Prometheus version is pinned as Prometheus semver does not include Go APIs.
replace github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20220329205120-3e4bd4d91352
replace github.com/prometheus/prometheus => github.com/prometheus/prometheus v0.35.1-0.20220525080617-3a56817a3068
390 changes: 167 additions & 223 deletions go.sum

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pkg/promql/engine.go
Expand Up @@ -1520,7 +1520,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
enh.Ts = ts
// Make the function call.
outVec := call(inArgs, e.Args, enh)
ev.samplesStats.IncrementSamplesAtStep(step, len(points))
ev.samplesStats.IncrementSamplesAtStep(step, int64(len(points)))
enh.Out = outVec[:0]
if len(outVec) > 0 {
ss.Points = append(ss.Points, Point{V: outVec[0].Point.V, T: ts})
Expand Down Expand Up @@ -1905,7 +1905,7 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, storag
}

ss.Points = ev.matrixIterSlice(it, mint, maxt, getPointSlice(16))
ev.samplesStats.IncrementSamplesAtTimestamp(ev.startTimestamp, len(ss.Points))
ev.samplesStats.IncrementSamplesAtTimestamp(ev.startTimestamp, int64(len(ss.Points)))

if len(ss.Points) > 0 {
matrix = append(matrix, ss)
Expand Down
2 changes: 1 addition & 1 deletion pkg/promql/engine_test.go
Expand Up @@ -776,7 +776,7 @@ load 10s
cases := []struct {
Query string
SkipMaxCheck bool
TotalSamples int
TotalSamples int64
TotalSamplesPerStep stats.TotalSamplesPerStep
PeakSamples int
Start time.Time
Expand Down
8 changes: 8 additions & 0 deletions pkg/promql/functions.go
Expand Up @@ -1037,6 +1037,13 @@ func funcDayOfWeek(vals []parser.Value, args parser.Expressions, enh *EvalNodeHe
})
}

// === day_of_year(v Vector) Scalar ===
func funcDayOfYear(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
return dateWrapper(vals, enh, func(t time.Time) float64 {
return float64(t.YearDay())
})
}

// === hour(v Vector) Scalar ===
func funcHour(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
return dateWrapper(vals, enh, func(t time.Time) float64 {
Expand Down Expand Up @@ -1088,6 +1095,7 @@ var FunctionCalls = map[string]FunctionCall{
"days_in_month": funcDaysInMonth,
"day_of_month": funcDayOfMonth,
"day_of_week": funcDayOfWeek,
"day_of_year": funcDayOfYear,
"deg": funcDeg,
"delta": funcDelta,
"deriv": funcDeriv,
Expand Down
16 changes: 8 additions & 8 deletions pkg/rules/config.go
Expand Up @@ -15,10 +15,10 @@ import (
)

const (
defaultNotificationQueueCapacity = 10000
defaultOutageTolerance = time.Hour
defaultForGracePeriod = time.Minute * 10
defaultResendDelay = time.Minute
DefaultNotificationQueueCapacity = 10000
DefaultOutageTolerance = time.Hour
DefaultForGracePeriod = time.Minute * 10
DefaultResendDelay = time.Minute
)

type Config struct {
Expand Down Expand Up @@ -48,10 +48,10 @@ func (cfg *Config) ContainsRules() bool {
}

func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config {
fs.IntVar(&cfg.NotificationQueueCapacity, "metrics.alertmanager.notification-queue-capacity", defaultNotificationQueueCapacity, "The capacity of the queue for pending Alertmanager notifications.")
fs.DurationVar(&cfg.OutageTolerance, "metrics.rules.alert.for-outage-tolerance", defaultOutageTolerance, "Max time to tolerate Promscale outage for restoring \"for\" state of alert.")
fs.DurationVar(&cfg.ForGracePeriod, "metrics.rules.alert.for-grace-period", defaultForGracePeriod, "Minimum duration between alert and restored \"for\" state. This is maintained only for alerts with configured \"for\" time greater than grace period.")
fs.DurationVar(&cfg.ResendDelay, "metrics.rules.alert.resend-delay", defaultResendDelay, "Minimum amount of time to wait before resending an alert to Alertmanager.")
fs.IntVar(&cfg.NotificationQueueCapacity, "metrics.alertmanager.notification-queue-capacity", DefaultNotificationQueueCapacity, "The capacity of the queue for pending Alertmanager notifications.")
fs.DurationVar(&cfg.OutageTolerance, "metrics.rules.alert.for-outage-tolerance", DefaultOutageTolerance, "Max time to tolerate Promscale outage for restoring \"for\" state of alert.")
fs.DurationVar(&cfg.ForGracePeriod, "metrics.rules.alert.for-grace-period", DefaultForGracePeriod, "Minimum duration between alert and restored \"for\" state. This is maintained only for alerts with configured \"for\" time greater than grace period.")
fs.DurationVar(&cfg.ResendDelay, "metrics.rules.alert.resend-delay", DefaultResendDelay, "Minimum amount of time to wait before resending an alert to Alertmanager.")
fs.StringVar(&cfg.PrometheusConfigAddress, "metrics.rules.config-file", "", "Path to configuration file in Prometheus-format, containing `rule_files` and optional `alerting`, `global` fields. "+
"For more details, see https://prometheus.io/docs/prometheus/latest/configuration/configuration/. "+
"Note: If this is flag empty or `rule_files` is empty, Promscale rule-manager will not start. If `alertmanagers` is empty, alerting will not be initialized.")
Expand Down
39 changes: 15 additions & 24 deletions pkg/rules/rules.go
Expand Up @@ -8,7 +8,6 @@ import (
"context"
"fmt"
"net/url"
"sync"
"time"

"github.com/oklog/run"
Expand All @@ -17,7 +16,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
prometheus_config "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/notifier"
prom_rules "github.com/prometheus/prometheus/rules"

Expand All @@ -27,11 +25,11 @@ import (
)

type Manager struct {
rulesManager *prom_rules.Manager
notifierManager *notifier.Manager
discoveryManager *discovery.Manager
stop chan struct{}
wg sync.WaitGroup
ctx context.Context
rulesManager *prom_rules.Manager
notifierManager *notifier.Manager
discoveryManager *discovery.Manager
postRulesProcessing prom_rules.RuleGroupPostProcessFunc
}

func NewManager(ctx context.Context, r prometheus.Registerer, client *pgclient.Client, cfg *Config) (*Manager, error) {
Expand Down Expand Up @@ -63,20 +61,27 @@ func NewManager(ctx context.Context, r prometheus.Registerer, client *pgclient.C
ResendDelay: cfg.ResendDelay,
})
return &Manager{
ctx: ctx,
rulesManager: rulesManager,
notifierManager: notifierManager,
discoveryManager: discoveryManagerNotify,
stop: make(chan struct{}),
}, nil
}

func (m *Manager) WithPostRulesProcess(f prom_rules.RuleGroupPostProcessFunc) {
m.postRulesProcessing = f
}

func (m *Manager) ApplyConfig(cfg *prometheus_config.Config) error {
if err := m.applyDiscoveryManagerConfig(cfg); err != nil {
return err
}
if err := m.applyNotifierManagerConfig(cfg); err != nil {
return err
}
if err := m.rulesManager.Update(time.Duration(cfg.GlobalConfig.EvaluationInterval), cfg.RuleFiles, cfg.GlobalConfig.ExternalLabels, "", m.postRulesProcessing); err != nil {
return fmt.Errorf("error updating rule-manager: %w", err)
}
return nil
}

Expand All @@ -92,10 +97,6 @@ func (m *Manager) applyNotifierManagerConfig(cfg *prometheus_config.Config) erro
return errors.WithMessage(m.notifierManager.ApplyConfig(cfg), "error applying config to notifier manager")
}

func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels, externalURL string) error {
return errors.WithMessage(m.rulesManager.Update(interval, files, externalLabels, externalURL, nil), "error updating the rules manager")
}

func (m *Manager) RuleGroups() []*prom_rules.Group {
return m.rulesManager.RuleGroups()
}
Expand Down Expand Up @@ -134,20 +135,10 @@ func (m *Manager) Run() error {
})

g.Add(func() error {
// This stops all actors in the group on receiving request from manager.Stop()
<-m.stop
// This stops all actors in the group on context done.
<-m.ctx.Done()
return nil
}, func(err error) {})

m.wg.Add(1)
defer func() {
m.wg.Done()
}()
return errors.WithMessage(g.Run(), "error running the rule manager groups")
}

func (m *Manager) Stop() {
close(m.stop)
// we need to wait for all group actor to shutdown which happens after g.Run() returns
m.wg.Wait()
}
11 changes: 1 addition & 10 deletions pkg/runner/runner.go
Expand Up @@ -238,20 +238,11 @@ func Run(cfg *Config) error {
if err = manager.ApplyConfig(promCfg); err != nil {
return fmt.Errorf("error applying Prometheus configuration to rules manager: %w", err)
}
if err = manager.Update(
time.Duration(promCfg.GlobalConfig.EvaluationInterval),
promCfg.RuleFiles,
promCfg.GlobalConfig.ExternalLabels,
"",
); err != nil {
return fmt.Errorf("error updating rules manager: %w", err)
}
log.Info("msg", "Started Rule-Manager")
return manager.Run()
}, func(err error) {
log.Info("msg", "Stopping Rule-Manager")
stopRuler() // Stops the discovery manager.
manager.Stop() // Stops the internal group of rule-manager.
stopRuler()
},
)
}
Expand Down
103 changes: 103 additions & 0 deletions pkg/tests/end_to_end_tests/rules_test.go
@@ -0,0 +1,103 @@
// This file and its contents are licensed under the Apache License 2.0.
// Please see the included NOTICE for copyright information and
// LICENSE for a copy of the license.

package end_to_end_tests

import (
"context"
"testing"
"time"

"github.com/go-kit/log"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/prometheus/client_golang/prometheus"
prom_rules "github.com/prometheus/prometheus/rules"
"github.com/stretchr/testify/require"
"github.com/timescale/promscale/pkg/prompb"

"github.com/timescale/promscale/pkg/pgclient"
"github.com/timescale/promscale/pkg/pgmodel/cache"
"github.com/timescale/promscale/pkg/query"
"github.com/timescale/promscale/pkg/rules"
"github.com/timescale/promscale/pkg/tenancy"
)

const RecordingRulesEvalConfigPath = "../testdata/rules/config.recording_rules_eval.yaml"

func TestRecordingRulesEval(t *testing.T) {
withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) {
conf := &pgclient.Config{
CacheConfig: cache.DefaultConfig,
WriteConnectionsPerProc: 4,
MaxConnections: -1,
}

pgClient, err := pgclient.NewClientWithPool(conf, 1, db, tenancy.NewNoopAuthorizer(), false)
require.NoError(t, err)
defer pgClient.Close()
err = pgClient.InitPromQLEngine(&query.Config{
MaxQueryTimeout: query.DefaultQueryTimeout,
EnabledFeatureMap: map[string]struct{}{"promql-at-modifier": {}},
SubQueryStepInterval: query.DefaultSubqueryStepInterval,
LookBackDelta: query.DefaultLookBackDelta,
MaxSamples: query.DefaultMaxSamples,
MaxPointsPerTs: 11000,
})
require.NoError(t, err)

ingestor := pgClient.Ingestor()
ts := tsToSeconds(generateSmallTimeseries(), time.Second) // Converts ts of samples into seconds.
_, _, err = ingestor.Ingest(context.Background(), newWriteRequestWithTs(ts))
require.NoError(t, err)

rulesCfg := &rules.Config{
NotificationQueueCapacity: rules.DefaultNotificationQueueCapacity,
OutageTolerance: rules.DefaultOutageTolerance,
ForGracePeriod: rules.DefaultForGracePeriod,
ResendDelay: rules.DefaultResendDelay,
PrometheusConfigAddress: RecordingRulesEvalConfigPath,
}
require.NoError(t, rules.Validate(rulesCfg))
require.True(t, rulesCfg.ContainsRules())

ruleCtx, stopRuler := context.WithCancel(context.Background())
defer stopRuler()

manager, err := rules.NewManager(ruleCtx, prometheus.NewRegistry(), pgClient, rulesCfg)
require.NoError(t, err)

require.NotNil(t, rulesCfg.PrometheusConfig)

manager.WithPostRulesProcess(func(*prom_rules.Group, time.Time, log.Logger) error {
defer func() {
stopRuler() // Cancels the context so that the blocking manager.Run() is released when the test finishes.
}()
// Check if recording rule as a metric exists in metric catalog table.
var exists bool
err := db.QueryRow(context.Background(), "select count(*)>0 from _prom_catalog.metric where metric_name = 'test_rule'").Scan(&exists)
require.NoError(t, err)
require.True(t, exists)

// Check if the sum is right.
expected := 0.9
var value float64
err = db.QueryRow(context.Background(), "select value from prom_data.test_rule order by time limit 1").Scan(&value)
require.NoError(t, err)
require.Equal(t, expected, value)

return nil
})
require.NoError(t, manager.ApplyConfig(rulesCfg.PrometheusConfig))
require.NoError(t, manager.Run(), "error running rules manager") // This is blocking. It will be released after stopRuler() in defer func.
})
}

func tsToSeconds(ts []prompb.TimeSeries, multiplier time.Duration) []prompb.TimeSeries {
for i := range ts {
for j := range ts[i].Samples {
ts[i].Samples[j].Timestamp *= multiplier.Milliseconds()
}
}
return ts
}
5 changes: 5 additions & 0 deletions pkg/tests/testdata/rules/config.recording_rules_eval.yaml
@@ -0,0 +1,5 @@
global:
evaluation_interval: 100ms

rule_files:
- rules.yaml
7 changes: 7 additions & 0 deletions pkg/tests/testdata/rules/rules.yaml
@@ -0,0 +1,7 @@
groups:
- name: e2e
rules:
- record: test_rule
expr: firstMetric @ 4 + firstMetric @ 5
labels:
kind: recording

0 comments on commit 6dc1a15

Please sign in to comment.