Skip to content

Commit

Permalink
Merge pull request #4675 from k8s-infra-cherrypick-robot/cherry-pick-…
Browse files Browse the repository at this point in the history
…4660-to-release-1.28

[release-1.28] feat: add annotation to control pls creation rg
  • Loading branch information
k8s-ci-robot committed Sep 21, 2023
2 parents d8ba411 + f560040 commit b0b18ed
Show file tree
Hide file tree
Showing 7 changed files with 252 additions and 21 deletions.
3 changes: 3 additions & 0 deletions pkg/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,9 @@ const (
// ServiceAnnotationPLSCreation determines whether a PLS needs to be created.
ServiceAnnotationPLSCreation = "service.beta.kubernetes.io/azure-pls-create"

// ServiceAnnotationPLSResourceGroup determines the resource group to create the PLS in.
ServiceAnnotationPLSResourceGroup = "service.beta.kubernetes.io/azure-pls-resource-group"

// ServiceAnnotationPLSName determines name of the PLS resource to create.
ServiceAnnotationPLSName = "service.beta.kubernetes.io/azure-pls-name"

Expand Down
2 changes: 1 addition & 1 deletion pkg/provider/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ type Cloud struct {
// key: [resourceGroupName]
// Value: sync.Map of [pipName]*PublicIPAddress
pipCache azcache.Resource
// use LB frontEndIpConfiguration ID as the key and search for PLS attached to the frontEnd
// use [resourceGroupName*LBFrontEndIpConfigurationID] as the key and search for PLS attached to the frontEnd
plsCache azcache.Resource
// a timed cache storing storage account properties to avoid querying storage account frequently
storageAccountCache azcache.Resource
Expand Down
21 changes: 16 additions & 5 deletions pkg/provider/azure_privatelinkservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (az *Cloud) reconcilePrivateLinkService(
}

// Secondly, check if there is a private link service already created
existingPLS, err := az.getPrivateLinkService(fipConfigID, azcache.CacheReadTypeDefault)
existingPLS, err := az.getPrivateLinkService(az.getPLSResourceGroup(service), fipConfigID, azcache.CacheReadTypeDefault)
if err != nil {
klog.Errorf("reconcilePrivateLinkService for service(%s): getPrivateLinkService(%s) failed: %v", serviceName, pointer.StringDeref(fipConfigID, ""), err)
return err
Expand Down Expand Up @@ -153,14 +153,14 @@ func (az *Cloud) reconcilePrivateLinkService(
return err
}
existingPLS.Etag = pointer.String("")
err = az.CreateOrUpdatePLS(service, existingPLS)
err = az.CreateOrUpdatePLS(service, az.getPLSResourceGroup(service), existingPLS)
if err != nil {
klog.Errorf("reconcilePrivateLinkService for service(%s) abort backoff: pls(%s) - updating: %s", serviceName, plsName, err.Error())
return err
}
}
} else if !wantPLS {
existingPLS, err := az.getPrivateLinkService(fipConfigID, azcache.CacheReadTypeDefault)
existingPLS, err := az.getPrivateLinkService(az.getPLSResourceGroup(service), fipConfigID, azcache.CacheReadTypeDefault)
if err != nil {
klog.Errorf("reconcilePrivateLinkService for service(%s): getPrivateLinkService(%s) failed: %v", serviceName, pointer.StringDeref(fipConfigID, ""), err)
return err
Expand All @@ -181,6 +181,17 @@ func (az *Cloud) reconcilePrivateLinkService(
return nil
}

func (az *Cloud) getPLSResourceGroup(service *v1.Service) string {
if resourceGroup, found := service.Annotations[consts.ServiceAnnotationPLSResourceGroup]; found {
resourceGroupName := strings.TrimSpace(resourceGroup)
if len(resourceGroupName) > 0 {
return resourceGroupName
}
}

return az.PrivateLinkServiceResourceGroup
}

func (az *Cloud) disablePLSNetworkPolicy(service *v1.Service) error {
serviceName := getServiceName(service)
subnetName := getPLSSubnetName(service)
Expand Down Expand Up @@ -218,14 +229,14 @@ func (az *Cloud) safeDeletePLS(pls *network.PrivateLinkService, service *v1.Serv
if peConns != nil {
for _, peConn := range *peConns {
klog.V(2).Infof("deletePLS: deleting PEConnection %s", pointer.StringDeref(peConn.Name, ""))
rerr := az.DeletePEConn(service, pointer.StringDeref(pls.Name, ""), pointer.StringDeref(peConn.Name, ""))
rerr := az.DeletePEConn(service, az.getPLSResourceGroup(service), pointer.StringDeref(pls.Name, ""), pointer.StringDeref(peConn.Name, ""))
if rerr != nil {
return rerr
}
}
}

rerr := az.DeletePLS(service, pointer.StringDeref(pls.Name, ""), pointer.StringDeref((*pls.LoadBalancerFrontendIPConfigurations)[0].ID, ""))
rerr := az.DeletePLS(service, az.getPLSResourceGroup(service), pointer.StringDeref(pls.Name, ""), pointer.StringDeref((*pls.LoadBalancerFrontendIPConfigurations)[0].ID, ""))
if rerr != nil {
return rerr
}
Expand Down
41 changes: 26 additions & 15 deletions pkg/provider/azure_privatelinkservice_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package provider

import (
"encoding/json"
"fmt"
"net/http"
"strings"
"time"
Expand All @@ -32,14 +33,14 @@ import (
"sigs.k8s.io/cloud-provider-azure/pkg/retry"
)

func (az *Cloud) CreateOrUpdatePLS(service *v1.Service, pls network.PrivateLinkService) error {
func (az *Cloud) CreateOrUpdatePLS(service *v1.Service, resourceGroup string, pls network.PrivateLinkService) error {
ctx, cancel := getContextWithCancel()
defer cancel()

rerr := az.PrivateLinkServiceClient.CreateOrUpdate(ctx, az.PrivateLinkServiceResourceGroup, pointer.StringDeref(pls.Name, ""), pls, pointer.StringDeref(pls.Etag, ""))
rerr := az.PrivateLinkServiceClient.CreateOrUpdate(ctx, resourceGroup, pointer.StringDeref(pls.Name, ""), pls, pointer.StringDeref(pls.Etag, ""))
if rerr == nil {
// Invalidate the cache right after updating
_ = az.plsCache.Delete(pointer.StringDeref((*pls.LoadBalancerFrontendIPConfigurations)[0].ID, ""))
_ = az.plsCache.Delete(getPLSCacheKey(resourceGroup, pointer.StringDeref((*pls.LoadBalancerFrontendIPConfigurations)[0].ID, "")))
return nil
}

Expand All @@ -49,26 +50,26 @@ func (az *Cloud) CreateOrUpdatePLS(service *v1.Service, pls network.PrivateLinkS
// Invalidate the cache because etag mismatch.
if rerr.HTTPStatusCode == http.StatusPreconditionFailed {
klog.V(3).Infof("Private link service cache for %s is cleanup because of http.StatusPreconditionFailed", pointer.StringDeref(pls.Name, ""))
_ = az.plsCache.Delete(pointer.StringDeref((*pls.LoadBalancerFrontendIPConfigurations)[0].ID, ""))
_ = az.plsCache.Delete(getPLSCacheKey(resourceGroup, pointer.StringDeref((*pls.LoadBalancerFrontendIPConfigurations)[0].ID, "")))
}
// Invalidate the cache because another new operation has canceled the current request.
if strings.Contains(strings.ToLower(rerr.Error().Error()), consts.OperationCanceledErrorMessage) {
klog.V(3).Infof("Private link service for %s is cleanup because CreateOrUpdatePrivateLinkService is canceled by another operation", pointer.StringDeref(pls.Name, ""))
_ = az.plsCache.Delete(pointer.StringDeref((*pls.LoadBalancerFrontendIPConfigurations)[0].ID, ""))
_ = az.plsCache.Delete(getPLSCacheKey(resourceGroup, pointer.StringDeref((*pls.LoadBalancerFrontendIPConfigurations)[0].ID, "")))
}
klog.Errorf("PrivateLinkServiceClient.CreateOrUpdate(%s) failed: %v", pointer.StringDeref(pls.Name, ""), rerr.Error())
return rerr.Error()
}

// DeletePLS invokes az.PrivateLinkServiceClient.Delete with exponential backoff retry
func (az *Cloud) DeletePLS(service *v1.Service, plsName string, plsLBFrontendID string) *retry.Error {
func (az *Cloud) DeletePLS(service *v1.Service, resourceGroup, plsName, plsLBFrontendID string) *retry.Error {
ctx, cancel := getContextWithCancel()
defer cancel()

rerr := az.PrivateLinkServiceClient.Delete(ctx, az.PrivateLinkServiceResourceGroup, plsName)
rerr := az.PrivateLinkServiceClient.Delete(ctx, resourceGroup, plsName)
if rerr == nil {
// Invalidate the cache right after deleting
_ = az.plsCache.Delete(plsLBFrontendID)
_ = az.plsCache.Delete(getPLSCacheKey(resourceGroup, plsLBFrontendID))
return nil
}

Expand All @@ -78,11 +79,11 @@ func (az *Cloud) DeletePLS(service *v1.Service, plsName string, plsLBFrontendID
}

// DeletePEConn invokes az.PrivateLinkServiceClient.DeletePEConnection with exponential backoff retry
func (az *Cloud) DeletePEConn(service *v1.Service, plsName string, peConnName string) *retry.Error {
func (az *Cloud) DeletePEConn(service *v1.Service, resourceGroup, plsName, peConnName string) *retry.Error {
ctx, cancel := getContextWithCancel()
defer cancel()

rerr := az.PrivateLinkServiceClient.DeletePEConnection(ctx, az.PrivateLinkServiceResourceGroup, plsName, peConnName)
rerr := az.PrivateLinkServiceClient.DeletePEConnection(ctx, resourceGroup, plsName, peConnName)
if rerr == nil {
return nil
}
Expand All @@ -97,7 +98,8 @@ func (az *Cloud) newPLSCache() (azcache.Resource, error) {
getter := func(key string) (interface{}, error) {
ctx, cancel := getContextWithCancel()
defer cancel()
plsList, err := az.PrivateLinkServiceClient.List(ctx, az.PrivateLinkServiceResourceGroup)
resourceGroup, frontendID := parsePLSCacheKey(key)
plsList, err := az.PrivateLinkServiceClient.List(ctx, resourceGroup)
exists, rerr := checkResourceExistsFromError(err)
if rerr != nil {
return nil, rerr.Error()
Expand All @@ -114,15 +116,15 @@ func (az *Cloud) newPLSCache() (azcache.Resource, error) {
continue
}
for _, fipConfig := range *fipConfigs {
if strings.EqualFold(*fipConfig.ID, key) {
if strings.EqualFold(*fipConfig.ID, frontendID) {
return &pls, nil
}
}

}
}

klog.V(2).Infof("No privateLinkService found for frontendIPConfig %q", key)
klog.V(2).Infof("No privateLinkService found for frontendIPConfig %q in rg %q", frontendID, resourceGroup)
plsNotExistID := consts.PrivateLinkServiceNotExistID
return &network.PrivateLinkService{ID: &plsNotExistID}, nil
}
Expand All @@ -133,10 +135,19 @@ func (az *Cloud) newPLSCache() (azcache.Resource, error) {
return azcache.NewTimedCache(time.Duration(az.PlsCacheTTLInSeconds)*time.Second, getter, az.Config.DisableAPICallCache)
}

func (az *Cloud) getPrivateLinkService(frontendIPConfigID *string, crt azcache.AzureCacheReadType) (pls network.PrivateLinkService, err error) {
cachedPLS, err := az.plsCache.GetWithDeepCopy(*frontendIPConfigID, crt)
func (az *Cloud) getPrivateLinkService(resourceGroup string, frontendIPConfigID *string, crt azcache.AzureCacheReadType) (pls network.PrivateLinkService, err error) {
cachedPLS, err := az.plsCache.GetWithDeepCopy(getPLSCacheKey(resourceGroup, *frontendIPConfigID), crt)
if err != nil {
return pls, err
}
return *(cachedPLS.(*network.PrivateLinkService)), nil
}

func getPLSCacheKey(resourceGroup, plsLBFrontendID string) string {
return fmt.Sprintf("%s*%s", resourceGroup, plsLBFrontendID)
}

func parsePLSCacheKey(key string) (string, string) {
splits := strings.Split(key, "*")
return splits[0], splits[1]
}
148 changes: 148 additions & 0 deletions pkg/provider/azure_privatelinkservice_repo_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package provider

import (
"fmt"
"net/http"
"testing"

"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2022-07-01/network"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
"k8s.io/utils/pointer"

"sigs.k8s.io/cloud-provider-azure/pkg/azureclients/privatelinkserviceclient/mockprivatelinkserviceclient"
"sigs.k8s.io/cloud-provider-azure/pkg/cache"
azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
"sigs.k8s.io/cloud-provider-azure/pkg/consts"
"sigs.k8s.io/cloud-provider-azure/pkg/retry"
)

func TestCreateOrUpdatePLS(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

tests := []struct {
clientErr *retry.Error
expectedErr error
}{
{
clientErr: &retry.Error{HTTPStatusCode: http.StatusPreconditionFailed},
expectedErr: fmt.Errorf("Retriable: false, RetryAfter: 0s, HTTPStatusCode: 412, RawError: %w", error(nil)),
},
{
clientErr: &retry.Error{RawError: fmt.Errorf(consts.OperationCanceledErrorMessage)},
expectedErr: fmt.Errorf("Retriable: false, RetryAfter: 0s, HTTPStatusCode: 0, RawError: %w", fmt.Errorf("canceledandsupersededduetoanotheroperation")),
},
}

for _, test := range tests {
az := GetTestCloud(ctrl)
az.pipCache.Set("rg*frontendID", "test")

mockPLSClient := az.PrivateLinkServiceClient.(*mockprivatelinkserviceclient.MockInterface)
mockPLSClient.EXPECT().CreateOrUpdate(gomock.Any(), "rg", gomock.Any(), gomock.Any(), gomock.Any()).Return(test.clientErr)
mockPLSClient.EXPECT().List(gomock.Any(), az.ResourceGroup).Return([]network.PrivateLinkService{}, nil)

err := az.CreateOrUpdatePLS(&v1.Service{}, "rg", network.PrivateLinkService{
Name: pointer.String("pls"),
Etag: pointer.String("etag"),
PrivateLinkServiceProperties: &network.PrivateLinkServiceProperties{
LoadBalancerFrontendIPConfigurations: &[]network.FrontendIPConfiguration{
{
ID: pointer.String("frontendID"),
},
},
},
})
assert.EqualError(t, test.expectedErr, err.Error())

// loadbalancer should be removed from cache if the etag is mismatch or the operation is canceled
pls, err := az.plsCache.GetWithDeepCopy("rg*frontendID", cache.CacheReadTypeDefault)
assert.NoError(t, err)
assert.Equal(t, consts.PrivateLinkServiceNotExistID, *pls.(*network.PrivateLinkService).ID)
}
}

func TestDeletePLS(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

az := GetTestCloud(ctrl)
mockPLSClient := az.PrivateLinkServiceClient.(*mockprivatelinkserviceclient.MockInterface)
mockPLSClient.EXPECT().Delete(gomock.Any(), "rg", "pls").Return(&retry.Error{HTTPStatusCode: http.StatusInternalServerError})

err := az.DeletePLS(&v1.Service{}, "rg", "pls", "frontendID")
assert.EqualError(t, fmt.Errorf("Retriable: false, RetryAfter: 0s, HTTPStatusCode: 500, RawError: %w", error(nil)), fmt.Sprintf("%s", err.Error()))
}

func TestDeletePEConn(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

az := GetTestCloud(ctrl)
mockPLSClient := az.PrivateLinkServiceClient.(*mockprivatelinkserviceclient.MockInterface)
mockPLSClient.EXPECT().DeletePEConnection(gomock.Any(), "rg", "pls", "peConn").Return(&retry.Error{HTTPStatusCode: http.StatusInternalServerError})

err := az.DeletePEConn(&v1.Service{}, "rg", "pls", "peConn")
assert.EqualError(t, fmt.Errorf("Retriable: false, RetryAfter: 0s, HTTPStatusCode: 500, RawError: %w", error(nil)), fmt.Sprintf("%s", err.Error()))
}

func TestGetPrivateLinkService(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

az := GetTestCloud(ctrl)
az.plsCache.Set("rg*frontendID", &network.PrivateLinkService{Name: pointer.String("pls")})

// cache hit
pls, err := az.getPrivateLinkService("rg", pointer.String("frontendID"), azcache.CacheReadTypeDefault)
assert.NoError(t, err)
assert.Equal(t, "pls", *pls.Name)

// cache miss
mockPLSClient := az.PrivateLinkServiceClient.(*mockprivatelinkserviceclient.MockInterface)
mockPLSClient.EXPECT().List(gomock.Any(), "rg1").Return([]network.PrivateLinkService{
{
Name: pointer.String("pls1"),
PrivateLinkServiceProperties: &network.PrivateLinkServiceProperties{
LoadBalancerFrontendIPConfigurations: &[]network.FrontendIPConfiguration{
{
ID: pointer.String("frontendID1"),
},
},
},
},
}, nil)
pls, err = az.getPrivateLinkService("rg1", pointer.String("frontendID1"), azcache.CacheReadTypeDefault)
assert.NoError(t, err)
assert.Equal(t, "pls1", *pls.Name)
}

func TestGetPLSCacheKey(t *testing.T) {
rg, frontendID := "rg", "/subscriptions/sub/resourceGroups/rg/providers/Microsoft.Network/loadBalancers/lb/frontendIPConfigurations/ipconfig"
assert.Equal(t, "rg*/subscriptions/sub/resourceGroups/rg/providers/Microsoft.Network/loadBalancers/lb/frontendIPConfigurations/ipconfig", getPLSCacheKey(rg, frontendID))
}

func TestParsePLSCacheKey(t *testing.T) {
key := "rg*/subscriptions/sub/resourceGroups/rg/providers/Microsoft.Network/loadBalancers/lb/frontendIPConfigurations/ipconfig"
rg, frontendID := parsePLSCacheKey(key)
assert.Equal(t, "rg", rg)
assert.Equal(t, "/subscriptions/sub/resourceGroups/rg/providers/Microsoft.Network/loadBalancers/lb/frontendIPConfigurations/ipconfig", frontendID)
}
29 changes: 29 additions & 0 deletions pkg/provider/azure_privatelinkservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,35 @@ func TestReconcilePrivateLinkService(t *testing.T) {
}
}

func TestGetPLSResourceGroup(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

testCases := []struct {
desc string
annotations map[string]string
expectedRG string
}{
{
desc: "getPLSResourceGroup should return resource group from annotation",
annotations: map[string]string{
consts.ServiceAnnotationPLSResourceGroup: "testRG",
},
expectedRG: "testRG",
},
{
desc: "getPLSResourceGroup should return resource group from azure config when annotation is not set",
expectedRG: "rg",
},
}
for i, test := range testCases {
az := GetTestCloud(ctrl)
service := getTestServiceWithAnnotation("test", test.annotations, false, 80)
rg := az.getPLSResourceGroup(&service)
assert.Equal(t, test.expectedRG, rg, "TestCase[%d]: %s", i, test.desc)
}
}

func TestDisablePLSNetworkPolicy(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down

0 comments on commit b0b18ed

Please sign in to comment.