Skip to content

Commit

Permalink
Merge pull request #10 from sapcc/orphaned-gauge-fix
Browse files Browse the repository at this point in the history
Orphaned gauge fix
  • Loading branch information
talal committed Sep 23, 2020
2 parents 8945ec4 + 37ca33d commit 3e1fa0f
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 113 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ build/
dist/

# test artifacts
*.actual
test/bin/
5 changes: 0 additions & 5 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ linters:
- ineffassign
- interfacer
- lll
- maligned
- misspell
- nakedret
- prealloc
Expand Down Expand Up @@ -71,10 +70,6 @@ linters-settings:
lll:
line-length: 140

maligned:
# Print struct with more effective memory layout.
suggest-new: true

unused:
# Treat code as a program (not a library) and report unused exported
# identifiers.
Expand Down
55 changes: 0 additions & 55 deletions internal/controller/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,61 +40,6 @@ func getPromRulefromAbsentRuleGroup(group string) string {
return sL[0]
}

// cleanUpOrphanedAbsentAlertsNamespace deletes orphaned absent alerts across a
// cluster.
func (c *Controller) cleanUpOrphanedAbsentAlertsCluster() error {
// Get names of all PrometheusRules that exist in the informer's cache:
// map of namespace to map[promRuleName]bool
promRules := make(map[string]map[string]bool)
objs := c.promRuleInformer.GetStore().List()
for _, v := range objs {
pr, ok := v.(*monitoringv1.PrometheusRule)
if !ok {
continue
}
ns := pr.GetNamespace()
if _, ok = promRules[ns]; !ok {
promRules[ns] = make(map[string]bool)
}
promRules[ns][pr.GetName()] = true
}

for namespace, promRuleNames := range promRules {
// Get all absentPrometheusRules for this namespace.
prList, err := c.promClientset.MonitoringV1().PrometheusRules(namespace).
List(context.Background(), metav1.ListOptions{LabelSelector: labelOperatorManagedBy})
if err != nil {
return errors.Wrap(err, "could not list AbsentPrometheusRules")
}

for _, pr := range prList.Items {
// Check if there are any alerts in this absentPrometheusRule that
// don't belong to any PrometheusRule in promRuleNames.
//
// cleanup map is used because there could be multiple RuleGroups
// that contain absent alerts concerning a single PrometheusRule
// therefore we check all the groups before doing any cleanup.
cleanup := make(map[string]struct{})
for _, g := range pr.Spec.Groups {
n := getPromRulefromAbsentRuleGroup(g.Name)
if n != "" && !promRuleNames[n] {
cleanup[n] = struct{}{}
}
}

aPR := &absentPrometheusRule{PrometheusRule: pr}
for n := range cleanup {
if err := c.cleanUpOrphanedAbsentAlerts(n, aPR); err != nil {
return err
}
c.metrics.SuccessfulPrometheusRuleReconcileTime.DeleteLabelValues(namespace, n)
}
}
}

return nil
}

// cleanUpOrphanedAbsentAlertsNamespace deletes orphaned absent alerts
// concerning a specific PrometheusRule from a namespace.
//
Expand Down
125 changes: 82 additions & 43 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,25 +60,20 @@ const (
DefaultResyncPeriod = 10 * time.Minute

// reconciliationPeriod is the period after which all the objects in the
// informer's cache will be added to the workqueue so that they can be
// processed by the syncHandler().
// informer's cache and orphaned objects with absent alerts are added to
// the workqueue so that they can be processed by the syncHandler().
//
// The informer calls the event handlers only if the resource state
// changes. We do this additional reconciliation as a liveness check to see
// if the operator is working as intended.
// if the operator is working as intended, and as manual maintenance for
// cases when the informer has missed a PrometheusRule deletion/rename.
reconciliationPeriod = 5 * time.Minute

// maintenancePeriod is the period after which the worker will clean up any
// orphaned absent alerts across the entire cluster.
//
// We do this manual cleanup in case a PrometheusRule is deleted and the
// update of the shared informer's cache has missed it.
maintenancePeriod = 1 * time.Hour
)

