Skip to content

Commit

Permalink
Get latest service from serviceLister before UpdateLoadBalancer and s…
Browse files Browse the repository at this point in the history
…erialize LB reconcile
  • Loading branch information
jwtty authored and k8s-infra-cherrypick-robot committed Sep 28, 2022
1 parent 5913ff2 commit 72f2e06
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 1 deletion.
8 changes: 8 additions & 0 deletions pkg/provider/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
Expand Down Expand Up @@ -359,6 +360,11 @@ type Cloud struct {
// use LB frontEndIpConfiguration ID as the key and search for PLS attached to the frontEnd
plsCache *azcache.TimedCache

// Add service lister to always get latest service
serviceLister corelisters.ServiceLister
// node-sync-loop routine and service-reconcile routine should not update LoadBalancer at the same time
serviceReconcileLock sync.Mutex

*ManagedDiskController
*controllerCommon
}
Expand Down Expand Up @@ -1061,6 +1067,8 @@ func (az *Cloud) SetInformers(informerFactory informers.SharedInformerFactory) {
},
})
az.nodeInformerSynced = nodeInformer.HasSynced

az.serviceLister = informerFactory.Core().V1().Services().Lister()
}

// updateNodeCaches updates local cache for node's zones and external resource groups.
Expand Down
40 changes: 39 additions & 1 deletion pkg/provider/azure_loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/Azure/go-autorest/autorest/to"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
cloudprovider "k8s.io/cloud-provider"
Expand Down Expand Up @@ -144,6 +145,11 @@ func (az *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, ser
// When a client updates the internal load balancer annotation,
// the service may be switched from an internal LB to a public one, or vise versa.
// Here we'll firstly ensure service do not lie in the opposite LB.

// Serialize service reconcile process
az.serviceReconcileLock.Lock()
defer az.serviceReconcileLock.Unlock()

var err error
serviceName := getServiceName(service)
mc := metrics.NewMetricContext("services", "ensure_loadbalancer", az.ResourceGroup, az.getNetworkResourceSubscriptionID(), serviceName)
Expand All @@ -164,8 +170,25 @@ func (az *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, ser
return lbStatus, nil
}

func (az *Cloud) getLatestService(service *v1.Service) (*v1.Service, bool, error) {
latestService, err := az.serviceLister.Services(service.Namespace).Get(service.Name)
switch {
case apierrors.IsNotFound(err):
// service absence in store means the service deletion is caught by watcher
return nil, false, nil
case err != nil:
return nil, false, err
default:
return latestService.DeepCopy(), true, nil
}
}

// UpdateLoadBalancer updates hosts under the specified load balancer.
func (az *Cloud) UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error {
// Serialize service reconcile process
az.serviceReconcileLock.Lock()
defer az.serviceReconcileLock.Unlock()

var err error
serviceName := getServiceName(service)
mc := metrics.NewMetricContext("services", "update_loadbalancer", az.ResourceGroup, az.getNetworkResourceSubscriptionID(), serviceName)
Expand All @@ -176,6 +199,17 @@ func (az *Cloud) UpdateLoadBalancer(ctx context.Context, clusterName string, ser
klog.V(5).InfoS("UpdateLoadBalancer Finish", "service", serviceName, "cluster", clusterName, "service_spec", service, "error", err)
}()

// In case UpdateLoadBalancer gets stale service spec, retrieve the latest from lister
service, serviceExists, err := az.getLatestService(service)
if err != nil {
return fmt.Errorf("UpdateLoadBalancer: failed to get latest service %s: %w", service.Name, err)
}
if !serviceExists {
isOperationSucceeded = true
klog.V(2).Infof("UpdateLoadBalancer: skipping service %s because service is going to be deleted", service.Name)
return nil
}

shouldUpdateLB, err := az.shouldUpdateLoadBalancer(clusterName, service, nodes)
if err != nil {
return err
Expand Down Expand Up @@ -203,6 +237,10 @@ func (az *Cloud) UpdateLoadBalancer(ctx context.Context, clusterName string, ser
// have multiple underlying components, meaning a Get could say that the LB
// doesn't exist even if some part of it is still laying around.
func (az *Cloud) EnsureLoadBalancerDeleted(ctx context.Context, clusterName string, service *v1.Service) error {
// Serialize service reconcile process
az.serviceReconcileLock.Lock()
defer az.serviceReconcileLock.Unlock()

var err error
isInternal := requiresInternalLoadBalancer(service)
serviceName := getServiceName(service)
Expand Down Expand Up @@ -2599,7 +2637,7 @@ func (az *Cloud) shouldUpdateLoadBalancer(clusterName string, service *v1.Servic
}

_, _, existsLb, _ := az.getServiceLoadBalancer(service, clusterName, nodes, false, existingManagedLBs)
return existsLb && service.ObjectMeta.DeletionTimestamp == nil, nil
return existsLb && service.ObjectMeta.DeletionTimestamp == nil && service.Spec.Type == v1.ServiceTypeLoadBalancer, nil
}

func logSafe(s *string) string {
Expand Down
13 changes: 13 additions & 0 deletions pkg/provider/azure_loadbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4221,30 +4221,42 @@ func TestShouldUpdateLoadBalancer(t *testing.T) {
testCases := []struct {
desc string
lbHasDeletionTimestamp bool
serviceType v1.ServiceType
existsLb bool
expectedOutput bool
}{
{
desc: "should update a load balancer that does not have a deletion timestamp and exists in Azure",
lbHasDeletionTimestamp: false,
serviceType: v1.ServiceTypeLoadBalancer,
existsLb: true,
expectedOutput: true,
},
{
desc: "should not update a load balancer that is being deleted / already deleted in K8s",
lbHasDeletionTimestamp: true,
serviceType: v1.ServiceTypeLoadBalancer,
existsLb: true,
expectedOutput: false,
},
{
desc: "should not update a load balancer that is no longer LoadBalancer type in K8s",
lbHasDeletionTimestamp: false,
serviceType: v1.ServiceTypeClusterIP,
existsLb: true,
expectedOutput: false,
},
{
desc: "should not update a load balancer that does not exist in Azure",
lbHasDeletionTimestamp: false,
serviceType: v1.ServiceTypeLoadBalancer,
existsLb: false,
expectedOutput: false,
},
{
desc: "should not update a load balancer that has a deletion timestamp and does not exist in Azure",
lbHasDeletionTimestamp: true,
serviceType: v1.ServiceTypeLoadBalancer,
existsLb: false,
expectedOutput: false,
},
Expand All @@ -4253,6 +4265,7 @@ func TestShouldUpdateLoadBalancer(t *testing.T) {
for i, test := range testCases {
az := GetTestCloud(ctrl)
service := getTestService("test1", v1.ProtocolTCP, nil, false, 80)
service.Spec.Type = test.serviceType
setMockPublicIPs(az, ctrl, 1)
mockLBsClient := mockloadbalancerclient.NewMockInterface(ctrl)
az.LoadBalancerClient = mockLBsClient
Expand Down

0 comments on commit 72f2e06

Please sign in to comment.