Skip to content

Commit

Permalink
feat: support attaching node private IP to the load balancer backend …
Browse files Browse the repository at this point in the history
…pools

(cherry picked from commit 46091eb)
  • Loading branch information
nilo19 committed Dec 30, 2021
1 parent d47c4b4 commit 24fa0e0
Show file tree
Hide file tree
Showing 11 changed files with 299 additions and 153 deletions.
142 changes: 96 additions & 46 deletions pkg/azureclients/armclient/azure_armclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,46 +395,7 @@ func (c *Client) PutResource(ctx context.Context, resourceID string, parameters
return c.PutResourceWithDecorators(ctx, resourceID, parameters, putDecorators)
}

// PutResources puts a list of resources from resources map[resourceID]parameters.
// Those resources sync requests are sequential while async requests are concurrent. It's especially
// useful when the ARM API doesn't support concurrent requests.
func (c *Client) PutResources(ctx context.Context, resources map[string]interface{}) map[string]*PutResourcesResponse {
if len(resources) == 0 {
return nil
}

// Sequential sync requests.
futures := make(map[string]*azure.Future)
responses := make(map[string]*PutResourcesResponse)
for resourceID, parameters := range resources {
decorators := []autorest.PrepareDecorator{
autorest.WithPathParameters("{resourceID}", map[string]interface{}{"resourceID": resourceID}),
autorest.WithJSON(parameters),
}
request, err := c.PreparePutRequest(ctx, decorators...)
if err != nil {
klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "put.prepare", resourceID, err)
responses[resourceID] = &PutResourcesResponse{
Error: retry.NewError(false, err),
}
continue
}
dumpRequest(request, 10)

future, resp, clientErr := c.SendAsync(ctx, request)
defer c.CloseResponse(ctx, resp)
if clientErr != nil {
klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "put.send", resourceID, clientErr.Error())
responses[resourceID] = &PutResourcesResponse{
Error: clientErr,
}
continue
}

futures[resourceID] = future
}

// Concurrent async requests.
func (c *Client) waitAsync(ctx context.Context, futures map[string]*azure.Future, previousResponses map[string]*PutResourcesResponse) {
wg := sync.WaitGroup{}
var responseLock sync.Mutex
for resourceID, future := range futures {
Expand All @@ -457,22 +418,111 @@ func (c *Client) PutResources(ctx context.Context, resources map[string]interfac
}

responseLock.Lock()
responses[resourceID] = &PutResourcesResponse{
previousResponses[resourceID] = &PutResourcesResponse{
Error: retriableErr,
}
responseLock.Unlock()
return
}
}(resourceID, future)
}
wg.Wait()
}

// PutResources puts a list of resources from resources map[resourceID]parameters.
// Those resources sync requests are sequential while async requests are concurrent. It's especially
// useful when the ARM API doesn't support concurrent requests.
func (c *Client) PutResources(ctx context.Context, resources map[string]interface{}) map[string]*PutResourcesResponse {
if len(resources) == 0 {
return nil
}

responseLock.Lock()
// Sequential sync requests.
futures := make(map[string]*azure.Future)
responses := make(map[string]*PutResourcesResponse)
for resourceID, parameters := range resources {
future, rerr := c.PutResourceAsync(ctx, resourceID, parameters)
if rerr != nil {
responses[resourceID] = &PutResourcesResponse{
Response: response,
Error: rerr,
}
responseLock.Unlock()
}(resourceID, future)
continue
}
futures[resourceID] = future
}

wg.Wait()
c.waitAsync(ctx, futures, responses)

return responses
}

// PutResourcesInBatches is similar with PutResources, but it sends sync request concurrently in batches.
func (c *Client) PutResourcesInBatches(ctx context.Context, resources map[string]interface{}, batchSize int) map[string]*PutResourcesResponse {
if len(resources) == 0 {
return nil
}

if batchSize <= 0 {
klog.V(4).Infof("PutResourcesInBatches: batch size %d, put resources in sequence", batchSize)
return c.PutResources(ctx, resources)
}

if batchSize > len(resources) {
klog.V(4).Infof("PutResourcesInBatches: batch size %d, but the number of the resources is %d", batchSize, resources)
batchSize = len(resources)
}
klog.V(4).Infof("PutResourcesInBatches: send sync requests in parallel with the batch size %d", batchSize)

// Convert map to slice because it is more straightforward to
// loop over slice in batches than map.
type resourcesMeta struct {
resourceID string
parameters interface{}
}
resourcesList := make([]resourcesMeta, 0)
for resourceID, parameters := range resources {
resourcesList = append(resourcesList, resourcesMeta{
resourceID: resourceID,
parameters: parameters,
})
}

// Concurrent sync requests in batches.
futures := make(map[string]*azure.Future)
responses := make(map[string]*PutResourcesResponse)
wg := sync.WaitGroup{}
var responseLock, futuresLock sync.Mutex
for i := 0; i < len(resourcesList); i += batchSize {
j := i + batchSize
if j > len(resourcesList) {
j = len(resourcesList)
}

for k := i; k < j; k++ {
wg.Add(1)
go func(resourceID string, parameters interface{}) {
defer wg.Done()
future, rerr := c.PutResourceAsync(ctx, resourceID, parameters)
if rerr != nil {
responseLock.Lock()
responses[resourceID] = &PutResourcesResponse{
Error: rerr,
}
responseLock.Unlock()
return
}

futuresLock.Lock()
futures[resourceID] = future
futuresLock.Unlock()
}(resourcesList[k].resourceID, resourcesList[k].parameters)
}
wg.Wait()
}

// Concurrent async requests.
c.waitAsync(ctx, futures, responses)

return responses
}

