Skip to content

Commit

Permalink
Full implementation of KEP-1669 ProxyTerminatingEndpoints
Browse files Browse the repository at this point in the history
The previous implementation was an approximation of KEP-1669 ProxyTerminatingEndpoints: we simply included terminating serving endpoints (ready=false, serving=true, terminating=true) along with ready ones in the endpoint selection logic. Let's fully implement KEP-1669 and only include terminating endpoints if none are ready. The selection follows two simple steps:
1) Take all ready endpoints
2) If no ready endpoints were found, take all serving terminating endpoints.

This should also help with an issue found in a production cluster (https://issues.redhat.com/browse/OCPBUGS-24363) where, due to infrequent readiness probes, terminating endpoints were declared as non-serving (that is, their readiness probe failed) only quite late and were included as valid endpoints for quite a bit, while the existing ready endpoints should have been preferred.

Extended the test cases to include testing against multiple slices and dual stack scenarios.

Signed-off-by: Riccardo Ravaioli <rravaiol@redhat.com>
  • Loading branch information
ricky-rav committed Jan 11, 2024
1 parent 51e53f9 commit 1c52192
Show file tree
Hide file tree
Showing 4 changed files with 949 additions and 135 deletions.
2 changes: 1 addition & 1 deletion go-controller/pkg/node/default_node_network_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1261,7 +1261,7 @@ func (nc *DefaultNodeNetworkController) reconcileConntrackUponEndpointSliceEvent
oldIPStr := utilnet.ParseIPSloppy(oldIP).String()
// upon an update event, remove conntrack entries for IP addresses that are no longer
// in the endpointslice, skip otherwise
if newEndpointSlice != nil && util.DoesEndpointSliceContainEndpoint(newEndpointSlice, oldIPStr, *oldPort.Port, *oldPort.Protocol, svc) {
if newEndpointSlice != nil && util.DoesEndpointSliceContainEligibleEndpoint(newEndpointSlice, oldIPStr, *oldPort.Port, *oldPort.Protocol, svc) {
continue
}
// upon update and delete events, flush conntrack only for UDP
Expand Down
20 changes: 10 additions & 10 deletions go-controller/pkg/node/gateway_shared_intf.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ func (npw *nodePortWatcher) AddService(service *kapi.Service) error {
hasLocalHostNetworkEp = false
} else {
nodeIPs := npw.nodeIPManager.ListAddresses()
localEndpoints = npw.GetLocalEndpointAddresses(epSlices, service)
localEndpoints = npw.GetLocalEligibleEndpointAddresses(epSlices, service)
hasLocalHostNetworkEp = util.HasLocalHostNetworkEndpoints(localEndpoints, nodeIPs)
}
// If something didn't already do it add correct Service rules
Expand Down Expand Up @@ -757,7 +757,7 @@ func (npw *nodePortWatcher) SyncServices(services []interface{}) error {
continue
}
nodeIPs := npw.nodeIPManager.ListAddresses()
localEndpoints := npw.GetLocalEndpointAddresses(epSlices, service)
localEndpoints := npw.GetLocalEligibleEndpointAddresses(epSlices, service)
hasLocalHostNetworkEp := util.HasLocalHostNetworkEndpoints(localEndpoints, nodeIPs)
npw.getAndSetServiceInfo(name, service, hasLocalHostNetworkEp, localEndpoints)

Expand Down Expand Up @@ -821,7 +821,7 @@ func (npw *nodePortWatcher) AddEndpointSlice(epSlice *discovery.EndpointSlice) e
// No need to continue adding the new endpoint slice, if we can't retrieve all slices for this service
return fmt.Errorf("error retrieving endpointslices for service %s/%s during endpointslice add: %w", svc.Namespace, svc.Name, err)
}
localEndpoints := npw.GetLocalEndpointAddresses(epSlices, svc)
localEndpoints := npw.GetLocalEligibleEndpointAddresses(epSlices, svc)
hasLocalHostNetworkEp := util.HasLocalHostNetworkEndpoints(localEndpoints, nodeIPs)

// Here we make sure the correct rules are programmed whenever an AddEndpointSlice event is
Expand Down Expand Up @@ -880,7 +880,7 @@ func (npw *nodePortWatcher) DeleteEndpointSlice(epSlice *discovery.EndpointSlice
return fmt.Errorf("error retrieving service %s/%s for endpointslice %s during endpointslice delete: %v",
namespacedName.Namespace, namespacedName.Name, epSlice.Name, err)
}
localEndpoints := npw.GetLocalEndpointAddresses(epSlices, svc)
localEndpoints := npw.GetLocalEligibleEndpointAddresses(epSlices, svc)
if svcConfig, exists := npw.updateServiceInfo(namespacedName, nil, &hasLocalHostNetworkEp, localEndpoints); exists {
// Lock the cache mutex here so we don't miss a service delete during an endpoint delete
// we have to do this because deleting and adding iptables rules is slow.
Expand All @@ -899,8 +899,8 @@ func (npw *nodePortWatcher) DeleteEndpointSlice(epSlice *discovery.EndpointSlice
}

// GetLocalEndpointAddresses returns a list of eligible endpoints that are local to the node
func (npw *nodePortWatcher) GetLocalEndpointAddresses(endpointSlices []*discovery.EndpointSlice, service *kapi.Service) sets.Set[string] {
return util.GetLocalEndpointAddresses(endpointSlices, service, npw.nodeIPManager.nodeName)
func (npw *nodePortWatcher) GetLocalEligibleEndpointAddresses(endpointSlices []*discovery.EndpointSlice, service *kapi.Service) sets.Set[string] {
return util.GetLocalEligibleEndpointAddresses(endpointSlices, service, npw.nodeIPManager.nodeName)
}

func (npw *nodePortWatcher) UpdateEndpointSlice(oldEpSlice, newEpSlice *discovery.EndpointSlice) error {
Expand All @@ -920,8 +920,8 @@ func (npw *nodePortWatcher) UpdateEndpointSlice(oldEpSlice, newEpSlice *discover
namespacedName.Namespace, namespacedName.Name, newEpSlice.Name, err)
}

oldEndpointAddresses := util.GetEndpointAddresses([]*discovery.EndpointSlice{oldEpSlice}, svc)
newEndpointAddresses := util.GetEndpointAddresses([]*discovery.EndpointSlice{newEpSlice}, svc)
oldEndpointAddresses := util.GetEligibleEndpointAddresses([]*discovery.EndpointSlice{oldEpSlice}, svc)
newEndpointAddresses := util.GetEligibleEndpointAddresses([]*discovery.EndpointSlice{newEpSlice}, svc)
if reflect.DeepEqual(oldEndpointAddresses, newEndpointAddresses) {
return nil
}
Expand Down Expand Up @@ -961,8 +961,8 @@ func (npw *nodePortWatcher) UpdateEndpointSlice(oldEpSlice, newEpSlice *discover
// endpoints has changed. For this second comparison, check first between the old endpoint slice and all current
// endpointslices for this service. This is a partial comparison, in case serviceInfo is not set. When it is set, compare
// between /all/ old endpoint slices and all new ones.
oldLocalEndpoints := npw.GetLocalEndpointAddresses([]*discovery.EndpointSlice{oldEpSlice}, svc)
newLocalEndpoints := npw.GetLocalEndpointAddresses(epSlices, svc)
oldLocalEndpoints := npw.GetLocalEligibleEndpointAddresses([]*discovery.EndpointSlice{oldEpSlice}, svc)
newLocalEndpoints := npw.GetLocalEligibleEndpointAddresses(epSlices, svc)
hasLocalHostNetworkEpOld := util.HasLocalHostNetworkEndpoints(oldLocalEndpoints, nodeIPs)
hasLocalHostNetworkEpNew := util.HasLocalHostNetworkEndpoints(newLocalEndpoints, nodeIPs)

Expand Down
242 changes: 157 additions & 85 deletions go-controller/pkg/util/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,12 +599,14 @@ type LbEndpoints struct {
Port int32
}

// GetLbEndpoints returns the IPv4 and IPv6 addresses of eligible endpoints as slices inside a struct
// GetLbEndpoints returns the IPv4 and IPv6 addresses of eligible endpoints from a
// provided list of endpoint slices, for a given service and service port.
func GetLbEndpoints(slices []*discovery.EndpointSlice, svcPort kapi.ServicePort, service *v1.Service) LbEndpoints {
v4ips := sets.NewString()
v6ips := sets.NewString()

var allEndpoints []discovery.Endpoint
v4IPs := sets.NewString()
v6IPs := sets.NewString()
out := LbEndpoints{}

// return an empty object so the caller doesn't have to check for nil and can use it as an iterator
if len(slices) == 0 {
return out
Expand All @@ -613,43 +615,58 @@ func GetLbEndpoints(slices []*discovery.EndpointSlice, svcPort kapi.ServicePort,
for _, slice := range slices {
klog.V(5).Infof("Getting endpoints for slice %s/%s", slice.Namespace, slice.Name)

// build the list of valid endpoints in the slice
for _, port := range slice.Ports {
for _, slicePort := range slice.Ports {
// If Service port name is set, it must match the name field in the endpoint
// If Service port name is not set, we just use the endpoint port
if svcPort.Name != "" && svcPort.Name != *port.Name {
if svcPort.Name != "" && svcPort.Name != *slicePort.Name {
klog.V(5).Infof("Slice %s with different Port name, requested: %s received: %s",
slice.Name, svcPort.Name, *port.Name)
slice.Name, svcPort.Name, *slicePort.Name)
continue
}

// Skip ports that don't match the protocol
if *port.Protocol != svcPort.Protocol {
if *slicePort.Protocol != svcPort.Protocol {
klog.V(5).Infof("Slice %s with different Port protocol, requested: %s received: %s",
slice.Name, svcPort.Protocol, *port.Protocol)
slice.Name, svcPort.Protocol, *slicePort.Protocol)
continue
}

out.Port = *port.Port
ForEachEligibleEndpoint(slice, service, func(endpoint discovery.Endpoint, shortcut *bool) {
for _, ip := range endpoint.Addresses {
klog.V(5).Infof("Adding slice %s endpoint: %v, port: %d", slice.Name, endpoint.Addresses, *port.Port)
ipStr := utilnet.ParseIPSloppy(ip).String()
switch slice.AddressType {
case discovery.AddressTypeIPv4:
v4ips.Insert(ipStr)
case discovery.AddressTypeIPv6:
v6ips.Insert(ipStr)
default:
klog.V(5).Infof("Skipping FQDN slice %s/%s", slice.Namespace, slice.Name)
}
}
})
out.Port = *slicePort.Port

if slice.AddressType == discovery.AddressTypeFQDN {
klog.V(5).Infof("Skipping FQDN slice %s/%s", slice.Namespace, slice.Name)
} else { // endpoints are here either IPv4 or IPv6
allEndpoints = append(allEndpoints, slice.Endpoints...)
}
}
}

out.V4IPs = v4ips.List()
out.V6IPs = v6ips.List()
serviceStr := ""
if service != nil {
serviceStr = fmt.Sprintf(" for service %s/%s", service.Namespace, service.Name)
}
// separate IPv4 from IPv6 addresses for eligible endpoints
ForEachEligibleEndpoint(allEndpoints, service, func(endpoint discovery.Endpoint) {
for _, ip := range endpoint.Addresses {
if utilnet.IsIPv4String(ip) {
klog.V(5).Infof("Adding endpoint IPv4 address %s port %d%s",
ip, out.Port, serviceStr)
v4IPs.Insert(utilnet.ParseIPSloppy(ip).String())

} else if utilnet.IsIPv6String(ip) {
klog.V(5).Infof("Adding endpoint IPv6 address %s port %d%s",
ip, out.Port, serviceStr)
v6IPs.Insert(utilnet.ParseIPSloppy(ip).String())

} else {
klog.V(5).Infof("Skipping unrecognized address %s port %d%s",
ip, out.Port, serviceStr)
}
}
})

out.V4IPs = v4IPs.List()
out.V6IPs = v6IPs.List()
klog.V(5).Infof("LB Endpoints for %s/%s are: %v / %v on port: %d",
slices[0].Namespace, slices[0].Labels[discovery.LabelServiceName],
out.V4IPs, out.V6IPs, out.Port)
Expand Down Expand Up @@ -702,11 +719,8 @@ func IsEndpointServing(endpoint discovery.Endpoint) bool {
}
}

// IsEndpointEligible takes as input an endpoint from an endpoint slice and a boolean that indicates whether to include
// all terminating endpoints, as per the PublishNotReadyAddresses feature in kubernetes service spec. It always returns true
// if includeTerminating is true and falls back to IsEndpointServing otherwise.
func IsEndpointEligible(endpoint discovery.Endpoint, includeTerminating bool) bool {
return includeTerminating || IsEndpointServing(endpoint)
func IsEndpointTerminating(endpoint discovery.Endpoint) bool {
return endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating
}

// NoHostSubnet() compares the no-hostsubnet-nodes flag with node labels to see if the node is managing its
Expand All @@ -719,55 +733,134 @@ func NoHostSubnet(node *v1.Node) bool {
return config.Kubernetes.NoHostSubnetNodes.Matches(labels.Set(node.Labels))
}

// ForEachEligibleEndpoint iterates through each eligible endpoint in the given endpointslice and applies the input function fn to it.
// An endpoint is eligible if it is serving or if its corresponding service has Spec.PublishNotReadyAddresses set.
// PublishNotReadyAddresses tells endpoint consumers to disregard any indications of ready/not-ready and is generally used
// together with headless services so that DNS records of all endpoints (ready or not) are always published.
// Function fn takes a bool pointer "shortcut" that, when set to true inside fn, ends the iteration; this is useful when fn
// checks for a condition on the endpoints and we want to return as soon as the condition is satisfied.
func ForEachEligibleEndpoint(endpointSlice *discovery.EndpointSlice, service *kapi.Service, fn func(ep discovery.Endpoint, shortcut *bool)) {
// ForEachEligibleEndpoint first finds eligible endpoints from a provided list of endpoints
// and then iterates through them to apply the given function fn to each eligible endpoint.
func ForEachEligibleEndpoint(endpoints []discovery.Endpoint, service *kapi.Service, fn func(ep discovery.Endpoint)) {
forEachSelectedAndEligibleEndpoint(endpoints, service, nil, fn)
}

// forEachSelectedAndEligibleEndpoint does the following:
// (1) filters the given endpoints by the given condition function condFn;
// (2) selects the eligible endpoints
// (3) applies the provided function fn to them.
// Eligible endpoints are ready endpoints. If there are none, eligible endpoints are serving & terminating endpoints
// (an endpoint that is terminating shows ready=false).
// Returns true when elegible endpoints where ready endpoints or false if they were terminating endpoints
// The endpoint selection in step 2 above implements KEP-1669
// (https://github.com/kubernetes/enhancements/blob/master/keps/sig-network/1669-proxy-terminating-endpoints/README.md)
// The corresponding service needs to provided as an input argument because if Spec.PublishNotReadyAddresses is set,
// then all provided endpoints must always be returned. PublishNotReadyAddresses tells endpoint consumers to disregard
// any indications of ready/not-ready and is generally used together with headless services so that DNS records of
// all endpoints (ready or not) are always published.
func forEachSelectedAndEligibleEndpoint(endpoints []discovery.Endpoint, service *kapi.Service, condFn func(ep discovery.Endpoint) bool, fn func(ep discovery.Endpoint)) bool {
includeTerminating := service != nil && service.Spec.PublishNotReadyAddresses
var shortcut bool
for _, endpoint := range endpointSlice.Endpoints {
if IsEndpointEligible(endpoint, includeTerminating) {
fn(endpoint, &shortcut)
if shortcut {
// shortcircuit the whole iteration
return
var selectedEndpoints []discovery.Endpoint
var eligibleSelectedEndpoints []discovery.Endpoint
var fallbackToTerminatingEndpoints bool

// Apply precondition on endpoints, if provided
if condFn != nil {
for _, endpoint := range endpoints {
if condFn(endpoint) {
selectedEndpoints = append(selectedEndpoints, endpoint)
}
}
} else {
selectedEndpoints = endpoints
}

// Apply endpoint selection based on readiness
for _, endpoint := range selectedEndpoints {
if includeTerminating || IsEndpointReady(endpoint) {
eligibleSelectedEndpoints = append(eligibleSelectedEndpoints, endpoint)
}
}
serviceStr := ""
if service != nil {
serviceStr = fmt.Sprintf(" (service %s/%s)", service.Namespace, service.Name)
}
klog.V(5).Infof("Endpoint selection%s: found %d ready endpoints", serviceStr, len(eligibleSelectedEndpoints))

// Fallback to serving terminating endpoints (ready=false, serving=true, terminating=true) only if none are ready.
if len(eligibleSelectedEndpoints) == 0 {
for _, endpoint := range selectedEndpoints {
if IsEndpointServing(endpoint) && IsEndpointTerminating(endpoint) {
eligibleSelectedEndpoints = append(eligibleSelectedEndpoints, endpoint)
}
}
klog.V(5).Infof("Endpoint selection%s: fallback to %d serving & terminating endpoints",
serviceStr, len(eligibleSelectedEndpoints))
fallbackToTerminatingEndpoints = true
}

// Finally, apply the provided fn function to each eligible selected endpoint
if fn != nil {
for _, endpoint := range eligibleSelectedEndpoints {
fn(endpoint)
}
}
return fallbackToTerminatingEndpoints
}

// GetEndpointAddresses returns a list of IP addresses of all eligible endpoints in the given endpoint slices.
func GetEndpointAddressesWithCondition(endpointSlices []*discovery.EndpointSlice, service *kapi.Service, fn func(discovery.Endpoint) bool) sets.Set[string] {
endpointsAddress := sets.New[string]()
func getAllEndpointsFromEndpointSlices(endpointSlices ...*discovery.EndpointSlice) []discovery.Endpoint {
var endpoints []discovery.Endpoint
for _, endpointSlice := range endpointSlices {
ForEachEligibleEndpoint(endpointSlice, service, func(endpoint discovery.Endpoint, shortcut *bool) {
includeEndpoint := fn == nil || fn(endpoint)
if !includeEndpoint {
return
}
for _, ip := range endpoint.Addresses {
endpointsAddress.Insert(utilnet.ParseIPSloppy(ip).String())
}
})
endpoints = append(endpoints, endpointSlice.Endpoints...)
}
return endpoints
}

// GetEligibleEndpointAddressesWithPreCondition first applies the (optionally) provided condition function condFn
// to filter endpoints from the given list of endpoint slices; from the resulting list,
// it returns the IP addresses of eligible endpoints.
func GetEligibleEndpointAddressesWithPreCondition(endpointSlices []*discovery.EndpointSlice, service *kapi.Service, condFn func(discovery.Endpoint) bool) sets.Set[string] {
endpointsAddress := sets.New[string]()

endpoints := getAllEndpointsFromEndpointSlices(endpointSlices...)

forEachSelectedAndEligibleEndpoint(endpoints, service, condFn, func(endpoint discovery.Endpoint) {
for _, ip := range endpoint.Addresses {
endpointsAddress.Insert(utilnet.ParseIPSloppy(ip).String())
}
})

return endpointsAddress
}

// GetEndpointAddresses returns a list of IP addresses of all eligible endpoints in the given endpoint slice.
func GetEndpointAddresses(endpointSlices []*discovery.EndpointSlice, service *kapi.Service) sets.Set[string] {
return GetEndpointAddressesWithCondition(endpointSlices, service, nil)
// GetEligibleEndpointAddresses returns a list of IP addresses of all eligible endpoints in the given endpoint slice.
func GetEligibleEndpointAddresses(endpointSlices []*discovery.EndpointSlice, service *kapi.Service) sets.Set[string] {
return GetEligibleEndpointAddressesWithPreCondition(endpointSlices, service, nil)
}

// GetLocalEndpointAddresses returns a list of endpoints that are local to the specified node
func GetLocalEndpointAddresses(endpointSlices []*discovery.EndpointSlice, service *kapi.Service, nodeName string) sets.Set[string] {
return GetEndpointAddressesWithCondition(endpointSlices, service, func(endpoint discovery.Endpoint) bool {
// GetLocalEligibleEndpointAddresses returns a list of endpoints that are local to the specified node and are eligible.
func GetLocalEligibleEndpointAddresses(endpointSlices []*discovery.EndpointSlice, service *kapi.Service, nodeName string) sets.Set[string] {
return GetEligibleEndpointAddressesWithPreCondition(endpointSlices, service, func(endpoint discovery.Endpoint) bool {
return endpoint.NodeName != nil && *endpoint.NodeName == nodeName
})
}

// DoesEndpointSliceContainEndpoint returns true if the endpointslice
// contains an endpoint with the given IP/Port/Protocol and this endpoint is considered eligible
func DoesEndpointSliceContainEligibleEndpoint(endpointSlice *discovery.EndpointSlice,
epIP string, epPort int32, protocol kapi.Protocol, service *kapi.Service) bool {
var containsEligibleEndpoint bool
endpoints := getAllEndpointsFromEndpointSlices(endpointSlice)
forEachSelectedAndEligibleEndpoint(endpoints, service, nil, func(ep discovery.Endpoint) {
if !containsEligibleEndpoint {
out:
for _, ip := range ep.Addresses {
for _, port := range endpointSlice.Ports {
if utilnet.ParseIPSloppy(ip).String() == epIP && *port.Port == epPort && *port.Protocol == protocol {
containsEligibleEndpoint = true
break out
}
}
}
}
})
return containsEligibleEndpoint
}

// HasLocalHostNetworkEndpoints returns true if any of the nodeAddresses appear in given the set of
// localEndpointAddresses. This is useful to check whether any of the provided local endpoints are host-networked.
func HasLocalHostNetworkEndpoints(localEndpointAddresses sets.Set[string], nodeAddresses []net.IP) bool {
Expand All @@ -781,27 +874,6 @@ func HasLocalHostNetworkEndpoints(localEndpointAddresses sets.Set[string], nodeA
return len(localEndpointAddresses.Intersection(nodeAddressesSet)) != 0
}

// DoesEndpointSliceContainEndpoint returns true if the endpointslice
// contains an endpoint with the given IP/Port/Protocol and this endpoint is considered eligible
func DoesEndpointSliceContainEndpoint(endpointSlice *discovery.EndpointSlice,
epIP string, epPort int32, protocol kapi.Protocol, service *kapi.Service) bool {
var res bool
for _, port := range endpointSlice.Ports {
ForEachEligibleEndpoint(endpointSlice, service, func(endpoint discovery.Endpoint, shortcut *bool) {
for _, ip := range endpoint.Addresses {
if utilnet.ParseIPSloppy(ip).String() == epIP && *port.Port == epPort && *port.Protocol == protocol {
if shortcut != nil {
*shortcut = true
}
res = true
return
}
}
})
}
return res
}

// ServiceNamespacedNameFromEndpointSlice returns the namespaced name of the service
// that corresponds to the given endpointSlice
func ServiceNamespacedNameFromEndpointSlice(endpointSlice *discovery.EndpointSlice) (k8stypes.NamespacedName, error) {
Expand Down

0 comments on commit 1c52192

Please sign in to comment.