Skip to content

Commit

Permalink
controller: queue objects for maintenance cleanup
Browse files Browse the repository at this point in the history
We queue the objects for cleanup and let syncHandler take care of the
cleanup. This means that only exclusively syncHandler() updates absent
PrometheusRules.
  • Loading branch information
talal committed Sep 21, 2020
1 parent ec5324f commit a17cfc4
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 84 deletions.
55 changes: 0 additions & 55 deletions internal/controller/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,61 +40,6 @@ func getPromRulefromAbsentRuleGroup(group string) string {
return sL[0]
}

// cleanUpOrphanedAbsentAlertsNamespace deletes orphaned absent alerts across a
// cluster.
func (c *Controller) cleanUpOrphanedAbsentAlertsCluster() error {
// Get names of all PrometheusRules that exist in the informer's cache:
// map of namespace to map[promRuleName]bool
promRules := make(map[string]map[string]bool)
objs := c.promRuleInformer.GetStore().List()
for _, v := range objs {
pr, ok := v.(*monitoringv1.PrometheusRule)
if !ok {
continue
}
ns := pr.GetNamespace()
if _, ok = promRules[ns]; !ok {
promRules[ns] = make(map[string]bool)
}
promRules[ns][pr.GetName()] = true
}

for namespace, promRuleNames := range promRules {
// Get all absentPrometheusRules for this namespace.
prList, err := c.promClientset.MonitoringV1().PrometheusRules(namespace).
List(context.Background(), metav1.ListOptions{LabelSelector: labelOperatorManagedBy})
if err != nil {
return errors.Wrap(err, "could not list AbsentPrometheusRules")
}

for _, pr := range prList.Items {
// Check if there are any alerts in this absentPrometheusRule that
// don't belong to any PrometheusRule in promRuleNames.
//
// cleanup map is used because there could be multiple RuleGroups
// that contain absent alerts concerning a single PrometheusRule
// therefore we check all the groups before doing any cleanup.
cleanup := make(map[string]struct{})
for _, g := range pr.Spec.Groups {
n := getPromRulefromAbsentRuleGroup(g.Name)
if n != "" && !promRuleNames[n] {
cleanup[n] = struct{}{}
}
}

aPR := &absentPrometheusRule{PrometheusRule: pr}
for n := range cleanup {
if err := c.cleanUpOrphanedAbsentAlerts(n, aPR); err != nil {
return err
}
c.metrics.SuccessfulPrometheusRuleReconcileTime.DeleteLabelValues(namespace, n)
}
}
}

return nil
}

// cleanUpOrphanedAbsentAlertsNamespace deletes orphaned absent alerts
// concerning a specific PrometheusRule from a namespace.
//
Expand Down
84 changes: 55 additions & 29 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,14 @@ const (
DefaultResyncPeriod = 10 * time.Minute

// reconciliationPeriod is the period after which all the objects in the
// informer's cache will be added to the workqueue so that they can be
// processed by the syncHandler().
// informer's cache and orphaned objects with absent alerts are added to
// the workqueue so that they can be processed by the syncHandler().
//
// The informer calls the event handlers only if the resource state
// changes. We do this additional reconciliation as a liveness check to see
// if the operator is working as intended.
// if the operator is working as intended, and as manual maintenance for
// cases when the informer has missed a PrometheusRule deletion/rename.
reconciliationPeriod = 5 * time.Minute

// maintenancePeriod is the period after which the worker will clean up any
// orphaned absent alerts across the entire cluster.
//
// We do this manual cleanup in case a PrometheusRule is deleted and the
// update of the shared informer's cache has missed it.
maintenancePeriod = 1 * time.Hour
)

// Controller is the controller implementation for acting on PrometheusRule
Expand Down Expand Up @@ -216,44 +210,76 @@ func (c *Controller) enqueuePromRule(obj interface{}) {
c.workqueue.Add(key)
}

// enqueueAllObjects adds all objects in the shared informer's cache to the
// workqueue.
func (c *Controller) enqueueAllObjects() {
// enqueueAllPrometheusRules adds all PrometheusRules to the workqueue.
func (c *Controller) enqueueAllPrometheusRules() {
// promRules is a map of namespace to map[promRuleName]bool.
promRules := make(map[string]map[string]bool)
// absentPromRules is a map of namespace to a slice of absentPrometheusRule.
absentPromRules := make(map[string][]*monitoringv1.PrometheusRule)

// Add all PrometheusRules in the shared informer's cache to the workqueue
// and get names of all PrometheusRules that exist in the informer's cache.
objs := c.promRuleInformer.GetStore().List()
for _, v := range objs {
if pr, ok := v.(*monitoringv1.PrometheusRule); ok {
pr, ok := v.(*monitoringv1.PrometheusRule)
if !ok {
continue
}

ns := pr.GetNamespace()
n := pr.GetName()

if l := pr.GetLabels(); mustParseBool(l[labelOperatorManagedBy]) {
absentPromRules[ns] = append(absentPromRules[ns], pr)
} else {
if _, ok = promRules[ns]; !ok {
promRules[ns] = make(map[string]bool)
}
promRules[ns][n] = true
c.enqueuePromRule(pr)
}
}

// Add the PrometheusRules which no longer exist but their corresponding
// absent alerts have been added to an absentPrometheusRule.
// The syncHandler() will perform the appropriate cleanup for them.
for ns, promRuleNames := range promRules {
// Check all absentPrometheusRules for this namespace for alerts that
// don't belong to any PrometheusRule in promRuleNames.
for _, aPR := range absentPromRules[ns] {
// A map is used because there could be multiple RuleGroups that
// contain absent alerts concerning a single PrometheusRule therefore
// we check all the groups before adding it to the queue.
shouldAdd := make(map[string]struct{})
for _, g := range aPR.Spec.Groups {
n := getPromRulefromAbsentRuleGroup(g.Name)
if n != "" && !promRuleNames[n] {
shouldAdd[n] = struct{}{}
}
}

for n := range shouldAdd {
key := fmt.Sprintf("%s/%s", ns, n)
c.workqueue.Add(key)
}
}
}
}

// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (c *Controller) runWorker() {
// Run maintenance at start up.
if err := c.cleanUpOrphanedAbsentAlertsCluster(); err != nil {
c.logger.ErrorWithBackoff("msg", "could not cleanup orphaned absent alerts from cluster",
"err", err)
}

done := make(chan struct{})
reconcileT := time.NewTicker(reconciliationPeriod)
defer reconcileT.Stop()
maintenanceT := time.NewTicker(maintenancePeriod)
defer maintenanceT.Stop()
done := make(chan struct{})
go func() {
for {
select {
case <-done:
return
case <-reconcileT.C:
c.enqueueAllObjects()
case <-maintenanceT.C:
if err := c.cleanUpOrphanedAbsentAlertsCluster(); err != nil {
c.logger.ErrorWithBackoff("msg", "could not cleanup orphaned absent alerts from cluster",
"err", err)
}
c.enqueueAllPrometheusRules()
}
}
}()
Expand Down

0 comments on commit a17cfc4

Please sign in to comment.