// Controller is the controller implementation for acting on PrometheusRule
// resources.
type Controller struct {
isTest bool
logger *log.Logger
metrics *Metrics

Expand All @@ -95,34 +90,39 @@ type Controller struct {
workqueue workqueue.RateLimitingInterface
}

// Opts holds the data required for instantiating a Controller.
type Opts struct {
IsTest bool
Logger *log.Logger
KeepLabel map[string]bool
PrometheusRegistry prometheus.Registerer
Config *rest.Config
ResyncPeriod time.Duration
}

// New creates a new Controller.
func New(
cfg *rest.Config,
resyncPeriod time.Duration,
r prometheus.Registerer,
keepLabel map[string]bool,
logger *log.Logger) (*Controller, error) {

kClient, err := kubernetes.NewForConfig(cfg)
func New(opts Opts) (*Controller, error) {
kClient, err := kubernetes.NewForConfig(opts.Config)
if err != nil {
return nil, errors.Wrap(err, "instantiating kubernetes client failed")
}

pClient, err := monitoringclient.NewForConfig(cfg)
pClient, err := monitoringclient.NewForConfig(opts.Config)
if err != nil {
return nil, errors.Wrap(err, "instantiating monitoring client failed")
}

c := &Controller{
logger: logger,
metrics: NewMetrics(r),
keepLabel: keepLabel,
isTest: opts.IsTest,
logger: opts.Logger,
metrics: NewMetrics(opts.PrometheusRegistry),
keepLabel: opts.KeepLabel,
kubeClientset: kClient,
promClientset: pClient,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "prometheusrules"),
}
c.keepTierServiceLabels = c.keepLabel[LabelTier] && c.keepLabel[LabelService]
ruleInf := informers.NewSharedInformerFactory(pClient, resyncPeriod).Monitoring().V1().PrometheusRules()
ruleInf := informers.NewSharedInformerFactory(pClient, opts.ResyncPeriod).Monitoring().V1().PrometheusRules()
c.promRuleLister = ruleInf.Lister()
c.promRuleInformer = ruleInf.Informer()
c.promRuleInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -210,44 +210,76 @@ func (c *Controller) enqueuePromRule(obj interface{}) {
c.workqueue.Add(key)
}

// enqueueAllObjects adds all objects in the shared informer's cache to the
// workqueue.
func (c *Controller) enqueueAllObjects() {
// enqueueAllPrometheusRules adds all PrometheusRules to the workqueue.
func (c *Controller) enqueueAllPrometheusRules() {
// promRules is a map of namespace to map[promRuleName]bool.
promRules := make(map[string]map[string]bool)
// absentPromRules is a map of namespace to a slice of absentPrometheusRule.
absentPromRules := make(map[string][]*monitoringv1.PrometheusRule)

// Add all PrometheusRules in the shared informer's cache to the workqueue
// and get names of all PrometheusRules that exist in the informer's cache.
objs := c.promRuleInformer.GetStore().List()
for _, v := range objs {
if pr, ok := v.(*monitoringv1.PrometheusRule); ok {
pr, ok := v.(*monitoringv1.PrometheusRule)
if !ok {
continue
}

ns := pr.GetNamespace()
n := pr.GetName()

if l := pr.GetLabels(); mustParseBool(l[labelOperatorManagedBy]) {
absentPromRules[ns] = append(absentPromRules[ns], pr)
} else {
if _, ok = promRules[ns]; !ok {
promRules[ns] = make(map[string]bool)
}
promRules[ns][n] = true
c.enqueuePromRule(pr)
}
}

// Add the PrometheusRules which no longer exist but their corresponding
// absent alerts have been added to an absentPrometheusRule.
// The syncHandler() will perform the appropriate cleanup for them.
for ns, promRuleNames := range promRules {
// Check all absentPrometheusRules for this namespace for alerts that
// don't belong to any PrometheusRule in promRuleNames.
for _, aPR := range absentPromRules[ns] {
// A map is used because there could be multiple RuleGroups that
// contain absent alerts concerning a single PrometheusRule therefore
// we check all the groups before adding it to the queue.
shouldAdd := make(map[string]struct{})
for _, g := range aPR.Spec.Groups {
n := getPromRulefromAbsentRuleGroup(g.Name)
if n != "" && !promRuleNames[n] {
shouldAdd[n] = struct{}{}
}
}

for n := range shouldAdd {
key := fmt.Sprintf("%s/%s", ns, n)
c.workqueue.Add(key)
}
}
}
}

// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (c *Controller) runWorker() {
// Run maintenance at start up.
if err := c.cleanUpOrphanedAbsentAlertsCluster(); err != nil {
c.logger.ErrorWithBackoff("msg", "could not cleanup orphaned absent alerts from cluster",
"err", err)
}

done := make(chan struct{})
reconcileT := time.NewTicker(reconciliationPeriod)
defer reconcileT.Stop()
maintenanceT := time.NewTicker(maintenancePeriod)
defer maintenanceT.Stop()
done := make(chan struct{})
go func() {
for {
select {
case <-done:
return
case <-reconcileT.C:
c.enqueueAllObjects()
case <-maintenanceT.C:
if err := c.cleanUpOrphanedAbsentAlertsCluster(); err != nil {
c.logger.ErrorWithBackoff("msg", "could not cleanup orphaned absent alerts from cluster",
"err", err)
}
c.enqueueAllPrometheusRules()
}
}
}()
Expand Down Expand Up @@ -318,6 +350,7 @@ func (c *Controller) syncHandler(key string) error {
err = c.cleanUpOrphanedAbsentAlertsNamespace(name, namespace)
if err == nil {
c.metrics.SuccessfulPrometheusRuleReconcileTime.DeleteLabelValues(namespace, name)
return nil
}
default:
// Requeue object for later processing.
Expand All @@ -335,7 +368,13 @@ func (c *Controller) syncHandler(key string) error {
return err
}

c.metrics.SuccessfulPrometheusRuleReconcileTime.WithLabelValues(namespace, name).SetToCurrentTime()
gauge := c.metrics.SuccessfulPrometheusRuleReconcileTime.WithLabelValues(namespace, name)
if c.isTest {
gauge.Set(float64(1))
} else {
gauge.SetToCurrentTime()
}

return nil
}

Expand Down
11 changes: 10 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,16 @@ func main() {
if err != nil {
logger.Fatal("msg", "instantiating cluster config failed", "err", err)
}
c, err := controller.New(cfg, controller.DefaultResyncPeriod, r, keepLabelMap, log.With(logger, "component", "controller"))

opts := controller.Opts{
IsTest: false,
Logger: log.With(logger, "component", "controller"),
KeepLabel: keepLabelMap,
PrometheusRegistry: r,
Config: cfg,
ResyncPeriod: controller.DefaultResyncPeriod,
}
c, err := controller.New(opts)
if err != nil {
logger.Fatal("msg", "could not instantiate controller", "err", err)
}
Expand Down
41 changes: 41 additions & 0 deletions test/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,20 @@ package test
import (
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"os/exec"
"path/filepath"
"strings"
"time"

monitoringv1 "github.com/coreos/prometheus-operator/pkg/apis/monitoring/v1"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/prometheus/client_golang/prometheus/promhttp"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -216,5 +224,38 @@ var _ = Describe("Controller", func() {
Expect(apierrors.IsNotFound(err)).To(Equal(true))
})
})

It("should cleanup orphaned gauge metrics", func() {
handler := promhttp.HandlerFor(reg, promhttp.HandlerOpts{})
method := http.MethodGet
path := "/metrics"
var requestBody io.Reader
request := httptest.NewRequest(method, path, requestBody)

recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, request)

response := recorder.Result()
Expect(response.StatusCode).To(Equal(200))
responseBytes, err := ioutil.ReadAll(response.Body)
Expect(err).ToNot(HaveOccurred())

// Write actual content to file to make it easy to copy the
// computed result over to the fixture path when a new test is
// added or an existing one is modified.
fixturePath, err := filepath.Abs("fixtures/metrics.prom")
Expect(err).ToNot(HaveOccurred())
actualPath := fixturePath + ".actual"
err = ioutil.WriteFile(actualPath, responseBytes, 0600)
Expect(err).ToNot(HaveOccurred())

cmd := exec.Command("diff", "-u", fixturePath, actualPath)
cmd.Stdin = nil
cmd.Stdout = GinkgoWriter
cmd.Stderr = os.Stderr
if err = cmd.Run(); err != nil {
Fail(fmt.Sprintf("%s %s: body does not match", method, path))
}
})
})
})
4 changes: 4 additions & 0 deletions test/fixtures/metrics.prom
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# HELP absent_metrics_operator_successful_reconcile_time The time at which a specific PrometheusRule was successfully reconciled by the operator.
# TYPE absent_metrics_operator_successful_reconcile_time gauge
absent_metrics_operator_successful_reconcile_time{prometheusrule_name="openstack-limes-api.alerts",prometheusrule_namespace="resmgmt"} 1
absent_metrics_operator_successful_reconcile_time{prometheusrule_name="openstack-swift.alerts",prometheusrule_namespace="swift"} 1
Loading

0 comments on commit 3e1fa0f

Please sign in to comment.