-
Notifications
You must be signed in to change notification settings - Fork 53
/
rules.go
115 lines (97 loc) · 2.68 KB
/
rules.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package agent
import (
"context"
"time"
healthpkg "github.com/rancher/opni/pkg/health"
"github.com/rancher/opni/pkg/util"
"github.com/rancher/opni/plugins/alerting/pkg/agent/drivers"
"github.com/rancher/opni/plugins/alerting/pkg/apis/node"
"github.com/rancher/opni/plugins/alerting/pkg/apis/rules"
"go.uber.org/zap"
)
var RuleSyncInterval = time.Minute * 2
type RuleStreamer struct {
util.Initializer
parentCtx context.Context
lg *zap.SugaredLogger
ruleStreamCtx context.Context
stopRuleStream context.CancelFunc
ruleSyncClient rules.RuleSyncClient
conditions healthpkg.ConditionTracker
nodeDriver drivers.NodeDriver
}
var _ drivers.ConfigPropagator = (*RuleStreamer)(nil)
func NewRuleStreamer(
ctx context.Context,
lg *zap.SugaredLogger,
ct healthpkg.ConditionTracker,
nodeDriver drivers.NodeDriver,
) *RuleStreamer {
return &RuleStreamer{
parentCtx: ctx,
lg: lg,
conditions: ct,
nodeDriver: nodeDriver,
}
}
func (r *RuleStreamer) Initialize(ruleSyncClient rules.RuleSyncClient) {
r.InitOnce(func() {
r.ruleSyncClient = ruleSyncClient
})
}
func (r *RuleStreamer) ConfigureNode(nodeId string, cfg *node.AlertingCapabilityConfig) error {
return r.configureRuleStreamer(nodeId, cfg)
}
func (r *RuleStreamer) configureRuleStreamer(nodeId string, cfg *node.AlertingCapabilityConfig) error {
lg := r.lg.With("nodeId", nodeId)
lg.Debug("alerting capability updated")
currentlyRunning := r.stopRuleStream != nil
shouldRun := cfg.GetEnabled()
startRuleStreamer := func() {
ctx, ca := context.WithCancel(r.parentCtx)
r.stopRuleStream = ca
go r.run(ctx)
}
switch {
case currentlyRunning && shouldRun:
lg.Debug("restarting rule stream")
r.stopRuleStream()
startRuleStreamer()
case currentlyRunning && !shouldRun:
lg.Debug("stopping rule stream")
r.stopRuleStream()
case !currentlyRunning && shouldRun:
lg.Debug("starting rule stream")
startRuleStreamer()
case !currentlyRunning && !shouldRun:
lg.Debug("rule sync is disabled")
}
return nil
}
func (r *RuleStreamer) sync(ctx context.Context) {
ruleManifest, err := r.nodeDriver.DiscoverRules(ctx)
if err != nil {
r.lg.Warnf("failed to discover rules %s", err)
}
r.lg.Infof("discovered %d rules", len(ruleManifest.Rules))
if _, err := r.ruleSyncClient.SyncRules(ctx, ruleManifest); err != nil {
r.lg.Warnf("failed to sync rules %s", err)
}
}
func (r *RuleStreamer) run(ctx context.Context) {
r.lg.Info("waiting for rule sync client...")
r.WaitForInitContext(ctx)
r.lg.Info("rule sync client acquired")
r.sync(ctx)
t := time.NewTicker(RuleSyncInterval)
defer t.Stop()
for {
select {
case <-t.C:
r.sync(ctx)
case <-ctx.Done():
r.lg.Info("Exiting rule sync loop")
return
}
}
}