diff --git a/cmd/metrics-node-sampler/cmd/cmd.go b/cmd/metrics-node-sampler/cmd/cmd.go index 01b6de8..4db2abe 100644 --- a/cmd/metrics-node-sampler/cmd/cmd.go +++ b/cmd/metrics-node-sampler/cmd/cmd.go @@ -65,14 +65,13 @@ func (s *Server) RunE(cmd *cobra.Command, args []string) error { log.Error(err, "unable to unmarshal sampler-config-filepath") return err } - ctx, stop := context.WithCancel(context.Background()) - ctx, _ = signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM) + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) go func() { // force process to exit so we don't get stuck in a terminating state // its much better to get killed and recreated than to hang <-ctx.Done() - log.Info("shutting down metrics-node-sampler") + log.Info("shutting down metrics-node-sampler", "err", ctx.Err()) time.Sleep(time.Duration(terminationSeconds) * time.Second) log.Info("terminating metrics-node-sampler") os.Exit(0) diff --git a/cmd/metrics-node-sampler/integration/testdata/expected.json b/cmd/metrics-node-sampler/integration/testdata/expected.json index 9328cd0..1efa3e3 100644 --- a/cmd/metrics-node-sampler/integration/testdata/expected.json +++ b/cmd/metrics-node-sampler/integration/testdata/expected.json @@ -3,6 +3,9 @@ { "containerID": "abcdef", "podUID": "pod12345", + "containerName": "", + "podName": "", + "namespaceName": "", "cpuCoresNanoSec": [ "200", "250", diff --git a/cmd/metrics-prometheus-collector/cmd/cmd.go b/cmd/metrics-prometheus-collector/cmd/cmd.go index 95da74f..0041fbf 100644 --- a/cmd/metrics-prometheus-collector/cmd/cmd.go +++ b/cmd/metrics-prometheus-collector/cmd/cmd.go @@ -29,13 +29,16 @@ import ( _ "net/http/pprof" - "github.com/fsnotify/fsnotify" "github.com/go-logr/logr" "github.com/pkg/profile" "github.com/prometheus/client_golang/prometheus" "github.com/spf13/cobra" "gopkg.in/yaml.v3" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientset "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth/oidc" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/healthz" @@ -46,7 +49,6 @@ import ( commonlog "sigs.k8s.io/usage-metrics-collector/pkg/log" "sigs.k8s.io/usage-metrics-collector/pkg/scheme" versioncollector "sigs.k8s.io/usage-metrics-collector/pkg/version" - "sigs.k8s.io/usage-metrics-collector/pkg/watchconfig" ) var ( @@ -58,12 +60,7 @@ var ( profileMemory, pprof bool exitAfterLeaderElectionLoss, leaseDuration, renewDeadline time.Duration - options = Options{Options: ctrl.Options{ - Scheme: scheme.Scheme, - LeaderElectionID: "capacity-metrics-prometheus-collector-lock", - LeaseDuration: &leaseDuration, - RenewDeadline: &renewDeadline, - }} + options = Options{Options: ctrl.Options{Scheme: scheme.Scheme}} RootCmd = &cobra.Command{ RunE: RunE, } @@ -79,6 +76,10 @@ var ( // Options is set by flags type Options struct { ctrl.Options + LeaderElection bool + LeaderElectionNamespace string + LeaderElectionLockName string + PodName string metricsPrometheusCollector string } @@ -102,8 +103,9 @@ func init() { RootCmd.Flags().BoolVar(&options.LeaderElection, "leader-election", false, "Enable leader election") // This is used for integration testing only - RootCmd.Flags().StringVar(&options.LeaderElectionNamespace, "leader-election-namespace", "", "Set the namespace used for leader election -- for testing only") - _ = RootCmd.Flags().MarkHidden("leader-election-namespace") + RootCmd.Flags().StringVar(&options.LeaderElectionNamespace, "leader-election-namespace", os.Getenv("POD_NAMESPACE"), "Set the namespace used for leader election") + RootCmd.Flags().StringVar(&options.LeaderElectionLockName, "leader-election-lock-name", "metrics-prometheus-collector", "Set the lock name used for leader election") + RootCmd.Flags().StringVar(&options.PodName, "leader-election-pod-name", os.Getenv("POD_NAME"), "Set the id used for leader election") RootCmd.Flags().StringVar(&logPath, "log-level-filepath", "", "path to log level file. The file must contain a single integer corresponding to the log level (e.g. 2)") @@ -174,23 +176,8 @@ func RunE(_ *cobra.Command, _ []string) error { &versioncollector.Collector{ Version: version, Commit: commit, Date: date, Name: metrics.Prefix + "_version"}) - ctx, stop := context.WithCancel(context.Background()) + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() - ctx, _ = signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM) - - if metrics.MetricsPrometheusCollector.ExitOnConfigChange { - err := watchconfig.ConfigFile{ - ConfigFilename: options.metricsPrometheusCollector, - }.WatchConfig(func(w *fsnotify.Watcher, s chan interface{}) { - log.Info("stopping metrics-prometheus-collector to read new config file") - stop() - w.Close() - close(s) - }) - if err != nil { - log.Error(err, "unable to watch config") - } - } options.Options.NewCache = collector.GetNewCacheFunc(metrics.MetricsPrometheusCollector) @@ -212,6 +199,8 @@ func RunE(_ *cobra.Command, _ []string) error { if err != nil { return err } + metrics.Col.IsLeaderElected.Store(!options.LeaderElection) + go func() { // NOTE: we don't want to start getting node sample traffic until we are the leader // so we use a readiness check in the config and don't report ourselves @@ -226,9 +215,8 @@ func RunE(_ *cobra.Command, _ []string) error { }() // start the manager and the reconcilers - // Note: mgr.Start will return an error if leadership is lost and leader-election is used. log.Info("starting manager") - checkError(log, mgr.Start(ctx), "") + checkError(log, metrics.Mgr.Start(ctx), "") return nil } @@ -271,32 +259,6 @@ func (ms *MetricsServer) ReadCollectorSpec() error { // Start starts the metrics server // nolint: gocyclo func (ms *MetricsServer) Start(ctx context.Context) error { - mgr := ms.Mgr - // Important -- don't register metrics until we are the leader - // otherwise we will be duplicating values by sending them from multiple instances. - // This is especially bad since we won't have the utilization data. - - // Note: This shouldn't be necessary -- this function isn't called until we are elected as the leader - // but it is here as a defensive check. - <-mgr.Elected() - log.Info("elected as leader -- serving capacity metrics") - - // mark us as ready to start receiving node samples -- this requires configuring a readiness check - // in the pod yaml - ms.Col.UtilizationServer.IsLeaderElected.Store(true) - // this shouldn't be necessary since we shutdown when we aren't the leader, but - // serves as a sanity check in case we don't shutdown gracefully and quickly - defer func() { - ms.Col.UtilizationServer.IsLeaderElected.Store(false) - // Ensure that we exit so we stop getting utilization requests - time.Sleep(exitAfterLeaderElectionLoss) - log.Info("exiting after leader election loss") - os.Exit(0) - }() - - // update the metric showing we are the leader - electedMetric.WithLabelValues(os.Getenv("POD_NAME")).Set(1) - // TODO: try to do this before becoming the leader. This isn't trivial // because the manager won't let us use the caches prior to starting. log.Info("initializing informers") @@ -313,14 +275,67 @@ func (ms *MetricsServer) Start(ctx context.Context) error { log.V(1).Info("starting metrics-prometheus-collector", "MetricsPrometheusCollector", val) } - // don't pre-compute and cache the metrics until we are the leader otherwise they may be ancient - // when we are elected + ctrlmetrics.Registry.MustRegister(ms.Col) + + // start pre-computing metrics eagerly. we won't publish them until we are the leader. go ms.Col.Run(ctx) - ctrlmetrics.Registry.MustRegister(ms.Col) + if options.LeaderElection { + ms.doLeaderElection(ctx) + } else { + electedMetric.WithLabelValues(os.Getenv("POD_NAME")).Set(1) + ms.Col.IsLeaderElected.Store(true) + } + + // this shouldn't be necessary since we shutdown when we aren't the leader, but + // serves as a sanity check in case we don't shutdown gracefully and quickly <-ctx.Done() - return ctx.Err() + log.Info("ending sampler server", "reason", ctx.Err()) + return nil +} + +func (ms *MetricsServer) doLeaderElection(ctx context.Context) { + ms.Col.IsLeaderElected.Store(false) + config := ms.Mgr.GetConfig() + lock := &resourcelock.LeaseLock{ + LeaseMeta: metav1.ObjectMeta{ + Name: options.LeaderElectionLockName, + Namespace: options.LeaderElectionNamespace, + }, + Client: clientset.NewForConfigOrDie(config).CoordinationV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: options.PodName, + }, + } + leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + Name: options.LeaderElectionLockName, + Lock: lock, + ReleaseOnCancel: true, + LeaseDuration: 30 * time.Second, + RenewDeadline: 20 * time.Second, + RetryPeriod: 2 * time.Second, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(c context.Context) { + log.Info("acquired leadership", "id", options.PodName) + electedMetric.WithLabelValues(os.Getenv("POD_NAME")).Set(1) + ms.Col.IsLeaderElected.Store(true) + }, + OnStoppedLeading: func() { + log.Info("lost leadership", "id", options.PodName) + electedMetric.WithLabelValues(os.Getenv("POD_NAME")).Set(0) + ms.Col.IsLeaderElected.Store(false) + }, + OnNewLeader: func(current_id string) { + if current_id == options.PodName { + return + } + log.Info("lost leadership", "id", options.PodName) + electedMetric.WithLabelValues(os.Getenv("POD_NAME")).Set(0) + ms.Col.IsLeaderElected.Store(false) + }, + }, + }) } // checkError exits in in case of errors printing the given output message diff --git a/config/metrics-node-sampler/configmaps/sampler.yaml b/config/metrics-node-sampler/configmaps/sampler.yaml index 06b8d7e..89c4720 100644 --- a/config/metrics-node-sampler/configmaps/sampler.yaml +++ b/config/metrics-node-sampler/configmaps/sampler.yaml @@ -1,4 +1,5 @@ -pushAddress: "metrics-prometheus-collector:9090" # push metrics to this service +pushHeadlessService: "metrics-prometheus-collector:9090" # push metrics to this service +pushAddress: "deprecated" pushFrequencyDuration: "1m" # push metrics to the collector every minute exitOnConfigChange: true buffer: diff --git a/pkg/api/collectorcontrollerv1alpha1/types.go b/pkg/api/collectorcontrollerv1alpha1/types.go index b20db8b..a9386bb 100644 --- a/pkg/api/collectorcontrollerv1alpha1/types.go +++ b/pkg/api/collectorcontrollerv1alpha1/types.go @@ -145,6 +145,14 @@ type UtilizationServer struct { SamplerPodLabels map[string]string `json:"samplerPodLabels" yaml:"samplerPodLabels"` // SamplerNamespaceName is the name of the sampler namespace. SamplerNamespaceName string `json:"samplerNamespaceName" yaml:"samplerNamespaceName"` + + CollectorPodLabels map[string]string `json:"collectorPodLabels" yaml:"collectorPodLabels"` + + CollectorPodIPsIndex int `json:"collectorPodIPsIndex" yaml:"collectorPodIPsIndex"` + + SamplerPort int `json:"samplerPort" yaml:"samplerPort"` + + MinResultPctBeforeReady int `json:"minResultPctBeforeReady" yaml:"minResultPctBeforeReady"` } // Condition matches a node condition to determine whether a node is unhealthy. diff --git a/pkg/api/samplerserverv1alpha1/testdata/default/empty/expected.yaml b/pkg/api/samplerserverv1alpha1/testdata/default/empty/expected.yaml index d35ebae..820fad3 100644 --- a/pkg/api/samplerserverv1alpha1/testdata/default/empty/expected.yaml +++ b/pkg/api/samplerserverv1alpha1/testdata/default/empty/expected.yaml @@ -4,10 +4,17 @@ buffer: checkCreatedPodFrequencyDuration: 6s containerdAddress: /run/containerd/containerd.sock containerdNamespace: default +dnsSpec: + backoffSeconds: 0 + collectorServerExpiration: 0 + failuresBeforeExit: 0 + pollSeconds: 0 exitOnConfigChange: false pbPort: 8080 pushAddress: "" pushFrequencyDuration: 1m +pushHeadlessService: "" +pushHeadlessServicePort: 9090 reader: cgroupVersion: v1 containerCacheSyncIntervalSeconds: 1 diff --git a/pkg/api/samplerserverv1alpha1/testdata/default/values/expected.yaml b/pkg/api/samplerserverv1alpha1/testdata/default/values/expected.yaml index 0038931..e89b70f 100644 --- a/pkg/api/samplerserverv1alpha1/testdata/default/values/expected.yaml +++ b/pkg/api/samplerserverv1alpha1/testdata/default/values/expected.yaml @@ -4,10 +4,17 @@ buffer: checkCreatedPodFrequencyDuration: 10s containerdAddress: /var/run/containerd/containerd.sock containerdNamespace: k8s.io +dnsSpec: + backoffSeconds: 0 + collectorServerExpiration: 0 + failuresBeforeExit: 0 + pollSeconds: 0 exitOnConfigChange: false pbPort: 7070 -pushAddress: default.svg +pushAddress: deprecated pushFrequencyDuration: 2m +pushHeadlessService: default.svg +pushHeadlessServicePort: 9090 reader: cgroupVersion: v1 containerCacheSyncIntervalSeconds: 2 diff --git a/pkg/api/samplerserverv1alpha1/testdata/default/values/input.yaml b/pkg/api/samplerserverv1alpha1/testdata/default/values/input.yaml index 375f3e2..1d94bec 100644 --- a/pkg/api/samplerserverv1alpha1/testdata/default/values/input.yaml +++ b/pkg/api/samplerserverv1alpha1/testdata/default/values/input.yaml @@ -5,7 +5,8 @@ checkCreatedPodFrequencyDuration: 10s containerdAddress: /var/run/containerd/containerd.sock containerdNamespace: k8s.io pbPort: 7070 -pushAddress: "default.svg" +pushHeadlessService: "default.svg" +pushAddress: "deprecated" pushFrequencyDuration: 2m reader: cgroupVersion: v1 diff --git a/pkg/api/samplerserverv1alpha1/types.go b/pkg/api/samplerserverv1alpha1/types.go index 48e72e1..1958e25 100644 --- a/pkg/api/samplerserverv1alpha1/types.go +++ b/pkg/api/samplerserverv1alpha1/types.go @@ -16,6 +16,7 @@ package samplerserverv1alpha1 import ( "path/filepath" + "time" "k8s.io/utils/pointer" ) @@ -32,7 +33,9 @@ type MetricsNodeSampler struct { RestPort int `json:"restPort" yaml:"restPort"` SendPushMetricsRetryCount int `json:"sendPushMetricsRetryCount" yaml:"sendPushMetricsRetryCount"` - PushAddress string `json:"pushAddress" yaml:"pushAddress"` + DEPRECATED_PushService string `json:"pushAddress" yaml:"pushAddress"` // TODO: Remove this + PushHeadlessService string `json:"pushHeadlessService" yaml:"pushHeadlessService"` + PushHeadlessServicePort int `json:"pushHeadlessServicePort" yaml:"pushHeadlessServicePort"` PushFrequency string `json:"pushFrequencyDuration" yaml:"pushFrequencyDuration"` CheckCreatedPodFrequency string `json:"checkCreatedPodFrequencyDuration" yaml:"checkCreatedPodFrequencyDuration"` @@ -48,6 +51,18 @@ type MetricsNodeSampler struct { // ContainerdNamespace specifies which containerd namespace to collect metrics for // Default: default ContainerdNamespace string `json:"containerdNamespace" yaml:"containerdNamespace"` + + DNSSpec DNSSpec `json:"dnsSpec" yaml:"dnsSpec"` +} + +type DNSSpec struct { + PollSeconds int `json:"pollSeconds" yaml:"pollSeconds"` + FailuresBeforeExit int `json:"failuresBeforeExit" yaml:"failuresBeforeExit"` + BackoffSeconds int `json:"backoffSeconds" yaml:"backoffSeconds"` + + // CollectorServerExpiration sets the time to stop trying to push metrics to a server after it is no + // long in the DNS for the service + CollectorServerExpiration time.Duration `json:"collectorServerExpiration" yaml:"collectorServerExpiration"` } // Buffer configures the window of buffered metric samples. @@ -235,6 +250,9 @@ func (s *MetricsNodeSampler) Default() { s.Reader.NodeAggregationLevelGlobs = DefaultNodeAggregationLevels } + if s.PushHeadlessServicePort == 0 { + s.PushHeadlessServicePort = 9090 + } if s.PushFrequency == "" { s.PushFrequency = DefaultPushFrequency } diff --git a/pkg/collector/capacity_test.go b/pkg/collector/capacity_test.go index 3419b6d..f9f02ca 100644 --- a/pkg/collector/capacity_test.go +++ b/pkg/collector/capacity_test.go @@ -22,6 +22,7 @@ import ( "path" "path/filepath" "strings" + "sync/atomic" "testing" "time" @@ -171,6 +172,8 @@ func TestCollector(t *testing.T) { instance, err := NewCollector(context.Background(), c, &spec) require.NoError(t, err) tc.UnmarshalInputsStrict(map[string]interface{}{"input_usage.yaml": &instance.UtilizationServer}) + instance.UtilizationServer.IsReadyResult.Store(true) + instance.IsLeaderElected.Store(true) if os.Getenv("TEST_COLLECTOR_FUNCS_OVERRIDE") == "true" { fns := CollectorFuncs() @@ -245,6 +248,8 @@ func TestCollectorOverride(t *testing.T) { instance, err := NewCollector(context.Background(), c, &spec) require.NoError(t, err) tc.UnmarshalInputsStrict(map[string]interface{}{"input_usage.yaml": &instance.UtilizationServer}) + instance.UtilizationServer.IsReadyResult.Store(true) + instance.IsLeaderElected.Store(true) reg := prometheus.NewPedanticRegistry() require.NoError(t, reg.Register(instance)) @@ -323,6 +328,8 @@ func testSave(t *testing.T, saveJSON, saveProto bool) { instance, err := NewCollector(context.Background(), c, &spec) require.NoError(t, err) tc.UnmarshalInputsStrict(map[string]interface{}{"input_usage.yaml": &instance.UtilizationServer}) + instance.UtilizationServer.IsReadyResult.Store(true) + instance.IsLeaderElected.Store(true) ctx, cancelCtx := context.WithCancel(context.Background()) go instance.Run(ctx) @@ -458,8 +465,9 @@ func TestLabels(t *testing.T) { "input.yaml": &inputs, }) - c := Collector{MetricsPrometheusCollector: &inputs.Spec} + c := Collector{MetricsPrometheusCollector: &inputs.Spec, IsLeaderElected: &atomic.Bool{}} require.NoError(t, c.init()) + c.IsLeaderElected.Store(true) instance := Labeler{ BuiltIn: builtInLabler{ diff --git a/pkg/collector/collector.go b/pkg/collector/collector.go index 776edf5..ce091ec 100644 --- a/pkg/collector/collector.go +++ b/pkg/collector/collector.go @@ -18,15 +18,20 @@ import ( "bytes" "context" "encoding/json" + "fmt" "os" "path/filepath" "sort" "strings" "sync" + "sync/atomic" "time" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" "sigs.k8s.io/usage-metrics-collector/pkg/api/collectorcontrollerv1alpha1" "sigs.k8s.io/usage-metrics-collector/pkg/api/quotamanagementv1alpha1" collectorapi "sigs.k8s.io/usage-metrics-collector/pkg/collector/api" @@ -52,6 +57,8 @@ var ( type Collector struct { // collector dependencies + IsLeaderElected *atomic.Bool + client.Client Labeler Labeler Reader ValueReader @@ -90,7 +97,10 @@ func NewCollector( UtilizationServer: utilization.Server{UtilizationServer: config.UtilizationServer}, startTime: time.Now(), metrics: make(chan *cachedMetrics), + IsLeaderElected: &atomic.Bool{}, } + c.UtilizationServer.IsReadyResult.Store(false) + c.IsLeaderElected.Store(false) // initialize the extensions data if err := c.init(); err != nil { @@ -119,8 +129,9 @@ func (c *Collector) InitInformers(ctx context.Context) error { // Start runs the collector func (c *Collector) Run(ctx context.Context) { if c.PreComputeMetrics.Enabled { - c.continuouslyCollect(ctx) + go c.continuouslyCollect(ctx) } + c.registerWithSamplers(ctx) } // Describe returns all descriptions of the collector. @@ -129,13 +140,34 @@ func (c *Collector) Describe(ch chan<- *prometheus.Desc) { } func (c *Collector) continuouslyCollect(ctx context.Context) { + cacheTicker := time.NewTicker(time.Minute * 5) + defer cacheTicker.Stop() for { + start := time.Now() + + // calculate the metrics from the current cluster state + log.Info("caching metrics") m := c.cacheMetrics() - select { - case <-ctx.Done(): - return // shutdown - case c.metrics <- m: - // write the cached metrics + log.Info("complete caching metrics", "seconds", time.Since(start).Seconds()) + + if c.IsLeaderElected.Load() && c.UtilizationServer.IsReadyResult.Load() { + // if we are the leader and ready, write the metrics so they are published + select { + case <-ctx.Done(): + return // shutdown + case c.metrics <- m: + // send the cached metrics to the collector + // the collector will throw these away if it isn't the + } + } else { + // if we are not the leader, we should continuously re-caculate the metrics + // so that we can publish them immediately when we become the leader. + select { + case <-ctx.Done(): + return // shutdown + case <-cacheTicker.C: + // re-cacalculate the metrics without publishing them + } } } } @@ -233,12 +265,29 @@ func (c *Collector) cacheMetrics() *cachedMetrics { // Collect returns the current state of all metrics of the collector. // This there are cached metrics, Collect will return the cached metrics. func (c *Collector) Collect(ch chan<- prometheus.Metric) { + if !c.IsLeaderElected.Load() || !c.UtilizationServer.IsReadyResult.Load() { + // Never export metrics if we aren't the leader or aren't ready to report them. + // e.g. have sufficient utilization metrics + select { + case <-c.metrics: + // we were leader elected when we cached the metrics, but aren't anymore. + // throw the results away so the are recalculated. + log.Info("lost leadership after caching metrics, throwing them away") + default: + } + log.Info("skipping collection") + return + } + start := time.Now() if !c.PreComputeMetrics.Enabled { + log.Info("collecting metrics without caching") c.collect(ch, nil) + log.Info("finished collection", "seconds", time.Since(start).Seconds()) return } + log.Info("collecting metrics from cache") // write the cached metrics out as a response metrics := <-c.metrics for i := range metrics.metrics { @@ -266,6 +315,7 @@ func (c *Collector) Collect(ch chan<- prometheus.Metric) { }) cacheCollectTime.Set(float64(time.Since(start).Seconds())) cacheCollectTime.Collect(ch) + log.Info("finished collection", "seconds", time.Since(start).Seconds()) } func (c *Collector) getSideCarConfigs() ([]*collectorcontrollerv1alpha1.SideCarConfig, error) { @@ -839,6 +889,7 @@ func (c *Collector) collectContainers(o *CapacityObjects, ch chan<- prometheus.M log := log.WithValues("start", start.Local().Format("2006-01-02 15:04:05")) utilization := c.UtilizationServer.GetContainerUsageSummary(o.UtilizationByNode) + log.Info("found utilization metrics", "container-count", len(utilization)) // metrics containerMetrics := map[MetricName]*Metric{} @@ -917,10 +968,16 @@ func (c *Collector) collectContainers(o *CapacityObjects, ch chan<- prometheus.M c.Labeler.SetLabelsForContainer(&containerLabels, container) sample := sb.NewSample(containerLabels) // For saving locally - id := sampler.ContainerKey{ContainerID: containerNameToID[container.Name], PodUID: podUID} - allContainers.Insert(string(id.PodUID) + "/" + string(id.ContainerID)) + // first try to get the metrics based on the container name and namespace + id := sampler.ContainerKey{ContainerName: container.Name, PodName: pod.Name, NamespaceName: pod.Namespace} + allContainers.Insert(string(pod.UID) + "/" + string(containerNameToID[container.Name])) + usage, ok := utilization[id] + if !ok { + // check for metrics based on uid and id if not present by name + id := sampler.ContainerKey{ContainerID: containerNameToID[container.Name], PodUID: podUID} + usage = utilization[id] + } - usage := utilization[id] if usage != nil { log.V(2).Info("found usage for container", "namespace", pod.Namespace, "pod", pod.Name, "container", container.Name, "id", id, @@ -1526,6 +1583,7 @@ func (c *Collector) listCapacityObjects(ch chan<- prometheus.Metric) (*CapacityO // get this once since it requires locking the cache o.UtilizationByNode = c.UtilizationServer.GetMetrics() + log.Info("found utilization metrics", "node-count", len(o.UtilizationByNode)) return &o, nil } @@ -1566,3 +1624,216 @@ func (c *Collector) wait(fns map[string]func() error, metricCh chan<- prometheus } return nil } + +// registerWithSamplers manually registers the collector with node samplers +// so that it can start getting metrics before it marks itself as "Ready" +// and before it appears in the DNS ips for the service. +func (c *Collector) registerWithSamplers(ctx context.Context) { + if c.UtilizationServer.MinResultPctBeforeReady == 0 { + // don't register with samplers + c.UtilizationServer.IsReadyResult.Store(true) + log.Info("not using min utilization results for ready check") + } + if len(c.UtilizationServer.CollectorPodLabels) == 0 { + // don't know the collector pods, we can't register + log.Info("not registering collector with samplers, no collector pod labels were specified") + return + } + + // continuously register with node samplers that we don't have results from + go func() { + tick := time.NewTicker(time.Minute) + for { + log.V(1).Info("ensuring collector is registered with samplers") + // since we can't get the status.ips list from the downward api, + // get the collector status.ips lists from the collector pods + // and register all collectors. + collectors := &corev1.PodList{} + err := c.Client.List(ctx, collectors, + client.InNamespace(c.UtilizationServer.SamplerNamespaceName), + client.MatchingLabels(c.UtilizationServer.CollectorPodLabels), + ) + if err != nil || len(collectors.Items) == 0 { + log.Error(err, "unable to register collector with node samplers -- failed to list collector pods.") + } + // calculate the collector IPs to register with the samplers + req := &samplerapi.RegisterCollectorsRequest{ + Collectors: make([]*samplerapi.Collector, 0, len(collectors.Items)), + Source: "Collector", // the sampler also registers collectors from DNS + FromPod: os.Getenv("POD_NAME"), // used for debugging in the sampler + } + for _, col := range collectors.Items { + if col.Status.Phase != corev1.PodRunning { + // only register running collectors + continue + } + if len(col.Status.PodIPs) <= c.UtilizationServer.CollectorPodIPsIndex { + // only register collectors with ip addresses + continue + } + ip := col.Status.PodIPs[c.UtilizationServer.CollectorPodIPsIndex] // pick the ip to register + req.Collectors = append(req.Collectors, &samplerapi.Collector{ + IpAddress: ip.IP, + PodName: col.Name, // used for debugging in the sampler + }) + } + + // list all sampler pods we may need to register with. + samplers := &corev1.PodList{} + err = c.Client.List(ctx, samplers, + client.InNamespace(c.UtilizationServer.SamplerNamespaceName), + client.MatchingLabels(c.UtilizationServer.SamplerPodLabels), + ) + if err != nil { + log.Error(err, "unable to register collector with node samplers. failed to list samplers.") + } + + // don't re-register with samplers if we are already registered + nodesWithResults := c.UtilizationServer.GetNodeNames() + + // Check if we have gotten enough node results to consider ourselves ready. + // We don't want Kubernetes to terminate the old replica during a rollout + // until we are able to publish utilization metrics to prometheus, and + // accomplish this by not being ready. + if c.UtilizationServer.MinResultPctBeforeReady > 0 && !c.UtilizationServer.IsReadyResult.Load() { + nodesWithRunningSamplers := sets.NewString() + nodesWithSamplers := sets.NewString() // used for logging + for i := range samplers.Items { + nodesWithSamplers.Insert(samplers.Items[i].Spec.NodeName) + if samplers.Items[i].Status.Phase != corev1.PodRunning { + // only consider running pods -- we don't expect results from non-running pods + continue + } + nodesWithRunningSamplers.Insert(samplers.Items[i].Spec.NodeName) + } + nodesMissingResults := nodesWithRunningSamplers.Difference(nodesWithResults) + + readyPct := (nodesWithResults.Len() * 100 / nodesWithSamplers.Len()) + if nodesWithSamplers.Len() > 0 && readyPct > c.UtilizationServer.MinResultPctBeforeReady { + // Have enough utilization results to say we are ready + log.Info("collector ready", + "running-minutes", time.Since(c.startTime).Minutes(), + "nodes-with-results-count", nodesWithResults.Len(), + "nodes-with-samplers-count", nodesWithSamplers.Len(), + "nodes-with-running-samplers-count", nodesWithRunningSamplers.Len(), + "nodes-missing-results", nodesMissingResults.List(), + "ready-pct", readyPct, + "min-ready-pct", c.UtilizationServer.MinResultPctBeforeReady, + ) + c.UtilizationServer.IsReadyResult.Store(true) + } else { + // Don't have enough utilization results to say we are ready + log.Info("collector not-ready", + "running-minutes", time.Since(c.startTime).Minutes(), + "nodes-with-results-count", nodesWithResults.Len(), + "nodes-with-samplers-count", nodesWithSamplers.Len(), + "nodes-with-running-samplers-count", nodesWithRunningSamplers.Len(), + "nodes-missing-results", nodesMissingResults.List(), + "ready-pct", readyPct, + "min-ready-pct", c.UtilizationServer.MinResultPctBeforeReady) + } + } + + // identify the sampler pods to register with + wg := sync.WaitGroup{} + var samplerCount, samplerResultsCount, registeredCount, errorCount, notRunningCount int + for i := range samplers.Items { + pod := samplers.Items[i] + switch { + case pod.Status.Phase != corev1.PodRunning: + // sampler isn't running yet + notRunningCount++ + continue + case pod.Spec.NodeName == "": + // defensive: shouldn't hit this + notRunningCount++ + continue + case pod.Status.PodIP == "": + // defensive: pod not running + notRunningCount++ + continue + case nodesWithResults.Has(pod.Spec.NodeName): + // already registered. + samplerResultsCount++ + samplerCount++ + continue + } + samplerCount++ + + // register the collector with the each node sampler + wg.Add(1) + go func() { + defer wg.Done() + // build the address for us to register with the sampler + address := pod.Status.PodIP + if strings.Contains(address, ":") { + // format IPv6 correctly + address = "[" + address + "]" + } + address = fmt.Sprintf("%s:%v", address, c.UtilizationServer.SamplerPort) + + // create the grpc connection + var conn *grpc.ClientConn + for i := 0; i < 3; i++ { // retry connection + conn, err = grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err == nil { + // connection established + break + } + } + if err != nil { + log.Error(err, "unable to connect to node sampler", + "node", pod.Spec.NodeName, + "pod", pod.Name, + ) + return + } + + // register ourself with this node sampler + c := samplerapi.NewMetricsClient(conn) + resp, err := c.RegisterCollectors(ctx, req) + if err != nil { + errorCount++ + log.Error(err, "failed to register with node sampler", + "node", pod.Spec.NodeName, + "pod", pod.Name, + ) + } else { + registeredCount++ + log.Info("registered with node sampler", + "node", pod.Spec.NodeName, + "pod", pod.Name, + "sent", req, + "got", resp.IpAddresses) + } + }() + } + log.Info("finished registering with node-samplers", + "results-pct", samplerResultsCount*100/samplerCount, + "results-count", samplerResultsCount, + "running-sampler-count", samplerCount, + "not-running-sampler-count", notRunningCount, + "register-success-count", registeredCount, + "register-fail-count", errorCount, + ) + registeredWithSamplers.Reset() + registeredWithSamplers.WithLabelValues("node-registered").Set(float64(samplerResultsCount)) + registeredWithSamplers.WithLabelValues("node-registration-error").Set(float64(errorCount)) + registeredWithSamplers.WithLabelValues("node-registration-success").Set(float64(registeredCount)) + registeredWithSamplers.WithLabelValues("sampler-not-running").Set(float64(notRunningCount)) + registeredWithSamplers.WithLabelValues("node-not-registered").Set(float64(samplerCount - notRunningCount - samplerResultsCount - errorCount - registeredCount)) + + wg.Wait() + <-tick.C // wait before registering again + } + }() +} + +var registeredWithSamplers = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "metrics_prometheus_collector_registered_count", + Help: "Nodes the collector has registered with.", +}, []string{"status"}) + +func init() { + ctrlmetrics.Registry.MustRegister(registeredWithSamplers) +} diff --git a/pkg/collector/collector_test.go b/pkg/collector/collector_test.go index cf16120..70640ed 100644 --- a/pkg/collector/collector_test.go +++ b/pkg/collector/collector_test.go @@ -15,6 +15,7 @@ package collector import ( + "sync/atomic" "testing" "github.com/stretchr/testify/require" @@ -42,8 +43,10 @@ func TestGetCGroupMetricSource(t *testing.T) { Name: "root", }, }, - }, + }, IsLeaderElected: &atomic.Bool{}, } + instance.UtilizationServer.IsReadyResult.Store(true) + instance.IsLeaderElected.Store(true) expected := map[string]collectorcontrollerv1alpha1.Source{ "system.slice/foo/bar": "", // not present "system.slice/foo": "system", diff --git a/pkg/collector/types.go b/pkg/collector/types.go index d707b53..fbd601e 100644 --- a/pkg/collector/types.go +++ b/pkg/collector/types.go @@ -19,6 +19,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/utils/pointer" "sigs.k8s.io/usage-metrics-collector/pkg/api/collectorcontrollerv1alpha1" + "sigs.k8s.io/usage-metrics-collector/pkg/api/samplerserverv1alpha1" ) func (c *Collector) init() error { @@ -100,6 +101,10 @@ func (c *Collector) init() error { } func (c *Collector) defaultMetricsPrometheusCollector() { + if c.UtilizationServer.SamplerPort == 0 { + c.UtilizationServer.SamplerPort = samplerserverv1alpha1.DefaultPBPort + } + c.Kind = "MetricsPrometheusCollector" c.APIVersion = "v1alpha1" for i := range c.Aggregations { diff --git a/pkg/collector/utilization/sampler_collector.go b/pkg/collector/utilization/sampler_collector.go index 94666cc..2796339 100644 --- a/pkg/collector/utilization/sampler_collector.go +++ b/pkg/collector/utilization/sampler_collector.go @@ -33,6 +33,7 @@ import ( "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/protobuf/types/known/timestamppb" + "k8s.io/apimachinery/pkg/util/sets" ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" "sigs.k8s.io/usage-metrics-collector/pkg/api/collectorcontrollerv1alpha1" commonlog "sigs.k8s.io/usage-metrics-collector/pkg/log" @@ -97,8 +98,8 @@ type Server struct { expireFreq time.Duration ttl time.Duration - IsLeaderElected atomic.Bool - IsHealthy atomic.Bool + IsReadyResult atomic.Bool + IsHealthyResult atomic.Bool grpcServer *grpc.Server } @@ -196,7 +197,7 @@ func (s *Server) Start(ctx context.Context) error { go s.expireEntries() - s.IsHealthy.Store(true) + s.IsHealthyResult.Store(true) select { case err := <-errs: @@ -207,15 +208,15 @@ func (s *Server) Start(ctx context.Context) error { } func (s *Server) Check(ctx context.Context, _ *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) { - if !s.IsHealthy.Load() { + if !s.IsHealthyResult.Load() { return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}, fmt.Errorf("not-healthy") } return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil } // Ready returns success if the service should be accepting traffic -func (s *Server) IsLeader(context.Context, *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) { - if !s.IsLeaderElected.Load() { +func (s *Server) IsReady(context.Context, *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) { + if !s.IsReadyResult.Load() { return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}, fmt.Errorf("not-leader") } return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil @@ -241,14 +242,6 @@ func (s *Server) PushMetrics(req api.MetricsCollector_PushMetricsServer) error { var nodeName, podName string for { msg, err := req.Recv() // Read metrics message - if !s.IsLeaderElected.Load() { - // We shouldn't be getting metrics if we aren't the leader. - // This can happen if we restart and the endpoints isn't updated to remove us yet. - log.Error(fmt.Errorf("got node samples when not leader"), "only the leader should get node samples. Possible that Readiness checks are not setup.") - nonLeaderRequestsTotal.WithLabelValues(nodeName, podName, os.Getenv("POD_NAME")).Inc() - s.grpcServer.Stop() // stop the server immediately - os.Exit(1) // exit so we restart with the new server - } if err == io.EOF { log.V(5).Info("read utilization metrics eof", "node", nodeName, "pod", podName) @@ -295,17 +288,35 @@ func (c *Server) GetContainerUsageSummary(metrics map[string]*api.ListMetricsRes for n, r := range metrics { for _, c := range r.Containers { c.NodeName = n + if c.PodName != "" && c.ContainerName != "" && c.NamespaceName != "" { + log.V(10).Info("found metrics for container", + "container", c.ContainerName, + "pod", c.PodName, + "namespace", c.NamespaceName, + ) + values[sampler.ContainerKey{ + PodName: c.PodName, + ContainerName: c.ContainerName, + NamespaceName: c.NamespaceName, + }] = c + continue + } + log := log.WithValues("node", c.NodeName, "namespace", c.NamespaceName, "pod", c.PodName, "container", c.ContainerName, "pod-id", c.PodUID, "container-id", c.ContainerID) if c.PodUID == "" { // this should never happen - log.Error(errors.New("pod-id missing from summary"), "response", r) - summaryErrorsTotal.WithLabelValues(n, r.PodName, "missing-pod-id").Inc() + log.Info("missing pod UID") + summaryErrorsTotal.WithLabelValues(n, r.NodeName, "missing-pod-id").Inc() continue } if c.ContainerID == "" { // this may happen for microvms or other edge cases - log.V(1).Error(errors.New("container-id missing from summary"), "pod", c.PodUID) - summaryErrorsTotal.WithLabelValues(n, r.PodName, "missing-container-id").Inc() + log.Info("missing container ID") + summaryErrorsTotal.WithLabelValues(n, r.NodeName, "missing-container-id").Inc() continue } - values[sampler.ContainerKey{ContainerID: c.ContainerID, PodUID: c.PodUID}] = c + values[sampler.ContainerKey{ + ContainerID: c.ContainerID, + PodUID: c.PodUID, + }] = c + log.V(10).Info("found metrics for container", "container", c.ContainerID, "pod", c.PodUID) } } log.V(1).Info("return container summary", "container-count", len(values)) @@ -322,6 +333,22 @@ func (s *Server) GetMetrics() map[string]*api.ListMetricsResponse { return s.getMetrics(true) } +// GetNodeNames returns the names of the nodes we have results for +func (s *Server) GetNodeNames() sets.String { + nodes := sets.NewString() + func() { + s.ResponseMutext.Lock() + defer s.ResponseMutext.Unlock() + for k, v := range s.Responses { + if time.Since(v.Timestamp.AsTime()) > s.ttl { + continue + } + nodes.Insert(k) + } + }() + return nodes +} + // GetMetrics returns a copy of the current cached metrics func (s *Server) getMetrics(filterExpired bool) map[string]*api.ListMetricsResponse { values := make(map[string]*api.ListMetricsResponse, len(s.Responses)) diff --git a/pkg/ctrstats/ctrstats.go b/pkg/ctrstats/ctrstats.go index 195365c..ed7171b 100644 --- a/pkg/ctrstats/ctrstats.go +++ b/pkg/ctrstats/ctrstats.go @@ -34,6 +34,7 @@ type Container struct { SandboxNamespace string ContainerName string Container containerd.Container + NamespaceName string } var ctrStatsLog = logrus.WithField("source", "ctrstats") @@ -55,7 +56,7 @@ func GetContainers(client *containerd.Client) ([]Container, error) { } for _, c := range containers { - var sandboxID, podName, sandboxNamespace, containerName string + var sandboxID, sandboxNamespace string container, err := c.Info(ctx) if err != nil { @@ -77,18 +78,17 @@ func GetContainers(client *containerd.Client) ([]Container, error) { } sandboxID = spec.Annotations["io.kubernetes.cri.sandbox-id"] - podName = spec.Annotations["io.kubernetes.cri.sandbox-name"] sandboxNamespace = spec.Annotations["io.kubernetes.cri.sandbox-namespace"] - containerName = spec.Annotations["io.kubernetes.cri.container-name"] } containerInfo := Container{ ContainerID: c.ID(), SandboxID: sandboxID, PodID: container.Labels["io.kubernetes.pod.uid"], - PodName: podName, + PodName: container.Labels["io.kubernetes.pod.name"], SandboxNamespace: sandboxNamespace, - ContainerName: containerName, + ContainerName: container.Labels["io.kubernetes.container.name"], + NamespaceName: container.Labels["io.kubernetes.pod.namespace"], Container: c, } diff --git a/pkg/sampler/api/api.pb.go b/pkg/sampler/api/api.pb.go index 0415b6b..951e01b 100644 --- a/pkg/sampler/api/api.pb.go +++ b/pkg/sampler/api/api.pb.go @@ -23,6 +23,171 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +type RegisterCollectorsRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Collectors []*Collector `protobuf:"bytes,1,rep,name=collectors,proto3" json:"collectors,omitempty"` + Source string `protobuf:"bytes,2,opt,name=source,proto3" json:"source,omitempty"` + FromPod string `protobuf:"bytes,3,opt,name=fromPod,proto3" json:"fromPod,omitempty"` +} + +func (x *RegisterCollectorsRequest) Reset() { + *x = RegisterCollectorsRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_sampler_api_api_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RegisterCollectorsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RegisterCollectorsRequest) ProtoMessage() {} + +func (x *RegisterCollectorsRequest) ProtoReflect() protoreflect.Message { + mi := &file_pkg_sampler_api_api_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RegisterCollectorsRequest.ProtoReflect.Descriptor instead. +func (*RegisterCollectorsRequest) Descriptor() ([]byte, []int) { + return file_pkg_sampler_api_api_proto_rawDescGZIP(), []int{0} +} + +func (x *RegisterCollectorsRequest) GetCollectors() []*Collector { + if x != nil { + return x.Collectors + } + return nil +} + +func (x *RegisterCollectorsRequest) GetSource() string { + if x != nil { + return x.Source + } + return "" +} + +func (x *RegisterCollectorsRequest) GetFromPod() string { + if x != nil { + return x.FromPod + } + return "" +} + +type Collector struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + IpAddress string `protobuf:"bytes,1,opt,name=ipAddress,proto3" json:"ipAddress,omitempty"` + PodName string `protobuf:"bytes,2,opt,name=podName,proto3" json:"podName,omitempty"` +} + +func (x *Collector) Reset() { + *x = Collector{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_sampler_api_api_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Collector) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Collector) ProtoMessage() {} + +func (x *Collector) ProtoReflect() protoreflect.Message { + mi := &file_pkg_sampler_api_api_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Collector.ProtoReflect.Descriptor instead. +func (*Collector) Descriptor() ([]byte, []int) { + return file_pkg_sampler_api_api_proto_rawDescGZIP(), []int{1} +} + +func (x *Collector) GetIpAddress() string { + if x != nil { + return x.IpAddress + } + return "" +} + +func (x *Collector) GetPodName() string { + if x != nil { + return x.PodName + } + return "" +} + +type RegisterCollectorsResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + IpAddresses []string `protobuf:"bytes,1,rep,name=ipAddresses,proto3" json:"ipAddresses,omitempty"` +} + +func (x *RegisterCollectorsResponse) Reset() { + *x = RegisterCollectorsResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_sampler_api_api_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RegisterCollectorsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RegisterCollectorsResponse) ProtoMessage() {} + +func (x *RegisterCollectorsResponse) ProtoReflect() protoreflect.Message { + mi := &file_pkg_sampler_api_api_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RegisterCollectorsResponse.ProtoReflect.Descriptor instead. +func (*RegisterCollectorsResponse) Descriptor() ([]byte, []int) { + return file_pkg_sampler_api_api_proto_rawDescGZIP(), []int{2} +} + +func (x *RegisterCollectorsResponse) GetIpAddresses() []string { + if x != nil { + return x.IpAddresses + } + return nil +} + type ListMetricsRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -32,7 +197,7 @@ type ListMetricsRequest struct { func (x *ListMetricsRequest) Reset() { *x = ListMetricsRequest{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_sampler_api_api_proto_msgTypes[0] + mi := &file_pkg_sampler_api_api_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -45,7 +210,7 @@ func (x *ListMetricsRequest) String() string { func (*ListMetricsRequest) ProtoMessage() {} func (x *ListMetricsRequest) ProtoReflect() protoreflect.Message { - mi := &file_pkg_sampler_api_api_proto_msgTypes[0] + mi := &file_pkg_sampler_api_api_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -58,7 +223,7 @@ func (x *ListMetricsRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use ListMetricsRequest.ProtoReflect.Descriptor instead. func (*ListMetricsRequest) Descriptor() ([]byte, []int) { - return file_pkg_sampler_api_api_proto_rawDescGZIP(), []int{0} + return file_pkg_sampler_api_api_proto_rawDescGZIP(), []int{3} } type ListMetricsResponse struct { @@ -77,7 +242,7 @@ type ListMetricsResponse struct { func (x *ListMetricsResponse) Reset() { *x = ListMetricsResponse{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_sampler_api_api_proto_msgTypes[1] + mi := &file_pkg_sampler_api_api_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -90,7 +255,7 @@ func (x *ListMetricsResponse) String() string { func (*ListMetricsResponse) ProtoMessage() {} func (x *ListMetricsResponse) ProtoReflect() protoreflect.Message { - mi := &file_pkg_sampler_api_api_proto_msgTypes[1] + mi := &file_pkg_sampler_api_api_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -103,7 +268,7 @@ func (x *ListMetricsResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use ListMetricsResponse.ProtoReflect.Descriptor instead. func (*ListMetricsResponse) Descriptor() ([]byte, []int) { - return file_pkg_sampler_api_api_proto_rawDescGZIP(), []int{1} + return file_pkg_sampler_api_api_proto_rawDescGZIP(), []int{4} } func (x *ListMetricsResponse) GetContainers() []*ContainerMetrics { @@ -155,6 +320,9 @@ type ContainerMetrics struct { ContainerID string `protobuf:"bytes,1,opt,name=containerID,proto3" json:"containerID,omitempty"` PodUID string `protobuf:"bytes,2,opt,name=podUID,proto3" json:"podUID,omitempty"` + ContainerName string `protobuf:"bytes,12,opt,name=containerName,proto3" json:"containerName,omitempty"` + PodName string `protobuf:"bytes,13,opt,name=podName,proto3" json:"podName,omitempty"` + NamespaceName string `protobuf:"bytes,14,opt,name=namespaceName,proto3" json:"namespaceName,omitempty"` CpuCoresNanoSec []int64 `protobuf:"varint,3,rep,packed,name=cpuCoresNanoSec,proto3" json:"cpuCoresNanoSec,omitempty"` MemoryBytes []int64 `protobuf:"varint,4,rep,packed,name=memoryBytes,proto3" json:"memoryBytes,omitempty"` OomCount []int64 `protobuf:"varint,5,rep,packed,name=oomCount,proto3" json:"oomCount,omitempty"` @@ -169,7 +337,7 @@ type ContainerMetrics struct { func (x *ContainerMetrics) Reset() { *x = ContainerMetrics{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_sampler_api_api_proto_msgTypes[2] + mi := &file_pkg_sampler_api_api_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -182,7 +350,7 @@ func (x *ContainerMetrics) String() string { func (*ContainerMetrics) ProtoMessage() {} func (x *ContainerMetrics) ProtoReflect() protoreflect.Message { - mi := &file_pkg_sampler_api_api_proto_msgTypes[2] + mi := &file_pkg_sampler_api_api_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -195,7 +363,7 @@ func (x *ContainerMetrics) ProtoReflect() protoreflect.Message { // Deprecated: Use ContainerMetrics.ProtoReflect.Descriptor instead. func (*ContainerMetrics) Descriptor() ([]byte, []int) { - return file_pkg_sampler_api_api_proto_rawDescGZIP(), []int{2} + return file_pkg_sampler_api_api_proto_rawDescGZIP(), []int{5} } func (x *ContainerMetrics) GetContainerID() string { @@ -212,6 +380,27 @@ func (x *ContainerMetrics) GetPodUID() string { return "" } +func (x *ContainerMetrics) GetContainerName() string { + if x != nil { + return x.ContainerName + } + return "" +} + +func (x *ContainerMetrics) GetPodName() string { + if x != nil { + return x.PodName + } + return "" +} + +func (x *ContainerMetrics) GetNamespaceName() string { + if x != nil { + return x.NamespaceName + } + return "" +} + func (x *ContainerMetrics) GetCpuCoresNanoSec() []int64 { if x != nil { return x.CpuCoresNanoSec @@ -286,7 +475,7 @@ type NodeMetrics struct { func (x *NodeMetrics) Reset() { *x = NodeMetrics{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_sampler_api_api_proto_msgTypes[3] + mi := &file_pkg_sampler_api_api_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -299,7 +488,7 @@ func (x *NodeMetrics) String() string { func (*NodeMetrics) ProtoMessage() {} func (x *NodeMetrics) ProtoReflect() protoreflect.Message { - mi := &file_pkg_sampler_api_api_proto_msgTypes[3] + mi := &file_pkg_sampler_api_api_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -312,7 +501,7 @@ func (x *NodeMetrics) ProtoReflect() protoreflect.Message { // Deprecated: Use NodeMetrics.ProtoReflect.Descriptor instead. func (*NodeMetrics) Descriptor() ([]byte, []int) { - return file_pkg_sampler_api_api_proto_rawDescGZIP(), []int{3} + return file_pkg_sampler_api_api_proto_rawDescGZIP(), []int{6} } func (x *NodeMetrics) GetAggregatedMetrics() []*NodeAggregatedMetrics { @@ -335,7 +524,7 @@ type NodeAggregatedMetrics struct { func (x *NodeAggregatedMetrics) Reset() { *x = NodeAggregatedMetrics{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_sampler_api_api_proto_msgTypes[4] + mi := &file_pkg_sampler_api_api_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -348,7 +537,7 @@ func (x *NodeAggregatedMetrics) String() string { func (*NodeAggregatedMetrics) ProtoMessage() {} func (x *NodeAggregatedMetrics) ProtoReflect() protoreflect.Message { - mi := &file_pkg_sampler_api_api_proto_msgTypes[4] + mi := &file_pkg_sampler_api_api_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -361,7 +550,7 @@ func (x *NodeAggregatedMetrics) ProtoReflect() protoreflect.Message { // Deprecated: Use NodeAggregatedMetrics.ProtoReflect.Descriptor instead. func (*NodeAggregatedMetrics) Descriptor() ([]byte, []int) { - return file_pkg_sampler_api_api_proto_rawDescGZIP(), []int{4} + return file_pkg_sampler_api_api_proto_rawDescGZIP(), []int{7} } func (x *NodeAggregatedMetrics) GetAggregationLevel() string { @@ -394,7 +583,7 @@ type ConfigurePush struct { func (x *ConfigurePush) Reset() { *x = ConfigurePush{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_sampler_api_api_proto_msgTypes[5] + mi := &file_pkg_sampler_api_api_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -407,7 +596,7 @@ func (x *ConfigurePush) String() string { func (*ConfigurePush) ProtoMessage() {} func (x *ConfigurePush) ProtoReflect() protoreflect.Message { - mi := &file_pkg_sampler_api_api_proto_msgTypes[5] + mi := &file_pkg_sampler_api_api_proto_msgTypes[8] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -420,7 +609,7 @@ func (x *ConfigurePush) ProtoReflect() protoreflect.Message { // Deprecated: Use ConfigurePush.ProtoReflect.Descriptor instead. func (*ConfigurePush) Descriptor() ([]byte, []int) { - return file_pkg_sampler_api_api_proto_rawDescGZIP(), []int{5} + return file_pkg_sampler_api_api_proto_rawDescGZIP(), []int{8} } var File_pkg_sampler_api_api_proto protoreflect.FileDescriptor @@ -434,7 +623,24 @@ var file_pkg_sampler_api_api_proto_rawDesc = []byte{ 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x18, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2f, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x14, 0x0a, 0x12, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x74, 0x72, + 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x88, 0x01, 0x0a, 0x19, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, + 0x72, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x39, 0x0a, 0x0a, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x73, + 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, + 0x65, 0x72, 0x64, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, + 0x72, 0x52, 0x0a, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x73, 0x12, 0x16, 0x0a, + 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, + 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x66, 0x72, 0x6f, 0x6d, 0x50, 0x6f, 0x64, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x66, 0x72, 0x6f, 0x6d, 0x50, 0x6f, 0x64, 0x22, + 0x43, 0x0a, 0x09, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x12, 0x1c, 0x0a, 0x09, + 0x69, 0x70, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x09, 0x69, 0x70, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x6f, + 0x64, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x6f, 0x64, + 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x3e, 0x0a, 0x1a, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, + 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x69, 0x70, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x65, + 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0b, 0x69, 0x70, 0x41, 0x64, 0x64, 0x72, 0x65, + 0x73, 0x73, 0x65, 0x73, 0x22, 0x14, 0x0a, 0x12, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x90, 0x02, 0x0a, 0x13, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x40, 0x0a, 0x0a, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x73, @@ -452,81 +658,96 @@ var file_pkg_sampler_api_api_proto_rawDesc = []byte{ 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x16, 0x0a, 0x06, 0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x18, - 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x22, 0xc4, 0x03, + 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x22, 0xaa, 0x04, 0x0a, 0x10, 0x43, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x12, 0x20, 0x0a, 0x0b, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x49, 0x44, 0x12, 0x16, 0x0a, 0x06, 0x70, 0x6f, 0x64, 0x55, 0x49, 0x44, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x70, 0x6f, 0x64, 0x55, 0x49, 0x44, 0x12, 0x28, 0x0a, 0x0f, - 0x63, 0x70, 0x75, 0x43, 0x6f, 0x72, 0x65, 0x73, 0x4e, 0x61, 0x6e, 0x6f, 0x53, 0x65, 0x63, 0x18, - 0x03, 0x20, 0x03, 0x28, 0x03, 0x52, 0x0f, 0x63, 0x70, 0x75, 0x43, 0x6f, 0x72, 0x65, 0x73, 0x4e, - 0x61, 0x6e, 0x6f, 0x53, 0x65, 0x63, 0x12, 0x20, 0x0a, 0x0b, 0x6d, 0x65, 0x6d, 0x6f, 0x72, 0x79, - 0x42, 0x79, 0x74, 0x65, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x03, 0x52, 0x0b, 0x6d, 0x65, 0x6d, - 0x6f, 0x72, 0x79, 0x42, 0x79, 0x74, 0x65, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x6f, 0x6f, 0x6d, 0x43, - 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x05, 0x20, 0x03, 0x28, 0x03, 0x52, 0x08, 0x6f, 0x6f, 0x6d, 0x43, - 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x22, 0x0a, 0x0c, 0x6f, 0x6f, 0x6d, 0x4b, 0x69, 0x6c, 0x6c, 0x43, - 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x06, 0x20, 0x03, 0x28, 0x03, 0x52, 0x0c, 0x6f, 0x6f, 0x6d, 0x4b, - 0x69, 0x6c, 0x6c, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x30, 0x0a, 0x13, 0x63, 0x70, 0x75, 0x54, - 0x68, 0x72, 0x6f, 0x74, 0x74, 0x6c, 0x65, 0x64, 0x4e, 0x61, 0x6e, 0x6f, 0x53, 0x65, 0x63, 0x18, - 0x07, 0x20, 0x03, 0x28, 0x03, 0x52, 0x13, 0x63, 0x70, 0x75, 0x54, 0x68, 0x72, 0x6f, 0x74, 0x74, - 0x6c, 0x65, 0x64, 0x4e, 0x61, 0x6e, 0x6f, 0x53, 0x65, 0x63, 0x12, 0x3e, 0x0a, 0x1a, 0x63, 0x70, - 0x75, 0x50, 0x65, 0x72, 0x63, 0x65, 0x6e, 0x74, 0x50, 0x65, 0x72, 0x69, 0x6f, 0x64, 0x73, 0x54, - 0x68, 0x72, 0x6f, 0x74, 0x74, 0x6c, 0x65, 0x64, 0x18, 0x08, 0x20, 0x03, 0x28, 0x02, 0x52, 0x1a, - 0x63, 0x70, 0x75, 0x50, 0x65, 0x72, 0x63, 0x65, 0x6e, 0x74, 0x50, 0x65, 0x72, 0x69, 0x6f, 0x64, - 0x73, 0x54, 0x68, 0x72, 0x6f, 0x74, 0x74, 0x6c, 0x65, 0x64, 0x12, 0x24, 0x0a, 0x0d, 0x63, 0x70, - 0x75, 0x50, 0x65, 0x72, 0x69, 0x6f, 0x64, 0x73, 0x53, 0x65, 0x63, 0x18, 0x09, 0x20, 0x03, 0x28, - 0x03, 0x52, 0x0d, 0x63, 0x70, 0x75, 0x50, 0x65, 0x72, 0x69, 0x6f, 0x64, 0x73, 0x53, 0x65, 0x63, - 0x12, 0x36, 0x0a, 0x16, 0x63, 0x70, 0x75, 0x54, 0x68, 0x72, 0x6f, 0x74, 0x74, 0x6c, 0x65, 0x64, - 0x50, 0x65, 0x72, 0x69, 0x6f, 0x64, 0x73, 0x53, 0x65, 0x63, 0x18, 0x0a, 0x20, 0x03, 0x28, 0x03, - 0x52, 0x16, 0x63, 0x70, 0x75, 0x54, 0x68, 0x72, 0x6f, 0x74, 0x74, 0x6c, 0x65, 0x64, 0x50, 0x65, - 0x72, 0x69, 0x6f, 0x64, 0x73, 0x53, 0x65, 0x63, 0x12, 0x1a, 0x0a, 0x08, 0x6e, 0x6f, 0x64, 0x65, - 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6e, 0x6f, 0x64, 0x65, - 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x62, 0x0a, 0x0b, 0x4e, 0x6f, 0x64, 0x65, 0x4d, 0x65, 0x74, 0x72, - 0x69, 0x63, 0x73, 0x12, 0x53, 0x0a, 0x11, 0x61, 0x67, 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, 0x65, - 0x64, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x25, - 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2e, 0x61, 0x70, 0x69, 0x2e, - 0x4e, 0x6f, 0x64, 0x65, 0x41, 0x67, 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, 0x65, 0x64, 0x4d, 0x65, - 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x11, 0x61, 0x67, 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, 0x65, - 0x64, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x22, 0x8f, 0x01, 0x0a, 0x15, 0x4e, 0x6f, 0x64, - 0x65, 0x41, 0x67, 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x74, 0x72, 0x69, - 0x63, 0x73, 0x12, 0x2a, 0x0a, 0x10, 0x61, 0x67, 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, 0x69, 0x6f, - 0x6e, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x61, 0x67, - 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x12, 0x28, - 0x0a, 0x0f, 0x63, 0x70, 0x75, 0x43, 0x6f, 0x72, 0x65, 0x73, 0x4e, 0x61, 0x6e, 0x6f, 0x53, 0x65, - 0x63, 0x18, 0x02, 0x20, 0x03, 0x28, 0x03, 0x52, 0x0f, 0x63, 0x70, 0x75, 0x43, 0x6f, 0x72, 0x65, - 0x73, 0x4e, 0x61, 0x6e, 0x6f, 0x53, 0x65, 0x63, 0x12, 0x20, 0x0a, 0x0b, 0x6d, 0x65, 0x6d, 0x6f, - 0x72, 0x79, 0x42, 0x79, 0x74, 0x65, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x03, 0x52, 0x0b, 0x6d, - 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x42, 0x79, 0x74, 0x65, 0x73, 0x22, 0x0f, 0x0a, 0x0d, 0x43, 0x6f, - 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x50, 0x75, 0x73, 0x68, 0x32, 0x76, 0x0a, 0x07, 0x4d, - 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x12, 0x6b, 0x0a, 0x0b, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, - 0x74, 0x72, 0x69, 0x63, 0x73, 0x12, 0x22, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, - 0x72, 0x64, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x74, 0x72, 0x69, - 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x63, 0x6f, 0x6e, 0x74, - 0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4d, - 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x13, - 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x0d, 0x12, 0x0b, 0x2f, 0x76, 0x31, 0x2f, 0x6d, 0x65, 0x74, 0x72, - 0x69, 0x63, 0x73, 0x32, 0xdb, 0x01, 0x0a, 0x06, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x12, 0x65, - 0x0a, 0x05, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x12, 0x22, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, - 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, - 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x67, 0x72, - 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, - 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x22, 0x13, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x0d, 0x12, 0x0b, 0x2f, 0x76, 0x31, 0x2f, 0x68, 0x65, - 0x61, 0x6c, 0x74, 0x68, 0x7a, 0x12, 0x6a, 0x0a, 0x08, 0x49, 0x73, 0x4c, 0x65, 0x61, 0x64, 0x65, - 0x72, 0x12, 0x22, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, - 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x70, 0x6f, 0x64, 0x55, 0x49, 0x44, 0x12, 0x24, 0x0a, 0x0d, + 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x0c, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x0d, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x4e, 0x61, + 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x6f, 0x64, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x0d, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x6f, 0x64, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x24, 0x0a, 0x0d, + 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x0e, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x0d, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x4e, 0x61, + 0x6d, 0x65, 0x12, 0x28, 0x0a, 0x0f, 0x63, 0x70, 0x75, 0x43, 0x6f, 0x72, 0x65, 0x73, 0x4e, 0x61, + 0x6e, 0x6f, 0x53, 0x65, 0x63, 0x18, 0x03, 0x20, 0x03, 0x28, 0x03, 0x52, 0x0f, 0x63, 0x70, 0x75, + 0x43, 0x6f, 0x72, 0x65, 0x73, 0x4e, 0x61, 0x6e, 0x6f, 0x53, 0x65, 0x63, 0x12, 0x20, 0x0a, 0x0b, + 0x6d, 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x42, 0x79, 0x74, 0x65, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, + 0x03, 0x52, 0x0b, 0x6d, 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x42, 0x79, 0x74, 0x65, 0x73, 0x12, 0x1a, + 0x0a, 0x08, 0x6f, 0x6f, 0x6d, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x05, 0x20, 0x03, 0x28, 0x03, + 0x52, 0x08, 0x6f, 0x6f, 0x6d, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x22, 0x0a, 0x0c, 0x6f, 0x6f, + 0x6d, 0x4b, 0x69, 0x6c, 0x6c, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x06, 0x20, 0x03, 0x28, 0x03, + 0x52, 0x0c, 0x6f, 0x6f, 0x6d, 0x4b, 0x69, 0x6c, 0x6c, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x30, + 0x0a, 0x13, 0x63, 0x70, 0x75, 0x54, 0x68, 0x72, 0x6f, 0x74, 0x74, 0x6c, 0x65, 0x64, 0x4e, 0x61, + 0x6e, 0x6f, 0x53, 0x65, 0x63, 0x18, 0x07, 0x20, 0x03, 0x28, 0x03, 0x52, 0x13, 0x63, 0x70, 0x75, + 0x54, 0x68, 0x72, 0x6f, 0x74, 0x74, 0x6c, 0x65, 0x64, 0x4e, 0x61, 0x6e, 0x6f, 0x53, 0x65, 0x63, + 0x12, 0x3e, 0x0a, 0x1a, 0x63, 0x70, 0x75, 0x50, 0x65, 0x72, 0x63, 0x65, 0x6e, 0x74, 0x50, 0x65, + 0x72, 0x69, 0x6f, 0x64, 0x73, 0x54, 0x68, 0x72, 0x6f, 0x74, 0x74, 0x6c, 0x65, 0x64, 0x18, 0x08, + 0x20, 0x03, 0x28, 0x02, 0x52, 0x1a, 0x63, 0x70, 0x75, 0x50, 0x65, 0x72, 0x63, 0x65, 0x6e, 0x74, + 0x50, 0x65, 0x72, 0x69, 0x6f, 0x64, 0x73, 0x54, 0x68, 0x72, 0x6f, 0x74, 0x74, 0x6c, 0x65, 0x64, + 0x12, 0x24, 0x0a, 0x0d, 0x63, 0x70, 0x75, 0x50, 0x65, 0x72, 0x69, 0x6f, 0x64, 0x73, 0x53, 0x65, + 0x63, 0x18, 0x09, 0x20, 0x03, 0x28, 0x03, 0x52, 0x0d, 0x63, 0x70, 0x75, 0x50, 0x65, 0x72, 0x69, + 0x6f, 0x64, 0x73, 0x53, 0x65, 0x63, 0x12, 0x36, 0x0a, 0x16, 0x63, 0x70, 0x75, 0x54, 0x68, 0x72, + 0x6f, 0x74, 0x74, 0x6c, 0x65, 0x64, 0x50, 0x65, 0x72, 0x69, 0x6f, 0x64, 0x73, 0x53, 0x65, 0x63, + 0x18, 0x0a, 0x20, 0x03, 0x28, 0x03, 0x52, 0x16, 0x63, 0x70, 0x75, 0x54, 0x68, 0x72, 0x6f, 0x74, + 0x74, 0x6c, 0x65, 0x64, 0x50, 0x65, 0x72, 0x69, 0x6f, 0x64, 0x73, 0x53, 0x65, 0x63, 0x12, 0x1a, + 0x0a, 0x08, 0x6e, 0x6f, 0x64, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x08, 0x6e, 0x6f, 0x64, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x62, 0x0a, 0x0b, 0x4e, 0x6f, + 0x64, 0x65, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x12, 0x53, 0x0a, 0x11, 0x61, 0x67, 0x67, + 0x72, 0x65, 0x67, 0x61, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x18, 0x01, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, + 0x64, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x41, 0x67, 0x67, 0x72, 0x65, 0x67, + 0x61, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x11, 0x61, 0x67, 0x67, + 0x72, 0x65, 0x67, 0x61, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x22, 0x8f, + 0x01, 0x0a, 0x15, 0x4e, 0x6f, 0x64, 0x65, 0x41, 0x67, 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, 0x65, + 0x64, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x12, 0x2a, 0x0a, 0x10, 0x61, 0x67, 0x67, 0x72, + 0x65, 0x67, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x10, 0x61, 0x67, 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4c, + 0x65, 0x76, 0x65, 0x6c, 0x12, 0x28, 0x0a, 0x0f, 0x63, 0x70, 0x75, 0x43, 0x6f, 0x72, 0x65, 0x73, + 0x4e, 0x61, 0x6e, 0x6f, 0x53, 0x65, 0x63, 0x18, 0x02, 0x20, 0x03, 0x28, 0x03, 0x52, 0x0f, 0x63, + 0x70, 0x75, 0x43, 0x6f, 0x72, 0x65, 0x73, 0x4e, 0x61, 0x6e, 0x6f, 0x53, 0x65, 0x63, 0x12, 0x20, + 0x0a, 0x0b, 0x6d, 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x42, 0x79, 0x74, 0x65, 0x73, 0x18, 0x03, 0x20, + 0x03, 0x28, 0x03, 0x52, 0x0b, 0x6d, 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x42, 0x79, 0x74, 0x65, 0x73, + 0x22, 0x0f, 0x0a, 0x0d, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x50, 0x75, 0x73, + 0x68, 0x32, 0xfd, 0x01, 0x0a, 0x07, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x12, 0x6b, 0x0a, + 0x0b, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x12, 0x22, 0x2e, 0x63, + 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x4c, 0x69, + 0x73, 0x74, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x23, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2e, 0x61, 0x70, + 0x69, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x13, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x0d, 0x12, 0x0b, 0x2f, + 0x76, 0x31, 0x2f, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x12, 0x84, 0x01, 0x0a, 0x12, 0x52, + 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, + 0x73, 0x12, 0x29, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2e, 0x61, + 0x70, 0x69, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x43, 0x6f, 0x6c, 0x6c, 0x65, + 0x63, 0x74, 0x6f, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2a, 0x2e, 0x63, + 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x52, 0x65, + 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x73, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x17, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x11, + 0x22, 0x0c, 0x2f, 0x76, 0x31, 0x2f, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x3a, 0x01, + 0x2a, 0x32, 0xd9, 0x01, 0x0a, 0x06, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x12, 0x65, 0x0a, 0x05, + 0x43, 0x68, 0x65, 0x63, 0x6b, 0x12, 0x22, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, - 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x15, 0x82, 0xd3, 0xe4, 0x93, - 0x02, 0x0f, 0x12, 0x0d, 0x2f, 0x76, 0x31, 0x2f, 0x69, 0x73, 0x2d, 0x6c, 0x65, 0x61, 0x64, 0x65, - 0x72, 0x32, 0x6b, 0x0a, 0x10, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x43, 0x6f, 0x6c, 0x6c, - 0x65, 0x63, 0x74, 0x6f, 0x72, 0x12, 0x57, 0x0a, 0x0b, 0x50, 0x75, 0x73, 0x68, 0x4d, 0x65, 0x74, - 0x72, 0x69, 0x63, 0x73, 0x12, 0x23, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, - 0x64, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, - 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, 0x1d, 0x2e, 0x63, 0x6f, 0x6e, 0x74, - 0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, - 0x67, 0x75, 0x72, 0x65, 0x50, 0x75, 0x73, 0x68, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x07, - 0x5a, 0x05, 0x2e, 0x2f, 0x61, 0x70, 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x67, 0x72, 0x70, 0x63, + 0x2e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, + 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x13, + 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x0d, 0x12, 0x0b, 0x2f, 0x76, 0x31, 0x2f, 0x68, 0x65, 0x61, 0x6c, + 0x74, 0x68, 0x7a, 0x12, 0x68, 0x0a, 0x07, 0x49, 0x73, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x22, + 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e, + 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, + 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x14, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x0e, 0x12, + 0x0c, 0x2f, 0x76, 0x31, 0x2f, 0x69, 0x73, 0x2d, 0x72, 0x65, 0x61, 0x64, 0x79, 0x32, 0x6b, 0x0a, + 0x10, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, + 0x72, 0x12, 0x57, 0x0a, 0x0b, 0x50, 0x75, 0x73, 0x68, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, + 0x12, 0x23, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2e, 0x61, 0x70, + 0x69, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, 0x1d, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, + 0x72, 0x64, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, + 0x50, 0x75, 0x73, 0x68, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x07, 0x5a, 0x05, 0x2e, 0x2f, + 0x61, 0x70, 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -541,36 +762,42 @@ func file_pkg_sampler_api_api_proto_rawDescGZIP() []byte { return file_pkg_sampler_api_api_proto_rawDescData } -var file_pkg_sampler_api_api_proto_msgTypes = make([]protoimpl.MessageInfo, 6) +var file_pkg_sampler_api_api_proto_msgTypes = make([]protoimpl.MessageInfo, 9) var file_pkg_sampler_api_api_proto_goTypes = []interface{}{ - (*ListMetricsRequest)(nil), // 0: containerd.api.ListMetricsRequest - (*ListMetricsResponse)(nil), // 1: containerd.api.ListMetricsResponse - (*ContainerMetrics)(nil), // 2: containerd.api.ContainerMetrics - (*NodeMetrics)(nil), // 3: containerd.api.NodeMetrics - (*NodeAggregatedMetrics)(nil), // 4: containerd.api.NodeAggregatedMetrics - (*ConfigurePush)(nil), // 5: containerd.api.ConfigurePush - (*timestamppb.Timestamp)(nil), // 6: google.protobuf.Timestamp - (*grpc_health_v1.HealthCheckRequest)(nil), // 7: grpc.health.v1.HealthCheckRequest - (*grpc_health_v1.HealthCheckResponse)(nil), // 8: grpc.health.v1.HealthCheckResponse + (*RegisterCollectorsRequest)(nil), // 0: containerd.api.RegisterCollectorsRequest + (*Collector)(nil), // 1: containerd.api.Collector + (*RegisterCollectorsResponse)(nil), // 2: containerd.api.RegisterCollectorsResponse + (*ListMetricsRequest)(nil), // 3: containerd.api.ListMetricsRequest + (*ListMetricsResponse)(nil), // 4: containerd.api.ListMetricsResponse + (*ContainerMetrics)(nil), // 5: containerd.api.ContainerMetrics + (*NodeMetrics)(nil), // 6: containerd.api.NodeMetrics + (*NodeAggregatedMetrics)(nil), // 7: containerd.api.NodeAggregatedMetrics + (*ConfigurePush)(nil), // 8: containerd.api.ConfigurePush + (*timestamppb.Timestamp)(nil), // 9: google.protobuf.Timestamp + (*grpc_health_v1.HealthCheckRequest)(nil), // 10: grpc.health.v1.HealthCheckRequest + (*grpc_health_v1.HealthCheckResponse)(nil), // 11: grpc.health.v1.HealthCheckResponse } var file_pkg_sampler_api_api_proto_depIdxs = []int32{ - 2, // 0: containerd.api.ListMetricsResponse.containers:type_name -> containerd.api.ContainerMetrics - 3, // 1: containerd.api.ListMetricsResponse.node:type_name -> containerd.api.NodeMetrics - 6, // 2: containerd.api.ListMetricsResponse.timestamp:type_name -> google.protobuf.Timestamp - 4, // 3: containerd.api.NodeMetrics.aggregatedMetrics:type_name -> containerd.api.NodeAggregatedMetrics - 0, // 4: containerd.api.Metrics.ListMetrics:input_type -> containerd.api.ListMetricsRequest - 7, // 5: containerd.api.Health.Check:input_type -> grpc.health.v1.HealthCheckRequest - 7, // 6: containerd.api.Health.IsLeader:input_type -> grpc.health.v1.HealthCheckRequest - 1, // 7: containerd.api.MetricsCollector.PushMetrics:input_type -> containerd.api.ListMetricsResponse - 1, // 8: containerd.api.Metrics.ListMetrics:output_type -> containerd.api.ListMetricsResponse - 8, // 9: containerd.api.Health.Check:output_type -> grpc.health.v1.HealthCheckResponse - 8, // 10: containerd.api.Health.IsLeader:output_type -> grpc.health.v1.HealthCheckResponse - 5, // 11: containerd.api.MetricsCollector.PushMetrics:output_type -> containerd.api.ConfigurePush - 8, // [8:12] is the sub-list for method output_type - 4, // [4:8] is the sub-list for method input_type - 4, // [4:4] is the sub-list for extension type_name - 4, // [4:4] is the sub-list for extension extendee - 0, // [0:4] is the sub-list for field type_name + 1, // 0: containerd.api.RegisterCollectorsRequest.collectors:type_name -> containerd.api.Collector + 5, // 1: containerd.api.ListMetricsResponse.containers:type_name -> containerd.api.ContainerMetrics + 6, // 2: containerd.api.ListMetricsResponse.node:type_name -> containerd.api.NodeMetrics + 9, // 3: containerd.api.ListMetricsResponse.timestamp:type_name -> google.protobuf.Timestamp + 7, // 4: containerd.api.NodeMetrics.aggregatedMetrics:type_name -> containerd.api.NodeAggregatedMetrics + 3, // 5: containerd.api.Metrics.ListMetrics:input_type -> containerd.api.ListMetricsRequest + 0, // 6: containerd.api.Metrics.RegisterCollectors:input_type -> containerd.api.RegisterCollectorsRequest + 10, // 7: containerd.api.Health.Check:input_type -> grpc.health.v1.HealthCheckRequest + 10, // 8: containerd.api.Health.IsReady:input_type -> grpc.health.v1.HealthCheckRequest + 4, // 9: containerd.api.MetricsCollector.PushMetrics:input_type -> containerd.api.ListMetricsResponse + 4, // 10: containerd.api.Metrics.ListMetrics:output_type -> containerd.api.ListMetricsResponse + 2, // 11: containerd.api.Metrics.RegisterCollectors:output_type -> containerd.api.RegisterCollectorsResponse + 11, // 12: containerd.api.Health.Check:output_type -> grpc.health.v1.HealthCheckResponse + 11, // 13: containerd.api.Health.IsReady:output_type -> grpc.health.v1.HealthCheckResponse + 8, // 14: containerd.api.MetricsCollector.PushMetrics:output_type -> containerd.api.ConfigurePush + 10, // [10:15] is the sub-list for method output_type + 5, // [5:10] is the sub-list for method input_type + 5, // [5:5] is the sub-list for extension type_name + 5, // [5:5] is the sub-list for extension extendee + 0, // [0:5] is the sub-list for field type_name } func init() { file_pkg_sampler_api_api_proto_init() } @@ -580,7 +807,7 @@ func file_pkg_sampler_api_api_proto_init() { } if !protoimpl.UnsafeEnabled { file_pkg_sampler_api_api_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ListMetricsRequest); i { + switch v := v.(*RegisterCollectorsRequest); i { case 0: return &v.state case 1: @@ -592,7 +819,7 @@ func file_pkg_sampler_api_api_proto_init() { } } file_pkg_sampler_api_api_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ListMetricsResponse); i { + switch v := v.(*Collector); i { case 0: return &v.state case 1: @@ -604,7 +831,7 @@ func file_pkg_sampler_api_api_proto_init() { } } file_pkg_sampler_api_api_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ContainerMetrics); i { + switch v := v.(*RegisterCollectorsResponse); i { case 0: return &v.state case 1: @@ -616,7 +843,7 @@ func file_pkg_sampler_api_api_proto_init() { } } file_pkg_sampler_api_api_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*NodeMetrics); i { + switch v := v.(*ListMetricsRequest); i { case 0: return &v.state case 1: @@ -628,7 +855,7 @@ func file_pkg_sampler_api_api_proto_init() { } } file_pkg_sampler_api_api_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*NodeAggregatedMetrics); i { + switch v := v.(*ListMetricsResponse); i { case 0: return &v.state case 1: @@ -640,6 +867,42 @@ func file_pkg_sampler_api_api_proto_init() { } } file_pkg_sampler_api_api_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ContainerMetrics); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkg_sampler_api_api_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*NodeMetrics); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkg_sampler_api_api_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*NodeAggregatedMetrics); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkg_sampler_api_api_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ConfigurePush); i { case 0: return &v.state @@ -658,7 +921,7 @@ func file_pkg_sampler_api_api_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_pkg_sampler_api_api_proto_rawDesc, NumEnums: 0, - NumMessages: 6, + NumMessages: 9, NumExtensions: 0, NumServices: 3, }, diff --git a/pkg/sampler/api/api.pb.gw.go b/pkg/sampler/api/api.pb.gw.go index b4ac9a8..4deff87 100644 --- a/pkg/sampler/api/api.pb.gw.go +++ b/pkg/sampler/api/api.pb.gw.go @@ -50,6 +50,40 @@ func local_request_Metrics_ListMetrics_0(ctx context.Context, marshaler runtime. } +func request_Metrics_RegisterCollectors_0(ctx context.Context, marshaler runtime.Marshaler, client MetricsClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq RegisterCollectorsRequest + var metadata runtime.ServerMetadata + + newReader, berr := utilities.IOReaderFactory(req.Body) + if berr != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", berr) + } + if err := marshaler.NewDecoder(newReader()).Decode(&protoReq); err != nil && err != io.EOF { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + + msg, err := client.RegisterCollectors(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) + return msg, metadata, err + +} + +func local_request_Metrics_RegisterCollectors_0(ctx context.Context, marshaler runtime.Marshaler, server MetricsServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq RegisterCollectorsRequest + var metadata runtime.ServerMetadata + + newReader, berr := utilities.IOReaderFactory(req.Body) + if berr != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", berr) + } + if err := marshaler.NewDecoder(newReader()).Decode(&protoReq); err != nil && err != io.EOF { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + + msg, err := server.RegisterCollectors(ctx, &protoReq) + return msg, metadata, err + +} + var ( filter_Health_Check_0 = &utilities.DoubleArray{Encoding: map[string]int{}, Base: []int(nil), Check: []int(nil)} ) @@ -87,37 +121,37 @@ func local_request_Health_Check_0(ctx context.Context, marshaler runtime.Marshal } var ( - filter_Health_IsLeader_0 = &utilities.DoubleArray{Encoding: map[string]int{}, Base: []int(nil), Check: []int(nil)} + filter_Health_IsReady_0 = &utilities.DoubleArray{Encoding: map[string]int{}, Base: []int(nil), Check: []int(nil)} ) -func request_Health_IsLeader_0(ctx context.Context, marshaler runtime.Marshaler, client HealthClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { +func request_Health_IsReady_0(ctx context.Context, marshaler runtime.Marshaler, client HealthClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { var protoReq grpc_health_v1.HealthCheckRequest var metadata runtime.ServerMetadata if err := req.ParseForm(); err != nil { return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) } - if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_Health_IsLeader_0); err != nil { + if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_Health_IsReady_0); err != nil { return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) } - msg, err := client.IsLeader(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) + msg, err := client.IsReady(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) return msg, metadata, err } -func local_request_Health_IsLeader_0(ctx context.Context, marshaler runtime.Marshaler, server HealthServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { +func local_request_Health_IsReady_0(ctx context.Context, marshaler runtime.Marshaler, server HealthServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { var protoReq grpc_health_v1.HealthCheckRequest var metadata runtime.ServerMetadata if err := req.ParseForm(); err != nil { return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) } - if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_Health_IsLeader_0); err != nil { + if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_Health_IsReady_0); err != nil { return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) } - msg, err := server.IsLeader(ctx, &protoReq) + msg, err := server.IsReady(ctx, &protoReq) return msg, metadata, err } @@ -153,6 +187,31 @@ func RegisterMetricsHandlerServer(ctx context.Context, mux *runtime.ServeMux, se }) + mux.Handle("POST", pattern_Metrics_RegisterCollectors_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + var err error + var annotatedContext context.Context + annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/containerd.api.Metrics/RegisterCollectors", runtime.WithHTTPPathPattern("/v1/register")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := local_request_Metrics_RegisterCollectors_0(annotatedContext, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + + forward_Metrics_RegisterCollectors_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + return nil } @@ -187,7 +246,7 @@ func RegisterHealthHandlerServer(ctx context.Context, mux *runtime.ServeMux, ser }) - mux.Handle("GET", pattern_Health_IsLeader_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + mux.Handle("GET", pattern_Health_IsReady_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() var stream runtime.ServerTransportStream @@ -195,12 +254,12 @@ func RegisterHealthHandlerServer(ctx context.Context, mux *runtime.ServeMux, ser inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) var err error var annotatedContext context.Context - annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/containerd.api.Health/IsLeader", runtime.WithHTTPPathPattern("/v1/is-leader")) + annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/containerd.api.Health/IsReady", runtime.WithHTTPPathPattern("/v1/is-ready")) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) return } - resp, md, err := local_request_Health_IsLeader_0(annotatedContext, inboundMarshaler, server, req, pathParams) + resp, md, err := local_request_Health_IsReady_0(annotatedContext, inboundMarshaler, server, req, pathParams) md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) if err != nil { @@ -208,7 +267,7 @@ func RegisterHealthHandlerServer(ctx context.Context, mux *runtime.ServeMux, ser return } - forward_Health_IsLeader_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + forward_Health_IsReady_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) }) @@ -275,15 +334,41 @@ func RegisterMetricsHandlerClient(ctx context.Context, mux *runtime.ServeMux, cl }) + mux.Handle("POST", pattern_Metrics_RegisterCollectors_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + var err error + var annotatedContext context.Context + annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/containerd.api.Metrics/RegisterCollectors", runtime.WithHTTPPathPattern("/v1/register")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_Metrics_RegisterCollectors_0(annotatedContext, inboundMarshaler, client, req, pathParams) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + + forward_Metrics_RegisterCollectors_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + return nil } var ( pattern_Metrics_ListMetrics_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"v1", "metrics"}, "")) + + pattern_Metrics_RegisterCollectors_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"v1", "register"}, "")) ) var ( forward_Metrics_ListMetrics_0 = runtime.ForwardResponseMessage + + forward_Metrics_RegisterCollectors_0 = runtime.ForwardResponseMessage ) // RegisterHealthHandlerFromEndpoint is same as RegisterHealthHandler but @@ -346,25 +431,25 @@ func RegisterHealthHandlerClient(ctx context.Context, mux *runtime.ServeMux, cli }) - mux.Handle("GET", pattern_Health_IsLeader_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + mux.Handle("GET", pattern_Health_IsReady_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) var err error var annotatedContext context.Context - annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/containerd.api.Health/IsLeader", runtime.WithHTTPPathPattern("/v1/is-leader")) + annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/containerd.api.Health/IsReady", runtime.WithHTTPPathPattern("/v1/is-ready")) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) return } - resp, md, err := request_Health_IsLeader_0(annotatedContext, inboundMarshaler, client, req, pathParams) + resp, md, err := request_Health_IsReady_0(annotatedContext, inboundMarshaler, client, req, pathParams) annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) if err != nil { runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) return } - forward_Health_IsLeader_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + forward_Health_IsReady_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) }) @@ -374,11 +459,11 @@ func RegisterHealthHandlerClient(ctx context.Context, mux *runtime.ServeMux, cli var ( pattern_Health_Check_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"v1", "healthz"}, "")) - pattern_Health_IsLeader_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"v1", "is-leader"}, "")) + pattern_Health_IsReady_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"v1", "is-ready"}, "")) ) var ( forward_Health_Check_0 = runtime.ForwardResponseMessage - forward_Health_IsLeader_0 = runtime.ForwardResponseMessage + forward_Health_IsReady_0 = runtime.ForwardResponseMessage ) diff --git a/pkg/sampler/api/api_grpc.pb.go b/pkg/sampler/api/api_grpc.pb.go index 8459bec..95bb0c1 100644 --- a/pkg/sampler/api/api_grpc.pb.go +++ b/pkg/sampler/api/api_grpc.pb.go @@ -24,6 +24,7 @@ const _ = grpc.SupportPackageIsVersion7 // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type MetricsClient interface { ListMetrics(ctx context.Context, in *ListMetricsRequest, opts ...grpc.CallOption) (*ListMetricsResponse, error) + RegisterCollectors(ctx context.Context, in *RegisterCollectorsRequest, opts ...grpc.CallOption) (*RegisterCollectorsResponse, error) } type metricsClient struct { @@ -43,11 +44,21 @@ func (c *metricsClient) ListMetrics(ctx context.Context, in *ListMetricsRequest, return out, nil } +func (c *metricsClient) RegisterCollectors(ctx context.Context, in *RegisterCollectorsRequest, opts ...grpc.CallOption) (*RegisterCollectorsResponse, error) { + out := new(RegisterCollectorsResponse) + err := c.cc.Invoke(ctx, "/containerd.api.Metrics/RegisterCollectors", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // MetricsServer is the server API for Metrics service. // All implementations must embed UnimplementedMetricsServer // for forward compatibility type MetricsServer interface { ListMetrics(context.Context, *ListMetricsRequest) (*ListMetricsResponse, error) + RegisterCollectors(context.Context, *RegisterCollectorsRequest) (*RegisterCollectorsResponse, error) mustEmbedUnimplementedMetricsServer() } @@ -58,6 +69,9 @@ type UnimplementedMetricsServer struct { func (UnimplementedMetricsServer) ListMetrics(context.Context, *ListMetricsRequest) (*ListMetricsResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method ListMetrics not implemented") } +func (UnimplementedMetricsServer) RegisterCollectors(context.Context, *RegisterCollectorsRequest) (*RegisterCollectorsResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method RegisterCollectors not implemented") +} func (UnimplementedMetricsServer) mustEmbedUnimplementedMetricsServer() {} // UnsafeMetricsServer may be embedded to opt out of forward compatibility for this service. @@ -89,6 +103,24 @@ func _Metrics_ListMetrics_Handler(srv interface{}, ctx context.Context, dec func return interceptor(ctx, in, info, handler) } +func _Metrics_RegisterCollectors_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(RegisterCollectorsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MetricsServer).RegisterCollectors(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/containerd.api.Metrics/RegisterCollectors", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MetricsServer).RegisterCollectors(ctx, req.(*RegisterCollectorsRequest)) + } + return interceptor(ctx, in, info, handler) +} + // Metrics_ServiceDesc is the grpc.ServiceDesc for Metrics service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -100,6 +132,10 @@ var Metrics_ServiceDesc = grpc.ServiceDesc{ MethodName: "ListMetrics", Handler: _Metrics_ListMetrics_Handler, }, + { + MethodName: "RegisterCollectors", + Handler: _Metrics_RegisterCollectors_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "pkg/sampler/api/api.proto", @@ -110,7 +146,7 @@ var Metrics_ServiceDesc = grpc.ServiceDesc{ // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type HealthClient interface { Check(ctx context.Context, in *grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) - IsLeader(ctx context.Context, in *grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) + IsReady(ctx context.Context, in *grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) } type healthClient struct { @@ -130,9 +166,9 @@ func (c *healthClient) Check(ctx context.Context, in *grpc_health_v1.HealthCheck return out, nil } -func (c *healthClient) IsLeader(ctx context.Context, in *grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) { +func (c *healthClient) IsReady(ctx context.Context, in *grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) { out := new(grpc_health_v1.HealthCheckResponse) - err := c.cc.Invoke(ctx, "/containerd.api.Health/IsLeader", in, out, opts...) + err := c.cc.Invoke(ctx, "/containerd.api.Health/IsReady", in, out, opts...) if err != nil { return nil, err } @@ -144,7 +180,7 @@ func (c *healthClient) IsLeader(ctx context.Context, in *grpc_health_v1.HealthCh // for forward compatibility type HealthServer interface { Check(context.Context, *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) - IsLeader(context.Context, *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) + IsReady(context.Context, *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) mustEmbedUnimplementedHealthServer() } @@ -155,8 +191,8 @@ type UnimplementedHealthServer struct { func (UnimplementedHealthServer) Check(context.Context, *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Check not implemented") } -func (UnimplementedHealthServer) IsLeader(context.Context, *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method IsLeader not implemented") +func (UnimplementedHealthServer) IsReady(context.Context, *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method IsReady not implemented") } func (UnimplementedHealthServer) mustEmbedUnimplementedHealthServer() {} @@ -189,20 +225,20 @@ func _Health_Check_Handler(srv interface{}, ctx context.Context, dec func(interf return interceptor(ctx, in, info, handler) } -func _Health_IsLeader_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +func _Health_IsReady_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(grpc_health_v1.HealthCheckRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(HealthServer).IsLeader(ctx, in) + return srv.(HealthServer).IsReady(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/containerd.api.Health/IsLeader", + FullMethod: "/containerd.api.Health/IsReady", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(HealthServer).IsLeader(ctx, req.(*grpc_health_v1.HealthCheckRequest)) + return srv.(HealthServer).IsReady(ctx, req.(*grpc_health_v1.HealthCheckRequest)) } return interceptor(ctx, in, info, handler) } @@ -219,8 +255,8 @@ var Health_ServiceDesc = grpc.ServiceDesc{ Handler: _Health_Check_Handler, }, { - MethodName: "IsLeader", - Handler: _Health_IsLeader_Handler, + MethodName: "IsReady", + Handler: _Health_IsReady_Handler, }, }, Streams: []grpc.StreamDesc{}, diff --git a/pkg/sampler/cache.go b/pkg/sampler/cache.go index a3f0596..fa40388 100644 --- a/pkg/sampler/cache.go +++ b/pkg/sampler/cache.go @@ -63,7 +63,7 @@ func (s *sampleCache) Start(ctx context.Context) error { select { case <-ctx.Done(): // stop scraping - log.Info("stopping cgroup sampler") + log.Info("stopping cgroup sampler", "err", ctx.Err()) return nil case <-ticker.C: // retry fetching the sample in case of a failure diff --git a/pkg/sampler/ctrstats_linux_amd64.go b/pkg/sampler/ctrstats_linux_amd64.go index b8352b0..c494e7a 100644 --- a/pkg/sampler/ctrstats_linux_amd64.go +++ b/pkg/sampler/ctrstats_linux_amd64.go @@ -68,16 +68,27 @@ func (s *sampleCache) getContainerCPUAndMemoryCM() (cpuMetrics, memoryMetrics, e } else if stats != nil { cpu, err := cmStatsToCPUResult(stats, readTime) if err != nil { - return nil, nil, err + log.Error(err, "no cpu stats available for container", + "namespace", c.NamespaceName, + "pod", c.PodName, + "container", c.ContainerName, + ) } mem, err := cmStatsToMemoryResult(stats, readTime) if err != nil { - return nil, nil, err + log.Error(err, "no memory stats available for container", + "namespace", c.NamespaceName, + "pod", c.PodName, + "container", c.ContainerName, + ) } var container ContainerKey container.ContainerID = c.ContainerID container.PodUID = c.PodID + container.NamespaceName = c.NamespaceName + container.PodName = c.PodName + container.ContainerName = c.ContainerName cpuResult[container] = cpu memResult[container] = mem diff --git a/pkg/sampler/server.go b/pkg/sampler/server.go index e28aaf4..5914e10 100644 --- a/pkg/sampler/server.go +++ b/pkg/sampler/server.go @@ -23,16 +23,15 @@ import ( "net/http" "os" "sort" - "sync/atomic" + "strings" + "sync" "time" "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "github.com/pkg/errors" "google.golang.org/grpc" - "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/health/grpc_health_v1" - "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/timestamppb" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" @@ -68,11 +67,12 @@ type Server struct { pushFrequency time.Duration checkCreatedPodFrequency time.Duration - PushHealthy atomic.Bool - PushErrorMsg atomic.Value + pushers map[string]*pusher + pushersLock sync.Mutex + + CTX context.Context } -// Stop stops the server from sampling metrics func (s *Server) Stop() { if s.stop != nil { s.stop() @@ -81,7 +81,24 @@ func (s *Server) Stop() { // Start starts the server sampling metrics and serving them func (s *Server) Start(ctx context.Context, stop context.CancelFunc) error { - defer stop() // stop the context when we are done + s.CTX = ctx + s.stop = stop + + s.pushers = make(map[string]*pusher) + if s.DNSSpec.FailuresBeforeExit == 0 { + s.DNSSpec.FailuresBeforeExit = 30 + } + if s.DNSSpec.BackoffSeconds == 0 { + s.DNSSpec.BackoffSeconds = 60 + } + if s.DNSSpec.PollSeconds == 0 { + s.DNSSpec.PollSeconds = 60 + } + if s.DNSSpec.CollectorServerExpiration == 0 { + s.DNSSpec.CollectorServerExpiration = time.Minute * 20 + } + + defer stop() // stop the context if we encounter an error s.Default() if log.Enabled() { @@ -89,7 +106,8 @@ func (s *Server) Start(ctx context.Context, stop context.CancelFunc) error { if err != nil { return err } - log.Info("starting metrics-node-sampler", "MetricsNodeSampler", val) + // use println so it renders on multiple lines instead of 1 + fmt.Println(string(val)) } var err error s.pushFrequency, err = time.ParseDuration(s.PushFrequency) @@ -178,50 +196,78 @@ func (s *Server) Start(ctx context.Context, stop context.CancelFunc) error { // run the REST service go func() { log.V(1).Info("serving json", "address", addr) - defer rpcServer.GracefulStop() // stop the server when we are done - go func() { errs <- errors.WithStack(gwServer.ListenAndServe()) }() // run the server - <-ctx.Done() // wait until the context is cancelled + defer rpcServer.GracefulStop() // stop the server when we are done + go func() { + err := errors.WithStack(gwServer.ListenAndServe()) + if err != nil { + log.Error(err, "error starting server") + errs <- err + } + }() // run the server + <-ctx.Done() // wait until the context is cancelled }() - - if s.PushAddress != "" { - s.PushErrorMsg.Store("starting push metrics") - s.PushHealthy.Store(false) - go func() { errs <- s.PushMetrics(ctx) }() - } else { - s.PushHealthy.Store(true) - } + log.V(1).Info("start registering collectors from DNS", "ctx-err", ctx.Err(), + "ctx", ctx, "ctx-type", fmt.Sprintf("%T", ctx), + "ctx-address", fmt.Sprintf("%p", ctx)) + go s.RegisterCollectorsFromDNS(ctx) // block until we are shutdown or there is an error in a service select { case err := <-errs: + log.Error(err, "stopping node-sampler due to error") return err case <-ctx.Done(): + log.Info("stopping node-sampler due to context done", "err", ctx.Err()) return nil } } -// PushMetrics pushes utilization metrics to a server -func (s *Server) PushMetrics(ctx context.Context) error { - var connectErrorMessage string - log := log.WithValues("address", s.PushAddress) - return wait.PollImmediateInfiniteWithContext(ctx, time.Second*30, func(ctx context.Context) (done bool, err error) { - // continously try to connect and push metrics - retry, err := s.connectAndPushMetrics(ctx) +// RegisterCollectorsFromDNS pushes utilization metrics to collector servers. +// The list of servers is configured through reading the DNS 'A' records. +// This is usually accomplished through a Kubernetes headless service. +func (s *Server) RegisterCollectorsFromDNS(ctx context.Context) { + // periodically update the list of servers and start pushing metrics + t := time.NewTicker(time.Duration(s.DNSSpec.PollSeconds) * time.Second) + log.Info("starting register collectors from DNS") + for { + // find the list of servers to push metrics to + ips, err := net.LookupIP(s.PushHeadlessService) if err != nil { - s.PushErrorMsg.Store(err.Error()) - s.PushHealthy.Store(false) - if err.Error() != connectErrorMessage { // don't spam the log file since we retry - log.Error(err, - "unable to establish grpc connection to prometheus-collector instance") + if strings.Contains(err.Error(), "no such host") { + log.Info("unable to lookup collector servers from dns records", + "service", s.PushHeadlessService, "msg", err.Error()) + } else { + log.Error(err, "unable to lookup collector servers from dns records", "service", s.PushHeadlessService) } } - return retry, nil // never return an error, it stops polling if we do - }) + + // register the collectors + if len(ips) > 0 { + req := &api.RegisterCollectorsRequest{ + Source: "DNS", + Collectors: make([]*api.Collector, 0, len(ips)), + } + for _, i := range ips { + req.Collectors = append(req.Collectors, &api.Collector{IpAddress: i.String()}) + } + log.V(1).Info("registering collectors from DNS", "ips", req) + _, _ = s.RegisterCollectors(ctx, req) + } + + select { + case <-t.C: + // keep going + case <-ctx.Done(): + // we are done + t.Stop() + return + } + } } // foundUnsentPods returns true if we have discovered containers that we haven't // sent metrics for their pods. -func (s *Server) foundUnsentPods(ctx context.Context, lastResp *api.ListMetricsResponse) bool { +func (s *Server) foundUnsentPods(lastResp *api.ListMetricsResponse) bool { if lastResp == nil { // we've never sent pods return true @@ -234,17 +280,50 @@ func (s *Server) foundUnsentPods(ctx context.Context, lastResp *api.ListMetricsR } // get the pods we have metrics for - known := s.cache.metricsReader.knownContainersSet.Load().(sets.String) + if k := s.cache.metricsReader.knownContainersSet.Load(); k != nil { + return k.(sets.String).Difference(last).Len() > 0 // we know about pods we haven't sent + } else { + // we don't have metrics for any pods + // this should never happen since we shouldn't have sent metrics if we don't have any + return false + } +} + +type pusher struct { + IP string + Context context.Context + Cancel context.CancelFunc + SawCollector time.Time +} - return known.Difference(last).Len() > 0 // we know about pods we haven't sent +func (p *pusher) connect(s *Server) error { + defer s.UnRegisterCollectors(p.Context, p.IP) + return wait.PollImmediateInfiniteWithContext(p.Context, time.Second*30, func(ctx context.Context) (done bool, err error) { + log.Info("starting metrics pushing", "ip", p.IP) + // continously try to connect and push metrics + done, err = p.connectAndPushMetrics(s) + if done { + return done, err + } + done = time.Since(p.SawCollector) > s.DNSSpec.CollectorServerExpiration + return done, err + }) } -func (s *Server) connectAndPushMetrics(ctx context.Context) (bool, error) { - log := log.WithValues("address", s.PushAddress) +func (p *pusher) connectAndPushMetrics(s *Server) (bool, error) { + a := p.IP + if strings.Contains(a, ":") { + a = fmt.Sprintf("[%s]:%v", a, s.PushHeadlessServicePort) + } else { + a = fmt.Sprintf("%s:%v", a, s.PushHeadlessServicePort) + } + log := log.WithValues("address", a, "node", nodeName) // setup a connection to the server to push metrics - conn, err := grpc.Dial(s.PushAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) + log.Info("connecting to collector") + conn, err := grpc.Dial(a, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { + log.Error(err, "failed collector connection") return false, err // re-establish the connection } @@ -257,6 +336,7 @@ func (s *Server) connectAndPushMetrics(ctx context.Context) (bool, error) { defer stop() // cancel the send context stream, err := client.PushMetrics(sendCtx) if err != nil { + log.Error(err, "failed metrics push") return false, err // re-establish the connection } defer func() { @@ -274,53 +354,60 @@ func (s *Server) connectAndPushMetrics(ctx context.Context) (bool, error) { sendTryCount := s.SendPushMetricsRetryCount + 1 // try to send at least once reason := "unknown" for { - select { - case <-ctx.Done(): - log.Info("sending final metrics to collector before shutdown", "node", nodeName) - resp, err := s.ListMetrics(ctx, &api.ListMetricsRequest{}) - resp.Reason = "sampler-shutdown" - if err != nil { - log.Error(errors.WithStack(err), "unable to list metrics") - // don't re-enter loop -- we need to shutdown - } else if err := stream.Send(resp); err != nil { - log.Error(errors.WithStack(err), "unable to send metrics") - // don't re-enter loop -- we need to shutdown - } - log.Info("closing push metrics stream") - return true, nil // we are done - case <-ticker.C: - // block until it is time to send metrics - reason = "periodic-sync" - case <-createdPodTicker.C: // check for new pods that we haven't seen - // check if there are any pods we haven't sent metrics for - if !s.foundUnsentPods(ctx, lastSent) { - // don't send the metrics if we don't have any new pods - continue - } - reason = "found-new-pods" - } if lastSent == nil { // we haven't sent metrics yet reason = "initial-sync" + } else { + select { + case <-p.Context.Done(): + log.Info("sending final metrics to collector before shutdown", "err", p.Context.Err()) + resp, err := s.ListMetrics(p.Context, &api.ListMetricsRequest{}) + resp.Reason = "sampler-shutdown" + if err != nil { + log.Error(errors.WithStack(err), "unable to list metrics") + // don't re-enter loop -- we need to shutdown + } else if err := stream.Send(resp); err != nil { + log.Error(errors.WithStack(err), "unable to send metrics") + // don't re-enter loop -- we need to shutdown + } + log.Info("closing push metrics stream") + return true, nil // we are done + case <-ticker.C: + // block until it is time to send metrics + reason = "periodic-sync" + case <-createdPodTicker.C: // check for new pods that we haven't seen + // check if there are any pods we haven't sent metrics for + if !s.foundUnsentPods(lastSent) { + // don't send the metrics if we don't have any new pods + continue + } + reason = "found-new-pods" + } } + var resp *api.ListMetricsResponse for i := 0; i < sendTryCount; i++ { - resp, err = s.ListMetrics(ctx, &api.ListMetricsRequest{}) + resp, err = s.ListMetrics(p.Context, &api.ListMetricsRequest{}) if err == nil { break } - log.Error(errors.WithStack(err), "unable to list metrics locally") - s.PushErrorMsg.Store(err.Error()) - s.PushHealthy.Store(false) // we were not able to push metrics + log.Error(errors.WithStack(err), "unable to list metrics") } + if len(resp.Containers) == 0 { + // we haven't seen any containers, don't send metrics until we do + continue + } + resp.Timestamp = timestamppb.Now() resp.Reason = reason if err := stream.Send(resp); err != nil { log.Error(errors.WithStack(err), "unable to send metrics to collector server") - return false, err // may be an error with the connection -- re-establish the connection + return false, err // may be an error with the connection -- re-establish the connection if possible } - s.PushHealthy.Store(true) // we are able to push metrics - log.V(1).Info("sent metrics to collector", "node", nodeName) + log.V(1).Info("sent metrics to collector", + "containers-len", len(resp.Containers), + "reason", reason, + "push-frequency-seconds", s.pushFrequency.Seconds()) lastSent = resp } } @@ -332,6 +419,47 @@ var ( podName = os.Getenv("POD_NAME") ) +func (s *Server) UnRegisterCollectors(ctx context.Context, ip string) { + s.pushersLock.Lock() + defer s.pushersLock.Unlock() + delete(s.pushers, ip) +} + +// RegisterCollector provides an endpoint for collectors to manually register with the +// samplers. This is useful in cases where DNS isn't adequate. +// - Collector needs to get utilization data before marking itself as ready and serving metrics +// - DNS is slow to propagate +func (s *Server) RegisterCollectors(ctx context.Context, req *api.RegisterCollectorsRequest) (*api.RegisterCollectorsResponse, error) { + if req.Source != "DNS" { + log.Info("registration request from collector", "req", req) + } + resp := &api.RegisterCollectorsResponse{} + s.pushersLock.Lock() + defer s.pushersLock.Unlock() + for _, c := range req.Collectors { + if len(c.IpAddress) == 0 { + log.Info("got empty IP for registration") + continue + } + + if p, ok := s.pushers[c.IpAddress]; ok { + p.SawCollector = time.Now() + // already running, do nothing + continue + } + // IMPORTANT: use the server context not the request context or the + // pusher will shutdown when the request ends + p := &pusher{IP: c.IpAddress, Context: s.CTX, SawCollector: time.Now()} + log.Info("starting metrics pusher for new server", "req", req) + + s.pushers[c.IpAddress] = p + go p.connect(s) // run the metrics pusher + resp.IpAddresses = append(resp.IpAddresses, c.IpAddress) + } + + return resp, nil +} + // ListMetrics lists the aggregated metrics for all containers and nodes func (s *Server) ListMetrics(context.Context, *api.ListMetricsRequest) (*api.ListMetricsResponse, error) { var result api.ListMetricsResponse @@ -340,8 +468,11 @@ func (s *Server) ListMetrics(context.Context, *api.ListMetricsRequest) (*api.Lis for k, v := range samples.containers { c := &api.ContainerMetrics{ - ContainerID: k.ContainerID, - PodUID: k.PodUID, + ContainerID: k.ContainerID, + PodUID: k.PodUID, + ContainerName: k.ContainerName, + PodName: k.PodName, + NamespaceName: k.NamespaceName, } for _, s := range v { c.CpuCoresNanoSec = append(c.CpuCoresNanoSec, int64(s.CPUCoresNanoSec)) @@ -424,12 +555,5 @@ func (x NodeAggregatedMetricsSlice) Less(i, j int) bool { func (x NodeAggregatedMetricsSlice) Swap(i, j int) { x[i], x[j] = x[j], x[i] } func (s *Server) Check(ctx context.Context, _ *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) { - if !s.PushHealthy.Load() { - var msg string - if v := s.PushErrorMsg.Load(); v != nil { - msg = v.(string) - } - return nil, status.Error(codes.Internal, msg) - } return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil } diff --git a/pkg/sampler/types.go b/pkg/sampler/types.go index 08709a6..51960c4 100644 --- a/pkg/sampler/types.go +++ b/pkg/sampler/types.go @@ -27,6 +27,15 @@ type ContainerKey struct { // PodUID is the uid of the pod the container is running in, and corresponds to the pod.metadata.uid, or for // mirror pods the config.mirror annotation. PodUID string + + // NamespaceName is the namespace of the pod + NamespaceName string + + // ContainerName is the name of the container + ContainerName string + + // PodName is the name of the pod + PodName string } type sampleInstant struct { diff --git a/pkg/testutil/integrationutil.go b/pkg/testutil/integrationutil.go index b630aaa..15cc447 100644 --- a/pkg/testutil/integrationutil.go +++ b/pkg/testutil/integrationutil.go @@ -89,7 +89,7 @@ var PodMetricsCRD = apiextensionsv1.CustomResourceDefinition{ Schema: &apiextensionsv1.CustomResourceValidation{ // Keep all the fields so we don't have to put the full schema here OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{ - XPreserveUnknownFields: pointer.BoolPtr(true), + XPreserveUnknownFields: pointer.Bool(true), Type: "object", }, }, @@ -122,7 +122,7 @@ var ResourceQuotaDescriptorCRD = apiextensionsv1.CustomResourceDefinition{ Schema: &apiextensionsv1.CustomResourceValidation{ // Keep all the fields so we don't have to put the full schema here OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{ - XPreserveUnknownFields: pointer.BoolPtr(true), + XPreserveUnknownFields: pointer.Bool(true), Type: "object", Properties: map[string]apiextensionsv1.JSONSchemaProps{ "metadata": { @@ -310,7 +310,7 @@ func (suite *IntegrationTestSuite) RunCommand(t *testing.T, cmd string, args ... var buff bytes.Buffer command.Stdout = &buff - command.Stdout = &buff + command.Stderr = os.Stderr go func() { err := command.Run() err = errors.Wrap(err, buff.String()) diff --git a/proto/pkg/sampler/api/api.proto b/proto/pkg/sampler/api/api.proto index a1deb9e..287139f 100644 --- a/proto/pkg/sampler/api/api.proto +++ b/proto/pkg/sampler/api/api.proto @@ -10,6 +10,28 @@ service Metrics { rpc ListMetrics (ListMetricsRequest) returns (ListMetricsResponse) { option (google.api.http) = { get: "/v1/metrics" }; } + +rpc RegisterCollectors (RegisterCollectorsRequest) returns (RegisterCollectorsResponse) { + option (google.api.http) = { + post: "/v1/register" + body: "*" + }; + } +} + +message RegisterCollectorsRequest { + repeated Collector collectors=1; + string source=2; + string fromPod=3; +} + +message Collector { + string ipAddress=1; + string podName=2; +} + +message RegisterCollectorsResponse { + repeated string ipAddresses=1; } service Health { @@ -17,8 +39,8 @@ service Health { option (google.api.http) = { get: "/v1/healthz" }; } - rpc IsLeader(grpc.health.v1.HealthCheckRequest) returns (grpc.health.v1.HealthCheckResponse) { - option (google.api.http) = { get: "/v1/is-leader" }; + rpc IsReady(grpc.health.v1.HealthCheckRequest) returns (grpc.health.v1.HealthCheckResponse) { + option (google.api.http) = { get: "/v1/is-ready" }; } } @@ -34,8 +56,13 @@ message ListMetricsResponse { } message ContainerMetrics { + // nextID: 15 + string containerID =1; string podUID=2; + string containerName=12; + string podName=13; + string namespaceName=14; repeated int64 cpuCoresNanoSec=3; repeated int64 memoryBytes=4;