Skip to content

Commit

Permalink
controller: do manual maintenance to cleanup orphaned alerts
Browse files Browse the repository at this point in the history
  • Loading branch information
talal committed Aug 26, 2020
1 parent 88001ee commit 0d59235
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 20 deletions.
95 changes: 84 additions & 11 deletions internal/controller/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,99 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// getPromRulefromAbsentRuleGroup takes the name of a RuleGroup name that holds
// absent alerts and returns the name of the origin PrometheusRule that holds
// the corresponding alert definitions. An empty string is returned if the name
// can't be determined.
//
// Absent alert RuleGroups names have the format:
// originPrometheusRuleName/RuleGroupName
func getPromRulefromAbsentRuleGroup(group string) string {
sL := strings.Split(group, "/")
if len(sL) != 2 {
return ""
}
return sL[0]
}

// cleanUpOrphanedAbsentAlertsNamespace deletes orphaned absent alerts across a
// cluster.
func (c *Controller) cleanUpOrphanedAbsentAlertsCluster() error {
// Get names of all PrometheusRules that exist in the informer's cache:
// map of namespace to map[promRuleName]bool
promRules := make(map[string]map[string]bool)
objs := c.promRuleInformer.GetStore().List()
for _, v := range objs {
pr, ok := v.(*monitoringv1.PrometheusRule)
if !ok {
continue
}
ns := pr.GetNamespace()
if _, ok = promRules[ns]; !ok {
promRules[ns] = make(map[string]bool)
}
promRules[ns][pr.GetName()] = true
}

for namespace, promRuleNames := range promRules {
// Get all absentPrometheusRules for this namespace.
prList, err := c.promClientset.MonitoringV1().PrometheusRules(namespace).
List(context.Background(), metav1.ListOptions{LabelSelector: labelOperatorManagedBy})
if err != nil {
return errors.Wrap(err, "could not list AbsentPrometheusRules")
}

for _, pr := range prList.Items {
// Check if there are any alerts in this absentPrometheusRule that
// don't belong to any PrometheusRule in promRuleNames.
//
// cleanup map is used because there could be multiple RuleGroups
// that contain absent alerts concerning a single PrometheusRule
// therefore we check all the groups before doing any cleanup.
cleanup := make(map[string]struct{})
for _, g := range pr.Spec.Groups {
n := getPromRulefromAbsentRuleGroup(g.Name)
if n != "" && !promRuleNames[n] {
cleanup[n] = struct{}{}
}
}

aPR := &absentPrometheusRule{PrometheusRule: pr}
for n := range cleanup {
if err := c.cleanUpOrphanedAbsentAlerts(n, aPR); err != nil {
return err
}
}
}
}

return nil
}

