Skip to content

Commit

Permalink
controller: do manual maintenance to cleanup orphaned alerts
Browse files Browse the repository at this point in the history
  • Loading branch information
talal committed Aug 25, 2020
1 parent 88001ee commit f7d5cba
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 15 deletions.
84 changes: 78 additions & 6 deletions internal/controller/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,82 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// getPromRulefromAbsentRuleGroup takes the name of a RuleGroup name that holds
// absent alerts and returns the name of the origin PrometheusRule that holds
// the corresponding alert definitions, and a bool which is false if the name
// can't be determined.
//
// Absent alert RuleGroups names have the format:
// originPrometheusRuleName/RuleGroupName
func getPromRulefromAbsentRuleGroup(group string) (string, bool) {
sL := strings.Split(group, "/")
if len(sL) != 2 {
return "", false
}
return sL[0], true
}

//nolint:gocognit
// cleanUpOrphanedAbsentAlertsNamespace deletes orphaned absent alerts across a
// cluster.
func (c *Controller) cleanUpOrphanedAbsentAlertsCluster() error {
// Get names of all PrometheusRules that exist in the informer's cache:
// map of namespace to map[promRuleName]bool
promRules := make(map[string]map[string]bool)
objs := c.promRuleInformer.GetStore().List()
for _, v := range objs {
pr, ok := v.(*monitoringv1.PrometheusRule)
if !ok {
continue
}
ns := pr.GetNamespace()
if _, ok = promRules[ns]; !ok {
promRules[ns] = make(map[string]bool)
}
promRules[ns][pr.GetName()] = true
}

cleanup := make(map[string]map[string]struct{})
for namespace, promRuleNames := range promRules {
// Get all absentPrometheusRules for this namespace.
prList, err := c.promClientset.MonitoringV1().PrometheusRules(namespace).
List(context.Background(), metav1.ListOptions{LabelSelector: labelOperatorManagedBy})
if err != nil {
return errors.Wrap(err, "could not list AbsentPrometheusRules")
}

// Check if there are any alerts in those absentPrometheusRules that
// don't belong to any PrometheusRule in promRuleNames.
for _, pr := range prList.Items {
for _, g := range pr.Spec.Groups {
n, ok := getPromRulefromAbsentRuleGroup(g.Name)
if !ok {
continue
}
if promRuleNames[n] {
continue
}
if _, ok = cleanup[namespace]; !ok {
cleanup[namespace] = make(map[string]struct{})
}
cleanup[namespace][n] = struct{}{}
}
}
}

for ns, m := range cleanup {
for pr := range m {
if err := c.cleanUpOrphanedAbsentAlertsNamespace(pr, ns); err != nil {
return err
}
}
}
return nil
}

// cleanUpOrphanedAbsentAlertsNamespace deletes orphaned absent alerts
// concerning a specific PrometheusRule from a namespace.
func (c *Controller) cleanUpOrphanedAbsentAlertsNamespace(namespace, promRuleName string) error {
func (c *Controller) cleanUpOrphanedAbsentAlertsNamespace(promRuleName, namespace string) error {
prList, err := c.promClientset.MonitoringV1().PrometheusRules(namespace).
List(context.Background(), metav1.ListOptions{LabelSelector: labelOperatorManagedBy})
if err != nil {
Expand All @@ -51,11 +124,10 @@ func (c *Controller) cleanUpOrphanedAbsentAlerts(promRuleName string, absentProm
old := absentPromRule.Spec.Groups
new := make([]monitoringv1.RuleGroup, 0, len(old))
for _, g := range old {
// The rule group names for absentPrometheusRule have the format:
// originPromRuleName/ruleGroupName.
sL := strings.Split(g.Name, "/")
if len(sL) > 0 && sL[0] == promRuleName {
continue
if n, ok := getPromRulefromAbsentRuleGroup(g.Name); ok {
if n == promRuleName {
continue
}
}
new = append(new, g)
}
Expand Down
39 changes: 30 additions & 9 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,21 @@ const (
// refresh its cache.
DefaultResyncPeriod = 10 * time.Minute

// DefaultReconciliationPeriod is the period after which all the objects in
// the informer's cache will be added to the workqueue so that they can be
// reconciliationPeriod is the period after which all the objects in the
// informer's cache will be added to the workqueue so that they can be
// processed by the syncHandler().
//
// The informer calls the event handlers only if the resource state
// changes. We force this additional reconciliation as a liveness check to
// see if the operator is working as intended.
DefaultReconciliationPeriod = 5 * time.Minute
// changes. We do this additional reconciliation as a liveness check to see
// if the operator is working as intended.
reconciliationPeriod = 5 * time.Minute

// maintenancePeriod is the period after which the worker will clean up any
// orphaned absent alerts across the entire cluster.
//
// We do this manual cleanup in case a PrometheusRule is deleted and the
// update of the shared informer's cache has missed it.
maintenancePeriod = 6 * time.Hour
)

// Controller is the controller implementation for acting on PrometheusRule
Expand Down Expand Up @@ -218,16 +225,29 @@ func (c *Controller) enqueueAllObjects() {
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (c *Controller) runWorker() {
ticker := time.NewTicker(DefaultReconciliationPeriod)
defer ticker.Stop()
// Run maintenance at start up.
if err := c.cleanUpOrphanedAbsentAlertsCluster(); err != nil {
c.logger.ErrorWithBackoff("msg", "could not cleanup orphaned absent alerts from cluster",
"err", err)
}

reconcileT := time.NewTicker(reconciliationPeriod)
defer reconcileT.Stop()
maintenanceT := time.NewTicker(maintenancePeriod)
defer maintenanceT.Stop()
done := make(chan struct{})
go func() {
for {
select {
case <-done:
return
case <-ticker.C:
case <-reconcileT.C:
c.enqueueAllObjects()
case <-maintenanceT.C:
if err := c.cleanUpOrphanedAbsentAlertsCluster(); err != nil {
c.logger.ErrorWithBackoff("msg", "could not cleanup orphaned absent alerts from cluster",
"err", err)
}
}
}
}()
Expand Down Expand Up @@ -295,7 +315,7 @@ func (c *Controller) syncHandler(key string) error {
// The resource may no longer exist, in which case we clean up any
// orphaned absent alerts.
c.logger.Debug("msg", "PrometheusRule no longer exists", "key", key)
err = c.cleanUpOrphanedAbsentAlertsNamespace(namespace, name)
err = c.cleanUpOrphanedAbsentAlertsNamespace(name, namespace)
default:
// Requeue object for later processing.
return err
Expand All @@ -320,6 +340,7 @@ type errParseRuleGroups struct {
cause error
}

// Error implements the error interface.
func (e *errParseRuleGroups) Error() string {
return e.cause.Error()
}
Expand Down

0 comments on commit f7d5cba

Please sign in to comment.