Skip to content

Commit

Permalink
fix: check endpoint slice update after backend pool update for local …
Browse files Browse the repository at this point in the history
…service to prevent mismatch
  • Loading branch information
nilo19 committed Sep 5, 2023
1 parent 8e4b637 commit 6d3c662
Show file tree
Hide file tree
Showing 15 changed files with 343 additions and 148 deletions.
3 changes: 3 additions & 0 deletions pkg/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package consts

import (
"strings"
"time"

"github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2021-09-01/storage"
Expand Down Expand Up @@ -203,6 +204,8 @@ const (
IPVersionDualStackString string = "DualStack"
)

var IPVersionIPv6StringLower = strings.ToLower(IPVersionIPv6String)

// LB variables for dual-stack
var (
// Service.Spec.LoadBalancerIP has been deprecated and may be removed in a future release. Those two annotations are introduced as alternatives to set IPv4/IPv6 LoadBalancer IPs.
Expand Down
13 changes: 4 additions & 9 deletions pkg/nodemanager/nodemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,6 @@ var updateNetworkConditionBackoff = wait.Backoff{
Jitter: 1.0,
}

const (
v4Suffix = "IPv4"
v6Suffix = "IPv6"
)

// CloudNodeController reconciles node information.
type CloudNodeController struct {
nodeName string
Expand Down Expand Up @@ -586,18 +581,18 @@ func nodeAddressesChangeDetected(addressSet1, addressSet2 []v1.NodeAddress) bool
addressMap1 := map[string]string{}

for i := range addressSet1 {
suffix := v4Suffix
suffix := consts.IPVersionIPv4String
if net.ParseIP(addressSet1[i].Address).To4() == nil {
suffix = v6Suffix
suffix = consts.IPVersionIPv6String
}
addrType := fmt.Sprintf("%s/%s", addressSet1[i].Type, suffix)
addressMap1[addrType] = addressSet1[i].Address
}

for _, v := range addressSet2 {
suffix := v4Suffix
suffix := consts.IPVersionIPv4String
if net.ParseIP(v.Address).To4() == nil {
suffix = v6Suffix
suffix = consts.IPVersionIPv6String
}
addrType := fmt.Sprintf("%s/%s", v.Type, suffix)
if addressMap1[addrType] != v.Address {
Expand Down
19 changes: 10 additions & 9 deletions pkg/provider/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -1258,18 +1258,19 @@ func (az *Cloud) updateNodeCaches(prevNode, newNode *v1.Node) {
az.unmanagedNodes.Delete(prevNode.ObjectMeta.Name)
}

// if the node is being deleted from the cluster, exclude it from load balancers
if newNode == nil {
az.excludeLoadBalancerNodes.Insert(prevNode.ObjectMeta.Name)
az.nodesWithCorrectLoadBalancerByPrimaryVMSet.Delete(strings.ToLower(prevNode.ObjectMeta.Name))
}

// Remove from nodePrivateIPs cache.
for _, address := range getNodePrivateIPAddresses(prevNode) {
klog.V(4).Infof("removing IP address %s of the node %s", address, prevNode.Name)
az.nodePrivateIPs[prevNode.Name].Delete(address)
delete(az.nodePrivateIPToNodeNameMap, address)
}

// if the node is being deleted from the cluster, exclude it from load balancers
if newNode == nil {
az.excludeLoadBalancerNodes.Insert(prevNode.ObjectMeta.Name)
az.nodesWithCorrectLoadBalancerByPrimaryVMSet.Delete(strings.ToLower(prevNode.ObjectMeta.Name))
delete(az.nodePrivateIPs, strings.ToLower(prevNode.Name))
}
}

if newNode != nil {
Expand Down Expand Up @@ -1318,15 +1319,15 @@ func (az *Cloud) updateNodeCaches(prevNode, newNode *v1.Node) {

// Add to nodePrivateIPs cache
for _, address := range getNodePrivateIPAddresses(newNode) {
if az.nodePrivateIPs[newNode.Name] == nil {
az.nodePrivateIPs[newNode.Name] = sets.New[string]()
if az.nodePrivateIPs[strings.ToLower(newNode.Name)] == nil {
az.nodePrivateIPs[strings.ToLower(newNode.Name)] = sets.New[string]()
}
if az.nodePrivateIPToNodeNameMap == nil {
az.nodePrivateIPToNodeNameMap = make(map[string]string)
}

klog.V(6).Infof("adding IP address %s of the node %s", address, newNode.Name)
az.nodePrivateIPs[newNode.Name].Insert(address)
az.nodePrivateIPs[strings.ToLower(newNode.Name)].Insert(address)
az.nodePrivateIPToNodeNameMap[address] = newNode.Name
}
}
Expand Down
58 changes: 46 additions & 12 deletions pkg/provider/azure_loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ func (az *Cloud) reconcileService(ctx context.Context, clusterName string, servi
key := strings.ToLower(serviceName)
if az.useMultipleStandardLoadBalancers() && isLocalService(service) {
az.localServiceNameToServiceInfoMap.Store(key, newServiceInfo(getServiceIPFamily(service), lbName))
// There are chances that the endpointslice changes after EnsureHostsInPool, so
// need to check endpointslice for a second time.
if err := az.checkAndApplyLocalServiceBackendPoolUpdates(*lb, service); err != nil {
return nil, err
}
} else {
az.localServiceNameToServiceInfoMap.Delete(key)
}
Expand Down Expand Up @@ -1801,18 +1806,9 @@ func (az *Cloud) reconcileLoadBalancer(clusterName string, service *v1.Service,
if az.useMultipleStandardLoadBalancers() {
lbToReconcile = *existingLBs
}
for _, lb := range lbToReconcile {
lbName := pointer.StringDeref(lb.Name, "")
if lb.LoadBalancerPropertiesFormat != nil && lb.LoadBalancerPropertiesFormat.BackendAddressPools != nil {
for _, backendPool := range *lb.LoadBalancerPropertiesFormat.BackendAddressPools {
isIPv6 := isBackendPoolIPv6(pointer.StringDeref(backendPool.Name, ""))
if strings.EqualFold(pointer.StringDeref(backendPool.Name, ""), az.getBackendPoolNameForService(service, clusterName, isIPv6)) {
if err := az.LoadBalancerBackendPool.EnsureHostsInPool(service, nodes, lbBackendPoolIDs[isIPv6], vmSetName, clusterName, lbName, backendPool); err != nil {
return nil, err
}
}
}
}
lb, err = az.reconcileBackendPoolHosts(lb, lbToReconcile, service, nodes, clusterName, vmSetName, lbBackendPoolIDs)
if err != nil {
return nil, err
}
}

Expand All @@ -1824,6 +1820,44 @@ func (az *Cloud) reconcileLoadBalancer(clusterName string, service *v1.Service,
return lb, nil
}

func (az *Cloud) reconcileBackendPoolHosts(
currentLB *network.LoadBalancer,
lbs []network.LoadBalancer,
service *v1.Service,
nodes []*v1.Node,
clusterName, vmSetName string,
lbBackendPoolIDs map[bool]string,
) (*network.LoadBalancer, error) {
var res *network.LoadBalancer
res = currentLB
for _, lb := range lbs {
lb := lb
lbName := pointer.StringDeref(lb.Name, "")
if lb.LoadBalancerPropertiesFormat != nil && lb.LoadBalancerPropertiesFormat.BackendAddressPools != nil {
for i, backendPool := range *lb.LoadBalancerPropertiesFormat.BackendAddressPools {
isIPv6 := isBackendPoolIPv6(pointer.StringDeref(backendPool.Name, ""))
if strings.EqualFold(pointer.StringDeref(backendPool.Name, ""), az.getBackendPoolNameForService(service, clusterName, isIPv6)) {
if err := az.LoadBalancerBackendPool.EnsureHostsInPool(
service,
nodes,
lbBackendPoolIDs[isIPv6],
vmSetName,
clusterName,
lbName,
(*lb.LoadBalancerPropertiesFormat.BackendAddressPools)[i],
); err != nil {
return nil, err
}
}
}
}
if strings.EqualFold(lbName, *currentLB.Name) {
res = &lb
}
}
return res, nil
}

// addOrUpdateLBInList adds or updates the given lb in the list
func addOrUpdateLBInList(lbs *[]network.LoadBalancer, targetLB *network.LoadBalancer) {
for i, lb := range *lbs {
Expand Down
4 changes: 2 additions & 2 deletions pkg/provider/azure_loadbalancer_backendpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func (bc *backendPoolTypeNodeIPConfig) GetBackendPrivateIPs(clusterName string,
klog.Errorf("bc.GetBackendPrivateIPs for service (%s): GetNodeNameByIPConfigurationID failed with error: %v", serviceName, err)
continue
}
privateIPsSet, ok := bc.nodePrivateIPs[nodeName]
privateIPsSet, ok := bc.nodePrivateIPs[strings.ToLower(nodeName)]
if !ok {
klog.Warningf("bc.GetBackendPrivateIPs for service (%s): failed to get private IPs of node %s", serviceName, nodeName)
continue
Expand Down Expand Up @@ -658,7 +658,7 @@ func (bi *backendPoolTypeNodeIP) ReconcileBackendPools(clusterName string, servi
bp := newBackendPools[i]
var nodeIPAddressesToBeDeleted []string
for nodeName := range bi.excludeLoadBalancerNodes {
for ip := range bi.nodePrivateIPs[nodeName] {
for ip := range bi.nodePrivateIPs[strings.ToLower(nodeName)] {
klog.V(2).Infof("bi.ReconcileBackendPools for service (%s): found unwanted node private IP %s, decouple it from the LB %s", serviceName, ip, lbName)
nodeIPAddressesToBeDeleted = append(nodeIPAddressesToBeDeleted, ip)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/provider/azure_loadbalancer_healthprobe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func getTestProbes(protocol, path string, interval, servicePort, probePort, numO
func getTestProbe(protocol, path string, interval, servicePort, probePort, numOfProbe *int32, isIPv6 bool) network.Probe {
suffix := ""
if isIPv6 {
suffix = "-" + v6Suffix
suffix = "-" + consts.IPVersionIPv6String
}
expectedProbes := network.Probe{
Name: pointer.String(fmt.Sprintf("atest1-TCP-%d", *servicePort) + suffix),
Expand Down
68 changes: 65 additions & 3 deletions pkg/provider/azure_loadbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3023,7 +3023,7 @@ func getTCPResetTestRules(enableTCPReset bool) map[bool][]network.LoadBalancingR
func getTestRule(enableTCPReset bool, port int32, isIPv6 bool) network.LoadBalancingRule {
suffix := ""
if isIPv6 {
suffix = "-" + v6Suffix
suffix = "-" + consts.IPVersionIPv6String
}
expectedRules := network.LoadBalancingRule{
Name: pointer.String(fmt.Sprintf("atest1-TCP-%d", port) + suffix),
Expand Down Expand Up @@ -3057,7 +3057,7 @@ func getHATestRules(enableTCPReset, hasProbe bool, protocol v1.Protocol, isIPv6,
suffix := ""
enableFloatingIP := true
if isIPv6 {
suffix = "-" + v6Suffix
suffix = "-" + consts.IPVersionIPv6String
if isInternal {
enableFloatingIP = false
}
Expand Down Expand Up @@ -3096,7 +3096,7 @@ func getHATestRules(enableTCPReset, hasProbe bool, protocol v1.Protocol, isIPv6,
func getFloatingIPTestRule(enableTCPReset, enableFloatingIP bool, port int32, isIPv6 bool) network.LoadBalancingRule {
suffix := ""
if isIPv6 {
suffix = "-" + v6Suffix
suffix = "-" + consts.IPVersionIPv6String
}
expectedRules := network.LoadBalancingRule{
Name: pointer.String(fmt.Sprintf("atest1-TCP-%d%s", port, suffix)),
Expand Down Expand Up @@ -9002,3 +9002,65 @@ func TestAddOrUpdateLBInList(t *testing.T) {
addOrUpdateLBInList(&existingLBs, &targetLB)
assert.Equal(t, expectedLBs, existingLBs)
}

func TestReconcileBackendPoolHosts(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

svc := getTestService("test", v1.ProtocolTCP, nil, false)
lbBackendPoolIDs := map[bool]string{false: "id"}
clusterName := "kubernetes"
ips := []string{"10.0.0.1"}
bp1 := buildTestLoadBalancerBackendPoolWithIPs(clusterName, ips)
bp2 := buildTestLoadBalancerBackendPoolWithIPs(clusterName, ips)
ips = []string{"10.0.0.2", "10.0.0.3"}
bp3 := buildTestLoadBalancerBackendPoolWithIPs(clusterName, ips)
lb1 := &network.LoadBalancer{
Name: pointer.String(clusterName),
LoadBalancerPropertiesFormat: &network.LoadBalancerPropertiesFormat{
BackendAddressPools: &[]network.BackendAddressPool{bp1},
},
}
lb2 := &network.LoadBalancer{
Name: pointer.String("lb2"),
LoadBalancerPropertiesFormat: &network.LoadBalancerPropertiesFormat{
BackendAddressPools: &[]network.BackendAddressPool{bp2},
},
}
expectedLB := &network.LoadBalancer{
Name: pointer.String(clusterName),
LoadBalancerPropertiesFormat: &network.LoadBalancerPropertiesFormat{
BackendAddressPools: &[]network.BackendAddressPool{bp3},
},
}
existingLBs := []network.LoadBalancer{*lb1, *lb2}

cloud := GetTestCloud(ctrl)
mockLBBackendPool := NewMockBackendPool(ctrl)
mockLBBackendPool.EXPECT().EnsureHostsInPool(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), bp1).DoAndReturn(fakeEnsureHostsInPool())
mockLBBackendPool.EXPECT().EnsureHostsInPool(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), bp2).Return(nil)
cloud.LoadBalancerBackendPool = mockLBBackendPool

var err error
lb1, err = cloud.reconcileBackendPoolHosts(lb1, existingLBs, &svc, []*v1.Node{}, clusterName, "vmss", lbBackendPoolIDs)
assert.NoError(t, err)
assert.Equal(t, expectedLB, lb1)
}

func fakeEnsureHostsInPool() func(*v1.Service, []*v1.Node, string, string, string, string, network.BackendAddressPool) error {
return func(svc *v1.Service, nodes []*v1.Node, lbBackendPoolID, vmSet, clusterName, lbName string, backendPool network.BackendAddressPool) error {
backendPool.LoadBalancerBackendAddresses = &[]network.LoadBalancerBackendAddress{
{
LoadBalancerBackendAddressPropertiesFormat: &network.LoadBalancerBackendAddressPropertiesFormat{
IPAddress: pointer.String("10.0.0.2"),
},
},
{
LoadBalancerBackendAddressPropertiesFormat: &network.LoadBalancerBackendAddressPropertiesFormat{
IPAddress: pointer.String("10.0.0.3"),
},
},
}
return nil
}
}

0 comments on commit 6d3c662

Please sign in to comment.