Expand Down
104 changes: 88 additions & 16 deletions pkg/azureclients/armclient/azure_armclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/http"
"net/http/cookiejar"
"net/http/httptest"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -340,6 +341,26 @@ func TestPutResource(t *testing.T) {
}

func TestPutResources(t *testing.T) {
total := 0
server := getTestServer(t, &total)

backoff := &retry.Backoff{Steps: 1}
armClient := New(nil, server.URL, "test", "2019-01-01", "eastus", backoff)
armClient.client.RetryDuration = time.Millisecond * 1

ctx := context.Background()
resources := map[string]interface{}{
"/id/1": nil,
"/id/2": nil,
}
responses := armClient.PutResources(ctx, nil)
assert.Nil(t, responses)
responses = armClient.PutResources(ctx, resources)
assert.NotNil(t, responses)
assert.Equal(t, 3, total)
}

func getTestServer(t *testing.T, counter *int) *httptest.Server {
serverFuncs := []func(rw http.ResponseWriter, req *http.Request){
func(rw http.ResponseWriter, req *http.Request) {
assert.Equal(t, "PUT", req.Method)
Expand Down Expand Up @@ -369,30 +390,81 @@ func TestPutResources(t *testing.T) {
},
}

i, total := 0, 0
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
i := 0
var l sync.Mutex
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
l.Lock()
serverFuncs[i](w, r)
i++
if i > 3 {
i = 3
}
total++
*counter++
l.Unlock()
}))
}

backoff := &retry.Backoff{Steps: 1}
armClient := New(nil, server.URL, "test", "2019-01-01", "eastus", backoff)
armClient.client.RetryDuration = time.Millisecond * 1

ctx := context.Background()
resources := map[string]interface{}{
"/id/1": nil,
"/id/2": nil,
func TestPutResourcesInBatches(t *testing.T) {
for _, testCase := range []struct {
description string
resources map[string]interface{}
batchSize, expectedCallTimes int
}{
{
description: "",
resources: map[string]interface{}{
"/id/1": nil,
"/id/2": nil,
},
batchSize: 2,
expectedCallTimes: 3,
},
{
description: "",
resources: map[string]interface{}{
"/id/1": nil,
"/id/2": nil,
},
batchSize: 1,
expectedCallTimes: 3,
},
{
description: "",
resources: nil,
},
{
description: "PutResourcesInBatches should set the batch size to the length of the resources if the batch size is larger than it",
resources: map[string]interface{}{
"/id/1": nil,
"/id/2": nil,
},
batchSize: 10,
expectedCallTimes: 3,
},
{
description: "PutResourcesInBatches should call PutResources if the batch size is smaller than or equal to zero",
resources: map[string]interface{}{
"/id/1": nil,
"/id/2": nil,
},
expectedCallTimes: 3,
},
} {
t.Run(testCase.description, func(t *testing.T) {
total := 0
server := getTestServer(t, &total)

backoff := &retry.Backoff{Steps: 1}
armClient := New(nil, server.URL, "test", "2019-01-01", "eastus", backoff)
armClient.client.RetryDuration = time.Millisecond * 1

ctx := context.Background()
responses := armClient.PutResourcesInBatches(ctx, testCase.resources, testCase.batchSize)
assert.Equal(t, testCase.resources == nil, responses == nil)
assert.Equal(t, testCase.expectedCallTimes, total)
})
}
responses := armClient.PutResources(ctx, nil)
assert.Nil(t, responses)
responses = armClient.PutResources(ctx, resources)
assert.NotNil(t, responses)
assert.Equal(t, 3, total)

}

func TestPutResourceAsync(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/azureclients/armclient/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ type Interface interface {
// useful when the ARM API doesn't support concurrent requests.
PutResources(ctx context.Context, resources map[string]interface{}) map[string]*PutResourcesResponse

// PutResourcesInBatches is similar with PutResources, but it sends sync request concurrently in batches.
PutResourcesInBatches(ctx context.Context, resources map[string]interface{}, batchSize int) map[string]*PutResourcesResponse

// PutResourceWithDecorators puts a resource with decorators by resource ID
PutResourceWithDecorators(ctx context.Context, resourceID string, parameters interface{}, decorators []autorest.PrepareDecorator) (*http.Response, *retry.Error)

Expand Down

0 comments on commit 24fa0e0

Please sign in to comment.