Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-1.27] feat: add annotation to control pls creation rg #4678

Merged
merged 1 commit into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,9 @@ const (
// ServiceAnnotationPLSCreation determines whether a PLS needs to be created.
ServiceAnnotationPLSCreation = "service.beta.kubernetes.io/azure-pls-create"

// ServiceAnnotationPLSResourceGroup determines the resource group to create the PLS in.
ServiceAnnotationPLSResourceGroup = "service.beta.kubernetes.io/azure-pls-resource-group"

// ServiceAnnotationPLSName determines name of the PLS resource to create.
ServiceAnnotationPLSName = "service.beta.kubernetes.io/azure-pls-name"

Expand Down
2 changes: 1 addition & 1 deletion pkg/provider/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ type Cloud struct {
// key: [resourceGroupName]
// Value: sync.Map of [pipName]*PublicIPAddress
pipCache *azcache.TimedCache
// use LB frontEndIpConfiguration ID as the key and search for PLS attached to the frontEnd
// use [resourceGroupName*LBFrontEndIpConfigurationID] as the key and search for PLS attached to the frontEnd
plsCache *azcache.TimedCache

// Add service lister to always get latest service
Expand Down
20 changes: 10 additions & 10 deletions pkg/provider/azure_backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,14 +572,14 @@ func (az *Cloud) CreateOrUpdateVMSS(resourceGroupName string, VMScaleSetName str
return nil
}

func (az *Cloud) CreateOrUpdatePLS(service *v1.Service, pls network.PrivateLinkService) error {
func (az *Cloud) CreateOrUpdatePLS(service *v1.Service, resourceGroup string, pls network.PrivateLinkService) error {
ctx, cancel := getContextWithCancel()
defer cancel()

rerr := az.PrivateLinkServiceClient.CreateOrUpdate(ctx, az.PrivateLinkServiceResourceGroup, pointer.StringDeref(pls.Name, ""), pls, pointer.StringDeref(pls.Etag, ""))
rerr := az.PrivateLinkServiceClient.CreateOrUpdate(ctx, resourceGroup, pointer.StringDeref(pls.Name, ""), pls, pointer.StringDeref(pls.Etag, ""))
if rerr == nil {
// Invalidate the cache right after updating
_ = az.plsCache.Delete(pointer.StringDeref((*pls.LoadBalancerFrontendIPConfigurations)[0].ID, ""))
_ = az.plsCache.Delete(getPLSCacheKey(resourceGroup, pointer.StringDeref((*pls.LoadBalancerFrontendIPConfigurations)[0].ID, "")))
return nil
}

Expand All @@ -589,26 +589,26 @@ func (az *Cloud) CreateOrUpdatePLS(service *v1.Service, pls network.PrivateLinkS
// Invalidate the cache because etag mismatch.
if rerr.HTTPStatusCode == http.StatusPreconditionFailed {
klog.V(3).Infof("Private link service cache for %s is cleanup because of http.StatusPreconditionFailed", pointer.StringDeref(pls.Name, ""))
_ = az.plsCache.Delete(pointer.StringDeref((*pls.LoadBalancerFrontendIPConfigurations)[0].ID, ""))
_ = az.plsCache.Delete(getPLSCacheKey(resourceGroup, pointer.StringDeref((*pls.LoadBalancerFrontendIPConfigurations)[0].ID, "")))
}
// Invalidate the cache because another new operation has canceled the current request.
if strings.Contains(strings.ToLower(rerr.Error().Error()), consts.OperationCanceledErrorMessage) {
klog.V(3).Infof("Private link service for %s is cleanup because CreateOrUpdatePrivateLinkService is canceled by another operation", pointer.StringDeref(pls.Name, ""))
_ = az.plsCache.Delete(pointer.StringDeref((*pls.LoadBalancerFrontendIPConfigurations)[0].ID, ""))
_ = az.plsCache.Delete(getPLSCacheKey(resourceGroup, pointer.StringDeref((*pls.LoadBalancerFrontendIPConfigurations)[0].ID, "")))
}
klog.Errorf("PrivateLinkServiceClient.CreateOrUpdate(%s) failed: %v", pointer.StringDeref(pls.Name, ""), rerr.Error())
return rerr.Error()
}

// DeletePLS invokes az.PrivateLinkServiceClient.Delete with exponential backoff retry
func (az *Cloud) DeletePLS(service *v1.Service, plsName string, plsLBFrontendID string) *retry.Error {
func (az *Cloud) DeletePLS(service *v1.Service, resourceGroup, plsName, plsLBFrontendID string) *retry.Error {
ctx, cancel := getContextWithCancel()
defer cancel()

rerr := az.PrivateLinkServiceClient.Delete(ctx, az.PrivateLinkServiceResourceGroup, plsName)
rerr := az.PrivateLinkServiceClient.Delete(ctx, resourceGroup, plsName)
if rerr == nil {
// Invalidate the cache right after deleting
_ = az.plsCache.Delete(plsLBFrontendID)
_ = az.plsCache.Delete(getPLSCacheKey(resourceGroup, plsLBFrontendID))
return nil
}

Expand All @@ -618,11 +618,11 @@ func (az *Cloud) DeletePLS(service *v1.Service, plsName string, plsLBFrontendID
}

// DeletePEConn invokes az.PrivateLinkServiceClient.DeletePEConnection with exponential backoff retry
func (az *Cloud) DeletePEConn(service *v1.Service, plsName string, peConnName string) *retry.Error {
func (az *Cloud) DeletePEConn(service *v1.Service, resourceGroup, plsName, peConnName string) *retry.Error {
ctx, cancel := getContextWithCancel()
defer cancel()

rerr := az.PrivateLinkServiceClient.DeletePEConnection(ctx, az.PrivateLinkServiceResourceGroup, plsName, peConnName)
rerr := az.PrivateLinkServiceClient.DeletePEConnection(ctx, resourceGroup, plsName, peConnName)
if rerr == nil {
return nil
}
Expand Down
71 changes: 71 additions & 0 deletions pkg/provider/azure_backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

"sigs.k8s.io/cloud-provider-azure/pkg/azureclients/interfaceclient/mockinterfaceclient"
"sigs.k8s.io/cloud-provider-azure/pkg/azureclients/loadbalancerclient/mockloadbalancerclient"
"sigs.k8s.io/cloud-provider-azure/pkg/azureclients/privatelinkserviceclient/mockprivatelinkserviceclient"
"sigs.k8s.io/cloud-provider-azure/pkg/azureclients/publicipclient/mockpublicipclient"
"sigs.k8s.io/cloud-provider-azure/pkg/azureclients/routeclient/mockrouteclient"
"sigs.k8s.io/cloud-provider-azure/pkg/azureclients/routetableclient/mockroutetableclient"
Expand Down Expand Up @@ -592,6 +593,76 @@ func TestCreateOrUpdateVMSS(t *testing.T) {
}
}

func TestCreateOrUpdatePLS(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

tests := []struct {
clientErr *retry.Error
expectedErr error
}{
{
clientErr: &retry.Error{HTTPStatusCode: http.StatusPreconditionFailed},
expectedErr: fmt.Errorf("Retriable: false, RetryAfter: 0s, HTTPStatusCode: 412, RawError: %w", error(nil)),
},
{
clientErr: &retry.Error{RawError: fmt.Errorf(consts.OperationCanceledErrorMessage)},
expectedErr: fmt.Errorf("Retriable: false, RetryAfter: 0s, HTTPStatusCode: 0, RawError: %w", fmt.Errorf("canceledandsupersededduetoanotheroperation")),
},
}

for _, test := range tests {
az := GetTestCloud(ctrl)
az.pipCache.Set("rg*frontendID", "test")

mockPLSClient := az.PrivateLinkServiceClient.(*mockprivatelinkserviceclient.MockInterface)
mockPLSClient.EXPECT().CreateOrUpdate(gomock.Any(), "rg", gomock.Any(), gomock.Any(), gomock.Any()).Return(test.clientErr)
mockPLSClient.EXPECT().List(gomock.Any(), az.ResourceGroup).Return([]network.PrivateLinkService{}, nil)

err := az.CreateOrUpdatePLS(&v1.Service{}, "rg", network.PrivateLinkService{
Name: pointer.String("pls"),
Etag: pointer.String("etag"),
PrivateLinkServiceProperties: &network.PrivateLinkServiceProperties{
LoadBalancerFrontendIPConfigurations: &[]network.FrontendIPConfiguration{
{
ID: pointer.String("frontendID"),
},
},
},
})
assert.EqualError(t, test.expectedErr, err.Error())

// loadbalancer should be removed from cache if the etag is mismatch or the operation is canceled
pls, err := az.plsCache.GetWithDeepCopy("rg*frontendID", cache.CacheReadTypeDefault)
assert.NoError(t, err)
assert.Equal(t, consts.PrivateLinkServiceNotExistID, *pls.(*network.PrivateLinkService).ID)
}
}

func TestDeletePLS(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

az := GetTestCloud(ctrl)
mockPLSClient := az.PrivateLinkServiceClient.(*mockprivatelinkserviceclient.MockInterface)
mockPLSClient.EXPECT().Delete(gomock.Any(), "rg", "pls").Return(&retry.Error{HTTPStatusCode: http.StatusInternalServerError})

err := az.DeletePLS(&v1.Service{}, "rg", "pls", "frontendID")
assert.EqualError(t, fmt.Errorf("Retriable: false, RetryAfter: 0s, HTTPStatusCode: 500, RawError: %w", error(nil)), fmt.Sprintf("%s", err.Error()))
}

func TestDeletePEConn(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

az := GetTestCloud(ctrl)
mockPLSClient := az.PrivateLinkServiceClient.(*mockprivatelinkserviceclient.MockInterface)
mockPLSClient.EXPECT().DeletePEConnection(gomock.Any(), "rg", "pls", "peConn").Return(&retry.Error{HTTPStatusCode: http.StatusInternalServerError})

err := az.DeletePEConn(&v1.Service{}, "rg", "pls", "peConn")
assert.EqualError(t, fmt.Errorf("Retriable: false, RetryAfter: 0s, HTTPStatusCode: 500, RawError: %w", error(nil)), fmt.Sprintf("%s", err.Error()))
}

func TestRequestBackoff(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
21 changes: 16 additions & 5 deletions pkg/provider/azure_privatelinkservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (az *Cloud) reconcilePrivateLinkService(
}

// Secondly, check if there is a private link service already created
existingPLS, err := az.getPrivateLinkService(fipConfigID, azcache.CacheReadTypeDefault)
existingPLS, err := az.getPrivateLinkService(az.getPLSResourceGroup(service), fipConfigID, azcache.CacheReadTypeDefault)
if err != nil {
klog.Errorf("reconcilePrivateLinkService for service(%s): getPrivateLinkService(%s) failed: %v", serviceName, pointer.StringDeref(fipConfigID, ""), err)
return err
Expand Down Expand Up @@ -153,14 +153,14 @@ func (az *Cloud) reconcilePrivateLinkService(
return err
}
existingPLS.Etag = pointer.String("")
err = az.CreateOrUpdatePLS(service, existingPLS)
err = az.CreateOrUpdatePLS(service, az.getPLSResourceGroup(service), existingPLS)
if err != nil {
klog.Errorf("reconcilePrivateLinkService for service(%s) abort backoff: pls(%s) - updating: %s", serviceName, plsName, err.Error())
return err
}
}
} else if !wantPLS {
existingPLS, err := az.getPrivateLinkService(fipConfigID, azcache.CacheReadTypeDefault)
existingPLS, err := az.getPrivateLinkService(az.getPLSResourceGroup(service), fipConfigID, azcache.CacheReadTypeDefault)
if err != nil {
klog.Errorf("reconcilePrivateLinkService for service(%s): getPrivateLinkService(%s) failed: %v", serviceName, pointer.StringDeref(fipConfigID, ""), err)
return err
Expand All @@ -181,6 +181,17 @@ func (az *Cloud) reconcilePrivateLinkService(
return nil
}

func (az *Cloud) getPLSResourceGroup(service *v1.Service) string {
if resourceGroup, found := service.Annotations[consts.ServiceAnnotationPLSResourceGroup]; found {
resourceGroupName := strings.TrimSpace(resourceGroup)
if len(resourceGroupName) > 0 {
return resourceGroupName
}
}

return az.PrivateLinkServiceResourceGroup
}

func (az *Cloud) disablePLSNetworkPolicy(service *v1.Service) error {
serviceName := getServiceName(service)
subnetName := getPLSSubnetName(service)
Expand Down Expand Up @@ -218,14 +229,14 @@ func (az *Cloud) safeDeletePLS(pls *network.PrivateLinkService, service *v1.Serv
if peConns != nil {
for _, peConn := range *peConns {
klog.V(2).Infof("deletePLS: deleting PEConnection %s", pointer.StringDeref(peConn.Name, ""))
rerr := az.DeletePEConn(service, pointer.StringDeref(pls.Name, ""), pointer.StringDeref(peConn.Name, ""))
rerr := az.DeletePEConn(service, az.getPLSResourceGroup(service), pointer.StringDeref(pls.Name, ""), pointer.StringDeref(peConn.Name, ""))
if rerr != nil {
return rerr
}
}
}

rerr := az.DeletePLS(service, pointer.StringDeref(pls.Name, ""), pointer.StringDeref((*pls.LoadBalancerFrontendIPConfigurations)[0].ID, ""))
rerr := az.DeletePLS(service, az.getPLSResourceGroup(service), pointer.StringDeref(pls.Name, ""), pointer.StringDeref((*pls.LoadBalancerFrontendIPConfigurations)[0].ID, ""))
if rerr != nil {
return rerr
}
Expand Down
29 changes: 29 additions & 0 deletions pkg/provider/azure_privatelinkservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,35 @@ func TestReconcilePrivateLinkService(t *testing.T) {
}
}

func TestGetPLSResourceGroup(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

testCases := []struct {
desc string
annotations map[string]string
expectedRG string
}{
{
desc: "getPLSResourceGroup should return resource group from annotation",
annotations: map[string]string{
consts.ServiceAnnotationPLSResourceGroup: "testRG",
},
expectedRG: "testRG",
},
{
desc: "getPLSResourceGroup should return resource group from azure config when annotation is not set",
expectedRG: "rg",
},
}
for i, test := range testCases {
az := GetTestCloud(ctrl)
service := getTestServiceWithAnnotation("test", test.annotations, false, 80)
rg := az.getPLSResourceGroup(&service)
assert.Equal(t, test.expectedRG, rg, "TestCase[%d]: %s", i, test.desc)
}
}

func TestDisablePLSNetworkPolicy(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
20 changes: 15 additions & 5 deletions pkg/provider/azure_wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ func (az *Cloud) getSecurityGroup(crt azcache.AzureCacheReadType) (network.Secur
return *(securityGroup.(*network.SecurityGroup)), nil
}

func (az *Cloud) getPrivateLinkService(frontendIPConfigID *string, crt azcache.AzureCacheReadType) (pls network.PrivateLinkService, err error) {
cachedPLS, err := az.plsCache.GetWithDeepCopy(*frontendIPConfigID, crt)
func (az *Cloud) getPrivateLinkService(resourceGroup string, frontendIPConfigID *string, crt azcache.AzureCacheReadType) (pls network.PrivateLinkService, err error) {
cachedPLS, err := az.plsCache.GetWithDeepCopy(getPLSCacheKey(resourceGroup, *frontendIPConfigID), crt)
if err != nil {
return pls, err
}
Expand Down Expand Up @@ -341,12 +341,22 @@ func (az *Cloud) newPIPCache() (*azcache.TimedCache, error) {
return azcache.NewTimedcache(time.Duration(az.PublicIPCacheTTLInSeconds)*time.Second, getter)
}

func getPLSCacheKey(resourceGroup, plsLBFrontendID string) string {
return fmt.Sprintf("%s*%s", resourceGroup, plsLBFrontendID)
}

func parsePLSCacheKey(key string) (string, string) {
splits := strings.Split(key, "*")
return splits[0], splits[1]
}

func (az *Cloud) newPLSCache() (*azcache.TimedCache, error) {
// for PLS cache, key is LBFrontendIPConfiguration ID
getter := func(key string) (interface{}, error) {
ctx, cancel := getContextWithCancel()
defer cancel()
plsList, err := az.PrivateLinkServiceClient.List(ctx, az.PrivateLinkServiceResourceGroup)
resourceGroup, frontendID := parsePLSCacheKey(key)
plsList, err := az.PrivateLinkServiceClient.List(ctx, resourceGroup)
exists, rerr := checkResourceExistsFromError(err)
if rerr != nil {
return nil, rerr.Error()
Expand All @@ -363,15 +373,15 @@ func (az *Cloud) newPLSCache() (*azcache.TimedCache, error) {
continue
}
for _, fipConfig := range *fipConfigs {
if strings.EqualFold(*fipConfig.ID, key) {
if strings.EqualFold(*fipConfig.ID, frontendID) {
return &pls, nil
}
}

}
}

klog.V(2).Infof("No privateLinkService found for frontendIPConfig %q", key)
klog.V(2).Infof("No privateLinkService found for frontendIPConfig %q in rg %q", frontendID, resourceGroup)
plsNotExistID := consts.PrivateLinkServiceNotExistID
return &network.PrivateLinkService{ID: &plsNotExistID}, nil
}
Expand Down
44 changes: 44 additions & 0 deletions pkg/provider/azure_wrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/pointer"

"sigs.k8s.io/cloud-provider-azure/pkg/azureclients/privatelinkserviceclient/mockprivatelinkserviceclient"
"sigs.k8s.io/cloud-provider-azure/pkg/azureclients/publicipclient/mockpublicipclient"
azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
"sigs.k8s.io/cloud-provider-azure/pkg/consts"
Expand Down Expand Up @@ -406,3 +407,46 @@ func TestListPIP(t *testing.T) {
})
}
}

func TestGetPrivateLinkService(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

az := GetTestCloud(ctrl)
az.plsCache.Set("rg*frontendID", &network.PrivateLinkService{Name: pointer.String("pls")})

// cache hit
pls, err := az.getPrivateLinkService("rg", pointer.String("frontendID"), azcache.CacheReadTypeDefault)
assert.NoError(t, err)
assert.Equal(t, "pls", *pls.Name)

// cache miss
mockPLSClient := az.PrivateLinkServiceClient.(*mockprivatelinkserviceclient.MockInterface)
mockPLSClient.EXPECT().List(gomock.Any(), "rg1").Return([]network.PrivateLinkService{
{
Name: pointer.String("pls1"),
PrivateLinkServiceProperties: &network.PrivateLinkServiceProperties{
LoadBalancerFrontendIPConfigurations: &[]network.FrontendIPConfiguration{
{
ID: pointer.String("frontendID1"),
},
},
},
},
}, nil)
pls, err = az.getPrivateLinkService("rg1", pointer.String("frontendID1"), azcache.CacheReadTypeDefault)
assert.NoError(t, err)
assert.Equal(t, "pls1", *pls.Name)
}

func TestGetPLSCacheKey(t *testing.T) {
rg, frontendID := "rg", "/subscriptions/sub/resourceGroups/rg/providers/Microsoft.Network/loadBalancers/lb/frontendIPConfigurations/ipconfig"
assert.Equal(t, "rg*/subscriptions/sub/resourceGroups/rg/providers/Microsoft.Network/loadBalancers/lb/frontendIPConfigurations/ipconfig", getPLSCacheKey(rg, frontendID))
}

func TestParsePLSCacheKey(t *testing.T) {
key := "rg*/subscriptions/sub/resourceGroups/rg/providers/Microsoft.Network/loadBalancers/lb/frontendIPConfigurations/ipconfig"
rg, frontendID := parsePLSCacheKey(key)
assert.Equal(t, "rg", rg)
assert.Equal(t, "/subscriptions/sub/resourceGroups/rg/providers/Microsoft.Network/loadBalancers/lb/frontendIPConfigurations/ipconfig", frontendID)
}