diff --git a/go-controller/pkg/node/gateway_shared_intf.go b/go-controller/pkg/node/gateway_shared_intf.go index e14bac9aa5..1e7a4f4b68 100644 --- a/go-controller/pkg/node/gateway_shared_intf.go +++ b/go-controller/pkg/node/gateway_shared_intf.go @@ -19,6 +19,7 @@ import ( kapi "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" ktypes "k8s.io/apimachinery/pkg/types" apierrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" @@ -774,7 +775,8 @@ func (npw *nodePortWatcher) AddEndpointSlice(epSlice *discovery.EndpointSlice) e // Here we make sure the correct rules are programmed whenever an AddEndpointSlice // event is received, only alter flows if we need to, i.e if cache wasn't // set or if it was and hasLocalHostNetworkEp state changed, to prevent flow churn - namespacedName, err := namespacedNameFromEPSlice(epSlice) + namespacedName, err := serviceNamespacedNameFromEndpointSlice(epSlice) + if err != nil { return fmt.Errorf("cannot add %s/%s to nodePortWatcher: %v", epSlice.Namespace, epSlice.Name, err) } @@ -814,7 +816,7 @@ func (npw *nodePortWatcher) DeleteEndpointSlice(epSlice *discovery.EndpointSlice klog.V(5).Infof("Deleting endpointslice %s in namespace %s", epSlice.Name, epSlice.Namespace) // remove rules for endpoints and add back normal ones - namespacedName, err := namespacedNameFromEPSlice(epSlice) + namespacedName, err := serviceNamespacedNameFromEndpointSlice(epSlice) if err != nil { return fmt.Errorf("cannot delete %s/%s from nodePortWatcher: %v", epSlice.Namespace, epSlice.Name, err) } @@ -835,10 +837,11 @@ func (npw *nodePortWatcher) DeleteEndpointSlice(epSlice *discovery.EndpointSlice return nil } -func getEndpointAddresses(endpointSlice *discovery.EndpointSlice) []string { +func getEndpointAddresses(endpointSlice *discovery.EndpointSlice, service *kapi.Service) []string { endpointsAddress := make([]string, 0) + includeTerminating := service != nil && service.Spec.PublishNotReadyAddresses for _, endpoint := range endpointSlice.Endpoints { - if isEndpointReady(endpoint) { + if util.IsEndpointValid(endpoint, includeTerminating) { for _, ip := range endpoint.Addresses { endpointsAddress = append(endpointsAddress, utilnet.ParseIPSloppy(ip).String()) } @@ -851,18 +854,23 @@ func (npw *nodePortWatcher) UpdateEndpointSlice(oldEpSlice, newEpSlice *discover var err error var errors []error - oldEpAddr := getEndpointAddresses(oldEpSlice) - newEpAddr := getEndpointAddresses(newEpSlice) + namespacedName, err := serviceNamespacedNameFromEndpointSlice(newEpSlice) + if err != nil { + return fmt.Errorf("cannot update %s/%s in nodePortWatcher: %v", newEpSlice.Namespace, newEpSlice.Name, err) + } + svc, err := npw.watchFactory.GetService(namespacedName.Namespace, namespacedName.Name) + if err != nil && !kerrors.IsNotFound(err) { + return fmt.Errorf("error while retrieving service for endpointslice %s/%s during update: %v", + oldEpSlice.Namespace, oldEpSlice.Name, err) + } + + oldEpAddr := getEndpointAddresses(oldEpSlice, svc) + newEpAddr := getEndpointAddresses(newEpSlice, svc) if reflect.DeepEqual(oldEpAddr, newEpAddr) { return nil } klog.V(5).Infof("Updating endpointslice %s in namespace %s", oldEpSlice.Name, oldEpSlice.Namespace) - - namespacedName, err := namespacedNameFromEPSlice(newEpSlice) - if err != nil { - return fmt.Errorf("cannot update %s/%s in nodePortWatcher: %v", newEpSlice.Namespace, newEpSlice.Name, err) - } if _, exists := npw.getServiceInfo(namespacedName); !exists { // When a service is updated from externalName to nodeport type, it won't be // in nodePortWatcher cache (npw): in this case, have the new nodeport IPtable rules diff --git a/go-controller/pkg/node/healthcheck_service.go b/go-controller/pkg/node/healthcheck_service.go index b0452c9b41..16becae814 100644 --- a/go-controller/pkg/node/healthcheck_service.go +++ b/go-controller/pkg/node/healthcheck_service.go @@ -102,7 +102,7 @@ func (l *loadBalancerHealthChecker) SyncServices(svcs []interface{}) error { } func (l *loadBalancerHealthChecker) SyncEndPointSlices(epSlice *discovery.EndpointSlice) error { - namespacedName, err := namespacedNameFromEPSlice(epSlice) + namespacedName, err := serviceNamespacedNameFromEndpointSlice(epSlice) if err != nil { return fmt.Errorf("skipping %s/%s: %v", epSlice.Namespace, epSlice.Name, err) } @@ -124,7 +124,7 @@ func (l *loadBalancerHealthChecker) SyncEndPointSlices(epSlice *discovery.Endpoi } func (l *loadBalancerHealthChecker) AddEndpointSlice(epSlice *discovery.EndpointSlice) error { - namespacedName, err := namespacedNameFromEPSlice(epSlice) + namespacedName, err := serviceNamespacedNameFromEndpointSlice(epSlice) if err != nil { return fmt.Errorf("cannot add %s/%s to loadBalancerHealthChecker: %v", epSlice.Namespace, epSlice.Name, err) } @@ -137,7 +137,7 @@ func (l *loadBalancerHealthChecker) AddEndpointSlice(epSlice *discovery.Endpoint } func (l *loadBalancerHealthChecker) UpdateEndpointSlice(oldEpSlice, newEpSlice *discovery.EndpointSlice) error { - namespacedName, err := namespacedNameFromEPSlice(newEpSlice) + namespacedName, err := serviceNamespacedNameFromEndpointSlice(newEpSlice) if err != nil { return fmt.Errorf("cannot update %s/%s in loadBalancerHealthChecker: %v", newEpSlice.Namespace, newEpSlice.Name, err) @@ -209,7 +209,7 @@ func isEndpointReady(endpoint discovery.Endpoint) bool { return endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready } -func namespacedNameFromEPSlice(epSlice *discovery.EndpointSlice) (ktypes.NamespacedName, error) { +func serviceNamespacedNameFromEndpointSlice(epSlice *discovery.EndpointSlice) (ktypes.NamespacedName, error) { // Return the namespaced name of the corresponding service var serviceNamespacedName ktypes.NamespacedName svcName := epSlice.Labels[discovery.LabelServiceName] diff --git a/go-controller/pkg/node/node.go b/go-controller/pkg/node/node.go index 62c14acb8e..9fb4d658be 100644 --- a/go-controller/pkg/node/node.go +++ b/go-controller/pkg/node/node.go @@ -13,6 +13,7 @@ import ( kapi "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" @@ -715,7 +716,15 @@ func (n *OvnNode) reconcileConntrackUponEndpointSliceEvents(oldEndpointSlice, ne // nothing to do upon an add event return nil } - + namespacedName, err := serviceNamespacedNameFromEndpointSlice(oldEndpointSlice) + if err != nil { + return fmt.Errorf("cannot reconcile conntrack: %v", err) + } + svc, err := n.watchFactory.GetService(namespacedName.Namespace, namespacedName.Name) + if err != nil && !kerrors.IsNotFound(err) { + return fmt.Errorf("error while retrieving service for endpointslice %s/%s when reconciling conntrack: %v", + newEndpointSlice.Namespace, newEndpointSlice.Name, err) + } for _, oldPort := range oldEndpointSlice.Ports { if *oldPort.Protocol != kapi.ProtocolUDP { // flush conntrack only for UDP continue @@ -725,7 +734,7 @@ func (n *OvnNode) reconcileConntrackUponEndpointSliceEvents(oldEndpointSlice, ne oldIPStr := utilnet.ParseIPSloppy(oldIP).String() // upon an update event, remove conntrack entries for IP addresses that are no longer // in the endpointslice, skip otherwise - if newEndpointSlice != nil && doesEPSliceContainReadyEndpoint(newEndpointSlice, oldIPStr, *oldPort.Port, *oldPort.Protocol) { + if newEndpointSlice != nil && doesEndpointSliceContainValidEndpoint(newEndpointSlice, oldIPStr, *oldPort.Port, *oldPort.Protocol, svc) { continue } // upon update and delete events, flush conntrack only for UDP @@ -876,13 +885,14 @@ func (n *OvnNode) validateVTEPInterfaceMTU() error { return nil } -// doesEPSliceContainEndpoint checks whether the endpointslice -// contains a specific endpoint with IP/Port/Protocol and this endpoint is ready -func doesEPSliceContainReadyEndpoint(epSlice *discovery.EndpointSlice, - epIP string, epPort int32, protocol kapi.Protocol) bool { +// doesEndpointSliceContainValidEndpoint returns true if the endpointslice +// contains an endpoint with the given IP/Port/Protocol and this endpoint is considered valid +func doesEndpointSliceContainValidEndpoint(epSlice *discovery.EndpointSlice, + epIP string, epPort int32, protocol kapi.Protocol, service *kapi.Service) bool { + includeTerminating := service != nil && service.Spec.PublishNotReadyAddresses for _, port := range epSlice.Ports { for _, endpoint := range epSlice.Endpoints { - if !isEndpointReady(endpoint) { + if !util.IsEndpointValid(endpoint, includeTerminating) { continue } for _, ip := range endpoint.Addresses { diff --git a/go-controller/pkg/ovn/controller/services/load_balancer.go b/go-controller/pkg/ovn/controller/services/load_balancer.go index d91f8820ad..24e3b48712 100644 --- a/go-controller/pkg/ovn/controller/services/load_balancer.go +++ b/go-controller/pkg/ovn/controller/services/load_balancer.go @@ -63,7 +63,7 @@ var protos = []v1.Protocol{ func buildServiceLBConfigs(service *v1.Service, endpointSlices []*discovery.EndpointSlice) (perNodeConfigs []lbConfig, clusterConfigs []lbConfig) { // For each svcPort, determine if it will be applied per-node or cluster-wide for _, svcPort := range service.Spec.Ports { - eps := util.GetLbEndpoints(endpointSlices, svcPort) + eps := util.GetLbEndpoints(endpointSlices, svcPort, service.Spec.PublishNotReadyAddresses) // if ExternalTrafficPolicy or InternalTrafficPolicy is local, then we need to do things a bit differently externalTrafficLocal := (service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal) diff --git a/go-controller/pkg/util/kube.go b/go-controller/pkg/util/kube.go index f417f3c253..2cd36a9611 100644 --- a/go-controller/pkg/util/kube.go +++ b/go-controller/pkg/util/kube.go @@ -306,13 +306,13 @@ type LbEndpoints struct { Port int32 } -// GetLbEndpoints return the endpoints that belong to the IPFamily as a slice of IPs -func GetLbEndpoints(slices []*discovery.EndpointSlice, svcPort kapi.ServicePort) LbEndpoints { +// GetLbEndpoints returns the IPv4 and IPv6 addresses of valid endpoints as slices inside a struct +func GetLbEndpoints(slices []*discovery.EndpointSlice, svcPort kapi.ServicePort, includeTerminating bool) LbEndpoints { v4ips := sets.NewString() v6ips := sets.NewString() out := LbEndpoints{} - // return an empty object so the caller don't have to check for nil and can use it as an iterator + // return an empty object so the caller doesn't have to check for nil and can use it as an iterator if len(slices) == 0 { return out } @@ -320,17 +320,17 @@ func GetLbEndpoints(slices []*discovery.EndpointSlice, svcPort kapi.ServicePort) for _, slice := range slices { klog.V(4).Infof("Getting endpoints for slice %s/%s", slice.Namespace, slice.Name) - // build the list of endpoints in the slice + // build the list of valid endpoints in the slice for _, port := range slice.Ports { - // If Service port name set it must match the name field in the endpoint - // If Service port name is not set we just use the endpoint port + // If Service port name is set, it must match the name field in the endpoint + // If Service port name is not set, we just use the endpoint port if svcPort.Name != "" && svcPort.Name != *port.Name { klog.V(5).Infof("Slice %s with different Port name, requested: %s received: %s", slice.Name, svcPort.Name, *port.Name) continue } - // Skip ports that doesn't match the protocol + // Skip ports that don't match the protocol if *port.Protocol != svcPort.Protocol { klog.V(5).Infof("Slice %s with different Port protocol, requested: %s received: %s", slice.Name, svcPort.Protocol, *port.Protocol) @@ -339,13 +339,13 @@ func GetLbEndpoints(slices []*discovery.EndpointSlice, svcPort kapi.ServicePort) out.Port = *port.Port for _, endpoint := range slice.Endpoints { - // Skip endpoints that are not ready - if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready { - klog.V(4).Infof("Slice endpoints Not Ready") + // Skip endpoint if it's not valid + if !IsEndpointValid(endpoint, includeTerminating) { + klog.V(4).Infof("Slice endpoint not valid") continue } for _, ip := range endpoint.Addresses { - klog.V(4).Infof("Adding slice %s endpoints: %v, port: %d", slice.Name, endpoint.Addresses, *port.Port) + klog.V(4).Infof("Adding slice %s endpoint: %v, port: %d", slice.Name, endpoint.Addresses, *port.Port) ipStr := utilnet.ParseIPSloppy(ip).String() switch slice.AddressType { case discovery.AddressTypeIPv4: diff --git a/go-controller/pkg/util/kube_test.go b/go-controller/pkg/util/kube_test.go index 2ceb335f15..2de78a2b19 100644 --- a/go-controller/pkg/util/kube_test.go +++ b/go-controller/pkg/util/kube_test.go @@ -637,10 +637,84 @@ func Test_getLbEndpoints(t *testing.T) { }, want: LbEndpoints{[]string{"10.0.0.2", "10.1.1.2", "10.2.2.2"}, []string{}, 80}, }, + { + name: "slices with non-ready but serving endpoints", + args: args{ + slices: []*discovery.EndpointSlice{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "svc-ab23", + Namespace: "ns", + Labels: map[string]string{discovery.LabelServiceName: "svc"}, + }, + Ports: []discovery.EndpointPort{ + { + Name: utilpointer.StringPtr("tcp-example"), + Protocol: protoPtr(v1.ProtocolTCP), + Port: utilpointer.Int32Ptr(int32(80)), + }, + }, + AddressType: discovery.AddressTypeIPv6, + Endpoints: []discovery.Endpoint{ + { + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(true), + }, + Addresses: []string{"2001:db2::2"}, + }, + }, + }, + }, + svcPort: v1.ServicePort{ + Name: "tcp-example", + TargetPort: intstr.FromInt(80), + Protocol: v1.ProtocolTCP, + }, + }, + want: LbEndpoints{[]string{}, []string{"2001:db2::2"}, 80}, + }, + { + name: "slices with non-ready non-serving endpoints", + args: args{ + slices: []*discovery.EndpointSlice{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "svc-ab23", + Namespace: "ns", + Labels: map[string]string{discovery.LabelServiceName: "svc"}, + }, + Ports: []discovery.EndpointPort{ + { + Name: utilpointer.StringPtr("tcp-example"), + Protocol: protoPtr(v1.ProtocolTCP), + Port: utilpointer.Int32Ptr(int32(80)), + }, + }, + AddressType: discovery.AddressTypeIPv6, + Endpoints: []discovery.Endpoint{ + { + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Serving: utilpointer.BoolPtr(false), + }, + Addresses: []string{"2001:db2::2"}, + }, + }, + }, + }, + svcPort: v1.ServicePort{ + Name: "tcp-example", + TargetPort: intstr.FromInt(80), + Protocol: v1.ProtocolTCP, + }, + }, + want: LbEndpoints{[]string{}, []string{}, 80}, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := GetLbEndpoints(tt.args.slices, tt.args.svcPort) + got := GetLbEndpoints(tt.args.slices, tt.args.svcPort, false) assert.Equal(t, tt.want, got) }) } diff --git a/go-controller/pkg/util/util.go b/go-controller/pkg/util/util.go index 98f7ae1973..0d1e763b24 100644 --- a/go-controller/pkg/util/util.go +++ b/go-controller/pkg/util/util.go @@ -19,6 +19,7 @@ import ( "github.com/urfave/cli/v2" + discovery "k8s.io/api/discovery/v1" "k8s.io/klog/v2" utilnet "k8s.io/utils/net" ) @@ -310,3 +311,30 @@ func UpdateNodeSwitchExcludeIPs(nbClient libovsdbclient.Client, nodeName string, return nil } + +// IsEndpointReady takes as input an endpoint from an endpoint slice and returns true if the endpoint is +// to be considered ready. Considering as ready an endpoint with Conditions.Ready==nil +// as per doc: "In most cases consumers should interpret this unknown state as ready" +// https://github.com/kubernetes/api/blob/0478a3e95231398d8b380dc2a1905972be8ae1d5/discovery/v1/types.go#L129-L131 +func IsEndpointReady(endpoint discovery.Endpoint) bool { + return endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready +} + +// IsEndpointServing takes as input an endpoint from an endpoint slice and returns true if the endpoint is +// to be considered serving. Falling back to IsEndpointReady when Serving field is nil, as per doc: +// "If nil, consumers should defer to the ready condition. +// https://github.com/kubernetes/api/blob/0478a3e95231398d8b380dc2a1905972be8ae1d5/discovery/v1/types.go#L138-L139 +func IsEndpointServing(endpoint discovery.Endpoint) bool { + if endpoint.Conditions.Serving != nil { + return *endpoint.Conditions.Serving + } else { + return IsEndpointReady(endpoint) + } +} + +// IsEndpointValid takes as input an endpoint from an endpoint slice and a boolean that indicates whether to include +// all terminating endpoints, as per the PublishNotReadyAddresses feature in kubernetes service spec. It always returns true +// if includeTerminating is true and falls back to IsEndpointServing otherwise. +func IsEndpointValid(endpoint discovery.Endpoint, includeTerminating bool) bool { + return includeTerminating || IsEndpointServing(endpoint) +}