Skip to content

Commit

Permalink
feat: support migrate from NIC-based to IP-based backend pool by migr…
Browse files Browse the repository at this point in the history
…ation API by setting `"enableMigrateToIPBasedBackendPoolAPI": true`
  • Loading branch information
nilo19 committed Jun 6, 2023
1 parent 9c5b598 commit 84c6462
Show file tree
Hide file tree
Showing 18 changed files with 630 additions and 125 deletions.
124 changes: 122 additions & 2 deletions pkg/azureclients/loadbalancerclient/azure_loadbalancerclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ type Client struct {
RetryAfterWriter time.Time
}

type backendPoolsToBeMigrated struct {
BackendPoolNames []string `json:"pools"`
}

// New creates a new LoadBalancer client with ratelimiting.
func New(config *azclients.ClientConfig) *Client {
baseURI := config.ResourceManagerEndpoint
Expand Down Expand Up @@ -421,6 +425,69 @@ func (page LoadBalancerListResultPage) Values() []network.LoadBalancer {
return *page.lblr.Value
}

// GetLBBackendPool gets a LoadBalancer backend pool.
func (c *Client) GetLBBackendPool(ctx context.Context, resourceGroupName string, loadBalancerName string, backendPoolName string, expand string) (network.BackendAddressPool, *retry.Error) {
mc := metrics.NewMetricContext("load_balancers", "get_backend_pool", resourceGroupName, c.subscriptionID, "")

// Report errors if the client is rate limited.
if !c.rateLimiterReader.TryAccept() {
mc.RateLimitedCount()
return network.BackendAddressPool{}, retry.GetRateLimitError(false, "LBGet")
}

// Report errors if the client is throttled.
if c.RetryAfterReader.After(time.Now()) {
mc.ThrottledCount()
rerr := retry.GetThrottlingError("LBBackendPoolGet", "client throttled", c.RetryAfterReader)
return network.BackendAddressPool{}, rerr
}

result, rerr := c.getLBBackendPool(ctx, resourceGroupName, loadBalancerName, backendPoolName, expand)
mc.Observe(rerr)
if rerr != nil {
if rerr.IsThrottled() {
// Update RetryAfterReader so that no more requests would be sent until RetryAfter expires.
c.RetryAfterReader = rerr.RetryAfter
}

return result, rerr
}

return result, nil
}

// getLBBackendPool gets a LoadBalancer backend pool.
func (c *Client) getLBBackendPool(ctx context.Context, resourceGroupName string, loadBalancerName string, backendPoolName string, expand string) (network.BackendAddressPool, *retry.Error) {
resourceID := armclient.GetChildResourceID(
c.subscriptionID,
resourceGroupName,
lbResourceType,
loadBalancerName,
"backendAddressPools",
backendPoolName,
)
result := network.BackendAddressPool{}

response, rerr := c.armClient.GetResourceWithExpandQuery(ctx, resourceID, expand)
defer c.armClient.CloseResponse(ctx, response)
if rerr != nil {
klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "loadbalancer.backendpool.get.request", resourceID, rerr.Error())
return result, rerr
}

err := autorest.Respond(
response,
azure.WithErrorUnlessStatusCode(http.StatusOK),
autorest.ByUnmarshallingJSON(&result))
if err != nil {
klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "loadbalancer.backendpool.get.respond", resourceID, err)
return result, retry.GetError(response, err)
}

result.Response = autorest.Response{Response: response}
return result, nil
}

// CreateOrUpdateBackendPools creates or updates a LoadBalancer backend pool.
func (c *Client) CreateOrUpdateBackendPools(ctx context.Context, resourceGroupName string, loadBalancerName string, backendPoolName string, parameters network.BackendAddressPool, etag string) *retry.Error {
mc := metrics.NewMetricContext("load_balancers", "create_or_update_backend_pools", resourceGroupName, c.subscriptionID, "")
Expand Down Expand Up @@ -470,14 +537,14 @@ func (c *Client) createOrUpdateLBBackendPool(ctx context.Context, resourceGroupN
response, rerr := c.armClient.PutResource(ctx, resourceID, parameters, decorators...)
defer c.armClient.CloseResponse(ctx, response)
if rerr != nil {
klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "loadbalancerbackendpool.put.request", resourceID, rerr.Error())
klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "loadbalancer.backendpool.put.request", resourceID, rerr.Error())
return rerr
}

