Skip to content

Commit

Permalink
Full implementation of KEP-1669 ProxyTerminatingEndpoints
Browse files Browse the repository at this point in the history
The previous implementation was an approximation of KEP-1669 ProxyTerminatingEndpoints: we simply included terminating serving endpoints (ready=false, serving=true, terminating=true) along with ready ones in the endpoint selection logic. Let's fully implement KEP-1669 and only include terminating endpoints if none are ready. The selection follows two simple steps:
1) Take all ready endpoints
2) If no ready endpoints were found, take all serving terminating endpoints.

This should also help with an issue found in a production cluster (https://issues.redhat.com/browse/OCPBUGS-24363) where, due to infrequent readiness probes, terminating endpoints were declared as non-serving (that is, their readiness probe failed) only quite late and were included as valid endpoints for quite a bit, while the existing ready endpoints should have been preferred.

Extended the test cases to include testing against multiple slices and dual stack scenarios.

Signed-off-by: Riccardo Ravaioli <rravaiol@redhat.com>
  • Loading branch information
ricky-rav committed Jan 5, 2024
1 parent 473922a commit 5e6d883
Show file tree
Hide file tree
Showing 3 changed files with 855 additions and 94 deletions.
2 changes: 1 addition & 1 deletion go-controller/pkg/node/default_node_network_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1261,7 +1261,7 @@ func (nc *DefaultNodeNetworkController) reconcileConntrackUponEndpointSliceEvent
oldIPStr := utilnet.ParseIPSloppy(oldIP).String()
// upon an update event, remove conntrack entries for IP addresses that are no longer
// in the endpointslice, skip otherwise
if newEndpointSlice != nil && util.DoesEndpointSliceContainEndpoint(newEndpointSlice, oldIPStr, *oldPort.Port, *oldPort.Protocol, svc) {
if newEndpointSlice != nil && util.DoesEndpointSliceContainEligibleEndpoint(newEndpointSlice, oldIPStr, *oldPort.Port, *oldPort.Protocol, svc) {
continue
}
// upon update and delete events, flush conntrack only for UDP
Expand Down
183 changes: 115 additions & 68 deletions go-controller/pkg/util/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,12 +599,14 @@ type LbEndpoints struct {
Port int32
}

// GetLbEndpoints returns the IPv4 and IPv6 addresses of eligible endpoints as slices inside a struct
// GetLbEndpoints returns the IPv4 and IPv6 addresses of eligible endpoints from a
// provided list of endpoint slices, for a given service and service port.
func GetLbEndpoints(slices []*discovery.EndpointSlice, svcPort kapi.ServicePort, service *v1.Service) LbEndpoints {
v4ips := sets.NewString()
v6ips := sets.NewString()

var allEndpoints []discovery.Endpoint
v4IPs := sets.NewString()
v6IPs := sets.NewString()
out := LbEndpoints{}

// return an empty object so the caller doesn't have to check for nil and can use it as an iterator
if len(slices) == 0 {
return out
Expand All @@ -613,43 +615,58 @@ func GetLbEndpoints(slices []*discovery.EndpointSlice, svcPort kapi.ServicePort,
for _, slice := range slices {
klog.V(5).Infof("Getting endpoints for slice %s/%s", slice.Namespace, slice.Name)

// build the list of valid endpoints in the slice
for _, port := range slice.Ports {
for _, slicePort := range slice.Ports {
// If Service port name is set, it must match the name field in the endpoint
// If Service port name is not set, we just use the endpoint port
if svcPort.Name != "" && svcPort.Name != *port.Name {
if svcPort.Name != "" && svcPort.Name != *slicePort.Name {
klog.V(5).Infof("Slice %s with different Port name, requested: %s received: %s",
slice.Name, svcPort.Name, *port.Name)
slice.Name, svcPort.Name, *slicePort.Name)
continue
}

// Skip ports that don't match the protocol
if *port.Protocol != svcPort.Protocol {
if *slicePort.Protocol != svcPort.Protocol {
klog.V(5).Infof("Slice %s with different Port protocol, requested: %s received: %s",
slice.Name, svcPort.Protocol, *port.Protocol)
slice.Name, svcPort.Protocol, *slicePort.Protocol)
continue
}

out.Port = *port.Port
ForEachEligibleEndpoint(slice, service, func(endpoint discovery.Endpoint, shortcut *bool) {
for _, ip := range endpoint.Addresses {
klog.V(5).Infof("Adding slice %s endpoint: %v, port: %d", slice.Name, endpoint.Addresses, *port.Port)
ipStr := utilnet.ParseIPSloppy(ip).String()
switch slice.AddressType {
case discovery.AddressTypeIPv4:
v4ips.Insert(ipStr)
case discovery.AddressTypeIPv6:
v6ips.Insert(ipStr)
default:
klog.V(5).Infof("Skipping FQDN slice %s/%s", slice.Namespace, slice.Name)
}
}
})
out.Port = *slicePort.Port

if slice.AddressType == discovery.AddressTypeFQDN {
klog.V(5).Infof("Skipping FQDN slice %s/%s", slice.Namespace, slice.Name)
} else { // endpoints are here either IPv4 or IPv6
allEndpoints = append(allEndpoints, slice.Endpoints...)
}
}
}

out.V4IPs = v4ips.List()
out.V6IPs = v6ips.List()
serviceStr := ""
if service != nil {
serviceStr = fmt.Sprintf(" for service %s/%s", service.Namespace, service.Name)
}
// separate IPv4 from IPv6 addresses for eligible endpoints
ForEachEligibleEndpoint(allEndpoints, service, func(endpoint discovery.Endpoint) {
for _, ip := range endpoint.Addresses {
if utilnet.IsIPv4String(ip) {
klog.V(5).Infof("Adding endpoint IPv4 address %s port %d%s",
ip, out.Port, serviceStr)
v4IPs.Insert(utilnet.ParseIPSloppy(ip).String())

} else if utilnet.IsIPv6String(ip) {
klog.V(5).Infof("Adding endpoint IPv6 address %s port %d%s",
ip, out.Port, serviceStr)
v6IPs.Insert(utilnet.ParseIPSloppy(ip).String())

} else {
klog.V(5).Infof("Skipping unrecognized address %s port %d%s",
ip, out.Port, serviceStr)
}
}
})

out.V4IPs = v4IPs.List()
out.V6IPs = v6IPs.List()
klog.V(5).Infof("LB Endpoints for %s/%s are: %v / %v on port: %d",
slices[0].Namespace, slices[0].Labels[discovery.LabelServiceName],
out.V4IPs, out.V6IPs, out.Port)
Expand Down Expand Up @@ -702,11 +719,8 @@ func IsEndpointServing(endpoint discovery.Endpoint) bool {
}
}

// IsEndpointEligible takes as input an endpoint from an endpoint slice and a boolean that indicates whether to include
// all terminating endpoints, as per the PublishNotReadyAddresses feature in kubernetes service spec. It always returns true
// if includeTerminating is true and falls back to IsEndpointServing otherwise.
func IsEndpointEligible(endpoint discovery.Endpoint, includeTerminating bool) bool {
return includeTerminating || IsEndpointServing(endpoint)
func IsEndpointTerminating(endpoint discovery.Endpoint) bool {
return endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating
}

// NoHostSubnet() compares the no-hostsubnet-nodes flag with node labels to see if the node is managing its
Expand All @@ -719,51 +733,88 @@ func NoHostSubnet(node *v1.Node) bool {
return config.Kubernetes.NoHostSubnetNodes.Matches(labels.Set(node.Labels))
}

// ForEachEligibleEndpoint iterates through each eligible endpoint in the given endpointslice and applies the input function fn to it.
// An endpoint is eligible if it is serving or if its corresponding service has Spec.PublishNotReadyAddresses set.
// PublishNotReadyAddresses tells endpoint consumers to disregard any indications of ready/not-ready and is generally used
// together with headless services so that DNS records of all endpoints (ready or not) are always published.
// Function fn takes a bool pointer "shortcut" that, when set to true inside fn, ends the iteration; this is useful when fn
// checks for a condition on the endpoints and we want to return as soon as the condition is satisfied.
func ForEachEligibleEndpoint(endpointSlice *discovery.EndpointSlice, service *kapi.Service, fn func(ep discovery.Endpoint, shortcut *bool)) {
// GetEligibleEndpoints takes a list of endpoints (provided from endpoint slices) and a pointer to a service
// and returns a list of eligible endpoints. The endpoint selection implements KEP-1669
// (https://github.com/kubernetes/enhancements/blob/master/keps/sig-network/1669-proxy-terminating-endpoints/README.md)
// First it selects only ready endpoints, which effectively filters out all terminating endpoints
// (an endpoint that is terminating shows ready=false). Then, only if no ready endpoints are found,
// it falls back to serving terminating endpoints.
// However, if the service has Spec.PublishNotReadyAddresses set, all provided endpoints are always returned.
// PublishNotReadyAddresses tells endpoint consumers to disregard any indications of ready/not-ready and is generally
// used together with headless services so that DNS records of all endpoints (ready or not) are always published.
func GetEligibleEndpoints(endpoints []discovery.Endpoint, service *kapi.Service) []discovery.Endpoint {
includeTerminating := service != nil && service.Spec.PublishNotReadyAddresses
var shortcut bool
for _, endpoint := range endpointSlice.Endpoints {
if IsEndpointEligible(endpoint, includeTerminating) {
fn(endpoint, &shortcut)
if shortcut {
// shortcircuit the whole iteration
return
var eligibleEndpoints []discovery.Endpoint

for _, endpoint := range endpoints {
if includeTerminating || IsEndpointReady(endpoint) {
eligibleEndpoints = append(eligibleEndpoints, endpoint)
}
}

serviceStr := ""
if service != nil {
serviceStr = fmt.Sprintf(" (service %s/%s)", service.Namespace, service.Name)
}
klog.Infof("rira: GetEligibleEndpoints: found %d ready endpoints", serviceStr, len(eligibleEndpoints)) // TODO remove this
// Fallback to serving terminating endpoints (ready=false, serving=true, terminating=true) only if none are ready.
if len(eligibleEndpoints) == 0 {
for _, endpoint := range endpoints {
if IsEndpointServing(endpoint) && IsEndpointTerminating(endpoint) {
eligibleEndpoints = append(eligibleEndpoints, endpoint)
}
}
klog.Infof("rira: GetEligibleEndpoints%s: FALLBACK to %d serving & terminating endpoints",
serviceStr, len(eligibleEndpoints)) // TODO remove this

}
return eligibleEndpoints
}

// ForEachEligibleEndpoint first finds eligible endpoints from a provided list of endpoints
// and then iterates through them to apply the given function fn to each eligible endpoint.
func ForEachEligibleEndpoint(endpoints []discovery.Endpoint, service *kapi.Service, fn func(ep discovery.Endpoint)) {
eligibleEndpoints := GetEligibleEndpoints(endpoints, service)
for _, endpoint := range eligibleEndpoints {
fn(endpoint)
}
}

// GetEndpointAddresses returns a list of IP addresses of all eligible endpoints in the given endpoint slices.
func GetEndpointAddressesWithCondition(endpointSlices []*discovery.EndpointSlice, service *kapi.Service, fn func(discovery.Endpoint) bool) sets.Set[string] {
// GetEndpointAddressesWithPrecondition first applies the (optionally) provided function fn
// to filter endpoints from the given list of endpoint slices; from the resulting list,
// it returns the IP addresses of eligible endpoints.
func GetEndpointAddressesWithPreCondition(endpointSlices []*discovery.EndpointSlice, service *kapi.Service, fn func(discovery.Endpoint) bool) sets.Set[string] {
var endpoints []discovery.Endpoint
endpointsAddress := sets.New[string]()

// Apply precondition
for _, endpointSlice := range endpointSlices {
ForEachEligibleEndpoint(endpointSlice, service, func(endpoint discovery.Endpoint, shortcut *bool) {
for _, endpoint := range endpointSlice.Endpoints {
includeEndpoint := fn == nil || fn(endpoint)
if !includeEndpoint {
return
}
for _, ip := range endpoint.Addresses {
endpointsAddress.Insert(utilnet.ParseIPSloppy(ip).String())
if includeEndpoint {
endpoints = append(endpoints, endpoint)
}
})
}
}

// Filter out terminating endpoints if possible, otherwise fallback to serving but terminating endpoints
ForEachEligibleEndpoint(endpoints, service, func(endpoint discovery.Endpoint) {
for _, ip := range endpoint.Addresses {
endpointsAddress.Insert(utilnet.ParseIPSloppy(ip).String())
}
})

return endpointsAddress
}

// GetEndpointAddresses returns a list of IP addresses of all eligible endpoints in the given endpoint slice.
func GetEndpointAddresses(endpointSlices []*discovery.EndpointSlice, service *kapi.Service) sets.Set[string] {
return GetEndpointAddressesWithCondition(endpointSlices, service, nil)
return GetEndpointAddressesWithPreCondition(endpointSlices, service, nil)
}

// GetLocalEndpointAddresses returns a list of endpoints that are local to the specified node
// GetLocalEndpointAddresses returns a list of endpoints that are local to the specified node and are eligible.
func GetLocalEndpointAddresses(endpointSlices []*discovery.EndpointSlice, service *kapi.Service, nodeName string) sets.Set[string] {
return GetEndpointAddressesWithCondition(endpointSlices, service, func(endpoint discovery.Endpoint) bool {
return GetEndpointAddressesWithPreCondition(endpointSlices, service, func(endpoint discovery.Endpoint) bool {
return endpoint.NodeName != nil && *endpoint.NodeName == nodeName
})
}
Expand All @@ -783,23 +834,19 @@ func HasLocalHostNetworkEndpoints(localEndpointAddresses sets.Set[string], nodeA

// DoesEndpointSliceContainEndpoint returns true if the endpointslice
// contains an endpoint with the given IP/Port/Protocol and this endpoint is considered eligible
func DoesEndpointSliceContainEndpoint(endpointSlice *discovery.EndpointSlice,
func DoesEndpointSliceContainEligibleEndpoint(endpointSlice *discovery.EndpointSlice,
epIP string, epPort int32, protocol kapi.Protocol, service *kapi.Service) bool {
var res bool
eligibleEndpoints := GetEligibleEndpoints(endpointSlice.Endpoints, service)
for _, port := range endpointSlice.Ports {
ForEachEligibleEndpoint(endpointSlice, service, func(endpoint discovery.Endpoint, shortcut *bool) {
for _, endpoint := range eligibleEndpoints {
for _, ip := range endpoint.Addresses {
if utilnet.ParseIPSloppy(ip).String() == epIP && *port.Port == epPort && *port.Protocol == protocol {
if shortcut != nil {
*shortcut = true
}
res = true
return
return true
}
}
})
}
}
return res
return false
}

// ServiceNamespacedNameFromEndpointSlice returns the namespaced name of the service
Expand Down

0 comments on commit 5e6d883

Please sign in to comment.