Skip to content

Commit

Permalink
non-leader metrics-prometheus-collector eagerly computes metrics
Browse files Browse the repository at this point in the history
- sampler pushes metrics to all collector replicas
- non-leader collector computes and calculates metrics, but doesn't publish
  • Loading branch information
pwittrock committed Apr 3, 2023
1 parent c3d8370 commit 683b9b1
Show file tree
Hide file tree
Showing 12 changed files with 206 additions and 81 deletions.
78 changes: 43 additions & 35 deletions cmd/metrics-prometheus-collector/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ var (

// Options is set by flags
type Options struct {
ctrl.Options
Options ctrl.Options
LeaderOptions ctrl.Options
metricsPrometheusCollector string
}

Expand All @@ -91,13 +92,13 @@ func init() {
RootCmd.Flags().StringVar(&options.metricsPrometheusCollector, "collector-config-filepath", "", "Path to a MetricsPrometheusCollector json or yaml configuration file.")
_ = RootCmd.MarkFlagRequired("collector-config-filepath")

RootCmd.Flags().StringVar(&options.MetricsBindAddress, "http-addr", ":8080", "Bind address of the webservice exposing the metrics and other endpoints.")
RootCmd.Flags().StringVar(&options.Options.MetricsBindAddress, "http-addr", ":8080", "Bind address of the webservice exposing the metrics and other endpoints.")

// Default this to false so that it doesn't try to do leader election when run locally with `go run`
RootCmd.Flags().BoolVar(&options.LeaderElection, "leader-election", false, "Enable leader election")
RootCmd.Flags().BoolVar(&options.LeaderOptions.LeaderElection, "leader-election", false, "Enable leader election")

// This is used for integration testing only
RootCmd.Flags().StringVar(&options.LeaderElectionNamespace, "leader-election-namespace", "", "Set the namespace used for leader election -- for testing only")
RootCmd.Flags().StringVar(&options.LeaderOptions.LeaderElectionNamespace, "leader-election-namespace", "", "Set the namespace used for leader election -- for testing only")
_ = RootCmd.Flags().MarkHidden("leader-election-namespace")

RootCmd.Flags().StringVar(&logPath, "log-level-filepath", "", "path to log level file. The file must contain a single integer corresponding to the log level (e.g. 2)")
Expand Down Expand Up @@ -131,9 +132,9 @@ func RunE(_ *cobra.Command, _ []string) error {

log.Info("initializing with options",
"collector-config-filepath", options.metricsPrometheusCollector,
"http-addr", options.MetricsBindAddress,
"leader-election", options.LeaderElection,
"leader-election-namespace", options.LeaderElectionNamespace,
"http-addr", options.Options.MetricsBindAddress,
"leader-election", options.LeaderOptions.LeaderElection,
"leader-election-namespace", options.LeaderOptions.LeaderElectionNamespace,
"log-level-filepath", logPath,
"profile-memory", profileMemory)

Expand Down Expand Up @@ -174,6 +175,11 @@ func RunE(_ *cobra.Command, _ []string) error {
mgr, err := ctrl.NewManager(restConfig, options.Options)
checkError(log, err, "unable to create manager config")

// the leader manager is separate
leaderMGR, err := ctrl.NewManager(restConfig, options.Options)
checkError(log, err, "unable to create leader manager config")
metrics.LeaderMgr = leaderMGR

// register a simple ping health-check endpoint with the manager
err = mgr.AddMetricsExtraHandler("/healthz", healthz.CheckHandler{Checker: healthz.Ping})
checkError(log, err, "unable to configure health-check endpoint")
Expand All @@ -188,6 +194,11 @@ func RunE(_ *cobra.Command, _ []string) error {
if err != nil {
return err
}
if options.LeaderOptions.LeaderElection {
metrics.Col.IsLeaderElected.Store(false)
} else {
metrics.Col.IsLeaderElected.Store(true)
}
go func() {
// NOTE: we don't want to start getting node sample traffic until we are the leader
// so we use a readiness check in the config and don't report ourselves
Expand All @@ -202,9 +213,10 @@ func RunE(_ *cobra.Command, _ []string) error {
}()

// start the manager and the reconcilers
// Note: mgr.Start will return an error if leadership is lost and leader-election is used.
// Note: metrifcs.LeaderMgr.Start will return an error if leadership is lost and leader-election is used.
log.Info("starting manager")
checkError(log, mgr.Start(ctx), "")
go checkError(log, metrics.LeaderMgr.Start(ctx), "") // start the leader election manager
checkError(log, metrics.Mgr.Start(ctx), "")
return nil
}

Expand All @@ -213,8 +225,9 @@ func RunE(_ *cobra.Command, _ []string) error {
// metrics collectors *after* this instance is elected as the leader
// rather than at startup.
type MetricsServer struct {
Mgr ctrl.Manager
Col *collector.Collector
Mgr ctrl.Manager
LeaderMgr ctrl.Manager
Col *collector.Collector
collectorcontrollerv1alpha1.MetricsPrometheusCollector
}

Expand Down Expand Up @@ -247,28 +260,6 @@ func (ms *MetricsServer) ReadCollectorSpec() error {
// Start starts the metrics server
// nolint: gocyclo
func (ms *MetricsServer) Start(ctx context.Context) error {
mgr := ms.Mgr
// Important -- don't register metrics until we are the leader
// otherwise we will be duplicating values by sending them from multiple instances.
// This is especially bad since we won't have the utilization data.

// Note: This shouldn't be necessary -- this function isn't called until we are elected as the leader
// but it is here as a defensive check.
<-mgr.Elected()
log.Info("elected as leader -- serving capacity metrics")

// mark us as ready to start receiving node samples -- this requires configuring a readiness check
// in the pod yaml
ms.Col.UtilizationServer.IsLeaderElected.Store(true)
// this shouldn't be necessary since we shutdown when we aren't the leader, but
// serves as a sanity check in case we don't shutdown gracefully and quickly
defer func() {
ms.Col.UtilizationServer.IsLeaderElected.Store(false)
// Ensure that we exit so we stop getting utilization requests
time.Sleep(exitAfterLeaderElectionLoss)
log.Info("exiting after leader election loss")
os.Exit(0)
}()

// update the metric showing we are the leader
electedMetric.WithLabelValues(os.Getenv("POD_NAME")).Set(1)
Expand All @@ -289,12 +280,29 @@ func (ms *MetricsServer) Start(ctx context.Context) error {
log.V(1).Info("starting metrics-prometheus-collector", "MetricsPrometheusCollector", val)
}

// don't pre-compute and cache the metrics until we are the leader otherwise they may be ancient
// when we are elected
// start pre-computing metrics eagerly. we won't publish them until we are the leader.
go ms.Col.Run(ctx)

// start exporting metrics when we are the leader
// use a separate manager for leader election so that the infromer caches are warm
<-ms.LeaderMgr.Elected()
log.Info("elected as leader -- serving capacity metrics")

ctrlmetrics.Registry.MustRegister(ms.Col)

// mark us as ready to start receiving node samples -- this requires configuring a readiness check
// in the pod yaml
ms.Col.IsLeaderElected.Store(true)
// this shouldn't be necessary since we shutdown when we aren't the leader, but
// serves as a sanity check in case we don't shutdown gracefully and quickly
defer func() {
ms.Col.IsLeaderElected.Store(false)
// Ensure that we exit so we stop getting utilization requests
time.Sleep(exitAfterLeaderElectionLoss)
log.Info("exiting after leader election loss")
os.Exit(0)
}()

<-ctx.Done()
return ctx.Err()
}
Expand Down
Empty file.
2 changes: 1 addition & 1 deletion config/metrics-node-sampler/configmaps/sampler.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pushAddress: "metrics-prometheus-collector:9090" # push metrics to this service
pushHeadlessService: "metrics-prometheus-collector:9090" # push metrics to this service
pushFrequencyDuration: "1m" # push metrics to the collector every minute
exitOnConfigChange: true
buffer:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ buffer:
checkCreatedPodFrequencyDuration: 6s
containerdAddress: /run/containerd/containerd.sock
containerdNamespace: default
dnsSpec:
backoffSeconds: 0
failuresBeforeExit: 0
pollMinutes: 0
exitOnConfigChange: false
pbPort: 8080
pushAddress: ""
pushFrequencyDuration: 1m
pushHeadlessService: ""
reader:
cgroupVersion: v1
containerCacheSyncIntervalSeconds: 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ buffer:
checkCreatedPodFrequencyDuration: 10s
containerdAddress: /var/run/containerd/containerd.sock
containerdNamespace: k8s.io
dnsSpec:
backoffSeconds: 0
failuresBeforeExit: 0
pollMinutes: 0
exitOnConfigChange: false
pbPort: 7070
pushAddress: default.svg
pushFrequencyDuration: 2m
pushHeadlessService: default.svg
reader:
cgroupVersion: v1
containerCacheSyncIntervalSeconds: 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ checkCreatedPodFrequencyDuration: 10s
containerdAddress: /var/run/containerd/containerd.sock
containerdNamespace: k8s.io
pbPort: 7070
pushAddress: "default.svg"
pushHeadlessService: "default.svg"
pushFrequencyDuration: 2m
reader:
cgroupVersion: v1
Expand Down
10 changes: 9 additions & 1 deletion pkg/api/samplerserverv1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type MetricsNodeSampler struct {
RestPort int `json:"restPort" yaml:"restPort"`
SendPushMetricsRetryCount int `json:"sendPushMetricsRetryCount" yaml:"sendPushMetricsRetryCount"`

PushAddress string `json:"pushAddress" yaml:"pushAddress"`
PushHeadlessService string `json:"pushHeadlessService" yaml:"pushHeadlessService"`
PushFrequency string `json:"pushFrequencyDuration" yaml:"pushFrequencyDuration"`
CheckCreatedPodFrequency string `json:"checkCreatedPodFrequencyDuration" yaml:"checkCreatedPodFrequencyDuration"`

Expand All @@ -48,6 +48,14 @@ type MetricsNodeSampler struct {
// ContainerdNamespace specifies which containerd namespace to collect metrics for
// Default: default
ContainerdNamespace string `json:"containerdNamespace" yaml:"containerdNamespace"`

DNSSpec DNSSpec `json:"dnsSpec" yaml:"dnsSpec"`
}

type DNSSpec struct {
PollMinutes int `json:"pollMinutes" yaml:"pollMinutes"`
FailuresBeforeExit int `json:"failuresBeforeExit" yaml:"failuresBeforeExit"`
BackoffSeconds int `json:"backoffSeconds" yaml:"backoffSeconds"`
}

// Buffer configures the window of buffered metric samples.
Expand Down
7 changes: 6 additions & 1 deletion pkg/collector/capacity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"path"
"path/filepath"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -171,6 +172,7 @@ func TestCollector(t *testing.T) {
instance, err := NewCollector(context.Background(), c, &spec)
require.NoError(t, err)
tc.UnmarshalInputsStrict(map[string]interface{}{"input_usage.yaml": &instance.UtilizationServer})
instance.IsLeaderElected.Store(true)

if os.Getenv("TEST_COLLECTOR_FUNCS_OVERRIDE") == "true" {
fns := CollectorFuncs()
Expand Down Expand Up @@ -245,6 +247,7 @@ func TestCollectorOverride(t *testing.T) {
instance, err := NewCollector(context.Background(), c, &spec)
require.NoError(t, err)
tc.UnmarshalInputsStrict(map[string]interface{}{"input_usage.yaml": &instance.UtilizationServer})
instance.IsLeaderElected.Store(true)

reg := prometheus.NewPedanticRegistry()
require.NoError(t, reg.Register(instance))
Expand Down Expand Up @@ -323,6 +326,7 @@ func testSave(t *testing.T, saveJSON, saveProto bool) {
instance, err := NewCollector(context.Background(), c, &spec)
require.NoError(t, err)
tc.UnmarshalInputsStrict(map[string]interface{}{"input_usage.yaml": &instance.UtilizationServer})
instance.IsLeaderElected.Store(true)

ctx, cancelCtx := context.WithCancel(context.Background())
go instance.Run(ctx)
Expand Down Expand Up @@ -458,8 +462,9 @@ func TestLabels(t *testing.T) {
"input.yaml": &inputs,
})

c := Collector{MetricsPrometheusCollector: &inputs.Spec}
c := Collector{MetricsPrometheusCollector: &inputs.Spec, IsLeaderElected: &atomic.Bool{}}
require.NoError(t, c.init())
c.IsLeaderElected.Store(true)

instance := Labeler{
BuiltIn: builtInLabler{
Expand Down
43 changes: 38 additions & 5 deletions pkg/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -51,6 +52,8 @@ var (
type Collector struct {
// collector dependencies

IsLeaderElected *atomic.Bool

client.Client
Labeler Labeler
Reader ValueReader
Expand Down Expand Up @@ -89,6 +92,7 @@ func NewCollector(
UtilizationServer: utilization.Server{UtilizationServer: config.UtilizationServer},
startTime: time.Now(),
metrics: make(chan *cachedMetrics),
IsLeaderElected: &atomic.Bool{},
}

// initialize the extensions data
Expand Down Expand Up @@ -128,13 +132,33 @@ func (c *Collector) Describe(ch chan<- *prometheus.Desc) {
}

func (c *Collector) continuouslyCollect(ctx context.Context) {
cacheTicker := time.NewTicker(time.Minute * 5)
defer cacheTicker.Stop()
for {
start := time.Now()

// calculate the metrics from the current cluster state
log.Info("caching metrics")
m := c.cacheMetrics()
select {
case <-ctx.Done():
return // shutdown
case c.metrics <- m:
// write the cached metrics
log.Info("complete caching metrics", "seconds", time.Since(start).Seconds())

if c.IsLeaderElected.Load() {
// if we are the leader, write the metrics so they are published
select {
case <-ctx.Done():
return // shutdown
case c.metrics <- m:
// write the cached metrics
}
} else {
// if we are not the leader, we should continuously re-caculate the metrics
// so that we can publish them immediately when we become the leader.
select {
case <-ctx.Done():
return // shutdown
case <-cacheTicker.C:
// re-cacalculate the metrics without publishing them
}
}
}
}
Expand Down Expand Up @@ -232,9 +256,16 @@ func (c *Collector) cacheMetrics() *cachedMetrics {
// Collect returns the current state of all metrics of the collector.
// This there are cached metrics, Collect will return the cached metrics.
func (c *Collector) Collect(ch chan<- prometheus.Metric) {
if !c.IsLeaderElected.Load() {
// Never export metrics if we aren't the leader.
return
}

start := time.Now()
log.Info("starting collection")
if !c.PreComputeMetrics.Enabled {
c.collect(ch, nil)
log.Info("finished collection", "seconds", time.Since(start).Seconds())
return
}

Expand Down Expand Up @@ -265,6 +296,7 @@ func (c *Collector) Collect(ch chan<- prometheus.Metric) {
})
cacheCollectTime.Set(float64(time.Since(start).Seconds()))
cacheCollectTime.Collect(ch)
log.Info("finished collection", "seconds", time.Since(start).Seconds())
}

func (c *Collector) getSideCarConfigs() ([]*collectorcontrollerv1alpha1.SideCarConfig, error) {
Expand Down Expand Up @@ -838,6 +870,7 @@ func (c *Collector) collectContainers(o *CapacityObjects, ch chan<- prometheus.M

log := log.WithValues("start", start.Local().Format("2006-01-02 15:04:05"))
utilization := c.UtilizationServer.GetContainerUsageSummary(o.UtilizationByNode)
log.Info("found utilization metrics", "container-count", len(utilization))

// metrics
containerMetrics := map[MetricName]*Metric{}
Expand Down
4 changes: 3 additions & 1 deletion pkg/collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package collector

import (
"sync/atomic"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -42,8 +43,9 @@ func TestGetCGroupMetricSource(t *testing.T) {
Name: "root",
},
},
},
}, IsLeaderElected: &atomic.Bool{},
}
instance.IsLeaderElected.Store(true)
expected := map[string]string{
"system.slice/foo/bar": "", // not present
"system.slice/foo": "system",
Expand Down
8 changes: 0 additions & 8 deletions pkg/collector/utilization/sampler_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,14 +241,6 @@ func (s *Server) PushMetrics(req api.MetricsCollector_PushMetricsServer) error {
var nodeName, podName string
for {
msg, err := req.Recv() // Read metrics message
if !s.IsLeaderElected.Load() {
// We shouldn't be getting metrics if we aren't the leader.
// This can happen if we restart and the endpoints isn't updated to remove us yet.
log.Error(fmt.Errorf("got node samples when not leader"), "only the leader should get node samples. Possible that Readiness checks are not setup.")
nonLeaderRequestsTotal.WithLabelValues(nodeName, podName, os.Getenv("POD_NAME")).Inc()
s.grpcServer.Stop() // stop the server immediately
os.Exit(1) // exit so we restart with the new server
}

if err == io.EOF {
log.V(5).Info("read utilization metrics eof", "node", nodeName, "pod", podName)
Expand Down
Loading

0 comments on commit 683b9b1

Please sign in to comment.