if response != nil && response.StatusCode != http.StatusNoContent {
_, rerr = c.createOrUpdateBackendPoolResponder(response)
if rerr != nil {
klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "loadbalancerbackendpool.put.respond", resourceID, rerr.Error())
klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "loadbalancer.backendpool.put.respond", resourceID, rerr.Error())
return rerr
}
}
Expand Down Expand Up @@ -537,3 +604,56 @@ func (c *Client) createOrUpdateBackendPoolResponder(resp *http.Response) (*netwo
result.Response = autorest.Response{Response: resp}
return result, retry.GetError(resp, err)
}

// MigrateToIPBasedBackendPool migrates a NIC-based backend pool to IP-based.
func (c *Client) MigrateToIPBasedBackendPool(ctx context.Context, resourceGroupName string, loadBalancerName string, backendPoolNames []string) *retry.Error {
mc := metrics.NewMetricContext("load_balancers", "migrate_to_ip_based_backend_pool", resourceGroupName, c.subscriptionID, "")

// Report errors if the client is rate limited.
if !c.rateLimiterWriter.TryAccept() {
mc.RateLimitedCount()
return retry.GetRateLimitError(true, "LBMigrateToIPBasedBackendPool")
}

// Report errors if the client is throttled.
if c.RetryAfterWriter.After(time.Now()) {
mc.ThrottledCount()
rerr := retry.GetThrottlingError("LBMigrateToIPBasedBackendPool", "client throttled", c.RetryAfterWriter)
return rerr
}

parameters := backendPoolsToBeMigrated{
BackendPoolNames: backendPoolNames,
}
rerr := c.migrateToIPBasedBackendPool(ctx, resourceGroupName, loadBalancerName, parameters)
mc.Observe(rerr)
if rerr != nil {
if rerr.IsThrottled() {
// Update RetryAfterReader so that no more requests would be sent until RetryAfter expires.
c.RetryAfterWriter = rerr.RetryAfter
}

return rerr
}

return nil
}

func (c *Client) migrateToIPBasedBackendPool(ctx context.Context, resourceGroupName string, loadBalancerName string, parameters backendPoolsToBeMigrated) *retry.Error {
resourceID := armclient.GetResourceID(
c.subscriptionID,
resourceGroupName,
lbResourceType,
loadBalancerName,
)

response, rerr := c.armClient.PostResource(ctx, resourceID, "migrateToIpBased", parameters, map[string]interface{}{})
defer c.armClient.CloseResponse(ctx, response)
if rerr != nil {
klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "loadbalancerbackendpool.migrate.request", resourceID, rerr.Error())
return rerr
}

klog.Infof("Response: %v", response)
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"testing"
Expand All @@ -39,8 +40,9 @@ import (
)

const (
testResourceID = "/subscriptions/subscriptionID/resourceGroups/rg/providers/Microsoft.Network/loadBalancers/lb1"
testResourcePrefix = "/subscriptions/subscriptionID/resourceGroups/rg/providers/Microsoft.Network/loadBalancers"
testResourceID = "/subscriptions/subscriptionID/resourceGroups/rg/providers/Microsoft.Network/loadBalancers/lb1"
testBackendPoolResourceID = "/subscriptions/subscriptionID/resourceGroups/rg/providers/Microsoft.Network/loadBalancers/lb1/backendAddressPools/lb1"
testResourcePrefix = "/subscriptions/subscriptionID/resourceGroups/rg/providers/Microsoft.Network/loadBalancers"
)