// cleanUpOrphanedAbsentAlertsNamespace deletes orphaned absent alerts
// concerning a specific PrometheusRule from a namespace.
func (c *Controller) cleanUpOrphanedAbsentAlertsNamespace(namespace, promRuleName string) error {
//
// This is used when we don't know the prometheus server name of the
// PrometheusRule so we list all the AbsentPrometheusRules in a namespace and
// find the one that has the corresponding absent alerts.
func (c *Controller) cleanUpOrphanedAbsentAlertsNamespace(promRuleName, namespace string) error {
prList, err := c.promClientset.MonitoringV1().PrometheusRules(namespace).
List(context.Background(), metav1.ListOptions{LabelSelector: labelOperatorManagedBy})
if err != nil {
return errors.Wrap(err, "could not list AbsentPrometheusRules")
}

for _, pr := range prList.Items {
aPR := &absentPrometheusRule{PrometheusRule: pr}
err := c.cleanUpOrphanedAbsentAlerts(promRuleName, aPR)
if err != nil {
return err
for _, g := range pr.Spec.Groups {
n := getPromRulefromAbsentRuleGroup(g.Name)
if n != "" && n == promRuleName {
aPR := &absentPrometheusRule{PrometheusRule: pr}
err = c.cleanUpOrphanedAbsentAlerts(promRuleName, aPR)
break
}
}
}

return nil
return err
}

// cleanUpOrphanedAbsentAlerts deletes orphaned absent alerts concerning a
Expand All @@ -51,10 +126,8 @@ func (c *Controller) cleanUpOrphanedAbsentAlerts(promRuleName string, absentProm
old := absentPromRule.Spec.Groups
new := make([]monitoringv1.RuleGroup, 0, len(old))
for _, g := range old {
// The rule group names for absentPrometheusRule have the format:
// originPromRuleName/ruleGroupName.
sL := strings.Split(g.Name, "/")
if len(sL) > 0 && sL[0] == promRuleName {
n := getPromRulefromAbsentRuleGroup(g.Name)
if n != "" && n == promRuleName {
continue
}
new = append(new, g)
Expand Down
39 changes: 30 additions & 9 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,21 @@ const (
// refresh its cache.
DefaultResyncPeriod = 10 * time.Minute

// DefaultReconciliationPeriod is the period after which all the objects in
// the informer's cache will be added to the workqueue so that they can be
// reconciliationPeriod is the period after which all the objects in the
// informer's cache will be added to the workqueue so that they can be
// processed by the syncHandler().
//
// The informer calls the event handlers only if the resource state
// changes. We force this additional reconciliation as a liveness check to
// see if the operator is working as intended.
DefaultReconciliationPeriod = 5 * time.Minute
// changes. We do this additional reconciliation as a liveness check to see
// if the operator is working as intended.
reconciliationPeriod = 5 * time.Minute

// maintenancePeriod is the period after which the worker will clean up any
// orphaned absent alerts across the entire cluster.
//
// We do this manual cleanup in case a PrometheusRule is deleted and the
// update of the shared informer's cache has missed it.
maintenancePeriod = 1 * time.Hour
)

// Controller is the controller implementation for acting on PrometheusRule
Expand Down Expand Up @@ -218,16 +225,29 @@ func (c *Controller) enqueueAllObjects() {
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (c *Controller) runWorker() {
ticker := time.NewTicker(DefaultReconciliationPeriod)
defer ticker.Stop()
// Run maintenance at start up.
if err := c.cleanUpOrphanedAbsentAlertsCluster(); err != nil {
c.logger.ErrorWithBackoff("msg", "could not cleanup orphaned absent alerts from cluster",
"err", err)
}

reconcileT := time.NewTicker(reconciliationPeriod)
defer reconcileT.Stop()
maintenanceT := time.NewTicker(maintenancePeriod)
defer maintenanceT.Stop()
done := make(chan struct{})
go func() {
for {
select {
case <-done:
return
case <-ticker.C:
case <-reconcileT.C:
c.enqueueAllObjects()
case <-maintenanceT.C:
if err := c.cleanUpOrphanedAbsentAlertsCluster(); err != nil {
c.logger.ErrorWithBackoff("msg", "could not cleanup orphaned absent alerts from cluster",
"err", err)
}
}
}
}()
Expand Down Expand Up @@ -295,7 +315,7 @@ func (c *Controller) syncHandler(key string) error {
// The resource may no longer exist, in which case we clean up any
// orphaned absent alerts.
c.logger.Debug("msg", "PrometheusRule no longer exists", "key", key)
err = c.cleanUpOrphanedAbsentAlertsNamespace(namespace, name)
err = c.cleanUpOrphanedAbsentAlertsNamespace(name, namespace)
default:
// Requeue object for later processing.
return err
Expand All @@ -320,6 +340,7 @@ type errParseRuleGroups struct {
cause error
}

// Error implements the error interface.
func (e *errParseRuleGroups) Error() string {
return e.cause.Error()
}
Expand Down

0 comments on commit 0d59235

Please sign in to comment.