Skip to content

Commit

Permalink
Add ability to exclude nodes by labels
Browse files Browse the repository at this point in the history
  • Loading branch information
yangjunmyfm192085 committed Feb 2, 2023
1 parent 5c45bb1 commit 8ac883d
Show file tree
Hide file tree
Showing 14 changed files with 98 additions and 24 deletions.
1 change: 1 addition & 0 deletions README.md
Expand Up @@ -136,6 +136,7 @@ Most useful flags:
- `--kubelet-preferred-address-types` - The priority of node address types used when determining an address for connecting to a particular node (default [Hostname,InternalDNS,InternalIP,ExternalDNS,ExternalIP])
- `--kubelet-insecure-tls` - Do not verify the CA of serving certificates presented by Kubelets. For testing purposes only.
- `--requestheader-client-ca-file` - Specify a root certificate bundle for verifying client certificates on incoming requests.
- `--node-selector` -Can complete to scrape the metrics from the Specified nodes based on labels

You can get a full list of Metrics Server configuration flags by running:

Expand Down
2 changes: 2 additions & 0 deletions cmd/metrics-server/app/options/kubelet_client.go
Expand Up @@ -36,6 +36,7 @@ type KubeletClientOptions struct {
KubeletClientCertFile string
DeprecatedCompletelyInsecureKubelet bool
KubeletRequestTimeout time.Duration
NodeSelector string
}

func (o *KubeletClientOptions) Validate() []error {
Expand Down Expand Up @@ -77,6 +78,7 @@ func (o *KubeletClientOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.KubeletClientKeyFile, "kubelet-client-key", "", "Path to a client key file for TLS.")
fs.StringVar(&o.KubeletClientCertFile, "kubelet-client-certificate", "", "Path to a client cert file for TLS.")
fs.DurationVar(&o.KubeletRequestTimeout, "kubelet-request-timeout", o.KubeletRequestTimeout, "The length of time to wait before giving up on a single request to Kubelet. Non-zero values should contain a corresponding time unit (e.g. 1s, 2m, 3h).")
fs.StringVarP(&o.NodeSelector, "node-selector", "l", o.NodeSelector, "Selector (label query) to filter on, not including uninitialized ones, supports '=', '==', and '!='.(e.g. -l key1=value1,key2=value2).")
// MarkDeprecated hides the flag from the help. We don't want that.
fs.BoolVar(&o.DeprecatedCompletelyInsecureKubelet, "deprecated-kubelet-completely-insecure", o.DeprecatedCompletelyInsecureKubelet, "DEPRECATED: Do not use any encryption, authorization, or authentication when communicating with the Kubelet. This is rarely the right option, since it leaves kubelet communication completely insecure. If you encounter auth errors, make sure you've enabled token webhook auth on the Kubelet, and if you're in a test cluster with self-signed Kubelet certificates, consider using kubelet-insecure-tls instead.")
}
Expand Down
1 change: 1 addition & 0 deletions cmd/metrics-server/app/options/options.go
Expand Up @@ -121,6 +121,7 @@ func (o Options) ServerConfig() (*server.Config, error) {
Kubelet: o.KubeletClient.Config(restConfig),
MetricResolution: o.MetricResolution,
ScrapeTimeout: o.KubeletClient.KubeletRequestTimeout,
NodeSelector: o.KubeletClient.NodeSelector,
}, nil
}

Expand Down
1 change: 1 addition & 0 deletions docs/command-line-flags.txt
Expand Up @@ -20,6 +20,7 @@ Kubelet client flags:
--kubelet-preferred-address-types strings The priority of node address types to use when determining which address to use to connect to a particular node (default [Hostname,InternalDNS,InternalIP,ExternalDNS,ExternalIP])
--kubelet-request-timeout duration The length of time to wait before giving up on a single request to Kubelet. Non-zero values should contain a corresponding time unit (e.g. 1s, 2m, 3h). (default 10s)
--kubelet-use-node-status-port Use the port in the node status. Takes precedence over --kubelet-port flag.
-l, --node-selector string Selector (label query) to filter on, not including uninitialized ones, supports '=', '==', and '!='.(e.g. -l key1=value1,key2=value2).

Apiserver secure serving flags:

Expand Down
3 changes: 3 additions & 0 deletions manifests/components/test/kustomization.yaml
Expand Up @@ -20,3 +20,6 @@ patchesJson6902:
- op: add
path: /spec/template/spec/containers/0/imagePullPolicy
value: Never
- op: add
path: /spec/template/spec/containers/0/args/-
value: --node-selector=metrics-server-skip!=true
5 changes: 3 additions & 2 deletions pkg/api/install.go
Expand Up @@ -16,6 +16,7 @@ package api

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
Expand Down Expand Up @@ -53,8 +54,8 @@ func Build(pod, node rest.Storage) genericapiserver.APIGroupInfo {
}

