Skip to content

Commit

Permalink
Merge pull request #3245 from ricky-rav/dev_OCPBUGS-1427_upstream_fix
Browse files Browse the repository at this point in the history
OCPBUGS-1427: Ignore non-ready endpoints when processing endpointslices
  • Loading branch information
trozet authored Oct 24, 2022
2 parents e3a7806 + 78a150c commit c72e852
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 29 deletions.
24 changes: 19 additions & 5 deletions go-controller/pkg/node/gateway_shared_intf.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,11 @@ func (npw *nodePortWatcher) AddEndpointSlice(epSlice *discovery.EndpointSlice) {
// Here we make sure the correct rules are programmed whenever an AddEndpointSlice
// event is received, only alter flows if we need to, i.e if cache wasn't
// set or if it was and hasLocalHostNetworkEp state changed, to prevent flow churn
namespacedName := namespacedNameFromEPSlice(epSlice)
namespacedName, err := namespacedNameFromEPSlice(epSlice)
if err != nil {
klog.Errorf("Skipping %s/%s: %v", epSlice.Namespace, epSlice.Name, err)
return
}
out, exists := npw.getAndSetServiceInfo(namespacedName, svc, hasLocalHostNetworkEp)
if !exists {
klog.V(5).Infof("Endpointslice %s ADD event in namespace %s is creating rules", epSlice.Name, epSlice.Namespace)
Expand Down Expand Up @@ -740,7 +744,11 @@ func (npw *nodePortWatcher) DeleteEndpointSlice(epSlice *discovery.EndpointSlice

klog.V(5).Infof("Deleting endpointslice %s in namespace %s", epSlice.Name, epSlice.Namespace)
// remove rules for endpoints and add back normal ones
namespacedName := namespacedNameFromEPSlice(epSlice)
namespacedName, err := namespacedNameFromEPSlice(epSlice)
if err != nil {
klog.Errorf("Skipping %s/%s: %v", epSlice.Namespace, epSlice.Name, err)
return
}
if svcConfig, exists := npw.updateServiceInfo(namespacedName, nil, &hasLocalHostNetworkEp); exists {
// Lock the cache mutex here so we don't miss a service delete during an endpoint delete
// we have to do this because deleting and adding iptables rules is slow.
Expand All @@ -755,8 +763,10 @@ func (npw *nodePortWatcher) DeleteEndpointSlice(epSlice *discovery.EndpointSlice
func getEndpointAddresses(endpointSlice *discovery.EndpointSlice) []string {
endpointsAddress := make([]string, 0)
for _, endpoint := range endpointSlice.Endpoints {
for _, ip := range endpoint.Addresses {
endpointsAddress = append(endpointsAddress, utilnet.ParseIPSloppy(ip).String())
if isEndpointReady(endpoint) {
for _, ip := range endpoint.Addresses {
endpointsAddress = append(endpointsAddress, utilnet.ParseIPSloppy(ip).String())
}
}
}
return endpointsAddress
Expand All @@ -769,7 +779,11 @@ func (npw *nodePortWatcher) UpdateEndpointSlice(oldEpSlice, newEpSlice *discover
return
}

namespacedName := namespacedNameFromEPSlice(oldEpSlice)
namespacedName, err := namespacedNameFromEPSlice(newEpSlice)
if err != nil {
klog.Errorf("Skipping %s/%s: %v", newEpSlice.Namespace, newEpSlice.Name, err)
return
}

if _, exists := npw.getServiceInfo(namespacedName); !exists {
// When a service is updated from externalName to nodeport type, it won't be
Expand Down
63 changes: 49 additions & 14 deletions go-controller/pkg/node/healthcheck.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package node

import (
"fmt"
"net"
"os"
"strings"
Expand Down Expand Up @@ -67,7 +68,7 @@ func (l *loadBalancerHealthChecker) UpdateService(old, new *kapi.Service) {
}
namespacedName := ktypes.NamespacedName{Namespace: new.Namespace, Name: new.Name}
l.Lock()
l.endpoints[namespacedName] = l.GetLocalEndpointAddressesCount(epSlices)
l.endpoints[namespacedName] = l.CountLocalEndpointAddresses(epSlices)
_ = l.server.SyncEndpoints(l.endpoints)
l.Unlock()
}
Expand All @@ -93,23 +94,33 @@ func (l *loadBalancerHealthChecker) SyncServices(svcs []interface{}) error {
}

func (l *loadBalancerHealthChecker) SyncEndPointSlices(epSlice *discovery.EndpointSlice) {
namespacedName := namespacedNameFromEPSlice(epSlice)
namespacedName, err := namespacedNameFromEPSlice(epSlice)
if err != nil {
klog.Errorf("Skipping %s/%s: %v", epSlice.Namespace, epSlice.Name, err)
return
}
epSlices, err := l.watchFactory.GetEndpointSlices(epSlice.Namespace, epSlice.Labels[discovery.LabelServiceName])
if err != nil {
// should be a rare occurence
klog.V(4).Infof("Could not fetch endpointslices for %v during health check", namespacedName)
klog.V(4).Infof("Could not fetch endpointslices for %s during health check", namespacedName.String())
}

localEndpointAddressCount := l.CountLocalEndpointAddresses(epSlices)
if len(epSlices) == 0 {
// let's delete it from cache and wait for the next update; this will show as 0 endpoints for health checks
delete(l.endpoints, namespacedName)
} else {
l.endpoints[namespacedName] = l.GetLocalEndpointAddressesCount(epSlices)
l.endpoints[namespacedName] = localEndpointAddressCount
}
_ = l.server.SyncEndpoints(l.endpoints)
}

func (l *loadBalancerHealthChecker) AddEndpointSlice(epSlice *discovery.EndpointSlice) {
namespacedName := namespacedNameFromEPSlice(epSlice)
namespacedName, err := namespacedNameFromEPSlice(epSlice)
if err != nil {
klog.Errorf("Skipping %s/%s: %v", epSlice.Namespace, epSlice.Name, err)
return
}
l.Lock()
defer l.Unlock()
if _, exists := l.services[namespacedName]; exists {
Expand All @@ -118,7 +129,11 @@ func (l *loadBalancerHealthChecker) AddEndpointSlice(epSlice *discovery.Endpoint
}

func (l *loadBalancerHealthChecker) UpdateEndpointSlice(oldEpSlice, newEpSlice *discovery.EndpointSlice) {
namespacedName := namespacedNameFromEPSlice(newEpSlice)
namespacedName, err := namespacedNameFromEPSlice(newEpSlice)
if err != nil {
klog.Errorf("Skipping %s/%s: %v", newEpSlice.Namespace, newEpSlice.Name, err)
return
}
l.Lock()
defer l.Unlock()
if _, exists := l.services[namespacedName]; exists {
Expand All @@ -132,17 +147,19 @@ func (l *loadBalancerHealthChecker) DeleteEndpointSlice(epSlice *discovery.Endpo
l.SyncEndPointSlices(epSlice)
}

// GetLocalEndpointAddresses returns the number of endpoints that are local to the node for a service
func (l *loadBalancerHealthChecker) GetLocalEndpointAddressesCount(endpointSlices []*discovery.EndpointSlice) int {
localEndpoints := sets.NewString()
// CountLocalEndpointAddresses returns the number of IP addresses from ready endpoints that are local
// to the node for a service
func (l *loadBalancerHealthChecker) CountLocalEndpointAddresses(endpointSlices []*discovery.EndpointSlice) int {
localEndpointAddresses := sets.NewString()
for _, endpointSlice := range endpointSlices {
for _, endpoint := range endpointSlice.Endpoints {
if endpoint.NodeName != nil && *endpoint.NodeName == l.nodeName {
localEndpoints.Insert(endpoint.Addresses...)
isLocal := endpoint.NodeName != nil && *endpoint.NodeName == l.nodeName
if isEndpointReady(endpoint) && isLocal {
localEndpointAddresses.Insert(endpoint.Addresses...)
}
}
}
return len(localEndpoints)
return len(localEndpointAddresses)
}

// hasLocalHostNetworkEndpoints returns true if there is at least one host-networked endpoint
Expand All @@ -151,6 +168,9 @@ func (l *loadBalancerHealthChecker) GetLocalEndpointAddressesCount(endpointSlice
func hasLocalHostNetworkEndpoints(epSlices []*discovery.EndpointSlice, nodeAddresses []net.IP) bool {
for _, epSlice := range epSlices {
for _, endpoint := range epSlice.Endpoints {
if !isEndpointReady(endpoint) {
continue
}
for _, ip := range endpoint.Addresses {
for _, nodeIP := range nodeAddresses {
if nodeIP.String() == utilnet.ParseIPSloppy(ip).String() {
Expand All @@ -173,6 +193,13 @@ func isHostEndpoint(endpointIP string) bool {
return true
}

// discovery.EndpointSlice reports in the same slice all endpoints along with their status.
// isEndpointReady takes an endpoint from an endpoint slice and returns true if the endpoint is
// to be considered ready.
func isEndpointReady(endpoint discovery.Endpoint) bool {
return endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready
}

// checkForStaleOVSInternalPorts checks for OVS internal ports without any ofport assigned,
// they are stale ports that must be deleted
func checkForStaleOVSInternalPorts() {
Expand Down Expand Up @@ -444,7 +471,15 @@ func checkPorts(patchIntf, ofPortPatch, physIntf, ofPortPhys string) error {
return nil
}

func namespacedNameFromEPSlice(epSlice *discovery.EndpointSlice) ktypes.NamespacedName {
func namespacedNameFromEPSlice(epSlice *discovery.EndpointSlice) (ktypes.NamespacedName, error) {
// Return the namespaced name of the corresponding service
var serviceNamespacedName ktypes.NamespacedName
svcName := epSlice.Labels[discovery.LabelServiceName]
return ktypes.NamespacedName{Namespace: epSlice.Namespace, Name: svcName}
if svcName == "" {
// should not happen, since the informer already filters out endpoint slices with an empty service label
return serviceNamespacedName,
fmt.Errorf("endpointslice %s/%s: empty value for label %s",
epSlice.Namespace, epSlice.Name, discovery.LabelServiceName)
}
return ktypes.NamespacedName{Namespace: epSlice.Namespace, Name: svcName}, nil
}
23 changes: 13 additions & 10 deletions go-controller/pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,17 +676,17 @@ func (n *OvnNode) WatchEndpointSlices() error {
UpdateFunc: func(old, new interface{}) {
newEndpointSlice := new.(*discovery.EndpointSlice)
oldEndpointSlice := old.(*discovery.EndpointSlice)
for _, port := range oldEndpointSlice.Ports {
for _, endpoint := range oldEndpointSlice.Endpoints {
for _, ip := range endpoint.Addresses {
ipStr := utilnet.ParseIPSloppy(ip).String()
if doesEPSliceContainEndpoint(newEndpointSlice, ipStr, *port.Port, *port.Protocol) {
for _, oldPort := range oldEndpointSlice.Ports {
for _, oldEndpoint := range oldEndpointSlice.Endpoints {
for _, oldIP := range oldEndpoint.Addresses {
oldIPStr := utilnet.ParseIPSloppy(oldIP).String()
if doesEPSliceContainReadyEndpoint(newEndpointSlice, oldIPStr, *oldPort.Port, *oldPort.Protocol) {
continue
}
if *port.Protocol == kapi.ProtocolUDP { // flush conntrack only for UDP
err := util.DeleteConntrack(ipStr, *port.Port, *port.Protocol, netlink.ConntrackReplyAnyIP, nil)
if *oldPort.Protocol == kapi.ProtocolUDP { // flush conntrack only for UDP
err := util.DeleteConntrack(oldIPStr, *oldPort.Port, *oldPort.Protocol, netlink.ConntrackReplyAnyIP, nil)
if err != nil {
klog.Errorf("Failed to delete conntrack entry for %s: %v", ipStr, err)
klog.Errorf("Failed to delete conntrack entry for %s: %v", oldIPStr, err)
}
}
}
Expand Down Expand Up @@ -851,11 +851,14 @@ func (n *OvnNode) validateVTEPInterfaceMTU() error {
}

// doesEPSliceContainEndpoint checks whether the endpointslice
// contains a specific endpoint with IP/Port/Protocol
func doesEPSliceContainEndpoint(epSlice *discovery.EndpointSlice,
// contains a specific endpoint with IP/Port/Protocol and this endpoint is ready
func doesEPSliceContainReadyEndpoint(epSlice *discovery.EndpointSlice,
epIP string, epPort int32, protocol kapi.Protocol) bool {
for _, port := range epSlice.Ports {
for _, endpoint := range epSlice.Endpoints {
if !isEndpointReady(endpoint) {
continue
}
for _, ip := range endpoint.Addresses {
if utilnet.ParseIPSloppy(ip).String() == epIP && *port.Port == epPort && *port.Protocol == protocol {
return true
Expand Down

0 comments on commit c72e852

Please sign in to comment.