Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OCPBUGS-24363: Full implementation of KEP-1669 ProxyTerminatingEndpoints #4072

Merged
merged 1 commit into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1261,7 +1261,7 @@ func (nc *DefaultNodeNetworkController) reconcileConntrackUponEndpointSliceEvent
oldIPStr := utilnet.ParseIPSloppy(oldIP).String()
// upon an update event, remove conntrack entries for IP addresses that are no longer
// in the endpointslice, skip otherwise
if newEndpointSlice != nil && util.DoesEndpointSliceContainEndpoint(newEndpointSlice, oldIPStr, *oldPort.Port, *oldPort.Protocol, svc) {
if newEndpointSlice != nil && util.DoesEndpointSliceContainEligibleEndpoint(newEndpointSlice, oldIPStr, *oldPort.Port, *oldPort.Protocol, svc) {
continue
}
// upon update and delete events, flush conntrack only for UDP
Expand Down
20 changes: 10 additions & 10 deletions go-controller/pkg/node/gateway_shared_intf.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ func (npw *nodePortWatcher) AddService(service *kapi.Service) error {
hasLocalHostNetworkEp = false
} else {
nodeIPs := npw.nodeIPManager.ListAddresses()
localEndpoints = npw.GetLocalEndpointAddresses(epSlices, service)
localEndpoints = npw.GetLocalEligibleEndpointAddresses(epSlices, service)
hasLocalHostNetworkEp = util.HasLocalHostNetworkEndpoints(localEndpoints, nodeIPs)
}
// If something didn't already do it add correct Service rules
Expand Down Expand Up @@ -757,7 +757,7 @@ func (npw *nodePortWatcher) SyncServices(services []interface{}) error {
continue
}
nodeIPs := npw.nodeIPManager.ListAddresses()
localEndpoints := npw.GetLocalEndpointAddresses(epSlices, service)
localEndpoints := npw.GetLocalEligibleEndpointAddresses(epSlices, service)
hasLocalHostNetworkEp := util.HasLocalHostNetworkEndpoints(localEndpoints, nodeIPs)
npw.getAndSetServiceInfo(name, service, hasLocalHostNetworkEp, localEndpoints)

Expand Down Expand Up @@ -821,7 +821,7 @@ func (npw *nodePortWatcher) AddEndpointSlice(epSlice *discovery.EndpointSlice) e
// No need to continue adding the new endpoint slice, if we can't retrieve all slices for this service
return fmt.Errorf("error retrieving endpointslices for service %s/%s during endpointslice add: %w", svc.Namespace, svc.Name, err)
}
localEndpoints := npw.GetLocalEndpointAddresses(epSlices, svc)
localEndpoints := npw.GetLocalEligibleEndpointAddresses(epSlices, svc)
hasLocalHostNetworkEp := util.HasLocalHostNetworkEndpoints(localEndpoints, nodeIPs)

