Skip to content

Commit

Permalink
Merge pull request #10759 from fpetkovski/endpoints-node-meta
Browse files Browse the repository at this point in the history
kubernetes_sd: Allow attaching node labels for endpoint role
  • Loading branch information
brancz committed Jun 16, 2022
2 parents 80fbe1d + 05da373 commit 0d0e44d
Show file tree
Hide file tree
Showing 8 changed files with 478 additions and 45 deletions.
95 changes: 82 additions & 13 deletions discovery/kubernetes/endpoints.go
Expand Up @@ -41,9 +41,11 @@ var (
type Endpoints struct {
logger log.Logger

endpointsInf cache.SharedInformer
serviceInf cache.SharedInformer
podInf cache.SharedInformer
endpointsInf cache.SharedIndexInformer
serviceInf cache.SharedInformer
podInf cache.SharedInformer
nodeInf cache.SharedInformer
withNodeMetadata bool

podStore cache.Store
endpointsStore cache.Store
Expand All @@ -53,19 +55,21 @@ type Endpoints struct {
}

// NewEndpoints returns a new endpoints discovery.
func NewEndpoints(l log.Logger, svc, eps, pod cache.SharedInformer) *Endpoints {
func NewEndpoints(l log.Logger, eps cache.SharedIndexInformer, svc, pod, node cache.SharedInformer) *Endpoints {
if l == nil {
l = log.NewNopLogger()
}
e := &Endpoints{
logger: l,
endpointsInf: eps,
endpointsStore: eps.GetStore(),
serviceInf: svc,
serviceStore: svc.GetStore(),
podInf: pod,
podStore: pod.GetStore(),
queue: workqueue.NewNamed("endpoints"),
logger: l,
endpointsInf: eps,
endpointsStore: eps.GetStore(),
serviceInf: svc,
serviceStore: svc.GetStore(),
podInf: pod,
podStore: pod.GetStore(),
nodeInf: node,
withNodeMetadata: node != nil,
queue: workqueue.NewNamed("endpoints"),
}

e.endpointsInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -118,10 +122,38 @@ func NewEndpoints(l log.Logger, svc, eps, pod cache.SharedInformer) *Endpoints {
serviceUpdate(o)
},
})
if e.withNodeMetadata {
e.nodeInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
node := o.(*apiv1.Node)
e.enqueueNode(node.Name)
},
UpdateFunc: func(_, o interface{}) {
node := o.(*apiv1.Node)
e.enqueueNode(node.Name)
},
DeleteFunc: func(o interface{}) {
node := o.(*apiv1.Node)
e.enqueueNode(node.Name)
},
})
}

return e
}

func (e *Endpoints) enqueueNode(nodeName string) {
endpoints, err := e.endpointsInf.GetIndexer().ByIndex(nodeIndex, nodeName)
if err != nil {
level.Error(e.logger).Log("msg", "Error getting endpoints for node", "node", nodeName, "err", err)
return
}

for _, endpoint := range endpoints {
e.enqueue(endpoint)
}
}

