Skip to content

Commit

Permalink
controller: enforce additional reconciliation
Browse files Browse the repository at this point in the history
  • Loading branch information
talal committed Aug 12, 2020
1 parent 7a64dcc commit 11b66e3
Show file tree
Hide file tree
Showing 148 changed files with 30,097 additions and 44 deletions.
3 changes: 0 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,6 @@ $ absent-metrics-operator --kubeconfig="$KUBECONFIG"

`kubeconfig` flag is only required if running outside a cluster.

You can configure the resync period (time period between each operator scan)
using the `resync-period` flag.

For detailed usage instructions:

```
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ require (
github.com/onsi/ginkgo v1.14.0
github.com/onsi/gomega v1.10.1
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.7.1
github.com/prometheus/prometheus v1.8.2-0.20200609102542-5d7e3e970602
github.com/sapcc/go-bits v0.0.0-20200719195243-6f202ca5296a
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
k8s.io/api v0.18.6
k8s.io/apimachinery v0.18.6
Expand Down
30 changes: 27 additions & 3 deletions go.sum

Large diffs are not rendered by default.

100 changes: 75 additions & 25 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
monitoringlisters "github.com/coreos/prometheus-operator/pkg/client/listers/monitoring/v1"
monitoringclient "github.com/coreos/prometheus-operator/pkg/client/versioned"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -45,10 +46,26 @@ const (
labelDisable = "absent-metrics-operator/disable"
)

const (
// DefaultResyncPeriod is the period after which the shared informer will
// refresh its cache.
DefaultResyncPeriod = 10 * time.Minute

// DefaultReconciliationPeriod is the period after which all the objects in
// the informer's cache will be added to the workqueue so that they can be
// processed by the syncHandler().
//
// The informer calls the event handlers only if the resource state
// changes. We force this additional reconciliation as a liveness check to
// see if the operator is working as intended.
DefaultReconciliationPeriod = 5 * time.Minute
)

// Controller is the controller implementation for acting on PrometheusRule
// resources.
type Controller struct {
logger *log.Logger
logger *log.Logger
metrics *Metrics

kubeClientset kubernetes.Interface
promClientset monitoringclient.Interface
Expand All @@ -59,7 +76,7 @@ type Controller struct {
}

// New creates a new Controller.
func New(cfg *rest.Config, resyncPeriod time.Duration, logger *log.Logger) (*Controller, error) {
func New(cfg *rest.Config, resyncPeriod time.Duration, r prometheus.Registerer, logger *log.Logger) (*Controller, error) {
kClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, errors.Wrap(err, "instantiating kubernetes client failed")
Expand All @@ -72,6 +89,7 @@ func New(cfg *rest.Config, resyncPeriod time.Duration, logger *log.Logger) (*Con

c := &Controller{
logger: logger,
metrics: NewMetrics(r),
kubeClientset: kClient,
promClientset: pClient,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "prometheusrules"),
Expand Down Expand Up @@ -164,12 +182,39 @@ func (c *Controller) enqueuePromRule(obj interface{}) {
c.workqueue.Add(key)
}

// enqueueAllObjects adds all objects in the shared informer's cache to the
// workqueue.
func (c *Controller) enqueueAllObjects() {
objs := c.promRuleInformer.GetStore().List()
for _, v := range objs {
if pr, ok := v.(*monitoringv1.PrometheusRule); ok {
c.enqueuePromRule(pr)
}
}
}

// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (c *Controller) runWorker() {
ticker := time.NewTicker(DefaultReconciliationPeriod)
defer ticker.Stop()
done := make(chan struct{})
go func() {
for {
select {
case <-done:
return
case <-ticker.C:
c.enqueueAllObjects()
}
}
}()

for c.processNextWorkItem() {
}

done <- struct{}{}
}

// processNextWorkItem will read a single work item off the workqueue and
Expand Down Expand Up @@ -210,6 +255,7 @@ func (c *Controller) processNextWorkItem() bool {
return true
}

//nolint:gocyclo,gocognit
// syncHandler gets a PrometheusRule from the queue and updates the
// corresponding absent metric alert PrometheusRule for it.
func (c *Controller) syncHandler(key string) error {
Expand Down Expand Up @@ -243,25 +289,24 @@ func (c *Controller) syncHandler(key string) error {
return nil
}

// Get the PrometheusRule resource that defines the absent metric alert
// rules for this namespace.
absentPromRuleName := fmt.Sprintf("%s-absent-metric-alert-rules", promServerName)
absentPromRule, err := c.promClientset.MonitoringV1().PrometheusRules(namespace).
Get(context.Background(), absentPromRuleName, metav1.GetOptions{})

// Default tier and service label values to use for absent metric alerts.
// See parseRuleGroups() for info on why we need this.
var tier, service string

// Get the PrometheusRule resource that defines the absent metric alert
// rules for this namespace.
ctx := context.Background()
absentPromRuleExists := false
absentPromRuleName := fmt.Sprintf("%s-absent-metric-alert-rules", promServerName)
absentPromRule, err := c.promClientset.MonitoringV1().PrometheusRules(namespace).Get(ctx, absentPromRuleName, metav1.GetOptions{})
switch {
case err == nil:
absentPromRuleExists = true
tier, service = getTierAndService(absentPromRule.Spec.Groups)
case apierrors.IsNotFound(err):
// Try to get a value for tier and service by traversing through all the
// PrometheusRules for the specific Prometheus server in this namespace.
prList, err := c.promClientset.MonitoringV1().PrometheusRules(namespace).
List(context.Background(), metav1.ListOptions{})
prList, err := c.promClientset.MonitoringV1().PrometheusRules(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
// Requeue object for later processing.
return errors.Wrap(err, "could not list PrometheusRules")
Expand All @@ -279,9 +324,10 @@ func (c *Controller) syncHandler(key string) error {
return errors.Wrap(err, "could not get absent PrometheusRule "+absentPromRuleName)
}
if tier == "" || service == "" {
// We shouldn't arrive at this point because this would mean that
// there was not a single alert rule in the namespace that did not
// use templating for its tier and service labels.
// We shouldn't arrive at this point because this would mean that there
// was not a single alert rule for the prometheus server in this
// namespace that did not use templating for its tier and service
// labels.
// Requeue object for later processing.
return errors.New("could not find default tier and service")
}
Expand All @@ -296,18 +342,22 @@ func (c *Controller) syncHandler(key string) error {
c.logger.ErrorWithBackoff("msg", "could not parse rule groups", "key", key, "err", err)
return nil
}
if len(rg) == 0 {
// This can happen when changes have been made to a PrometheusRule
// that result in no absent alert rules. E.g. absent() operator was used.
if absentPromRuleExists {
// In this case we clean up orphaned absent alert rules.
return c.deleteAbsentAlertRules(namespace, name, absentPromRule)
}
return nil // nothing to do
}

if absentPromRuleExists {
return c.updateAbsentPrometheusRule(namespace, absentPromRule, rg)
switch lenRg := len(rg); {
case lenRg == 0 && absentPromRuleExists:
// This can happen when changes have been made to a PrometheusRule that
// result in no absent alert rules. E.g. absent() operator was used. In
// this case we clean up orphaned absent alert rules.
err = c.deleteAbsentAlertRules(namespace, name, absentPromRule)
case lenRg > 0 && absentPromRuleExists:
err = c.updateAbsentPrometheusRule(namespace, absentPromRule, rg)
case lenRg > 0:
err = c.createAbsentPrometheusRule(namespace, absentPromRuleName, promServerName, rg)
}
return c.createAbsentPrometheusRule(namespace, absentPromRuleName, promServerName, rg)
if err != nil {
return err
}

c.metrics.SuccessfulPrometheusRuleReconcileTime.WithLabelValues(namespace, name).SetToCurrentTime()
return nil
}
38 changes: 38 additions & 0 deletions internal/controller/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2020 SAP SE
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controller

import "github.com/prometheus/client_golang/prometheus"

// Metrics represents the metrics associated with the controller.
type Metrics struct {
SuccessfulPrometheusRuleReconcileTime *prometheus.GaugeVec
}

// NewMetrics returns a new Metrics and registers its metrics with the provided
// Registerer.
func NewMetrics(r prometheus.Registerer) *Metrics {
m := Metrics{
SuccessfulPrometheusRuleReconcileTime: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "absent_metrics_operator_successful_reconcile_time",
Help: "The time at which a specific PrometheusRule was successfully reconciled by the operator.",
},
[]string{"namespace", "name"},
),
}
r.MustRegister(m.SuccessfulPrometheusRuleReconcileTime)
return &m
}
45 changes: 34 additions & 11 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ package main
import (
"flag"
"fmt"
"net/http"
"os"
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sapcc/go-bits/httpee"
_ "k8s.io/client-go/plugin/pkg/client/auth/oidc" // load auth plugin
"k8s.io/client-go/tools/clientcmd"

Expand Down Expand Up @@ -53,25 +57,16 @@ var (
)

func main() {
var logLevel, logFormat, kubeconfig, resyncPeriod string
var logLevel, logFormat, kubeconfig string
flagset := flag.CommandLine
flagset.StringVar(&logLevel, "log-level", log.LevelInfo,
fmt.Sprintf("Log level to use. Possible values: %s", strings.Join(availableLogLevels, ", ")))
flagset.StringVar(&logFormat, "log-format", log.FormatLogfmt,
fmt.Sprintf("Log format to use. Possible values: %s", strings.Join(availableLogFormats, ", ")))
flagset.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster")
flagset.StringVar(&resyncPeriod, "resync-period", "30s",
"The controller's resync period. Valid time units are 's', 'm', 'h'. Minimum acceptable value is 15s")
if err := flagset.Parse(os.Args[1:]); err != nil {
logFatalf("could not parse flagset: %s", err.Error())
}
dur, err := time.ParseDuration(resyncPeriod)
if err != nil {
logFatalf("could not parse resync period: %s", err.Error())
}
if dur < 15*time.Second {
logFatalf("minimum acceptable value for resync period is 15s, got: %s", dur)
}

logger, err := log.New(os.Stdout, logFormat, logLevel)
if err != nil {
Expand All @@ -81,19 +76,30 @@ func main() {
logger.Info("msg", "starting absent-metrics-operator",
"version", version, "git-commit", gitCommitHash, "build-date", buildDate)

r := prometheus.NewRegistry()

// Create controller
cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
logger.Fatal("msg", "instantiating cluster config failed", "err", err)
}
c, err := controller.New(cfg, dur, log.With(*logger, "component", "controller"))
c, err := controller.New(cfg, controller.DefaultResyncPeriod, r, log.With(*logger, "component", "controller"))
if err != nil {
logger.Fatal("msg", "could not instantiate controller", "err", err)
}

// Set up signal handling for graceful shutdown
wg, ctx := signals.SetupSignalHandlerAndRoutineGroup(logger)

// Serve metrics at port "9659". This port has been allocated for absent
// metrics operator.
// See: https://github.com/prometheus/prometheus/wiki/Default-port-allocations
listenAddr := ":9659"
http.HandleFunc("/", landingPageHandler(logger))
http.Handle("/metrics", promhttp.HandlerFor(r, promhttp.HandlerOpts{}))
logger.Info("msg", "listening on "+listenAddr)
wg.Go(func() error { return httpee.ListenAndServeContext(ctx, listenAddr, nil) })

// Start controller
wg.Go(func() error { return c.Run(ctx.Done()) })

Expand All @@ -107,3 +113,20 @@ func logFatalf(format string, a ...interface{}) {
fmt.Fprintf(os.Stderr, "FATAL: "+format+"\n", a...)
os.Exit(1)
}

func landingPageHandler(logger *log.Logger) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
pageBytes := []byte(`<html>
<head><title>Absent Metrics Operator</title></head>
<body>
<h1>Absent Metrics Operator</h1>
<p><a href="/metrics">Metrics</a></p>
<p><a href="https://github.com/sapcc/absent-metrics-operator">Source Code</a></p>
</body>
</html>`)

if _, err := w.Write(pageBytes); err != nil {
logger.ErrorWithBackoff("msg", "could not write landing page bytes", "err", err)
}
}
}
3 changes: 2 additions & 1 deletion test/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
monitoringv1 "github.com/coreos/prometheus-operator/pkg/apis/monitoring/v1"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -85,7 +86,7 @@ var _ = BeforeSuite(func() {
By("starting controller")
l, err := log.New(GinkgoWriter, log.FormatLogfmt, log.LevelAll)
Expect(err).ToNot(HaveOccurred())
c, err := controller.New(cfg, 1*time.Second, l)
c, err := controller.New(cfg, 1*time.Second, prometheus.NewRegistry(), l)
Expect(err).ToNot(HaveOccurred())

ctx := context.Background()
Expand Down
20 changes: 20 additions & 0 deletions vendor/github.com/beorn7/perks/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 11b66e3

Please sign in to comment.