diff --git a/.gitignore b/.gitignore index 5af960bd..e0cc597c 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ build/ dist/ # test artifacts +*.actual test/bin/ diff --git a/.golangci.yml b/.golangci.yml index 112c4fc0..e8819eef 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -29,7 +29,6 @@ linters: - ineffassign - interfacer - lll - - maligned - misspell - nakedret - prealloc @@ -71,10 +70,6 @@ linters-settings: lll: line-length: 140 - maligned: - # Print struct with more effective memory layout. - suggest-new: true - unused: # Treat code as a program (not a library) and report unused exported # identifiers. diff --git a/internal/controller/cleanup.go b/internal/controller/cleanup.go index 54ac268a..39f8f7c8 100644 --- a/internal/controller/cleanup.go +++ b/internal/controller/cleanup.go @@ -40,61 +40,6 @@ func getPromRulefromAbsentRuleGroup(group string) string { return sL[0] } -// cleanUpOrphanedAbsentAlertsNamespace deletes orphaned absent alerts across a -// cluster. -func (c *Controller) cleanUpOrphanedAbsentAlertsCluster() error { - // Get names of all PrometheusRules that exist in the informer's cache: - // map of namespace to map[promRuleName]bool - promRules := make(map[string]map[string]bool) - objs := c.promRuleInformer.GetStore().List() - for _, v := range objs { - pr, ok := v.(*monitoringv1.PrometheusRule) - if !ok { - continue - } - ns := pr.GetNamespace() - if _, ok = promRules[ns]; !ok { - promRules[ns] = make(map[string]bool) - } - promRules[ns][pr.GetName()] = true - } - - for namespace, promRuleNames := range promRules { - // Get all absentPrometheusRules for this namespace. - prList, err := c.promClientset.MonitoringV1().PrometheusRules(namespace). - List(context.Background(), metav1.ListOptions{LabelSelector: labelOperatorManagedBy}) - if err != nil { - return errors.Wrap(err, "could not list AbsentPrometheusRules") - } - - for _, pr := range prList.Items { - // Check if there are any alerts in this absentPrometheusRule that - // don't belong to any PrometheusRule in promRuleNames. - // - // cleanup map is used because there could be multiple RuleGroups - // that contain absent alerts concerning a single PrometheusRule - // therefore we check all the groups before doing any cleanup. - cleanup := make(map[string]struct{}) - for _, g := range pr.Spec.Groups { - n := getPromRulefromAbsentRuleGroup(g.Name) - if n != "" && !promRuleNames[n] { - cleanup[n] = struct{}{} - } - } - - aPR := &absentPrometheusRule{PrometheusRule: pr} - for n := range cleanup { - if err := c.cleanUpOrphanedAbsentAlerts(n, aPR); err != nil { - return err - } - c.metrics.SuccessfulPrometheusRuleReconcileTime.DeleteLabelValues(namespace, n) - } - } - } - - return nil -} - // cleanUpOrphanedAbsentAlertsNamespace deletes orphaned absent alerts // concerning a specific PrometheusRule from a namespace. // diff --git a/internal/controller/controller.go b/internal/controller/controller.go index ff97de53..52551069 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -60,25 +60,20 @@ const ( DefaultResyncPeriod = 10 * time.Minute // reconciliationPeriod is the period after which all the objects in the - // informer's cache will be added to the workqueue so that they can be - // processed by the syncHandler(). + // informer's cache and orphaned objects with absent alerts are added to + // the workqueue so that they can be processed by the syncHandler(). // // The informer calls the event handlers only if the resource state // changes. We do this additional reconciliation as a liveness check to see - // if the operator is working as intended. + // if the operator is working as intended, and as manual maintenance for + // cases when the informer has missed a PrometheusRule deletion/rename. reconciliationPeriod = 5 * time.Minute - - // maintenancePeriod is the period after which the worker will clean up any - // orphaned absent alerts across the entire cluster. - // - // We do this manual cleanup in case a PrometheusRule is deleted and the - // update of the shared informer's cache has missed it. - maintenancePeriod = 1 * time.Hour ) // Controller is the controller implementation for acting on PrometheusRule // resources. type Controller struct { + isTest bool logger *log.Logger metrics *Metrics @@ -95,34 +90,39 @@ type Controller struct { workqueue workqueue.RateLimitingInterface } +// Opts holds the data required for instantiating a Controller. +type Opts struct { + IsTest bool + Logger *log.Logger + KeepLabel map[string]bool + PrometheusRegistry prometheus.Registerer + Config *rest.Config + ResyncPeriod time.Duration +} + // New creates a new Controller. -func New( - cfg *rest.Config, - resyncPeriod time.Duration, - r prometheus.Registerer, - keepLabel map[string]bool, - logger *log.Logger) (*Controller, error) { - - kClient, err := kubernetes.NewForConfig(cfg) +func New(opts Opts) (*Controller, error) { + kClient, err := kubernetes.NewForConfig(opts.Config) if err != nil { return nil, errors.Wrap(err, "instantiating kubernetes client failed") } - pClient, err := monitoringclient.NewForConfig(cfg) + pClient, err := monitoringclient.NewForConfig(opts.Config) if err != nil { return nil, errors.Wrap(err, "instantiating monitoring client failed") } c := &Controller{ - logger: logger, - metrics: NewMetrics(r), - keepLabel: keepLabel, + isTest: opts.IsTest, + logger: opts.Logger, + metrics: NewMetrics(opts.PrometheusRegistry), + keepLabel: opts.KeepLabel, kubeClientset: kClient, promClientset: pClient, workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "prometheusrules"), } c.keepTierServiceLabels = c.keepLabel[LabelTier] && c.keepLabel[LabelService] - ruleInf := informers.NewSharedInformerFactory(pClient, resyncPeriod).Monitoring().V1().PrometheusRules() + ruleInf := informers.NewSharedInformerFactory(pClient, opts.ResyncPeriod).Monitoring().V1().PrometheusRules() c.promRuleLister = ruleInf.Lister() c.promRuleInformer = ruleInf.Informer() c.promRuleInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -210,44 +210,76 @@ func (c *Controller) enqueuePromRule(obj interface{}) { c.workqueue.Add(key) } -// enqueueAllObjects adds all objects in the shared informer's cache to the -// workqueue. -func (c *Controller) enqueueAllObjects() { +// enqueueAllPrometheusRules adds all PrometheusRules to the workqueue. +func (c *Controller) enqueueAllPrometheusRules() { + // promRules is a map of namespace to map[promRuleName]bool. + promRules := make(map[string]map[string]bool) + // absentPromRules is a map of namespace to a slice of absentPrometheusRule. + absentPromRules := make(map[string][]*monitoringv1.PrometheusRule) + + // Add all PrometheusRules in the shared informer's cache to the workqueue + // and get names of all PrometheusRules that exist in the informer's cache. objs := c.promRuleInformer.GetStore().List() for _, v := range objs { - if pr, ok := v.(*monitoringv1.PrometheusRule); ok { + pr, ok := v.(*monitoringv1.PrometheusRule) + if !ok { + continue + } + + ns := pr.GetNamespace() + n := pr.GetName() + + if l := pr.GetLabels(); mustParseBool(l[labelOperatorManagedBy]) { + absentPromRules[ns] = append(absentPromRules[ns], pr) + } else { + if _, ok = promRules[ns]; !ok { + promRules[ns] = make(map[string]bool) + } + promRules[ns][n] = true c.enqueuePromRule(pr) } } + + // Add the PrometheusRules which no longer exist but their corresponding + // absent alerts have been added to an absentPrometheusRule. + // The syncHandler() will perform the appropriate cleanup for them. + for ns, promRuleNames := range promRules { + // Check all absentPrometheusRules for this namespace for alerts that + // don't belong to any PrometheusRule in promRuleNames. + for _, aPR := range absentPromRules[ns] { + // A map is used because there could be multiple RuleGroups that + // contain absent alerts concerning a single PrometheusRule therefore + // we check all the groups before adding it to the queue. + shouldAdd := make(map[string]struct{}) + for _, g := range aPR.Spec.Groups { + n := getPromRulefromAbsentRuleGroup(g.Name) + if n != "" && !promRuleNames[n] { + shouldAdd[n] = struct{}{} + } + } + + for n := range shouldAdd { + key := fmt.Sprintf("%s/%s", ns, n) + c.workqueue.Add(key) + } + } + } } // runWorker is a long-running function that will continually call the // processNextWorkItem function in order to read and process a message on the // workqueue. func (c *Controller) runWorker() { - // Run maintenance at start up. - if err := c.cleanUpOrphanedAbsentAlertsCluster(); err != nil { - c.logger.ErrorWithBackoff("msg", "could not cleanup orphaned absent alerts from cluster", - "err", err) - } - + done := make(chan struct{}) reconcileT := time.NewTicker(reconciliationPeriod) defer reconcileT.Stop() - maintenanceT := time.NewTicker(maintenancePeriod) - defer maintenanceT.Stop() - done := make(chan struct{}) go func() { for { select { case <-done: return case <-reconcileT.C: - c.enqueueAllObjects() - case <-maintenanceT.C: - if err := c.cleanUpOrphanedAbsentAlertsCluster(); err != nil { - c.logger.ErrorWithBackoff("msg", "could not cleanup orphaned absent alerts from cluster", - "err", err) - } + c.enqueueAllPrometheusRules() } } }() @@ -318,6 +350,7 @@ func (c *Controller) syncHandler(key string) error { err = c.cleanUpOrphanedAbsentAlertsNamespace(name, namespace) if err == nil { c.metrics.SuccessfulPrometheusRuleReconcileTime.DeleteLabelValues(namespace, name) + return nil } default: // Requeue object for later processing. @@ -335,7 +368,13 @@ func (c *Controller) syncHandler(key string) error { return err } - c.metrics.SuccessfulPrometheusRuleReconcileTime.WithLabelValues(namespace, name).SetToCurrentTime() + gauge := c.metrics.SuccessfulPrometheusRuleReconcileTime.WithLabelValues(namespace, name) + if c.isTest { + gauge.Set(float64(1)) + } else { + gauge.SetToCurrentTime() + } + return nil } diff --git a/main.go b/main.go index 181e6847..37f60fa7 100644 --- a/main.go +++ b/main.go @@ -87,7 +87,16 @@ func main() { if err != nil { logger.Fatal("msg", "instantiating cluster config failed", "err", err) } - c, err := controller.New(cfg, controller.DefaultResyncPeriod, r, keepLabelMap, log.With(logger, "component", "controller")) + + opts := controller.Opts{ + IsTest: false, + Logger: log.With(logger, "component", "controller"), + KeepLabel: keepLabelMap, + PrometheusRegistry: r, + Config: cfg, + ResyncPeriod: controller.DefaultResyncPeriod, + } + c, err := controller.New(opts) if err != nil { logger.Fatal("msg", "could not instantiate controller", "err", err) } diff --git a/test/controller_test.go b/test/controller_test.go index 6b9febcf..8bedff5d 100644 --- a/test/controller_test.go +++ b/test/controller_test.go @@ -17,12 +17,20 @@ package test import ( "context" "fmt" + "io" + "io/ioutil" + "net/http" + "net/http/httptest" + "os" + "os/exec" + "path/filepath" "strings" "time" monitoringv1 "github.com/coreos/prometheus-operator/pkg/apis/monitoring/v1" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/prometheus/client_golang/prometheus/promhttp" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/intstr" "sigs.k8s.io/controller-runtime/pkg/client" @@ -216,5 +224,38 @@ var _ = Describe("Controller", func() { Expect(apierrors.IsNotFound(err)).To(Equal(true)) }) }) + + It("should cleanup orphaned gauge metrics", func() { + handler := promhttp.HandlerFor(reg, promhttp.HandlerOpts{}) + method := http.MethodGet + path := "/metrics" + var requestBody io.Reader + request := httptest.NewRequest(method, path, requestBody) + + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, request) + + response := recorder.Result() + Expect(response.StatusCode).To(Equal(200)) + responseBytes, err := ioutil.ReadAll(response.Body) + Expect(err).ToNot(HaveOccurred()) + + // Write actual content to file to make it easy to copy the + // computed result over to the fixture path when a new test is + // added or an existing one is modified. + fixturePath, err := filepath.Abs("fixtures/metrics.prom") + Expect(err).ToNot(HaveOccurred()) + actualPath := fixturePath + ".actual" + err = ioutil.WriteFile(actualPath, responseBytes, 0600) + Expect(err).ToNot(HaveOccurred()) + + cmd := exec.Command("diff", "-u", fixturePath, actualPath) + cmd.Stdin = nil + cmd.Stdout = GinkgoWriter + cmd.Stderr = os.Stderr + if err = cmd.Run(); err != nil { + Fail(fmt.Sprintf("%s %s: body does not match", method, path)) + } + }) }) }) diff --git a/test/fixtures/metrics.prom b/test/fixtures/metrics.prom new file mode 100644 index 00000000..8ce193ef --- /dev/null +++ b/test/fixtures/metrics.prom @@ -0,0 +1,4 @@ +# HELP absent_metrics_operator_successful_reconcile_time The time at which a specific PrometheusRule was successfully reconciled by the operator. +# TYPE absent_metrics_operator_successful_reconcile_time gauge +absent_metrics_operator_successful_reconcile_time{prometheusrule_name="openstack-limes-api.alerts",prometheusrule_namespace="resmgmt"} 1 +absent_metrics_operator_successful_reconcile_time{prometheusrule_name="openstack-swift.alerts",prometheusrule_namespace="swift"} 1 diff --git a/test/suite_test.go b/test/suite_test.go index df74b00e..46db8f91 100644 --- a/test/suite_test.go +++ b/test/suite_test.go @@ -46,6 +46,7 @@ var ( k8sClient client.Client c *controller.Controller + reg *prometheus.Registry wg *errgroup.Group cancel context.CancelFunc ) @@ -81,19 +82,25 @@ var _ = BeforeSuite(func() { k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) Expect(err).ToNot(HaveOccurred()) - // NOTE: We start the controller before adding objects since the items are - // queued by the controller sequentially and we depend on this behavior in - // our mock assertion. By("starting controller") - l := log.New(GinkgoWriter, log.FormatLogfmt, true) - Expect(err).ToNot(HaveOccurred()) - kL := map[string]bool{ - controller.LabelTier: true, - controller.LabelService: true, + reg = prometheus.NewPedanticRegistry() + opts := controller.Opts{ + IsTest: true, + Logger: log.New(GinkgoWriter, log.FormatLogfmt, true), + PrometheusRegistry: reg, + Config: cfg, + ResyncPeriod: 1 * time.Second, + KeepLabel: map[string]bool{ + controller.LabelTier: true, + controller.LabelService: true, + }, } - c, err = controller.New(cfg, 1*time.Second, prometheus.NewRegistry(), kL, l) + c, err = controller.New(opts) Expect(err).ToNot(HaveOccurred()) + // NOTE: We start the controller before adding objects since the items are + // queued by the controller sequentially and we depend on this behavior in + // our mock assertion. ctx := context.Background() ctx, cancel = context.WithCancel(ctx) wg, ctx = errgroup.WithContext(ctx)