diff --git a/discovery/kubernetes/endpoints.go b/discovery/kubernetes/endpoints.go index 14615a09c74..f1fd82672f5 100644 --- a/discovery/kubernetes/endpoints.go +++ b/discovery/kubernetes/endpoints.go @@ -41,9 +41,11 @@ var ( type Endpoints struct { logger log.Logger - endpointsInf cache.SharedInformer - serviceInf cache.SharedInformer - podInf cache.SharedInformer + endpointsInf cache.SharedIndexInformer + serviceInf cache.SharedInformer + podInf cache.SharedInformer + nodeInf cache.SharedInformer + withNodeMetadata bool podStore cache.Store endpointsStore cache.Store @@ -53,19 +55,21 @@ type Endpoints struct { } // NewEndpoints returns a new endpoints discovery. -func NewEndpoints(l log.Logger, svc, eps, pod cache.SharedInformer) *Endpoints { +func NewEndpoints(l log.Logger, eps cache.SharedIndexInformer, svc, pod, node cache.SharedInformer) *Endpoints { if l == nil { l = log.NewNopLogger() } e := &Endpoints{ - logger: l, - endpointsInf: eps, - endpointsStore: eps.GetStore(), - serviceInf: svc, - serviceStore: svc.GetStore(), - podInf: pod, - podStore: pod.GetStore(), - queue: workqueue.NewNamed("endpoints"), + logger: l, + endpointsInf: eps, + endpointsStore: eps.GetStore(), + serviceInf: svc, + serviceStore: svc.GetStore(), + podInf: pod, + podStore: pod.GetStore(), + nodeInf: node, + withNodeMetadata: node != nil, + queue: workqueue.NewNamed("endpoints"), } e.endpointsInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -118,10 +122,38 @@ func NewEndpoints(l log.Logger, svc, eps, pod cache.SharedInformer) *Endpoints { serviceUpdate(o) }, }) + if e.withNodeMetadata { + e.nodeInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(o interface{}) { + node := o.(*apiv1.Node) + e.enqueueNode(node.Name) + }, + UpdateFunc: func(_, o interface{}) { + node := o.(*apiv1.Node) + e.enqueueNode(node.Name) + }, + DeleteFunc: func(o interface{}) { + node := o.(*apiv1.Node) + e.enqueueNode(node.Name) + }, + }) + } return e } +func (e *Endpoints) enqueueNode(nodeName string) { + endpoints, err := e.endpointsInf.GetIndexer().ByIndex(nodeIndex, nodeName) + if err != nil { + level.Error(e.logger).Log("msg", "Error getting endpoints for node", "node", nodeName, "err", err) + return + } + + for _, endpoint := range endpoints { + e.enqueue(endpoint) + } +} + func (e *Endpoints) enqueue(obj interface{}) { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { @@ -135,7 +167,12 @@ func (e *Endpoints) enqueue(obj interface{}) { func (e *Endpoints) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { defer e.queue.ShutDown() - if !cache.WaitForCacheSync(ctx.Done(), e.endpointsInf.HasSynced, e.serviceInf.HasSynced, e.podInf.HasSynced) { + cacheSyncs := []cache.InformerSynced{e.endpointsInf.HasSynced, e.serviceInf.HasSynced, e.podInf.HasSynced} + if e.withNodeMetadata { + cacheSyncs = append(cacheSyncs, e.nodeInf.HasSynced) + } + + if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) { if !errors.Is(ctx.Err(), context.Canceled) { level.Error(e.logger).Log("msg", "endpoints informer unable to sync cache") } @@ -257,6 +294,10 @@ func (e *Endpoints) buildEndpoints(eps *apiv1.Endpoints) *targetgroup.Group { target[model.LabelName(endpointHostname)] = lv(addr.Hostname) } + if e.withNodeMetadata { + target = addNodeLabels(target, e.nodeInf, e.logger, addr.NodeName) + } + pod := e.resolvePodRef(addr.TargetRef) if pod == nil { // This target is not a Pod, so don't continue with Pod specific logic. @@ -387,3 +428,31 @@ func (e *Endpoints) addServiceLabels(ns, name string, tg *targetgroup.Group) { tg.Labels = tg.Labels.Merge(serviceLabels(svc)) } + +func addNodeLabels(tg model.LabelSet, nodeInf cache.SharedInformer, logger log.Logger, nodeName *string) model.LabelSet { + if nodeName == nil { + return tg + } + + obj, exists, err := nodeInf.GetStore().GetByKey(*nodeName) + if err != nil { + level.Error(logger).Log("msg", "Error getting node", "node", *nodeName, "err", err) + return tg + } + + if !exists { + return tg + } + + node := obj.(*apiv1.Node) + // Allocate one target label for the node name, + // and two target labels for each node label. + nodeLabelset := make(model.LabelSet, 1+2*len(node.GetLabels())) + nodeLabelset[nodeNameLabel] = lv(*nodeName) + for k, v := range node.GetLabels() { + ln := strutil.SanitizeLabelName(k) + nodeLabelset[model.LabelName(nodeLabelPrefix+ln)] = lv(v) + nodeLabelset[model.LabelName(nodeLabelPresentPrefix+ln)] = presentValue + } + return tg.Merge(nodeLabelset) +} diff --git a/discovery/kubernetes/endpoints_test.go b/discovery/kubernetes/endpoints_test.go index 4d72272564c..5fd9460ae25 100644 --- a/discovery/kubernetes/endpoints_test.go +++ b/discovery/kubernetes/endpoints_test.go @@ -478,6 +478,126 @@ func TestEndpointsDiscoveryWithServiceUpdate(t *testing.T) { }.Run(t) } +func TestEndpointsDiscoveryWithNodeMetadata(t *testing.T) { + metadataConfig := AttachMetadataConfig{Node: true} + nodeLabels := map[string]string{"az": "us-east1"} + node := makeNode("foobar", "", "", nodeLabels, nil) + svc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testendpoints", + Namespace: "default", + Labels: map[string]string{ + "app/name": "test", + }, + }, + } + n, _ := makeDiscoveryWithMetadata(RoleEndpoint, NamespaceDiscovery{}, metadataConfig, makeEndpoints(), svc, node) + + k8sDiscoveryTest{ + discovery: n, + expectedMaxItems: 1, + expectedRes: map[string]*targetgroup.Group{ + "endpoints/default/testendpoints": { + Targets: []model.LabelSet{ + { + "__address__": "1.2.3.4:9000", + "__meta_kubernetes_endpoint_hostname": "testendpoint1", + "__meta_kubernetes_endpoint_node_name": "foobar", + "__meta_kubernetes_endpoint_port_name": "testport", + "__meta_kubernetes_endpoint_port_protocol": "TCP", + "__meta_kubernetes_endpoint_ready": "true", + "__meta_kubernetes_node_label_az": "us-east1", + "__meta_kubernetes_node_labelpresent_az": "true", + "__meta_kubernetes_node_name": "foobar", + }, + { + "__address__": "2.3.4.5:9001", + "__meta_kubernetes_endpoint_port_name": "testport", + "__meta_kubernetes_endpoint_port_protocol": "TCP", + "__meta_kubernetes_endpoint_ready": "true", + }, + { + "__address__": "2.3.4.5:9001", + "__meta_kubernetes_endpoint_port_name": "testport", + "__meta_kubernetes_endpoint_port_protocol": "TCP", + "__meta_kubernetes_endpoint_ready": "false", + }, + }, + Labels: model.LabelSet{ + "__meta_kubernetes_namespace": "default", + "__meta_kubernetes_endpoints_name": "testendpoints", + "__meta_kubernetes_service_label_app_name": "test", + "__meta_kubernetes_service_labelpresent_app_name": "true", + "__meta_kubernetes_service_name": "testendpoints", + }, + Source: "endpoints/default/testendpoints", + }, + }, + }.Run(t) +} + +func TestEndpointsDiscoveryWithUpdatedNodeMetadata(t *testing.T) { + nodeLabels := map[string]string{"az": "us-east1"} + nodes := makeNode("foobar", "", "", nodeLabels, nil) + metadataConfig := AttachMetadataConfig{Node: true} + svc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testendpoints", + Namespace: "default", + Labels: map[string]string{ + "app/name": "test", + }, + }, + } + n, c := makeDiscoveryWithMetadata(RoleEndpoint, NamespaceDiscovery{}, metadataConfig, makeEndpoints(), nodes, svc) + + k8sDiscoveryTest{ + discovery: n, + afterStart: func() { + nodes.Labels["az"] = "eu-central1" + c.CoreV1().Nodes().Update(context.Background(), nodes, metav1.UpdateOptions{}) + }, + expectedMaxItems: 2, + expectedRes: map[string]*targetgroup.Group{ + "endpoints/default/testendpoints": { + Targets: []model.LabelSet{ + { + "__address__": "1.2.3.4:9000", + "__meta_kubernetes_endpoint_hostname": "testendpoint1", + "__meta_kubernetes_endpoint_node_name": "foobar", + "__meta_kubernetes_endpoint_port_name": "testport", + "__meta_kubernetes_endpoint_port_protocol": "TCP", + "__meta_kubernetes_endpoint_ready": "true", + "__meta_kubernetes_node_label_az": "eu-central1", + "__meta_kubernetes_node_labelpresent_az": "true", + "__meta_kubernetes_node_name": "foobar", + }, + { + "__address__": "2.3.4.5:9001", + "__meta_kubernetes_endpoint_port_name": "testport", + "__meta_kubernetes_endpoint_port_protocol": "TCP", + "__meta_kubernetes_endpoint_ready": "true", + }, + { + "__address__": "2.3.4.5:9001", + "__meta_kubernetes_endpoint_port_name": "testport", + "__meta_kubernetes_endpoint_port_protocol": "TCP", + "__meta_kubernetes_endpoint_ready": "false", + }, + }, + Labels: model.LabelSet{ + "__meta_kubernetes_namespace": "default", + "__meta_kubernetes_endpoints_name": "testendpoints", + "__meta_kubernetes_service_label_app_name": "test", + "__meta_kubernetes_service_labelpresent_app_name": "true", + "__meta_kubernetes_service_name": "testendpoints", + }, + Source: "endpoints/default/testendpoints", + }, + }, + }.Run(t) +} + func TestEndpointsDiscoveryNamespaces(t *testing.T) { epOne := makeEndpoints() epOne.Namespace = "ns1" diff --git a/discovery/kubernetes/endpointslice.go b/discovery/kubernetes/endpointslice.go index 31bc14dd77a..6d4c2c8f257 100644 --- a/discovery/kubernetes/endpointslice.go +++ b/discovery/kubernetes/endpointslice.go @@ -42,9 +42,11 @@ var ( type EndpointSlice struct { logger log.Logger - endpointSliceInf cache.SharedInformer + endpointSliceInf cache.SharedIndexInformer serviceInf cache.SharedInformer podInf cache.SharedInformer + nodeInf cache.SharedInformer + withNodeMetadata bool podStore cache.Store endpointSliceStore cache.Store @@ -54,7 +56,7 @@ type EndpointSlice struct { } // NewEndpointSlice returns a new endpointslice discovery. -func NewEndpointSlice(l log.Logger, svc, eps, pod cache.SharedInformer) *EndpointSlice { +func NewEndpointSlice(l log.Logger, eps cache.SharedIndexInformer, svc, pod, node cache.SharedInformer) *EndpointSlice { if l == nil { l = log.NewNopLogger() } @@ -66,6 +68,8 @@ func NewEndpointSlice(l log.Logger, svc, eps, pod cache.SharedInformer) *Endpoin serviceStore: svc.GetStore(), podInf: pod, podStore: pod.GetStore(), + nodeInf: node, + withNodeMetadata: node != nil, queue: workqueue.NewNamed("endpointSlice"), } @@ -120,9 +124,38 @@ func NewEndpointSlice(l log.Logger, svc, eps, pod cache.SharedInformer) *Endpoin }, }) + if e.withNodeMetadata { + e.nodeInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(o interface{}) { + node := o.(*apiv1.Node) + e.enqueueNode(node.Name) + }, + UpdateFunc: func(_, o interface{}) { + node := o.(*apiv1.Node) + e.enqueueNode(node.Name) + }, + DeleteFunc: func(o interface{}) { + node := o.(*apiv1.Node) + e.enqueueNode(node.Name) + }, + }) + } + return e } +func (e *EndpointSlice) enqueueNode(nodeName string) { + endpoints, err := e.endpointSliceInf.GetIndexer().ByIndex(nodeIndex, nodeName) + if err != nil { + level.Error(e.logger).Log("msg", "Error getting endpoints for node", "node", nodeName, "err", err) + return + } + + for _, endpoint := range endpoints { + e.enqueue(endpoint) + } +} + func (e *EndpointSlice) enqueue(obj interface{}) { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { @@ -136,7 +169,11 @@ func (e *EndpointSlice) enqueue(obj interface{}) { func (e *EndpointSlice) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { defer e.queue.ShutDown() - if !cache.WaitForCacheSync(ctx.Done(), e.endpointSliceInf.HasSynced, e.serviceInf.HasSynced, e.podInf.HasSynced) { + cacheSyncs := []cache.InformerSynced{e.endpointSliceInf.HasSynced, e.serviceInf.HasSynced, e.podInf.HasSynced} + if e.withNodeMetadata { + cacheSyncs = append(cacheSyncs, e.nodeInf.HasSynced) + } + if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) { if ctx.Err() != context.Canceled { level.Error(e.logger).Log("msg", "endpointslice informer unable to sync cache") } @@ -282,6 +319,10 @@ func (e *EndpointSlice) buildEndpointSlice(eps endpointSliceAdaptor) *targetgrou target[model.LabelName(endpointSliceEndpointTopologyLabelPresentPrefix+ln)] = presentValue } + if e.withNodeMetadata { + target = addNodeLabels(target, e.nodeInf, e.logger, ep.nodename()) + } + pod := e.resolvePodRef(ep.targetRef()) if pod == nil { // This target is not a Pod, so don't continue with Pod specific logic. diff --git a/discovery/kubernetes/endpointslice_adaptor.go b/discovery/kubernetes/endpointslice_adaptor.go index f22affb6f6b..87484b06fde 100644 --- a/discovery/kubernetes/endpointslice_adaptor.go +++ b/discovery/kubernetes/endpointslice_adaptor.go @@ -41,6 +41,7 @@ type endpointSlicePortAdaptor interface { type endpointSliceEndpointAdaptor interface { addresses() []string hostname() *string + nodename() *string conditions() endpointSliceEndpointConditionsAdaptor targetRef() *corev1.ObjectReference topology() map[string]string @@ -164,6 +165,10 @@ func (e *endpointSliceEndpointAdaptorV1) hostname() *string { return e.endpoint.Hostname } +func (e *endpointSliceEndpointAdaptorV1) nodename() *string { + return e.endpoint.NodeName +} + func (e *endpointSliceEndpointAdaptorV1) conditions() endpointSliceEndpointConditionsAdaptor { return newEndpointSliceEndpointConditionsAdaptorFromV1(e.endpoint.Conditions) } @@ -204,6 +209,10 @@ func (e *endpointSliceEndpointAdaptorV1beta1) hostname() *string { return e.endpoint.Hostname } +func (e *endpointSliceEndpointAdaptorV1beta1) nodename() *string { + return e.endpoint.NodeName +} + func (e *endpointSliceEndpointAdaptorV1beta1) conditions() endpointSliceEndpointConditionsAdaptor { return newEndpointSliceEndpointConditionsAdaptorFromV1beta1(e.endpoint.Conditions) } diff --git a/discovery/kubernetes/endpointslice_test.go b/discovery/kubernetes/endpointslice_test.go index 16148d2a0f5..91408c009c5 100644 --- a/discovery/kubernetes/endpointslice_test.go +++ b/discovery/kubernetes/endpointslice_test.go @@ -68,6 +68,7 @@ func makeEndpointSliceV1() *v1.EndpointSlice { Conditions: v1.EndpointConditions{Ready: boolptr(true)}, Hostname: strptr("testendpoint1"), TargetRef: &corev1.ObjectReference{}, + NodeName: strptr("foobar"), DeprecatedTopology: map[string]string{ "topology": "value", }, @@ -688,6 +689,147 @@ func TestEndpointSliceDiscoveryWithServiceUpdate(t *testing.T) { }.Run(t) } +func TestEndpointsSlicesDiscoveryWithNodeMetadata(t *testing.T) { + metadataConfig := AttachMetadataConfig{Node: true} + nodeLabels := map[string]string{"az": "us-east1"} + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testendpoints", + Namespace: "default", + Labels: map[string]string{ + "app/name": "test", + }, + }, + } + objs := []runtime.Object{makeEndpointSliceV1(), makeNode("foobar", "", "", nodeLabels, nil), svc} + n, _ := makeDiscoveryWithMetadata(RoleEndpointSlice, NamespaceDiscovery{}, metadataConfig, objs...) + + k8sDiscoveryTest{ + discovery: n, + expectedMaxItems: 1, + expectedRes: map[string]*targetgroup.Group{ + "endpointslice/default/testendpoints": { + Targets: []model.LabelSet{ + { + "__address__": "1.2.3.4:9000", + "__meta_kubernetes_endpointslice_address_target_kind": "", + "__meta_kubernetes_endpointslice_address_target_name": "", + "__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true", + "__meta_kubernetes_endpointslice_endpoint_hostname": "testendpoint1", + "__meta_kubernetes_endpointslice_endpoint_topology_present_topology": "true", + "__meta_kubernetes_endpointslice_endpoint_topology_topology": "value", + "__meta_kubernetes_endpointslice_port": "9000", + "__meta_kubernetes_endpointslice_port_app_protocol": "http", + "__meta_kubernetes_endpointslice_port_name": "testport", + "__meta_kubernetes_endpointslice_port_protocol": "TCP", + "__meta_kubernetes_node_label_az": "us-east1", + "__meta_kubernetes_node_labelpresent_az": "true", + "__meta_kubernetes_node_name": "foobar", + }, + { + "__address__": "2.3.4.5:9000", + "__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true", + "__meta_kubernetes_endpointslice_port": "9000", + "__meta_kubernetes_endpointslice_port_app_protocol": "http", + "__meta_kubernetes_endpointslice_port_name": "testport", + "__meta_kubernetes_endpointslice_port_protocol": "TCP", + }, + { + "__address__": "3.4.5.6:9000", + "__meta_kubernetes_endpointslice_endpoint_conditions_ready": "false", + "__meta_kubernetes_endpointslice_port": "9000", + "__meta_kubernetes_endpointslice_port_app_protocol": "http", + "__meta_kubernetes_endpointslice_port_name": "testport", + "__meta_kubernetes_endpointslice_port_protocol": "TCP", + }, + }, + Labels: model.LabelSet{ + "__meta_kubernetes_endpointslice_address_type": "IPv4", + "__meta_kubernetes_endpointslice_name": "testendpoints", + "__meta_kubernetes_namespace": "default", + "__meta_kubernetes_service_label_app_name": "test", + "__meta_kubernetes_service_labelpresent_app_name": "true", + "__meta_kubernetes_service_name": "testendpoints", + }, + Source: "endpointslice/default/testendpoints", + }, + }, + }.Run(t) +} + +func TestEndpointsSlicesDiscoveryWithUpdatedNodeMetadata(t *testing.T) { + metadataConfig := AttachMetadataConfig{Node: true} + nodeLabels := map[string]string{"az": "us-east1"} + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testendpoints", + Namespace: "default", + Labels: map[string]string{ + "app/name": "test", + }, + }, + } + node := makeNode("foobar", "", "", nodeLabels, nil) + objs := []runtime.Object{makeEndpointSliceV1(), node, svc} + n, c := makeDiscoveryWithMetadata(RoleEndpointSlice, NamespaceDiscovery{}, metadataConfig, objs...) + + k8sDiscoveryTest{ + discovery: n, + expectedMaxItems: 2, + afterStart: func() { + node.Labels["az"] = "us-central1" + c.CoreV1().Nodes().Update(context.Background(), node, metav1.UpdateOptions{}) + }, + expectedRes: map[string]*targetgroup.Group{ + "endpointslice/default/testendpoints": { + Targets: []model.LabelSet{ + { + "__address__": "1.2.3.4:9000", + "__meta_kubernetes_endpointslice_address_target_kind": "", + "__meta_kubernetes_endpointslice_address_target_name": "", + "__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true", + "__meta_kubernetes_endpointslice_endpoint_hostname": "testendpoint1", + "__meta_kubernetes_endpointslice_endpoint_topology_present_topology": "true", + "__meta_kubernetes_endpointslice_endpoint_topology_topology": "value", + "__meta_kubernetes_endpointslice_port": "9000", + "__meta_kubernetes_endpointslice_port_app_protocol": "http", + "__meta_kubernetes_endpointslice_port_name": "testport", + "__meta_kubernetes_endpointslice_port_protocol": "TCP", + "__meta_kubernetes_node_label_az": "us-central1", + "__meta_kubernetes_node_labelpresent_az": "true", + "__meta_kubernetes_node_name": "foobar", + }, + { + "__address__": "2.3.4.5:9000", + "__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true", + "__meta_kubernetes_endpointslice_port": "9000", + "__meta_kubernetes_endpointslice_port_app_protocol": "http", + "__meta_kubernetes_endpointslice_port_name": "testport", + "__meta_kubernetes_endpointslice_port_protocol": "TCP", + }, + { + "__address__": "3.4.5.6:9000", + "__meta_kubernetes_endpointslice_endpoint_conditions_ready": "false", + "__meta_kubernetes_endpointslice_port": "9000", + "__meta_kubernetes_endpointslice_port_app_protocol": "http", + "__meta_kubernetes_endpointslice_port_name": "testport", + "__meta_kubernetes_endpointslice_port_protocol": "TCP", + }, + }, + Labels: model.LabelSet{ + "__meta_kubernetes_endpointslice_address_type": "IPv4", + "__meta_kubernetes_endpointslice_name": "testendpoints", + "__meta_kubernetes_namespace": "default", + "__meta_kubernetes_service_label_app_name": "test", + "__meta_kubernetes_service_labelpresent_app_name": "true", + "__meta_kubernetes_service_name": "testendpoints", + }, + Source: "endpointslice/default/testendpoints", + }, + }, + }.Run(t) +} + func TestEndpointSliceDiscoveryNamespaces(t *testing.T) { epOne := makeEndpointSliceV1() epOne.Namespace = "ns1" diff --git a/discovery/kubernetes/kubernetes.go b/discovery/kubernetes/kubernetes.go index 3f417c49b3c..2d1e36d57d5 100644 --- a/discovery/kubernetes/kubernetes.go +++ b/discovery/kubernetes/kubernetes.go @@ -23,6 +23,8 @@ import ( "sync" "time" + disv1beta1 "k8s.io/api/discovery/v1beta1" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" @@ -31,7 +33,6 @@ import ( "github.com/prometheus/common/version" apiv1 "k8s.io/api/core/v1" disv1 "k8s.io/api/discovery/v1" - disv1beta1 "k8s.io/api/discovery/v1beta1" networkv1 "k8s.io/api/networking/v1" "k8s.io/api/networking/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -406,7 +407,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { } for _, namespace := range namespaces { - var informer cache.SharedInformer + var informer cache.SharedIndexInformer if v1Supported { e := d.client.DiscoveryV1().EndpointSlices(namespace) elw := &cache.ListWatch{ @@ -421,7 +422,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { return e.Watch(ctx, options) }, } - informer = cache.NewSharedInformer(elw, &disv1.EndpointSlice{}, resyncPeriod) + informer = d.newEndpointSlicesByNodeInformer(elw, &disv1.EndpointSlice{}) } else { e := d.client.DiscoveryV1beta1().EndpointSlices(namespace) elw := &cache.ListWatch{ @@ -436,7 +437,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { return e.Watch(ctx, options) }, } - informer = cache.NewSharedInformer(elw, &disv1beta1.EndpointSlice{}, resyncPeriod) + informer = d.newEndpointSlicesByNodeInformer(elw, &disv1beta1.EndpointSlice{}) } s := d.client.CoreV1().Services(namespace) @@ -465,11 +466,17 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { return p.Watch(ctx, options) }, } + var nodeInf cache.SharedInformer + if d.attachMetadata.Node { + nodeInf = d.newNodeInformer(context.Background()) + go nodeInf.Run(ctx.Done()) + } eps := NewEndpointSlice( log.With(d.logger, "role", "endpointslice"), - cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod), informer, + cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod), cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod), + nodeInf, ) d.discoverers = append(d.discoverers, eps) go eps.endpointSliceInf.Run(ctx.Done()) @@ -517,11 +524,18 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { return p.Watch(ctx, options) }, } + var nodeInf cache.SharedInformer + if d.attachMetadata.Node { + nodeInf = d.newNodeInformer(ctx) + go nodeInf.Run(ctx.Done()) + } + eps := NewEndpoints( log.With(d.logger, "role", "endpoint"), + d.newEndpointsByNodeInformer(elw), cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod), - cache.NewSharedInformer(elw, &apiv1.Endpoints{}, resyncPeriod), cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod), + nodeInf, ) d.discoverers = append(d.discoverers, eps) go eps.endpointsInf.Run(ctx.Done()) @@ -735,6 +749,65 @@ func (d *Discovery) newPodsByNodeInformer(plw *cache.ListWatch) cache.SharedInde return cache.NewSharedIndexInformer(plw, &apiv1.Pod{}, resyncPeriod, indexers) } +func (d *Discovery) newEndpointsByNodeInformer(plw *cache.ListWatch) cache.SharedIndexInformer { + indexers := make(map[string]cache.IndexFunc) + if !d.attachMetadata.Node { + return cache.NewSharedIndexInformer(plw, &apiv1.Endpoints{}, resyncPeriod, indexers) + } + + indexers[nodeIndex] = func(obj interface{}) ([]string, error) { + e, ok := obj.(*apiv1.Endpoints) + if !ok { + return nil, fmt.Errorf("object is not a pod") + } + var nodes []string + for _, target := range e.Subsets { + for _, addr := range target.Addresses { + if addr.NodeName == nil { + continue + } + nodes = append(nodes, *addr.NodeName) + } + } + return nodes, nil + } + + return cache.NewSharedIndexInformer(plw, &apiv1.Endpoints{}, resyncPeriod, indexers) +} + +func (d *Discovery) newEndpointSlicesByNodeInformer(plw *cache.ListWatch, object runtime.Object) cache.SharedIndexInformer { + indexers := make(map[string]cache.IndexFunc) + if !d.attachMetadata.Node { + cache.NewSharedIndexInformer(plw, &disv1.EndpointSlice{}, resyncPeriod, indexers) + } + + indexers[nodeIndex] = func(obj interface{}) ([]string, error) { + var nodes []string + switch e := obj.(type) { + case *disv1.EndpointSlice: + for _, target := range e.Endpoints { + if target.NodeName == nil { + continue + } + nodes = append(nodes, *target.NodeName) + } + case *disv1beta1.EndpointSlice: + for _, target := range e.Endpoints { + if target.NodeName == nil { + continue + } + nodes = append(nodes, *target.NodeName) + } + default: + return nil, fmt.Errorf("object is not an endpointslice") + } + + return nodes, nil + } + + return cache.NewSharedIndexInformer(plw, object, resyncPeriod, indexers) +} + func checkDiscoveryV1Supported(client kubernetes.Interface) (bool, error) { k8sVer, err := client.Discovery().ServerVersion() if err != nil { diff --git a/discovery/kubernetes/pod.go b/discovery/kubernetes/pod.go index 10ec4512a2c..2e55dce7891 100644 --- a/discovery/kubernetes/pod.go +++ b/discovery/kubernetes/pod.go @@ -253,7 +253,7 @@ func (p *Pod) buildPod(pod *apiv1.Pod) *targetgroup.Group { tg.Labels = podLabels(pod) tg.Labels[namespaceLabel] = lv(pod.Namespace) if p.withNodeMetadata { - p.attachNodeMetadata(tg, pod) + tg.Labels = addNodeLabels(tg.Labels, p.nodeInf, p.logger, &pod.Spec.NodeName) } containers := append(pod.Spec.Containers, pod.Spec.InitContainers...) @@ -291,27 +291,6 @@ func (p *Pod) buildPod(pod *apiv1.Pod) *targetgroup.Group { return tg } -func (p *Pod) attachNodeMetadata(tg *targetgroup.Group, pod *apiv1.Pod) { - tg.Labels[nodeNameLabel] = lv(pod.Spec.NodeName) - - obj, exists, err := p.nodeInf.GetStore().GetByKey(pod.Spec.NodeName) - if err != nil { - level.Error(p.logger).Log("msg", "Error getting node", "node", pod.Spec.NodeName, "err", err) - return - } - - if !exists { - return - } - - node := obj.(*apiv1.Node) - for k, v := range node.GetLabels() { - ln := strutil.SanitizeLabelName(k) - tg.Labels[model.LabelName(nodeLabelPrefix+ln)] = lv(v) - tg.Labels[model.LabelName(nodeLabelPresentPrefix+ln)] = presentValue - } -} - func (p *Pod) enqueuePodsForNode(nodeName string) { pods, err := p.podInf.GetIndexer().ByIndex(nodeIndex, nodeName) if err != nil { diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index b1d59158229..b1e3bb47db0 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -1851,7 +1851,7 @@ namespaces: # Optional metadata to attach to discovered targets. If omitted, no additional metadata is attached. attach_metadata: -# Attaches node metadata to discovered targets. Only valid for role: pod. +# Attaches node metadata to discovered targets. Valid for roles: pod, endpoints, endpointslice. # When set to true, Prometheus must have permissions to get Nodes. [ node: | default = false ] ```