func (e *Endpoints) enqueue(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
Expand All @@ -135,7 +167,12 @@ func (e *Endpoints) enqueue(obj interface{}) {
func (e *Endpoints) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
defer e.queue.ShutDown()

if !cache.WaitForCacheSync(ctx.Done(), e.endpointsInf.HasSynced, e.serviceInf.HasSynced, e.podInf.HasSynced) {
cacheSyncs := []cache.InformerSynced{e.endpointsInf.HasSynced, e.serviceInf.HasSynced, e.podInf.HasSynced}
if e.withNodeMetadata {
cacheSyncs = append(cacheSyncs, e.nodeInf.HasSynced)
}

if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
if !errors.Is(ctx.Err(), context.Canceled) {
level.Error(e.logger).Log("msg", "endpoints informer unable to sync cache")
}
Expand Down Expand Up @@ -257,6 +294,10 @@ func (e *Endpoints) buildEndpoints(eps *apiv1.Endpoints) *targetgroup.Group {
target[model.LabelName(endpointHostname)] = lv(addr.Hostname)
}

if e.withNodeMetadata {
target = addNodeLabels(target, e.nodeInf, e.logger, addr.NodeName)
}

pod := e.resolvePodRef(addr.TargetRef)
if pod == nil {
// This target is not a Pod, so don't continue with Pod specific logic.
Expand Down Expand Up @@ -387,3 +428,31 @@ func (e *Endpoints) addServiceLabels(ns, name string, tg *targetgroup.Group) {

tg.Labels = tg.Labels.Merge(serviceLabels(svc))
}

func addNodeLabels(tg model.LabelSet, nodeInf cache.SharedInformer, logger log.Logger, nodeName *string) model.LabelSet {
if nodeName == nil {
return tg
}

obj, exists, err := nodeInf.GetStore().GetByKey(*nodeName)
if err != nil {
level.Error(logger).Log("msg", "Error getting node", "node", *nodeName, "err", err)
return tg
}

if !exists {
return tg
}

node := obj.(*apiv1.Node)
// Allocate one target label for the node name,
// and two target labels for each node label.
nodeLabelset := make(model.LabelSet, 1+2*len(node.GetLabels()))
nodeLabelset[nodeNameLabel] = lv(*nodeName)
for k, v := range node.GetLabels() {
ln := strutil.SanitizeLabelName(k)
nodeLabelset[model.LabelName(nodeLabelPrefix+ln)] = lv(v)
nodeLabelset[model.LabelName(nodeLabelPresentPrefix+ln)] = presentValue
}
return tg.Merge(nodeLabelset)
}
120 changes: 120 additions & 0 deletions discovery/kubernetes/endpoints_test.go
Expand Up @@ -478,6 +478,126 @@ func TestEndpointsDiscoveryWithServiceUpdate(t *testing.T) {
}.Run(t)
}

func TestEndpointsDiscoveryWithNodeMetadata(t *testing.T) {
metadataConfig := AttachMetadataConfig{Node: true}
nodeLabels := map[string]string{"az": "us-east1"}
node := makeNode("foobar", "", "", nodeLabels, nil)
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "testendpoints",
Namespace: "default",
Labels: map[string]string{
"app/name": "test",
},
},
}
n, _ := makeDiscoveryWithMetadata(RoleEndpoint, NamespaceDiscovery{}, metadataConfig, makeEndpoints(), svc, node)

k8sDiscoveryTest{
discovery: n,
expectedMaxItems: 1,
expectedRes: map[string]*targetgroup.Group{
"endpoints/default/testendpoints": {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_endpoint_hostname": "testendpoint1",
"__meta_kubernetes_endpoint_node_name": "foobar",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
"__meta_kubernetes_node_label_az": "us-east1",
"__meta_kubernetes_node_labelpresent_az": "true",
"__meta_kubernetes_node_name": "foobar",
},
{
"__address__": "2.3.4.5:9001",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
},
{
"__address__": "2.3.4.5:9001",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "false",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_namespace": "default",
"__meta_kubernetes_endpoints_name": "testendpoints",
"__meta_kubernetes_service_label_app_name": "test",
"__meta_kubernetes_service_labelpresent_app_name": "true",
"__meta_kubernetes_service_name": "testendpoints",
},
Source: "endpoints/default/testendpoints",
},
},
}.Run(t)
}

func TestEndpointsDiscoveryWithUpdatedNodeMetadata(t *testing.T) {
nodeLabels := map[string]string{"az": "us-east1"}
nodes := makeNode("foobar", "", "", nodeLabels, nil)
metadataConfig := AttachMetadataConfig{Node: true}
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "testendpoints",
Namespace: "default",
Labels: map[string]string{
"app/name": "test",
},
},
}
n, c := makeDiscoveryWithMetadata(RoleEndpoint, NamespaceDiscovery{}, metadataConfig, makeEndpoints(), nodes, svc)

k8sDiscoveryTest{
discovery: n,
afterStart: func() {
nodes.Labels["az"] = "eu-central1"
c.CoreV1().Nodes().Update(context.Background(), nodes, metav1.UpdateOptions{})
},
expectedMaxItems: 2,
expectedRes: map[string]*targetgroup.Group{
"endpoints/default/testendpoints": {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_endpoint_hostname": "testendpoint1",
"__meta_kubernetes_endpoint_node_name": "foobar",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
"__meta_kubernetes_node_label_az": "eu-central1",
"__meta_kubernetes_node_labelpresent_az": "true",
"__meta_kubernetes_node_name": "foobar",
},
{
"__address__": "2.3.4.5:9001",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
},
{
"__address__": "2.3.4.5:9001",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "false",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_namespace": "default",
"__meta_kubernetes_endpoints_name": "testendpoints",
"__meta_kubernetes_service_label_app_name": "test",
"__meta_kubernetes_service_labelpresent_app_name": "true",
"__meta_kubernetes_service_name": "testendpoints",
},
Source: "endpoints/default/testendpoints",
},
},
}.Run(t)
}