// Install builds the metrics for the metrics.k8s.io API, and then installs it into the given API metrics-server.
func Install(m MetricsGetter, podMetadataLister cache.GenericLister, nodeLister corev1.NodeLister, server *genericapiserver.GenericAPIServer) error {
node := newNodeMetrics(metrics.Resource("nodemetrics"), m, nodeLister)
func Install(m MetricsGetter, podMetadataLister cache.GenericLister, nodeLister corev1.NodeLister, server *genericapiserver.GenericAPIServer, nodeSelector []labels.Requirement) error {
node := newNodeMetrics(metrics.Resource("nodemetrics"), m, nodeLister, nodeSelector)
pod := newPodMetrics(metrics.Resource("podmetrics"), m, podMetadataLister)
info := Build(pod, node)
return server.InstallAPIGroup(&info)
Expand Down
7 changes: 6 additions & 1 deletion pkg/api/node.go
Expand Up @@ -38,6 +38,7 @@ type nodeMetrics struct {
groupResource schema.GroupResource
metrics NodeMetricsGetter
nodeLister v1listers.NodeLister
nodeSelector []labels.Requirement
}

var _ rest.KindProvider = &nodeMetrics{}
Expand All @@ -47,11 +48,12 @@ var _ rest.Lister = &nodeMetrics{}
var _ rest.Scoper = &nodeMetrics{}
var _ rest.TableConvertor = &nodeMetrics{}

func newNodeMetrics(groupResource schema.GroupResource, metrics NodeMetricsGetter, nodeLister v1listers.NodeLister) *nodeMetrics {
func newNodeMetrics(groupResource schema.GroupResource, metrics NodeMetricsGetter, nodeLister v1listers.NodeLister, nodeSelector []labels.Requirement) *nodeMetrics {
return &nodeMetrics{
groupResource: groupResource,
metrics: metrics,
nodeLister: nodeLister,
nodeSelector: nodeSelector,
}
}

Expand Down Expand Up @@ -94,6 +96,9 @@ func (m *nodeMetrics) nodes(ctx context.Context, options *metainternalversion.Li
if options != nil && options.LabelSelector != nil {
labelSelector = options.LabelSelector
}
if m.nodeSelector != nil {
labelSelector = labelSelector.Add(m.nodeSelector...)
}
nodes, err := m.nodeLister.List(labelSelector)
if err != nil {
klog.ErrorS(err, "Failed listing nodes", "labelSelector", labelSelector)
Expand Down
15 changes: 13 additions & 2 deletions pkg/api/node_test.go
Expand Up @@ -284,12 +284,18 @@ func (mp fakeNodeMetricsGetter) GetNodeMetrics(nodes ...*corev1.Node) ([]metrics
}

func NewTestNodeStorage(listerError error) *nodeMetrics {
var labelSelector []labels.Requirement
if ns, err := labels.ParseToRequirements("skipKey!=skipValue"); err == nil {
labelSelector = ns
}

return &nodeMetrics{
nodeLister: fakeNodeLister{
data: createTestNodes(),
err: listerError,
},
metrics: fakeNodeMetricsGetter{now: myClock.Now()},
metrics: fakeNodeMetricsGetter{now: myClock.Now()},
nodeSelector: labelSelector,
}
}

Expand Down Expand Up @@ -317,7 +323,10 @@ func createTestNodes() []*corev1.Node {
node4 := &corev1.Node{}
node4.Name = "node4"
node4.Labels = nodeLabels(node4.Name)
return []*corev1.Node{node1, node2, node3, node4}
node5 := &corev1.Node{}
node5.Name = "node5"
node4.Labels = nodeLabels(node5.Name)
return []*corev1.Node{node1, node2, node3, node4, node5}
}

func nodeLabels(name string) map[string]string {
Expand All @@ -331,6 +340,8 @@ func nodeLabels(name string) map[string]string {
labels["labelKey"] = "otherValue"
case "node4":
labels["otherKey"] = "otherValue"
case "node5":
labels["skipKey"] = "skipValue"
}
return labels
}
12 changes: 9 additions & 3 deletions pkg/scraper/scraper.go
Expand Up @@ -83,18 +83,24 @@ func RegisterScraperMetrics(registrationFunc func(metrics.Registerable) error) e
return nil
}

func NewScraper(nodeLister v1listers.NodeLister, client client.KubeletMetricsGetter, scrapeTimeout time.Duration) *scraper {
func NewScraper(nodeLister v1listers.NodeLister, client client.KubeletMetricsGetter, scrapeTimeout time.Duration, labelRequirement []labels.Requirement) *scraper {
labelSelector := labels.Everything()
if labelRequirement != nil {
labelSelector = labelSelector.Add(labelRequirement...)
}
return &scraper{
nodeLister: nodeLister,
kubeletClient: client,
scrapeTimeout: scrapeTimeout,
labelSelector: labelSelector,
}
}

type scraper struct {
nodeLister v1listers.NodeLister
kubeletClient client.KubeletMetricsGetter
scrapeTimeout time.Duration
labelSelector labels.Selector
}

var _ Scraper = (*scraper)(nil)
Expand All @@ -107,12 +113,12 @@ type NodeInfo struct {
}

func (c *scraper) Scrape(baseCtx context.Context) *storage.MetricsBatch {
nodes, err := c.nodeLister.List(labels.Everything())
nodes, err := c.nodeLister.List(c.labelSelector)
if err != nil {
// report the error and continue on in case of partial results
klog.ErrorS(err, "Failed to list nodes")
}
klog.V(1).InfoS("Scraping metrics from nodes", "nodeCount", len(nodes))
klog.V(1).InfoS("Scraping metrics from nodes", "nodes", klog.KObjSlice(nodes), "nodeCount", len(nodes), "nodeSelector", c.labelSelector)

responseChannel := make(chan *storage.MetricsBatch, len(nodes))
defer close(responseChannel)
Expand Down
29 changes: 16 additions & 13 deletions pkg/scraper/scraper_test.go
Expand Up @@ -43,13 +43,14 @@ func TestScraper(t *testing.T) {

var _ = Describe("Scraper", func() {
var (
scrapeTime = time.Now()
nodeLister fakeNodeLister
client fakeKubeletClient
node1 = makeNode("node1", "node1.somedomain", "10.0.1.2", true)
node2 = makeNode("node-no-host", "", "10.0.1.3", true)
node3 = makeNode("node3", "node3.somedomain", "10.0.1.4", false)
node4 = makeNode("node4", "node4.somedomain", "10.0.1.5", true)
scrapeTime = time.Now()
nodeLister fakeNodeLister
client fakeKubeletClient
labelRequirement []labels.Requirement
node1 = makeNode("node1", "node1.somedomain", "10.0.1.2", true)
node2 = makeNode("node-no-host", "", "10.0.1.3", true)
node3 = makeNode("node3", "node3.somedomain", "10.0.1.4", false)
node4 = makeNode("node4", "node4.somedomain", "10.0.1.5", true)
)
BeforeEach(func() {
mb := &storage.MetricsBatch{
Expand Down Expand Up @@ -90,6 +91,8 @@ var _ = Describe("Scraper", func() {
node4: {Nodes: map[string]storage.MetricsPoint{node4.Name: metricPoint(100, 200, scrapeTime)}},
},
}

labelRequirement, _ = labels.ParseToRequirements("metrics-server-skip!=true")
})

Context("when all nodes return in time", func() {
Expand All @@ -99,7 +102,7 @@ var _ = Describe("Scraper", func() {

By("running the scraper with a context timeout of 3*seconds")
start := time.Now()
scraper := NewScraper(&nodeLister, &client, 3*time.Second)
scraper := NewScraper(&nodeLister, &client, 3*time.Second, labelRequirement)
timeoutCtx, doneWithWork := context.WithTimeout(context.Background(), 4*time.Second)
dataBatch := scraper.Scrape(timeoutCtx)
doneWithWork()
Expand All @@ -122,7 +125,7 @@ var _ = Describe("Scraper", func() {

By("running the source scraper with a scrape timeout of 3 seconds")
start := time.Now()
scraper := NewScraper(&nodeLister, &client, 3*time.Second)
scraper := NewScraper(&nodeLister, &client, 3*time.Second, labelRequirement)
dataBatch := scraper.Scrape(context.Background())

By("ensuring that scraping took around 3 seconds")
Expand All @@ -139,7 +142,7 @@ var _ = Describe("Scraper", func() {

By("running the source scraper with a scrape timeout of 5 seconds, but a context timeout of 1 second")
start := time.Now()
scraper := NewScraper(&nodeLister, &client, 5*time.Second)
scraper := NewScraper(&nodeLister, &client, 5*time.Second, labelRequirement)
timeoutCtx, doneWithWork := context.WithTimeout(context.Background(), 1*time.Second)
dataBatch := scraper.Scrape(timeoutCtx)
doneWithWork()
Expand All @@ -165,7 +168,7 @@ var _ = Describe("Scraper", func() {
}
nodes := fakeNodeLister{nodes: []*corev1.Node{node1}}

scraper := NewScraper(&nodes, &client, 3*time.Second)
scraper := NewScraper(&nodes, &client, 3*time.Second, labelRequirement)
scraper.Scrape(context.Background())

err := testutil.CollectAndCompare(requestDuration, strings.NewReader(`
Expand Down Expand Up @@ -207,7 +210,7 @@ var _ = Describe("Scraper", func() {
By("deleting node")
nodeLister.nodes[0].Status.Addresses = nil
delete(client.metrics, node1)
scraper := NewScraper(&nodeLister, &client, 5*time.Second)
scraper := NewScraper(&nodeLister, &client, 5*time.Second, labelRequirement)

By("running the scraper")
dataBatch := scraper.Scrape(context.Background())
Expand All @@ -218,7 +221,7 @@ var _ = Describe("Scraper", func() {
It("should gracefully handle list errors", func() {
By("setting a fake error from the lister")
nodeLister.listErr = fmt.Errorf("something went wrong, expectedly")
scraper := NewScraper(&nodeLister, &client, 5*time.Second)
scraper := NewScraper(&nodeLister, &client, 5*time.Second, labelRequirement)

By("running the scraper")
scraper.Scrape(context.Background())
Expand Down
16 changes: 14 additions & 2 deletions pkg/server/config.go
Expand Up @@ -16,9 +16,11 @@ package server
import (
"fmt"
"net/http"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
apimetrics "k8s.io/apiserver/pkg/endpoints/metrics"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/client-go/rest"
Expand All @@ -39,9 +41,12 @@ type Config struct {
Kubelet *client.KubeletClientConfig
MetricResolution time.Duration
ScrapeTimeout time.Duration
NodeSelector string
}

func (c Config) Complete() (*server, error) {
var labelRequirement []labels.Requirement

podInformerFactory, err := runningPodMetadataInformer(c.Rest)
if err != nil {
return nil, err
Expand All @@ -56,7 +61,14 @@ func (c Config) Complete() (*server, error) {
return nil, fmt.Errorf("unable to construct a client to connect to the kubelets: %v", err)
}
nodes := informer.Core().V1().Nodes()
scrape := scraper.NewScraper(nodes.Lister(), kubeletClient, c.ScrapeTimeout)
ns := strings.TrimSpace(c.NodeSelector)
if ns != "" {
labelRequirement, err = labels.ParseToRequirements(ns)
if err != nil {
return nil, err
}
}
scrape := scraper.NewScraper(nodes.Lister(), kubeletClient, c.ScrapeTimeout, labelRequirement)

// Disable default metrics handler and create custom one
c.Apiserver.EnableMetrics = false
Expand All @@ -71,7 +83,7 @@ func (c Config) Complete() (*server, error) {
genericServer.Handler.NonGoRestfulMux.HandleFunc("/metrics", metricsHandler)

store := storage.NewStorage(c.MetricResolution)
if err := api.Install(store, podInformer.Lister(), nodes.Lister(), genericServer); err != nil {
if err := api.Install(store, podInformer.Lister(), nodes.Lister(), genericServer, labelRequirement); err != nil {
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion skaffold.yaml
Expand Up @@ -41,4 +41,4 @@ profiles:
podLabels: { k8s-app: metrics-server }
containerPort: 10250
valuesFiles:
- charts/metrics-server/ci/ci-values.yaml
- test/chart-values.yaml
3 changes: 3 additions & 0 deletions test/chart-values.yaml
@@ -0,0 +1,3 @@
args:
- --kubelet-insecure-tls
- --node-selector=metrics-server-skip!=true
25 changes: 25 additions & 0 deletions test/e2e_test.go
Expand Up @@ -348,6 +348,31 @@ livez check passed
Expect(diff).To(BeEmpty(), "Unexpected metrics")
}
})
It("skip scrape metrics about nodes with label node-selector filtered in cluster", func() {
nodeList, err := client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
panic(err)
}
for _, node := range nodeList.Items {
node.Labels["metrics-server-skip"] = "true"
_, err := client.CoreV1().Nodes().Update(context.TODO(), &node, metav1.UpdateOptions{})
Expect(err).NotTo(HaveOccurred(), "Update labels for node %s failed", node.Name)
}
time.Sleep(30 * time.Second)
nodeList, err = client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
panic(err)
}
for _, node := range nodeList.Items {
delete(node.Labels, "metrics-server-skip")
_, err := mclient.MetricsV1beta1().NodeMetricses().Get(context.TODO(), node.Name, metav1.GetOptions{})
_, nodeErr := client.CoreV1().Nodes().Update(context.TODO(), &node, metav1.UpdateOptions{})
if nodeErr != nil {
panic(nodeErr)
}
Expect(err).To(HaveOccurred(), "Metrics for node %s are available with label node-selector filtered", node.Name)
}
})
})

func getRestConfig() (*rest.Config, error) {
Expand Down

0 comments on commit 8ac883d

Please sign in to comment.