func TestNew(t *testing.T) {
Expand Down Expand Up @@ -328,6 +330,67 @@ func TestDelete(t *testing.T) {
}
}

func TestGetLBBackendPoolInternalError(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

response := &http.Response{
StatusCode: http.StatusInternalServerError,
Body: io.NopCloser(bytes.NewReader([]byte("{}"))),
}
armClient := mockarmclient.NewMockInterface(ctrl)
armClient.EXPECT().GetResourceWithExpandQuery(gomock.Any(), testBackendPoolResourceID, "").Return(response, nil)
armClient.EXPECT().CloseResponse(gomock.Any(), gomock.Any())

lbClient := getTestLoadBalancerClient(armClient)
expected := network.BackendAddressPool{Response: autorest.Response{}}
result, rerr := lbClient.GetLBBackendPool(context.TODO(), "rg", "lb1", "lb1", "")
assert.Equal(t, expected, result)
assert.NotNil(t, rerr)
assert.Equal(t, http.StatusInternalServerError, rerr.HTTPStatusCode)
}

func TestGetLBBackendPoolThrottle(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

response := &http.Response{
StatusCode: http.StatusTooManyRequests,
Body: io.NopCloser(bytes.NewReader([]byte("{}"))),
}
throttleErr := &retry.Error{
HTTPStatusCode: http.StatusTooManyRequests,
RawError: fmt.Errorf("error"),
Retriable: true,
RetryAfter: time.Unix(100, 0),
}
armClient := mockarmclient.NewMockInterface(ctrl)
armClient.EXPECT().GetResourceWithExpandQuery(gomock.Any(), testBackendPoolResourceID, "").Return(response, throttleErr)
armClient.EXPECT().CloseResponse(gomock.Any(), gomock.Any())

lbClient := getTestLoadBalancerClient(armClient)
result, rerr := lbClient.GetLBBackendPool(context.TODO(), "rg", "lb1", "lb1", "")
assert.Empty(t, result)
assert.Equal(t, throttleErr, rerr)
}

func TestMigrateToIpBasedBackendPools(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

armClient := mockarmclient.NewMockInterface(ctrl)
response := &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader([]byte(""))),
}
armClient.EXPECT().PostResource(gomock.Any(), testResourceID, "migrateToIpBased", backendPoolsToBeMigrated{BackendPoolNames: []string{"lb1"}}, gomock.Any()).Return(response, nil)
armClient.EXPECT().CloseResponse(gomock.Any(), gomock.Any())

lbClient := getTestLoadBalancerClient(armClient)
rerr := lbClient.MigrateToIPBasedBackendPool(context.TODO(), "rg", "lb1", []string{"lb1"})
assert.Nil(t, rerr)
}