func TestEndpointsDiscoveryNamespaces(t *testing.T) {
epOne := makeEndpoints()
epOne.Namespace = "ns1"
Expand Down
47 changes: 44 additions & 3 deletions discovery/kubernetes/endpointslice.go
Expand Up @@ -42,9 +42,11 @@ var (
type EndpointSlice struct {
logger log.Logger

endpointSliceInf cache.SharedInformer
endpointSliceInf cache.SharedIndexInformer
serviceInf cache.SharedInformer
podInf cache.SharedInformer
nodeInf cache.SharedInformer
withNodeMetadata bool

podStore cache.Store
endpointSliceStore cache.Store
Expand All @@ -54,7 +56,7 @@ type EndpointSlice struct {
}

// NewEndpointSlice returns a new endpointslice discovery.
func NewEndpointSlice(l log.Logger, svc, eps, pod cache.SharedInformer) *EndpointSlice {
func NewEndpointSlice(l log.Logger, eps cache.SharedIndexInformer, svc, pod, node cache.SharedInformer) *EndpointSlice {
if l == nil {
l = log.NewNopLogger()
}
Expand All @@ -66,6 +68,8 @@ func NewEndpointSlice(l log.Logger, svc, eps, pod cache.SharedInformer) *Endpoin
serviceStore: svc.GetStore(),
podInf: pod,
podStore: pod.GetStore(),
nodeInf: node,
withNodeMetadata: node != nil,
queue: workqueue.NewNamed("endpointSlice"),
}

Expand Down Expand Up @@ -120,9 +124,38 @@ func NewEndpointSlice(l log.Logger, svc, eps, pod cache.SharedInformer) *Endpoin
},
})

if e.withNodeMetadata {
e.nodeInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
node := o.(*apiv1.Node)
e.enqueueNode(node.Name)
},
UpdateFunc: func(_, o interface{}) {
node := o.(*apiv1.Node)
e.enqueueNode(node.Name)
},
DeleteFunc: func(o interface{}) {
node := o.(*apiv1.Node)
e.enqueueNode(node.Name)
},
})
}

return e
}

func (e *EndpointSlice) enqueueNode(nodeName string) {
endpoints, err := e.endpointSliceInf.GetIndexer().ByIndex(nodeIndex, nodeName)
if err != nil {
level.Error(e.logger).Log("msg", "Error getting endpoints for node", "node", nodeName, "err", err)
return
}

for _, endpoint := range endpoints {
e.enqueue(endpoint)
}
}

func (e *EndpointSlice) enqueue(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
Expand All @@ -136,7 +169,11 @@ func (e *EndpointSlice) enqueue(obj interface{}) {
func (e *EndpointSlice) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
defer e.queue.ShutDown()

if !cache.WaitForCacheSync(ctx.Done(), e.endpointSliceInf.HasSynced, e.serviceInf.HasSynced, e.podInf.HasSynced) {
cacheSyncs := []cache.InformerSynced{e.endpointSliceInf.HasSynced, e.serviceInf.HasSynced, e.podInf.HasSynced}
if e.withNodeMetadata {
cacheSyncs = append(cacheSyncs, e.nodeInf.HasSynced)
}
if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
if ctx.Err() != context.Canceled {
level.Error(e.logger).Log("msg", "endpointslice informer unable to sync cache")
}
Expand Down Expand Up @@ -282,6 +319,10 @@ func (e *EndpointSlice) buildEndpointSlice(eps endpointSliceAdaptor) *targetgrou
target[model.LabelName(endpointSliceEndpointTopologyLabelPresentPrefix+ln)] = presentValue
}

if e.withNodeMetadata {
target = addNodeLabels(target, e.nodeInf, e.logger, ep.nodename())
}

pod := e.resolvePodRef(ep.targetRef())
if pod == nil {
// This target is not a Pod, so don't continue with Pod specific logic.
Expand Down
9 changes: 9 additions & 0 deletions discovery/kubernetes/endpointslice_adaptor.go
Expand Up @@ -41,6 +41,7 @@ type endpointSlicePortAdaptor interface {
type endpointSliceEndpointAdaptor interface {
addresses() []string
hostname() *string
nodename() *string
conditions() endpointSliceEndpointConditionsAdaptor
targetRef() *corev1.ObjectReference
topology() map[string]string
Expand Down Expand Up @@ -164,6 +165,10 @@ func (e *endpointSliceEndpointAdaptorV1) hostname() *string {
return e.endpoint.Hostname
}

func (e *endpointSliceEndpointAdaptorV1) nodename() *string {
return e.endpoint.NodeName
}

func (e *endpointSliceEndpointAdaptorV1) conditions() endpointSliceEndpointConditionsAdaptor {
return newEndpointSliceEndpointConditionsAdaptorFromV1(e.endpoint.Conditions)
}
Expand Down Expand Up @@ -204,6 +209,10 @@ func (e *endpointSliceEndpointAdaptorV1beta1) hostname() *string {
return e.endpoint.Hostname
}

func (e *endpointSliceEndpointAdaptorV1beta1) nodename() *string {
return e.endpoint.NodeName
}

func (e *endpointSliceEndpointAdaptorV1beta1) conditions() endpointSliceEndpointConditionsAdaptor {
return newEndpointSliceEndpointConditionsAdaptorFromV1beta1(e.endpoint.Conditions)
}
Expand Down

0 comments on commit 0d0e44d

Please sign in to comment.