Skip to content

Commit

Permalink
Check the "Serving" field for endpoints
Browse files Browse the repository at this point in the history
Master, node and healthcheck code looked up the Ready field in endpoints inside endpointslices, in line with the previous implementation that dealt with the older and less fine-grained Endpoints resource, that only distinguished between ready and not ready. Healthchecks should indeed rely on the Ready field, while node and master could look up the "Serving" field, so that we won't prematurely refuse incoming connections to an endpoint that is scheduled to be terminating but is still up and running.

Making a special case for services with PublishNotReadyAddresses set, as their endpoints should always be considered ready, even if terminating. A couple of upstream kubernetes tests use this feature in 1.26.

Signed-off-by: Riccardo Ravaioli <rravaiol@redhat.com>
(cherry picked from commit fd414d1)
  • Loading branch information
ricky-rav committed Mar 17, 2023
1 parent da42356 commit aceef01
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 35 deletions.
30 changes: 19 additions & 11 deletions go-controller/pkg/node/gateway_shared_intf.go
Expand Up @@ -19,6 +19,7 @@ import (

kapi "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
ktypes "k8s.io/apimachinery/pkg/types"
apierrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -774,7 +775,8 @@ func (npw *nodePortWatcher) AddEndpointSlice(epSlice *discovery.EndpointSlice) e
// Here we make sure the correct rules are programmed whenever an AddEndpointSlice
// event is received, only alter flows if we need to, i.e if cache wasn't
// set or if it was and hasLocalHostNetworkEp state changed, to prevent flow churn
namespacedName, err := namespacedNameFromEPSlice(epSlice)
namespacedName, err := serviceNamespacedNameFromEndpointSlice(epSlice)

if err != nil {
return fmt.Errorf("cannot add %s/%s to nodePortWatcher: %v", epSlice.Namespace, epSlice.Name, err)
}
Expand Down Expand Up @@ -814,7 +816,7 @@ func (npw *nodePortWatcher) DeleteEndpointSlice(epSlice *discovery.EndpointSlice

klog.V(5).Infof("Deleting endpointslice %s in namespace %s", epSlice.Name, epSlice.Namespace)
// remove rules for endpoints and add back normal ones
namespacedName, err := namespacedNameFromEPSlice(epSlice)
namespacedName, err := serviceNamespacedNameFromEndpointSlice(epSlice)
if err != nil {
return fmt.Errorf("cannot delete %s/%s from nodePortWatcher: %v", epSlice.Namespace, epSlice.Name, err)
}
Expand All @@ -835,10 +837,11 @@ func (npw *nodePortWatcher) DeleteEndpointSlice(epSlice *discovery.EndpointSlice
return nil
}

func getEndpointAddresses(endpointSlice *discovery.EndpointSlice) []string {
func getEndpointAddresses(endpointSlice *discovery.EndpointSlice, service *kapi.Service) []string {
endpointsAddress := make([]string, 0)
includeTerminating := service != nil && service.Spec.PublishNotReadyAddresses
for _, endpoint := range endpointSlice.Endpoints {
if isEndpointReady(endpoint) {
if util.IsEndpointValid(endpoint, includeTerminating) {
for _, ip := range endpoint.Addresses {
endpointsAddress = append(endpointsAddress, utilnet.ParseIPSloppy(ip).String())
}
Expand All @@ -851,18 +854,23 @@ func (npw *nodePortWatcher) UpdateEndpointSlice(oldEpSlice, newEpSlice *discover
var err error
var errors []error

oldEpAddr := getEndpointAddresses(oldEpSlice)
newEpAddr := getEndpointAddresses(newEpSlice)
namespacedName, err := serviceNamespacedNameFromEndpointSlice(newEpSlice)
if err != nil {
return fmt.Errorf("cannot update %s/%s in nodePortWatcher: %v", newEpSlice.Namespace, newEpSlice.Name, err)
}
svc, err := npw.watchFactory.GetService(namespacedName.Namespace, namespacedName.Name)
if err != nil && !kerrors.IsNotFound(err) {
return fmt.Errorf("error while retrieving service for endpointslice %s/%s during update: %v",
oldEpSlice.Namespace, oldEpSlice.Name, err)
}

oldEpAddr := getEndpointAddresses(oldEpSlice, svc)
newEpAddr := getEndpointAddresses(newEpSlice, svc)
if reflect.DeepEqual(oldEpAddr, newEpAddr) {
return nil
}

klog.V(5).Infof("Updating endpointslice %s in namespace %s", oldEpSlice.Name, oldEpSlice.Namespace)

namespacedName, err := namespacedNameFromEPSlice(newEpSlice)
if err != nil {
return fmt.Errorf("cannot update %s/%s in nodePortWatcher: %v", newEpSlice.Namespace, newEpSlice.Name, err)
}
if _, exists := npw.getServiceInfo(namespacedName); !exists {
// When a service is updated from externalName to nodeport type, it won't be
// in nodePortWatcher cache (npw): in this case, have the new nodeport IPtable rules
Expand Down
8 changes: 4 additions & 4 deletions go-controller/pkg/node/healthcheck_service.go
Expand Up @@ -102,7 +102,7 @@ func (l *loadBalancerHealthChecker) SyncServices(svcs []interface{}) error {
}

func (l *loadBalancerHealthChecker) SyncEndPointSlices(epSlice *discovery.EndpointSlice) error {
namespacedName, err := namespacedNameFromEPSlice(epSlice)
namespacedName, err := serviceNamespacedNameFromEndpointSlice(epSlice)
if err != nil {
return fmt.Errorf("skipping %s/%s: %v", epSlice.Namespace, epSlice.Name, err)
}
Expand All @@ -124,7 +124,7 @@ func (l *loadBalancerHealthChecker) SyncEndPointSlices(epSlice *discovery.Endpoi
}

func (l *loadBalancerHealthChecker) AddEndpointSlice(epSlice *discovery.EndpointSlice) error {
namespacedName, err := namespacedNameFromEPSlice(epSlice)
namespacedName, err := serviceNamespacedNameFromEndpointSlice(epSlice)
if err != nil {
return fmt.Errorf("cannot add %s/%s to loadBalancerHealthChecker: %v", epSlice.Namespace, epSlice.Name, err)
}
Expand All @@ -137,7 +137,7 @@ func (l *loadBalancerHealthChecker) AddEndpointSlice(epSlice *discovery.Endpoint
}

func (l *loadBalancerHealthChecker) UpdateEndpointSlice(oldEpSlice, newEpSlice *discovery.EndpointSlice) error {
namespacedName, err := namespacedNameFromEPSlice(newEpSlice)
namespacedName, err := serviceNamespacedNameFromEndpointSlice(newEpSlice)
if err != nil {
return fmt.Errorf("cannot update %s/%s in loadBalancerHealthChecker: %v",
newEpSlice.Namespace, newEpSlice.Name, err)
Expand Down Expand Up @@ -209,7 +209,7 @@ func isEndpointReady(endpoint discovery.Endpoint) bool {
return endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready
}

func namespacedNameFromEPSlice(epSlice *discovery.EndpointSlice) (ktypes.NamespacedName, error) {
func serviceNamespacedNameFromEndpointSlice(epSlice *discovery.EndpointSlice) (ktypes.NamespacedName, error) {
// Return the namespaced name of the corresponding service
var serviceNamespacedName ktypes.NamespacedName
svcName := epSlice.Labels[discovery.LabelServiceName]
Expand Down
24 changes: 17 additions & 7 deletions go-controller/pkg/node/node.go
Expand Up @@ -13,6 +13,7 @@ import (

kapi "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -715,7 +716,15 @@ func (n *OvnNode) reconcileConntrackUponEndpointSliceEvents(oldEndpointSlice, ne
// nothing to do upon an add event
return nil
}

namespacedName, err := serviceNamespacedNameFromEndpointSlice(oldEndpointSlice)
if err != nil {
return fmt.Errorf("cannot reconcile conntrack: %v", err)
}
svc, err := n.watchFactory.GetService(namespacedName.Namespace, namespacedName.Name)
if err != nil && !kerrors.IsNotFound(err) {
return fmt.Errorf("error while retrieving service for endpointslice %s/%s when reconciling conntrack: %v",
newEndpointSlice.Namespace, newEndpointSlice.Name, err)
}
for _, oldPort := range oldEndpointSlice.Ports {
if *oldPort.Protocol != kapi.ProtocolUDP { // flush conntrack only for UDP
continue
Expand All @@ -725,7 +734,7 @@ func (n *OvnNode) reconcileConntrackUponEndpointSliceEvents(oldEndpointSlice, ne
oldIPStr := utilnet.ParseIPSloppy(oldIP).String()
// upon an update event, remove conntrack entries for IP addresses that are no longer
// in the endpointslice, skip otherwise
if newEndpointSlice != nil && doesEPSliceContainReadyEndpoint(newEndpointSlice, oldIPStr, *oldPort.Port, *oldPort.Protocol) {
if newEndpointSlice != nil && doesEndpointSliceContainValidEndpoint(newEndpointSlice, oldIPStr, *oldPort.Port, *oldPort.Protocol, svc) {
continue
}
// upon update and delete events, flush conntrack only for UDP
Expand Down Expand Up @@ -876,13 +885,14 @@ func (n *OvnNode) validateVTEPInterfaceMTU() error {
return nil
}

// doesEPSliceContainEndpoint checks whether the endpointslice
// contains a specific endpoint with IP/Port/Protocol and this endpoint is ready
func doesEPSliceContainReadyEndpoint(epSlice *discovery.EndpointSlice,
epIP string, epPort int32, protocol kapi.Protocol) bool {
// doesEndpointSliceContainValidEndpoint returns true if the endpointslice
// contains an endpoint with the given IP/Port/Protocol and this endpoint is considered valid
func doesEndpointSliceContainValidEndpoint(epSlice *discovery.EndpointSlice,
epIP string, epPort int32, protocol kapi.Protocol, service *kapi.Service) bool {
includeTerminating := service != nil && service.Spec.PublishNotReadyAddresses
for _, port := range epSlice.Ports {
for _, endpoint := range epSlice.Endpoints {
if !isEndpointReady(endpoint) {
if !util.IsEndpointValid(endpoint, includeTerminating) {
continue
}
for _, ip := range endpoint.Addresses {
Expand Down
2 changes: 1 addition & 1 deletion go-controller/pkg/ovn/controller/services/load_balancer.go
Expand Up @@ -63,7 +63,7 @@ var protos = []v1.Protocol{
func buildServiceLBConfigs(service *v1.Service, endpointSlices []*discovery.EndpointSlice) (perNodeConfigs []lbConfig, clusterConfigs []lbConfig) {
// For each svcPort, determine if it will be applied per-node or cluster-wide
for _, svcPort := range service.Spec.Ports {
eps := util.GetLbEndpoints(endpointSlices, svcPort)
eps := util.GetLbEndpoints(endpointSlices, svcPort, service.Spec.PublishNotReadyAddresses)

// if ExternalTrafficPolicy or InternalTrafficPolicy is local, then we need to do things a bit differently
externalTrafficLocal := (service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal)
Expand Down
22 changes: 11 additions & 11 deletions go-controller/pkg/util/kube.go
Expand Up @@ -306,31 +306,31 @@ type LbEndpoints struct {
Port int32
}

// GetLbEndpoints return the endpoints that belong to the IPFamily as a slice of IPs
func GetLbEndpoints(slices []*discovery.EndpointSlice, svcPort kapi.ServicePort) LbEndpoints {
// GetLbEndpoints returns the IPv4 and IPv6 addresses of valid endpoints as slices inside a struct
func GetLbEndpoints(slices []*discovery.EndpointSlice, svcPort kapi.ServicePort, includeTerminating bool) LbEndpoints {
v4ips := sets.NewString()
v6ips := sets.NewString()

out := LbEndpoints{}
// return an empty object so the caller don't have to check for nil and can use it as an iterator
// return an empty object so the caller doesn't have to check for nil and can use it as an iterator
if len(slices) == 0 {
return out
}

for _, slice := range slices {
klog.V(4).Infof("Getting endpoints for slice %s/%s", slice.Namespace, slice.Name)

// build the list of endpoints in the slice
// build the list of valid endpoints in the slice
for _, port := range slice.Ports {
// If Service port name set it must match the name field in the endpoint
// If Service port name is not set we just use the endpoint port
// If Service port name is set, it must match the name field in the endpoint
// If Service port name is not set, we just use the endpoint port
if svcPort.Name != "" && svcPort.Name != *port.Name {
klog.V(5).Infof("Slice %s with different Port name, requested: %s received: %s",
slice.Name, svcPort.Name, *port.Name)
continue
}

// Skip ports that doesn't match the protocol
// Skip ports that don't match the protocol
if *port.Protocol != svcPort.Protocol {
klog.V(5).Infof("Slice %s with different Port protocol, requested: %s received: %s",
slice.Name, svcPort.Protocol, *port.Protocol)
Expand All @@ -339,13 +339,13 @@ func GetLbEndpoints(slices []*discovery.EndpointSlice, svcPort kapi.ServicePort)

out.Port = *port.Port
for _, endpoint := range slice.Endpoints {
// Skip endpoints that are not ready
if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready {
klog.V(4).Infof("Slice endpoints Not Ready")
// Skip endpoint if it's not valid
if !IsEndpointValid(endpoint, includeTerminating) {
klog.V(4).Infof("Slice endpoint not valid")
continue
}
for _, ip := range endpoint.Addresses {
klog.V(4).Infof("Adding slice %s endpoints: %v, port: %d", slice.Name, endpoint.Addresses, *port.Port)
klog.V(4).Infof("Adding slice %s endpoint: %v, port: %d", slice.Name, endpoint.Addresses, *port.Port)
ipStr := utilnet.ParseIPSloppy(ip).String()
switch slice.AddressType {
case discovery.AddressTypeIPv4:
Expand Down
76 changes: 75 additions & 1 deletion go-controller/pkg/util/kube_test.go
Expand Up @@ -637,10 +637,84 @@ func Test_getLbEndpoints(t *testing.T) {
},
want: LbEndpoints{[]string{"10.0.0.2", "10.1.1.2", "10.2.2.2"}, []string{}, 80},
},
{
name: "slices with non-ready but serving endpoints",
args: args{
slices: []*discovery.EndpointSlice{
{
ObjectMeta: metav1.ObjectMeta{
Name: "svc-ab23",
Namespace: "ns",
Labels: map[string]string{discovery.LabelServiceName: "svc"},
},
Ports: []discovery.EndpointPort{
{
Name: utilpointer.StringPtr("tcp-example"),
Protocol: protoPtr(v1.ProtocolTCP),
Port: utilpointer.Int32Ptr(int32(80)),
},
},
AddressType: discovery.AddressTypeIPv6,
Endpoints: []discovery.Endpoint{
{
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(true),
},
Addresses: []string{"2001:db2::2"},
},
},
},
},
svcPort: v1.ServicePort{
Name: "tcp-example",
TargetPort: intstr.FromInt(80),
Protocol: v1.ProtocolTCP,
},
},
want: LbEndpoints{[]string{}, []string{"2001:db2::2"}, 80},
},
{
name: "slices with non-ready non-serving endpoints",
args: args{
slices: []*discovery.EndpointSlice{
{
ObjectMeta: metav1.ObjectMeta{
Name: "svc-ab23",
Namespace: "ns",
Labels: map[string]string{discovery.LabelServiceName: "svc"},
},
Ports: []discovery.EndpointPort{
{
Name: utilpointer.StringPtr("tcp-example"),
Protocol: protoPtr(v1.ProtocolTCP),
Port: utilpointer.Int32Ptr(int32(80)),
},
},
AddressType: discovery.AddressTypeIPv6,
Endpoints: []discovery.Endpoint{
{
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(false),
Serving: utilpointer.BoolPtr(false),
},
Addresses: []string{"2001:db2::2"},
},
},
},
},
svcPort: v1.ServicePort{
Name: "tcp-example",
TargetPort: intstr.FromInt(80),
Protocol: v1.ProtocolTCP,
},
},
want: LbEndpoints{[]string{}, []string{}, 80},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := GetLbEndpoints(tt.args.slices, tt.args.svcPort)
got := GetLbEndpoints(tt.args.slices, tt.args.svcPort, false)
assert.Equal(t, tt.want, got)
})
}
Expand Down
28 changes: 28 additions & 0 deletions go-controller/pkg/util/util.go
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/urfave/cli/v2"

discovery "k8s.io/api/discovery/v1"
"k8s.io/klog/v2"
utilnet "k8s.io/utils/net"
)
Expand Down Expand Up @@ -310,3 +311,30 @@ func UpdateNodeSwitchExcludeIPs(nbClient libovsdbclient.Client, nodeName string,

return nil
}

// IsEndpointReady takes as input an endpoint from an endpoint slice and returns true if the endpoint is
// to be considered ready. Considering as ready an endpoint with Conditions.Ready==nil
// as per doc: "In most cases consumers should interpret this unknown state as ready"
// https://github.com/kubernetes/api/blob/0478a3e95231398d8b380dc2a1905972be8ae1d5/discovery/v1/types.go#L129-L131
func IsEndpointReady(endpoint discovery.Endpoint) bool {
return endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready
}

// IsEndpointServing takes as input an endpoint from an endpoint slice and returns true if the endpoint is
// to be considered serving. Falling back to IsEndpointReady when Serving field is nil, as per doc:
// "If nil, consumers should defer to the ready condition.
// https://github.com/kubernetes/api/blob/0478a3e95231398d8b380dc2a1905972be8ae1d5/discovery/v1/types.go#L138-L139
func IsEndpointServing(endpoint discovery.Endpoint) bool {
if endpoint.Conditions.Serving != nil {
return *endpoint.Conditions.Serving
} else {
return IsEndpointReady(endpoint)
}
}

// IsEndpointValid takes as input an endpoint from an endpoint slice and a boolean that indicates whether to include
// all terminating endpoints, as per the PublishNotReadyAddresses feature in kubernetes service spec. It always returns true
// if includeTerminating is true and falls back to IsEndpointServing otherwise.
func IsEndpointValid(endpoint discovery.Endpoint, includeTerminating bool) bool {
return includeTerminating || IsEndpointServing(endpoint)
}

0 comments on commit aceef01

Please sign in to comment.