func getTestLoadBalancer(name string) network.LoadBalancer {
return network.LoadBalancer{
ID: pointer.String(fmt.Sprintf("/subscriptions/subscriptionID/resourceGroups/rg/providers/Microsoft.Network/loadBalancers/%s", name)),
Expand Down
6 changes: 6 additions & 0 deletions pkg/azureclients/loadbalancerclient/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ type Interface interface {
// Delete deletes a LoadBalancer by name.
Delete(ctx context.Context, resourceGroupName string, loadBalancerName string) *retry.Error

// GetLBBackendPool gets a LoadBalancer backend pool.
GetLBBackendPool(ctx context.Context, resourceGroupName string, loadBalancerName string, backendPoolName string, expand string) (network.BackendAddressPool, *retry.Error)

// DeleteLBBackendPool deletes a LoadBalancer backend pool by name.
DeleteLBBackendPool(ctx context.Context, resourceGroupName, loadBalancerName, backendPoolName string) *retry.Error

// MigrateToIPBasedBackendPool migrates a NIC-based backend pool to IP-based.
MigrateToIPBasedBackendPool(ctx context.Context, resourceGroupName string, loadBalancerName string, backendPoolNames []string) *retry.Error
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions pkg/provider/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,12 @@ type Config struct {
PutVMSSVMBatchSize int `json:"putVMSSVMBatchSize" yaml:"putVMSSVMBatchSize"`
// PrivateLinkServiceResourceGroup determines the specific resource group of the private link services user want to use
PrivateLinkServiceResourceGroup string `json:"privateLinkServiceResourceGroup,omitempty" yaml:"privateLinkServiceResourceGroup,omitempty"`

// EnableMigrateToIPBasedBackendPoolAPI uses the migration API to migrate from NIC-based to IP-based backend pool.
// The migration API can provide a migration from NIC-based to IP-based backend pool without service downtime.
// If the API is not used, the migration will be done by decoupling all nodes on the backend pool and then re-attaching
// node IPs, which will introduce service downtime. The downtime increases with the number of nodes in the backend pool.
EnableMigrateToIPBasedBackendPoolAPI bool `json:"enableMigrateToIPBasedBackendPoolAPI" yaml:"enableMigrateToIPBasedBackendPoolAPI"`
}

type InitSecretConfig struct {
Expand Down
53 changes: 53 additions & 0 deletions pkg/provider/azure_backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ limitations under the License.
package provider

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"regexp"
"strings"
"time"

"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2022-03-01/compute"
"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2022-07-01/network"
Expand Down Expand Up @@ -652,3 +654,54 @@ func (az *Cloud) CreateOrUpdateSubnet(service *v1.Service, subnet network.Subnet

return nil
}

// MigrateToIPBasedBackendPoolAndWaitForCompletion use the migration API to migrate from
// NIC-based to IP-based LB backend pools. It also makes sure the number of IP addresses
// in the backend pools is expected.
func (az *Cloud) MigrateToIPBasedBackendPoolAndWaitForCompletion(
lbName string, backendPoolNames []string, nicsCountMap map[string]int,
) error {
if rerr := az.LoadBalancerClient.MigrateToIPBasedBackendPool(context.Background(), az.ResourceGroup, lbName, backendPoolNames); rerr != nil {
backendPoolNamesStr := strings.Join(backendPoolNames, ",")
klog.Errorf("MigrateToIPBasedBackendPoolAndWaitForCompletion: Failed to migrate to IP based backend pool for lb %s, backend pool %s: %s", lbName, backendPoolNamesStr, rerr.Error().Error())
return rerr.Error()
}

succeeded := make(map[string]bool)
for bpName := range nicsCountMap {
succeeded[bpName] = false
}

err := wait.PollImmediate(5*time.Second, 10*time.Minute, func() (done bool, err error) {
for bpName, nicsCount := range nicsCountMap {
if succeeded[bpName] {
continue
}

bp, rerr := az.LoadBalancerClient.GetLBBackendPool(context.Background(), az.ResourceGroup, lbName, bpName, "")
if rerr != nil {
klog.Errorf("MigrateToIPBasedBackendPoolAndWaitForCompletion: Failed to get backend pool %s for lb %s: %s", bpName, lbName, rerr.Error().Error())
return false, rerr.Error()
}

if countIPsOnBackendPool(bp) != nicsCount {
klog.V(4).Infof("MigrateToIPBasedBackendPoolAndWaitForCompletion: Expected IPs %s, current IPs %d, will retry in 5s", nicsCount, countIPsOnBackendPool(bp))
return false, nil
}
succeeded[bpName] = true
}
return true, nil
})

if err != nil {
if errors.Is(err, wait.ErrWaitTimeout) {
klog.Warningf("MigrateToIPBasedBackendPoolAndWaitForCompletion: Timeout waiting for migration to IP based backend pool for lb %s, backend pool %s", lbName, strings.Join(backendPoolNames, ","))
return nil
}

klog.Errorf("MigrateToIPBasedBackendPoolAndWaitForCompletion: Failed to wait for migration to IP based backend pool for lb %s, backend pool %s: %s", lbName, strings.Join(backendPoolNames, ","), err.Error())
return err
}

return nil
}
Loading

0 comments on commit 84c6462

Please sign in to comment.