// Here we make sure the correct rules are programmed whenever an AddEndpointSlice event is
Expand Down Expand Up @@ -880,7 +880,7 @@ func (npw *nodePortWatcher) DeleteEndpointSlice(epSlice *discovery.EndpointSlice
return fmt.Errorf("error retrieving service %s/%s for endpointslice %s during endpointslice delete: %v",
namespacedName.Namespace, namespacedName.Name, epSlice.Name, err)
}
localEndpoints := npw.GetLocalEndpointAddresses(epSlices, svc)
localEndpoints := npw.GetLocalEligibleEndpointAddresses(epSlices, svc)
if svcConfig, exists := npw.updateServiceInfo(namespacedName, nil, &hasLocalHostNetworkEp, localEndpoints); exists {
// Lock the cache mutex here so we don't miss a service delete during an endpoint delete
// we have to do this because deleting and adding iptables rules is slow.
Expand All @@ -899,8 +899,8 @@ func (npw *nodePortWatcher) DeleteEndpointSlice(epSlice *discovery.EndpointSlice
}

// GetLocalEndpointAddresses returns a list of eligible endpoints that are local to the node
func (npw *nodePortWatcher) GetLocalEndpointAddresses(endpointSlices []*discovery.EndpointSlice, service *kapi.Service) sets.Set[string] {
return util.GetLocalEndpointAddresses(endpointSlices, service, npw.nodeIPManager.nodeName)
func (npw *nodePortWatcher) GetLocalEligibleEndpointAddresses(endpointSlices []*discovery.EndpointSlice, service *kapi.Service) sets.Set[string] {
return util.GetLocalEligibleEndpointAddresses(endpointSlices, service, npw.nodeIPManager.nodeName)
}

func (npw *nodePortWatcher) UpdateEndpointSlice(oldEpSlice, newEpSlice *discovery.EndpointSlice) error {
Expand All @@ -920,8 +920,8 @@ func (npw *nodePortWatcher) UpdateEndpointSlice(oldEpSlice, newEpSlice *discover
namespacedName.Namespace, namespacedName.Name, newEpSlice.Name, err)
}

oldEndpointAddresses := util.GetEndpointAddresses([]*discovery.EndpointSlice{oldEpSlice}, svc)
newEndpointAddresses := util.GetEndpointAddresses([]*discovery.EndpointSlice{newEpSlice}, svc)
oldEndpointAddresses := util.GetEligibleEndpointAddresses([]*discovery.EndpointSlice{oldEpSlice}, svc)
newEndpointAddresses := util.GetEligibleEndpointAddresses([]*discovery.EndpointSlice{newEpSlice}, svc)
if reflect.DeepEqual(oldEndpointAddresses, newEndpointAddresses) {
return nil
}
Expand Down Expand Up @@ -961,8 +961,8 @@ func (npw *nodePortWatcher) UpdateEndpointSlice(oldEpSlice, newEpSlice *discover
// endpoints has changed. For this second comparison, check first between the old endpoint slice and all current
// endpointslices for this service. This is a partial comparison, in case serviceInfo is not set. When it is set, compare
// between /all/ old endpoint slices and all new ones.
oldLocalEndpoints := npw.GetLocalEndpointAddresses([]*discovery.EndpointSlice{oldEpSlice}, svc)
newLocalEndpoints := npw.GetLocalEndpointAddresses(epSlices, svc)
oldLocalEndpoints := npw.GetLocalEligibleEndpointAddresses([]*discovery.EndpointSlice{oldEpSlice}, svc)
newLocalEndpoints := npw.GetLocalEligibleEndpointAddresses(epSlices, svc)
hasLocalHostNetworkEpOld := util.HasLocalHostNetworkEndpoints(oldLocalEndpoints, nodeIPs)
hasLocalHostNetworkEpNew := util.HasLocalHostNetworkEndpoints(newLocalEndpoints, nodeIPs)

Expand Down
232 changes: 142 additions & 90 deletions go-controller/pkg/util/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,12 +599,14 @@ type LbEndpoints struct {
Port int32
}

// GetLbEndpoints returns the IPv4 and IPv6 addresses of eligible endpoints as slices inside a struct
// GetLbEndpoints returns the IPv4 and IPv6 addresses of eligible endpoints from a
// provided list of endpoint slices, for a given service and service port.
func GetLbEndpoints(slices []*discovery.EndpointSlice, svcPort kapi.ServicePort, service *v1.Service) LbEndpoints {
v4ips := sets.NewString()
v6ips := sets.NewString()

var validSlices []*discovery.EndpointSlice
v4IPs := sets.NewString()
v6IPs := sets.NewString()
out := LbEndpoints{}

// return an empty object so the caller doesn't have to check for nil and can use it as an iterator
if len(slices) == 0 {
return out
Expand All @@ -613,43 +615,58 @@ func GetLbEndpoints(slices []*discovery.EndpointSlice, svcPort kapi.ServicePort,
for _, slice := range slices {
klog.V(5).Infof("Getting endpoints for slice %s/%s", slice.Namespace, slice.Name)

// build the list of valid endpoints in the slice
for _, port := range slice.Ports {
for _, slicePort := range slice.Ports {
// If Service port name is set, it must match the name field in the endpoint
// If Service port name is not set, we just use the endpoint port
if svcPort.Name != "" && svcPort.Name != *port.Name {
if svcPort.Name != "" && svcPort.Name != *slicePort.Name {
klog.V(5).Infof("Slice %s with different Port name, requested: %s received: %s",
slice.Name, svcPort.Name, *port.Name)
slice.Name, svcPort.Name, *slicePort.Name)
continue
}

// Skip ports that don't match the protocol
if *port.Protocol != svcPort.Protocol {
if *slicePort.Protocol != svcPort.Protocol {
klog.V(5).Infof("Slice %s with different Port protocol, requested: %s received: %s",
slice.Name, svcPort.Protocol, *port.Protocol)
slice.Name, svcPort.Protocol, *slicePort.Protocol)
continue
}

out.Port = *port.Port
ForEachEligibleEndpoint(slice, service, func(endpoint discovery.Endpoint, shortcut *bool) {
for _, ip := range endpoint.Addresses {
klog.V(5).Infof("Adding slice %s endpoint: %v, port: %d", slice.Name, endpoint.Addresses, *port.Port)
ipStr := utilnet.ParseIPSloppy(ip).String()
switch slice.AddressType {
case discovery.AddressTypeIPv4:
v4ips.Insert(ipStr)
case discovery.AddressTypeIPv6:
v6ips.Insert(ipStr)
default:
klog.V(5).Infof("Skipping FQDN slice %s/%s", slice.Namespace, slice.Name)
}
}
})
out.Port = *slicePort.Port

if slice.AddressType == discovery.AddressTypeFQDN {
klog.V(5).Infof("Skipping FQDN slice %s/%s", slice.Namespace, slice.Name)
} else { // endpoints are here either IPv4 or IPv6
validSlices = append(validSlices, slice)
}
}
}

out.V4IPs = v4ips.List()
out.V6IPs = v6ips.List()
serviceStr := ""
if service != nil {
serviceStr = fmt.Sprintf(" for service %s/%s", service.Namespace, service.Name)
}
// separate IPv4 from IPv6 addresses for eligible endpoints
for _, endpoint := range getEligibleEndpoints(validSlices, service) {
Copy link
Contributor

@jcaamano jcaamano Jan 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the logic above this line, could it be a condFn for getEligibleEndpoints getSelectedEligibleEndpoints?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we're separating IPv4 from IPv6 endpoints addresses. Do I need a condFn for that?
Here actually I should use the existing getEligibleEndpointAddresses, but it's at the cost of adding one more iteration through the endpoints. I'm a little hesitant to do that because it gets executed by ovnkube-controller, which configures /all/ the endpoints for a given service: https://github.com/ovn-org/ovn-kubernetes/blob/master/go-controller/pkg/ovn/controller/services/services_controller.go#L392-L402

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean the logic above this line, not the logic below. So the one where you are matching the port name & protocol of the service.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh ok, I see. condFn as I defined it is applied to single endpoints, because we need to check their node name, while in GetLbEndpoints we're filtering out entire slices. I don't know if rewriting it once again for this particular usage would improve the overall readability :)

for _, ip := range endpoint.Addresses {
if utilnet.IsIPv4String(ip) {
klog.V(5).Infof("Adding endpoint IPv4 address %s port %d%s",
ip, out.Port, serviceStr)
v4IPs.Insert(utilnet.ParseIPSloppy(ip).String())

} else if utilnet.IsIPv6String(ip) {
klog.V(5).Infof("Adding endpoint IPv6 address %s port %d%s",
ip, out.Port, serviceStr)
v6IPs.Insert(utilnet.ParseIPSloppy(ip).String())

} else {
klog.V(5).Infof("Skipping unrecognized address %s port %d%s",
ip, out.Port, serviceStr)
}
}
}

out.V4IPs = v4IPs.List()
out.V6IPs = v6IPs.List()
klog.V(5).Infof("LB Endpoints for %s/%s are: %v / %v on port: %d",
slices[0].Namespace, slices[0].Labels[discovery.LabelServiceName],
out.V4IPs, out.V6IPs, out.Port)
Expand Down Expand Up @@ -702,11 +719,8 @@ func IsEndpointServing(endpoint discovery.Endpoint) bool {
}
}

// IsEndpointEligible takes as input an endpoint from an endpoint slice and a boolean that indicates whether to include
// all terminating endpoints, as per the PublishNotReadyAddresses feature in kubernetes service spec. It always returns true
// if includeTerminating is true and falls back to IsEndpointServing otherwise.
func IsEndpointEligible(endpoint discovery.Endpoint, includeTerminating bool) bool {
return includeTerminating || IsEndpointServing(endpoint)
func IsEndpointTerminating(endpoint discovery.Endpoint) bool {
return endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating
}

// NoHostSubnet() compares the no-hostsubnet-nodes flag with node labels to see if the node is managing its
Expand All @@ -719,55 +733,114 @@ func NoHostSubnet(node *v1.Node) bool {
return config.Kubernetes.NoHostSubnetNodes.Matches(labels.Set(node.Labels))
}

// ForEachEligibleEndpoint iterates through each eligible endpoint in the given endpointslice and applies the input function fn to it.
// An endpoint is eligible if it is serving or if its corresponding service has Spec.PublishNotReadyAddresses set.
// PublishNotReadyAddresses tells endpoint consumers to disregard any indications of ready/not-ready and is generally used
// together with headless services so that DNS records of all endpoints (ready or not) are always published.
// Function fn takes a bool pointer "shortcut" that, when set to true inside fn, ends the iteration; this is useful when fn
// checks for a condition on the endpoints and we want to return as soon as the condition is satisfied.
func ForEachEligibleEndpoint(endpointSlice *discovery.EndpointSlice, service *kapi.Service, fn func(ep discovery.Endpoint, shortcut *bool)) {
includeTerminating := service != nil && service.Spec.PublishNotReadyAddresses
var shortcut bool
for _, endpoint := range endpointSlice.Endpoints {
if IsEndpointEligible(endpoint, includeTerminating) {
fn(endpoint, &shortcut)
if shortcut {
// shortcircuit the whole iteration
return
// getSelectedEligibleEndpoints does the following:
// (1) filters the given endpoints with the provided condition function condFn;
// (2) further selects eligible endpoints based on readiness.
// Eligible endpoints are ready endpoints; if there are none, eligible endpoints are serving & terminating
// endpoints, as defined in KEP-1669
// (https://github.com/kubernetes/enhancements/blob/master/keps/sig-network/1669-proxy-terminating-endpoints/README.md).
// The service corresponding to the given endpoints needs to provided as an input argument
// because if Spec.PublishNotReadyAddresses is set, then all provided endpoints must always be returned.
// PublishNotReadyAddresses tells endpoint consumers to disregard any indications of ready/not-ready and
// is generally used together with headless services so that DNS records of all endpoints (ready or not)
// are always published.
// Note that condFn, when specified, is used by utility functions to filter out non-local endpoints.
// It's important to run it /before/ the eligible endpoint selection, since the order impacts the output.
func getSelectedEligibleEndpoints(endpointSlices []*discovery.EndpointSlice, service *kapi.Service, condFn func(ep discovery.Endpoint) bool) []discovery.Endpoint {
var readySelectedEndpoints []discovery.Endpoint
var servingTerminatingSelectedEndpoints []discovery.Endpoint
var eligibleEndpoints []discovery.Endpoint

includeAllEndpoints := service != nil && service.Spec.PublishNotReadyAddresses

for _, slice := range endpointSlices {
for _, endpoint := range slice.Endpoints {
// Apply precondition on endpoints, if provided
if condFn == nil || condFn(endpoint) {
// Assign to the ready or the serving&terminating slice for a later decision
if includeAllEndpoints || IsEndpointReady(endpoint) {
readySelectedEndpoints = append(readySelectedEndpoints, endpoint)
} else if IsEndpointServing(endpoint) && IsEndpointTerminating(endpoint) {
servingTerminatingSelectedEndpoints = append(servingTerminatingSelectedEndpoints, endpoint)
}
}
}
}
}
serviceStr := ""
if service != nil {
serviceStr = fmt.Sprintf(" (service %s/%s)", service.Namespace, service.Name)
}
klog.V(5).Infof("Endpoint selection%s: found %d ready endpoints", serviceStr, len(readySelectedEndpoints))

// GetEndpointAddresses returns a list of IP addresses of all eligible endpoints in the given endpoint slices.
func GetEndpointAddressesWithCondition(endpointSlices []*discovery.EndpointSlice, service *kapi.Service, fn func(discovery.Endpoint) bool) sets.Set[string] {
endpointsAddress := sets.New[string]()
for _, endpointSlice := range endpointSlices {
ForEachEligibleEndpoint(endpointSlice, service, func(endpoint discovery.Endpoint, shortcut *bool) {
includeEndpoint := fn == nil || fn(endpoint)
if !includeEndpoint {
return
}
for _, ip := range endpoint.Addresses {
endpointsAddress.Insert(utilnet.ParseIPSloppy(ip).String())
}
})
// Select eligible endpoints based on readiness
eligibleEndpoints = readySelectedEndpoints
// Fallback to serving terminating endpoints (ready=false, serving=true, terminating=true) only if none are ready
if len(readySelectedEndpoints) == 0 {
eligibleEndpoints = servingTerminatingSelectedEndpoints
klog.V(5).Infof("Endpoint selection%s: fallback to %d serving & terminating endpoints",
serviceStr, len(servingTerminatingSelectedEndpoints))
}
return endpointsAddress
}

// GetEndpointAddresses returns a list of IP addresses of all eligible endpoints in the given endpoint slice.
func GetEndpointAddresses(endpointSlices []*discovery.EndpointSlice, service *kapi.Service) sets.Set[string] {
return GetEndpointAddressesWithCondition(endpointSlices, service, nil)
return eligibleEndpoints
}

// GetLocalEndpointAddresses returns a list of endpoints that are local to the specified node
func GetLocalEndpointAddresses(endpointSlices []*discovery.EndpointSlice, service *kapi.Service, nodeName string) sets.Set[string] {
return GetEndpointAddressesWithCondition(endpointSlices, service, func(endpoint discovery.Endpoint) bool {
func getLocalEligibleEndpoints(endpointSlices []*discovery.EndpointSlice, service *kapi.Service, nodeName string) []discovery.Endpoint {
return getSelectedEligibleEndpoints(endpointSlices, service, func(endpoint discovery.Endpoint) bool {
return endpoint.NodeName != nil && *endpoint.NodeName == nodeName
})
}

func getEligibleEndpoints(endpointSlices []*discovery.EndpointSlice, service *kapi.Service) []discovery.Endpoint {
return getSelectedEligibleEndpoints(endpointSlices, service, nil)
}

// getEligibleEndpointAddresses takes a list of endpointSlices, a service and, optionally, a nodeName
// and applies the endpoint selection logic. It returns the IP addresses of eligible endpoints.
func getEligibleEndpointAddresses(endpointSlices []*discovery.EndpointSlice, service *kapi.Service, nodeName string) sets.Set[string] {
endpointsAddresses := sets.New[string]()
var eligibleEndpoints []discovery.Endpoint

if nodeName != "" {
eligibleEndpoints = getLocalEligibleEndpoints(endpointSlices, service, nodeName)
} else {
eligibleEndpoints = getEligibleEndpoints(endpointSlices, service)
}
for _, endpoint := range eligibleEndpoints {
for _, ip := range endpoint.Addresses {
endpointsAddresses.Insert(utilnet.ParseIPSloppy(ip).String())
}
}

return endpointsAddresses
}

// GetEligibleEndpointAddresses returns a list of IP addresses of all eligible endpoints from the given endpoint slices.
func GetEligibleEndpointAddresses(endpointSlices []*discovery.EndpointSlice, service *kapi.Service) sets.Set[string] {
return getEligibleEndpointAddresses(endpointSlices, service, "")
}

// GetLocalEligibleEndpointAddresses returns a list of IP address of endpoints that are local to the specified node
// and are eligible.
func GetLocalEligibleEndpointAddresses(endpointSlices []*discovery.EndpointSlice, service *kapi.Service, nodeName string) sets.Set[string] {
return getEligibleEndpointAddresses(endpointSlices, service, nodeName)
}

// DoesEndpointSliceContainEndpoint returns true if the endpointslice
// contains an endpoint with the given IP, port and Protocol and if this endpoint is considered eligible.
func DoesEndpointSliceContainEligibleEndpoint(endpointSlice *discovery.EndpointSlice,
epIP string, epPort int32, protocol kapi.Protocol, service *kapi.Service) bool {
for _, ep := range getEligibleEndpoints([]*discovery.EndpointSlice{endpointSlice}, service) {
for _, ip := range ep.Addresses {
for _, port := range endpointSlice.Ports {
if utilnet.ParseIPSloppy(ip).String() == epIP && *port.Port == epPort && *port.Protocol == protocol {
return true
}
}
}
}
return false
}

// HasLocalHostNetworkEndpoints returns true if any of the nodeAddresses appear in given the set of
// localEndpointAddresses. This is useful to check whether any of the provided local endpoints are host-networked.
func HasLocalHostNetworkEndpoints(localEndpointAddresses sets.Set[string], nodeAddresses []net.IP) bool {
Expand All @@ -781,27 +854,6 @@ func HasLocalHostNetworkEndpoints(localEndpointAddresses sets.Set[string], nodeA
return len(localEndpointAddresses.Intersection(nodeAddressesSet)) != 0
}

// DoesEndpointSliceContainEndpoint returns true if the endpointslice
// contains an endpoint with the given IP/Port/Protocol and this endpoint is considered eligible
func DoesEndpointSliceContainEndpoint(endpointSlice *discovery.EndpointSlice,
epIP string, epPort int32, protocol kapi.Protocol, service *kapi.Service) bool {
var res bool
for _, port := range endpointSlice.Ports {
ForEachEligibleEndpoint(endpointSlice, service, func(endpoint discovery.Endpoint, shortcut *bool) {
for _, ip := range endpoint.Addresses {
if utilnet.ParseIPSloppy(ip).String() == epIP && *port.Port == epPort && *port.Protocol == protocol {
if shortcut != nil {
*shortcut = true
}
res = true
return
}
}
})
}
return res
}

// ServiceNamespacedNameFromEndpointSlice returns the namespaced name of the service
// that corresponds to the given endpointSlice
func ServiceNamespacedNameFromEndpointSlice(endpointSlice *discovery.EndpointSlice) (k8stypes.NamespacedName, error) {
Expand Down