Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Fix panic in rules-manager on reload.
Browse files Browse the repository at this point in the history
Signed-off-by: Harkishen-Singh <harkishensingh@hotmail.com>
  • Loading branch information
Harkishen-Singh committed Jun 17, 2022
1 parent 90289d8 commit 1fe3bb8
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 64 deletions.
1 change: 1 addition & 0 deletions pkg/rules/config.go
Expand Up @@ -51,6 +51,7 @@ func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config {

func Validate(cfg *Config) error {
if cfg.PrometheusConfigAddress == "" {
cfg.PrometheusConfig = &prometheus_config.DefaultConfig
return nil
}
promCfg, err := prometheus_config.LoadFile(cfg.PrometheusConfigAddress, false, true, log.GetLogger())
Expand Down
11 changes: 5 additions & 6 deletions pkg/rules/config_test.go
Expand Up @@ -24,9 +24,11 @@ func TestValidate(t *testing.T) {
shouldError bool
}{
{
name: "no prometheus config",
config: Config{},
expectedConfig: Config{},
name: "no prometheus config",
config: Config{},
expectedConfig: Config{
PrometheusConfig: &prometheus_config.DefaultConfig,
},
},
{
name: "healthy config with no rules",
Expand Down Expand Up @@ -118,9 +120,6 @@ func TestValidate(t *testing.T) {
require.Equal(t, c.containsRules, c.config.ContainsRules(), c.name)

if c.config.PrometheusConfig != nil {
if c.expectedConfig.PrometheusConfig.RuleFiles == nil {
c.expectedConfig.PrometheusConfig.RuleFiles = []string{}
}
require.Equal(t, c.expectedConfig, c.config, c.name)
}
}
Expand Down
31 changes: 29 additions & 2 deletions pkg/rules/rules.go
Expand Up @@ -22,17 +22,19 @@ import (
"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/pgclient"
"github.com/timescale/promscale/pkg/rules/adapters"
"github.com/timescale/promscale/pkg/telemetry"
)

type Manager struct {
ctx context.Context
rulesManager *prom_rules.Manager
notifierManager *notifier.Manager
discoveryManager *discovery.Manager
telemetryEngine telemetry.Engine
postRulesProcessing prom_rules.RuleGroupPostProcessFunc
}

func NewManager(ctx context.Context, r prometheus.Registerer, client *pgclient.Client, cfg *Config) (*Manager, func() error, error) {
func NewManager(ctx context.Context, r prometheus.Registerer, client *pgclient.Client, cfg *Config, t telemetry.Engine) (*Manager, func() error, error) {
discoveryManagerNotify := discovery.NewManager(ctx, log.GetLogger(), discovery.Name("notify"))

notifierManager := notifier.NewManager(&notifier.Options{
Expand Down Expand Up @@ -66,6 +68,7 @@ func NewManager(ctx context.Context, r prometheus.Registerer, client *pgclient.C
rulesManager: rulesManager,
notifierManager: notifierManager,
discoveryManager: discoveryManagerNotify,
telemetryEngine: t,
}
return manager, manager.getReloader(cfg), nil
}
Expand All @@ -76,7 +79,31 @@ func (m *Manager) getReloader(cfg *Config) func() error {
if err != nil {
return fmt.Errorf("error validating rules-config: %w", err)
}
return errors.WithMessage(m.ApplyConfig(cfg.PrometheusConfig), "error applying config")
if err = m.ApplyConfig(cfg.PrometheusConfig); err != nil {
return fmt.Errorf("error applying config: %w", err)
}
go m.performTelemetry(cfg) // Update the telemetry async after ensuring everything is fine in the rule files.
return nil
}
}

func (m *Manager) performTelemetry(cfg *Config) {
pendingTelemetry := map[string]string{
"rules_enabled": "false", // Will be written in telemetry table as `promscale_rules_enabled: false`
"alerting_enabled": "false", // `promscale_alerting_enabled: false`
}
if cfg.ContainsRules() {
pendingTelemetry["rules_enabled"] = "true"
if cfg.ContainsAlertingConfig() {
pendingTelemetry["alerting_enabled"] = "true"
} else {
log.Debug("msg", "Alerting configuration not present in the given Prometheus configuration file. Alerting will not be initialized")
}
} else {
log.Debug("msg", "Rules files not found. Rules and alerting configuration will not be initialized")
}
for k, v := range pendingTelemetry {
m.telemetryEngine.Write(k, v)
}
}

Expand Down
2 changes: 0 additions & 2 deletions pkg/rules/testdata/no_rules.good.config.yaml
@@ -1,5 +1,3 @@
global:
scrape_interval: 1m
evaluation_interval: 1m

rule_files: []
83 changes: 31 additions & 52 deletions pkg/runner/runner.go
Expand Up @@ -137,48 +137,33 @@ func Run(cfg *Config) error {
}
}

var (
group run.Group
reloadRules func() error
)
pendingTelemetry := map[string]string{
"rules_enabled": "false", // Will be written in telemetry table as `promscale_rules_enabled: false`
"alerting_enabled": "false", // `promscale_alerting_enabled: false`
}
if cfg.RulesCfg.ContainsRules() {
pendingTelemetry["rules_enabled"] = "true"
if cfg.RulesCfg.ContainsAlertingConfig() {
pendingTelemetry["alerting_enabled"] = "true"
} else {
log.Debug("msg", "Alerting configuration not present in the given Prometheus configuration file. Alerting will not be initialized")
}

rulesCtx, stopRuler := context.WithCancel(context.Background())
defer stopRuler()

var manager *rules.Manager
manager, reloadRules, err = rules.NewManager(rulesCtx, prometheus.DefaultRegisterer, client, &cfg.RulesCfg)
if err != nil {
return fmt.Errorf("error creating rules manager: %w", err)
}
cfg.APICfg.Rules = manager

if err := reloadRules(); err != nil {
return err
}
telemetryEngine := initTelemetryEngine(client)
telemetryEngine.Start()
defer telemetryEngine.Stop()

group.Add(
func() error {
log.Info("msg", "Started Rule-Manager")
return manager.Run()
}, func(error) {
log.Info("msg", "Stopping Rule-Manager")
stopRuler()
},
)
} else {
log.Debug("msg", "Rules files not found in the given Prometheus configuration file. Both rule-manager and alerting will not be initialized")
rulesCtx, stopRuler := context.WithCancel(context.Background())
defer stopRuler()
manager, reloadRules, err := rules.NewManager(rulesCtx, prometheus.DefaultRegisterer, client, &cfg.RulesCfg, telemetryEngine)
if err != nil {
return fmt.Errorf("error creating rules manager: %w", err)
}
cfg.APICfg.Rules = manager

var group run.Group
group.Add(
func() error {
// Reload the rules before starting the rules-manager to ensure all rules are healthy.
// Otherwise, we block the startup.
if err = reloadRules(); err != nil {
return fmt.Errorf("error reloading rules: %w", err)
}
log.Info("msg", "Started Rule-Manager")
return manager.Run()
}, func(error) {
log.Info("msg", "Stopping Rule-Manager")
stopRuler()
},
)

jaegerQuery := query.New(client.QuerierConnection, &cfg.TracingCfg)

Expand All @@ -188,10 +173,6 @@ func Run(cfg *Config) error {
return fmt.Errorf("generate router: %w", err)
}

telemetryEngine := initTelemetryEngine(client)
telemetryEngine.Start()
defer telemetryEngine.Stop()

if len(cfg.ThanosStoreAPIListenAddr) > 0 {
srv := thanos.NewStorage(client.Queryable())
options := make([]grpc.ServerOption, 0)
Expand Down Expand Up @@ -264,13 +245,6 @@ func Run(cfg *Config) error {
},
)

// Asynchronously update the telemetry information.
go func() {
for k, v := range pendingTelemetry {
telemetryEngine.Write(k, v)
}
}()

mux := http.NewServeMux()
mux.Handle("/", router)

Expand Down Expand Up @@ -304,7 +278,12 @@ func Run(cfg *Config) error {
group.Add(
func() error {
for {
switch <-c {
sig, open := <-c
if !open {
// Channel closed from error function. Let's shutdown.
return nil
}
switch sig {
case syscall.SIGINT:
return nil
case syscall.SIGHUP:
Expand Down
3 changes: 2 additions & 1 deletion pkg/tests/end_to_end_tests/alerts_test.go
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/timescale/promscale/pkg/pgmodel/cache"
"github.com/timescale/promscale/pkg/query"
"github.com/timescale/promscale/pkg/rules"
"github.com/timescale/promscale/pkg/telemetry"
"github.com/timescale/promscale/pkg/tenancy"
)

Expand Down Expand Up @@ -75,7 +76,7 @@ func TestAlerts(t *testing.T) {
rulesCtx, stopRuler := context.WithCancel(context.Background())
defer stopRuler()

manager, _, err := rules.NewManager(rulesCtx, prometheus.NewRegistry(), pgClient, rulesCfg)
manager, _, err := rules.NewManager(rulesCtx, prometheus.NewRegistry(), pgClient, rulesCfg, telemetry.NewNoopEngine())
require.NoError(t, err)

require.NotNil(t, rulesCfg.PrometheusConfig)
Expand Down
3 changes: 2 additions & 1 deletion pkg/tests/end_to_end_tests/rules_test.go
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/timescale/promscale/pkg/pgmodel/cache"
"github.com/timescale/promscale/pkg/query"
"github.com/timescale/promscale/pkg/rules"
"github.com/timescale/promscale/pkg/telemetry"
"github.com/timescale/promscale/pkg/tenancy"
)

Expand Down Expand Up @@ -67,7 +68,7 @@ func TestRecordingRulesEval(t *testing.T) {
ruleCtx, stopRuler := context.WithCancel(context.Background())
defer stopRuler()

manager, reloadRules, err := rules.NewManager(ruleCtx, prometheus.NewRegistry(), pgClient, rulesCfg)
manager, reloadRules, err := rules.NewManager(ruleCtx, prometheus.NewRegistry(), pgClient, rulesCfg, telemetry.NewNoopEngine())
require.NoError(t, err)

require.NotNil(t, rulesCfg.PrometheusConfig)
Expand Down

0 comments on commit 1fe3bb8

Please sign in to comment.