Skip to content

Commit

Permalink
non-leader metrics-prometheus-collector eagerly computes metrics
Browse files Browse the repository at this point in the history
* sampler pushes metrics to all collector replicas using headless service
* sampler sets pod name, namespace and container name from container labels when using containerd
* non-leader collector computes and calculates metrics, but doesn't publish
  • Loading branch information
pwittrock committed Apr 10, 2023
1 parent 0de3f4c commit cbffb8b
Show file tree
Hide file tree
Showing 24 changed files with 1,267 additions and 339 deletions.
5 changes: 2 additions & 3 deletions cmd/metrics-node-sampler/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,13 @@ func (s *Server) RunE(cmd *cobra.Command, args []string) error {
log.Error(err, "unable to unmarshal sampler-config-filepath")
return err
}
ctx, stop := context.WithCancel(context.Background())
ctx, _ = signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM)
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)

go func() {
// force process to exit so we don't get stuck in a terminating state
// its much better to get killed and recreated than to hang
<-ctx.Done()
log.Info("shutting down metrics-node-sampler")
log.Info("shutting down metrics-node-sampler", "err", ctx.Err())
time.Sleep(time.Duration(terminationSeconds) * time.Second)
log.Info("terminating metrics-node-sampler")
os.Exit(0)
Expand Down
3 changes: 3 additions & 0 deletions cmd/metrics-node-sampler/integration/testdata/expected.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
{
"containerID": "abcdef",
"podUID": "pod12345",
"containerName": "",
"podName": "",
"namespaceName": "",
"cpuCoresNanoSec": [
"200",
"250",
Expand Down
131 changes: 73 additions & 58 deletions cmd/metrics-prometheus-collector/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@ import (

_ "net/http/pprof"

"github.com/fsnotify/fsnotify"
"github.com/go-logr/logr"
"github.com/pkg/profile"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/cobra"
"gopkg.in/yaml.v3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth/oidc"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
Expand All @@ -46,7 +49,6 @@ import (
commonlog "sigs.k8s.io/usage-metrics-collector/pkg/log"
"sigs.k8s.io/usage-metrics-collector/pkg/scheme"
versioncollector "sigs.k8s.io/usage-metrics-collector/pkg/version"
"sigs.k8s.io/usage-metrics-collector/pkg/watchconfig"
)

var (
Expand All @@ -58,12 +60,7 @@ var (
profileMemory, pprof bool
exitAfterLeaderElectionLoss, leaseDuration, renewDeadline time.Duration

options = Options{Options: ctrl.Options{
Scheme: scheme.Scheme,
LeaderElectionID: "capacity-metrics-prometheus-collector-lock",
LeaseDuration: &leaseDuration,
RenewDeadline: &renewDeadline,
}}
options = Options{Options: ctrl.Options{Scheme: scheme.Scheme}}
RootCmd = &cobra.Command{
RunE: RunE,
}
Expand All @@ -79,6 +76,10 @@ var (
// Options is set by flags
type Options struct {
ctrl.Options
LeaderElection bool
LeaderElectionNamespace string
LeaderElectionLockName string
PodName string
metricsPrometheusCollector string
}

Expand All @@ -102,8 +103,9 @@ func init() {
RootCmd.Flags().BoolVar(&options.LeaderElection, "leader-election", false, "Enable leader election")

// This is used for integration testing only
RootCmd.Flags().StringVar(&options.LeaderElectionNamespace, "leader-election-namespace", "", "Set the namespace used for leader election -- for testing only")
_ = RootCmd.Flags().MarkHidden("leader-election-namespace")
RootCmd.Flags().StringVar(&options.LeaderElectionNamespace, "leader-election-namespace", os.Getenv("POD_NAMESPACE"), "Set the namespace used for leader election")
RootCmd.Flags().StringVar(&options.LeaderElectionLockName, "leader-election-lock-name", "metrics-prometheus-collector", "Set the lock name used for leader election")
RootCmd.Flags().StringVar(&options.PodName, "leader-election-pod-name", os.Getenv("POD_NAME"), "Set the id used for leader election")

RootCmd.Flags().StringVar(&logPath, "log-level-filepath", "", "path to log level file. The file must contain a single integer corresponding to the log level (e.g. 2)")

Expand Down Expand Up @@ -174,23 +176,8 @@ func RunE(_ *cobra.Command, _ []string) error {
&versioncollector.Collector{
Version: version, Commit: commit, Date: date, Name: metrics.Prefix + "_version"})

ctx, stop := context.WithCancel(context.Background())
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
ctx, _ = signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM)

if metrics.MetricsPrometheusCollector.ExitOnConfigChange {
err := watchconfig.ConfigFile{
ConfigFilename: options.metricsPrometheusCollector,
}.WatchConfig(func(w *fsnotify.Watcher, s chan interface{}) {
log.Info("stopping metrics-prometheus-collector to read new config file")
stop()
w.Close()
close(s)
})
if err != nil {
log.Error(err, "unable to watch config")
}
}

options.Options.NewCache = collector.GetNewCacheFunc(metrics.MetricsPrometheusCollector)

Expand All @@ -212,6 +199,8 @@ func RunE(_ *cobra.Command, _ []string) error {
if err != nil {
return err
}
metrics.Col.IsLeaderElected.Store(!options.LeaderElection)

go func() {
// NOTE: we don't want to start getting node sample traffic until we are the leader
// so we use a readiness check in the config and don't report ourselves
Expand All @@ -226,9 +215,8 @@ func RunE(_ *cobra.Command, _ []string) error {
}()

// start the manager and the reconcilers
// Note: mgr.Start will return an error if leadership is lost and leader-election is used.
log.Info("starting manager")
checkError(log, mgr.Start(ctx), "")
checkError(log, metrics.Mgr.Start(ctx), "")
return nil
}

Expand Down Expand Up @@ -271,32 +259,6 @@ func (ms *MetricsServer) ReadCollectorSpec() error {
// Start starts the metrics server
// nolint: gocyclo
func (ms *MetricsServer) Start(ctx context.Context) error {
mgr := ms.Mgr
// Important -- don't register metrics until we are the leader
// otherwise we will be duplicating values by sending them from multiple instances.
// This is especially bad since we won't have the utilization data.

// Note: This shouldn't be necessary -- this function isn't called until we are elected as the leader
// but it is here as a defensive check.
<-mgr.Elected()
log.Info("elected as leader -- serving capacity metrics")

// mark us as ready to start receiving node samples -- this requires configuring a readiness check
// in the pod yaml
ms.Col.UtilizationServer.IsLeaderElected.Store(true)
// this shouldn't be necessary since we shutdown when we aren't the leader, but
// serves as a sanity check in case we don't shutdown gracefully and quickly
defer func() {
ms.Col.UtilizationServer.IsLeaderElected.Store(false)
// Ensure that we exit so we stop getting utilization requests
time.Sleep(exitAfterLeaderElectionLoss)
log.Info("exiting after leader election loss")
os.Exit(0)
}()

// update the metric showing we are the leader
electedMetric.WithLabelValues(os.Getenv("POD_NAME")).Set(1)

// TODO: try to do this before becoming the leader. This isn't trivial
// because the manager won't let us use the caches prior to starting.
log.Info("initializing informers")
Expand All @@ -313,14 +275,67 @@ func (ms *MetricsServer) Start(ctx context.Context) error {
log.V(1).Info("starting metrics-prometheus-collector", "MetricsPrometheusCollector", val)
}

// don't pre-compute and cache the metrics until we are the leader otherwise they may be ancient
// when we are elected
ctrlmetrics.Registry.MustRegister(ms.Col)

// start pre-computing metrics eagerly. we won't publish them until we are the leader.
go ms.Col.Run(ctx)

ctrlmetrics.Registry.MustRegister(ms.Col)
if options.LeaderElection {
ms.doLeaderElection(ctx)
} else {
electedMetric.WithLabelValues(os.Getenv("POD_NAME")).Set(1)
ms.Col.IsLeaderElected.Store(true)
}

// this shouldn't be necessary since we shutdown when we aren't the leader, but
// serves as a sanity check in case we don't shutdown gracefully and quickly

<-ctx.Done()
return ctx.Err()
log.Info("ending sampler server", "reason", ctx.Err())
return nil
}

func (ms *MetricsServer) doLeaderElection(ctx context.Context) {
ms.Col.IsLeaderElected.Store(false)
config := ms.Mgr.GetConfig()
lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: options.LeaderElectionLockName,
Namespace: options.LeaderElectionNamespace,
},
Client: clientset.NewForConfigOrDie(config).CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: options.PodName,
},
}
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Name: options.LeaderElectionLockName,
Lock: lock,
ReleaseOnCancel: true,
LeaseDuration: 30 * time.Second,
RenewDeadline: 20 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(c context.Context) {
log.Info("acquired leadership", "id", options.PodName)
electedMetric.WithLabelValues(os.Getenv("POD_NAME")).Set(1)
ms.Col.IsLeaderElected.Store(true)
},
OnStoppedLeading: func() {
log.Info("lost leadership", "id", options.PodName)
electedMetric.WithLabelValues(os.Getenv("POD_NAME")).Set(0)
ms.Col.IsLeaderElected.Store(false)
},
OnNewLeader: func(current_id string) {
if current_id == options.PodName {
return
}
log.Info("lost leadership", "id", options.PodName)
electedMetric.WithLabelValues(os.Getenv("POD_NAME")).Set(0)
ms.Col.IsLeaderElected.Store(false)
},
},
})
}

// checkError exits in in case of errors printing the given output message
Expand Down
3 changes: 2 additions & 1 deletion config/metrics-node-sampler/configmaps/sampler.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pushAddress: "metrics-prometheus-collector:9090" # push metrics to this service
pushHeadlessService: "metrics-prometheus-collector:9090" # push metrics to this service
pushAddress: "deprecated"
pushFrequencyDuration: "1m" # push metrics to the collector every minute
exitOnConfigChange: true
buffer:
Expand Down
8 changes: 8 additions & 0 deletions pkg/api/collectorcontrollerv1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,14 @@ type UtilizationServer struct {
SamplerPodLabels map[string]string `json:"samplerPodLabels" yaml:"samplerPodLabels"`
// SamplerNamespaceName is the name of the sampler namespace.
SamplerNamespaceName string `json:"samplerNamespaceName" yaml:"samplerNamespaceName"`

CollectorPodLabels map[string]string `json:"collectorPodLabels" yaml:"collectorPodLabels"`

CollectorPodIPsIndex int `json:"collectorPodIPsIndex" yaml:"collectorPodIPsIndex"`

SamplerPort int `json:"samplerPort" yaml:"samplerPort"`

MinResultPctBeforeReady int `json:"minResultPctBeforeReady" yaml:"minResultPctBeforeReady"`
}

// Condition matches a node condition to determine whether a node is unhealthy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@ buffer:
checkCreatedPodFrequencyDuration: 6s
containerdAddress: /run/containerd/containerd.sock
containerdNamespace: default
dnsSpec:
backoffSeconds: 0
collectorServerExpiration: 0
failuresBeforeExit: 0
pollSeconds: 0
exitOnConfigChange: false
pbPort: 8080
pushAddress: ""
pushFrequencyDuration: 1m
pushHeadlessService: ""
pushHeadlessServicePort: 9090
reader:
cgroupVersion: v1
containerCacheSyncIntervalSeconds: 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@ buffer:
checkCreatedPodFrequencyDuration: 10s
containerdAddress: /var/run/containerd/containerd.sock
containerdNamespace: k8s.io
dnsSpec:
backoffSeconds: 0
collectorServerExpiration: 0
failuresBeforeExit: 0
pollSeconds: 0
exitOnConfigChange: false
pbPort: 7070
pushAddress: default.svg
pushAddress: deprecated
pushFrequencyDuration: 2m
pushHeadlessService: default.svg
pushHeadlessServicePort: 9090
reader:
cgroupVersion: v1
containerCacheSyncIntervalSeconds: 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ checkCreatedPodFrequencyDuration: 10s
containerdAddress: /var/run/containerd/containerd.sock
containerdNamespace: k8s.io
pbPort: 7070
pushAddress: "default.svg"
pushHeadlessService: "default.svg"
pushAddress: "deprecated"
pushFrequencyDuration: 2m
reader:
cgroupVersion: v1
Expand Down
20 changes: 19 additions & 1 deletion pkg/api/samplerserverv1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package samplerserverv1alpha1

import (
"path/filepath"
"time"

"k8s.io/utils/pointer"
)
Expand All @@ -32,7 +33,9 @@ type MetricsNodeSampler struct {
RestPort int `json:"restPort" yaml:"restPort"`
SendPushMetricsRetryCount int `json:"sendPushMetricsRetryCount" yaml:"sendPushMetricsRetryCount"`

PushAddress string `json:"pushAddress" yaml:"pushAddress"`
DEPRECATED_PushService string `json:"pushAddress" yaml:"pushAddress"` // TODO: Remove this
PushHeadlessService string `json:"pushHeadlessService" yaml:"pushHeadlessService"`
PushHeadlessServicePort int `json:"pushHeadlessServicePort" yaml:"pushHeadlessServicePort"`
PushFrequency string `json:"pushFrequencyDuration" yaml:"pushFrequencyDuration"`
CheckCreatedPodFrequency string `json:"checkCreatedPodFrequencyDuration" yaml:"checkCreatedPodFrequencyDuration"`

Expand All @@ -48,6 +51,18 @@ type MetricsNodeSampler struct {
// ContainerdNamespace specifies which containerd namespace to collect metrics for
// Default: default
ContainerdNamespace string `json:"containerdNamespace" yaml:"containerdNamespace"`

DNSSpec DNSSpec `json:"dnsSpec" yaml:"dnsSpec"`
}

type DNSSpec struct {
PollSeconds int `json:"pollSeconds" yaml:"pollSeconds"`
FailuresBeforeExit int `json:"failuresBeforeExit" yaml:"failuresBeforeExit"`
BackoffSeconds int `json:"backoffSeconds" yaml:"backoffSeconds"`

// CollectorServerExpiration sets the time to stop trying to push metrics to a server after it is no
// long in the DNS for the service
CollectorServerExpiration time.Duration `json:"collectorServerExpiration" yaml:"collectorServerExpiration"`
}

// Buffer configures the window of buffered metric samples.
Expand Down Expand Up @@ -235,6 +250,9 @@ func (s *MetricsNodeSampler) Default() {
s.Reader.NodeAggregationLevelGlobs = DefaultNodeAggregationLevels
}

if s.PushHeadlessServicePort == 0 {
s.PushHeadlessServicePort = 9090
}
if s.PushFrequency == "" {
s.PushFrequency = DefaultPushFrequency
}
Expand Down
Loading

0 comments on commit cbffb8b

Please sign in to comment.