Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support sending sync requests concurrently in batches when putting vmss vm #959

Merged
merged 1 commit into from
Jan 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
142 changes: 96 additions & 46 deletions pkg/azureclients/armclient/azure_armclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,46 +395,7 @@ func (c *Client) PutResource(ctx context.Context, resourceID string, parameters
return c.PutResourceWithDecorators(ctx, resourceID, parameters, putDecorators)
}

// PutResources puts a list of resources from resources map[resourceID]parameters.
// Those resources sync requests are sequential while async requests are concurrent. It's especially
// useful when the ARM API doesn't support concurrent requests.
func (c *Client) PutResources(ctx context.Context, resources map[string]interface{}) map[string]*PutResourcesResponse {
if len(resources) == 0 {
return nil
}

// Sequential sync requests.
futures := make(map[string]*azure.Future)
responses := make(map[string]*PutResourcesResponse)
for resourceID, parameters := range resources {
decorators := []autorest.PrepareDecorator{
autorest.WithPathParameters("{resourceID}", map[string]interface{}{"resourceID": resourceID}),
autorest.WithJSON(parameters),
}
request, err := c.PreparePutRequest(ctx, decorators...)
if err != nil {
klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "put.prepare", resourceID, err)
responses[resourceID] = &PutResourcesResponse{
Error: retry.NewError(false, err),
}
continue
}
dumpRequest(request, 10)

future, resp, clientErr := c.SendAsync(ctx, request)
defer c.CloseResponse(ctx, resp)
if clientErr != nil {
klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "put.send", resourceID, clientErr.Error())
responses[resourceID] = &PutResourcesResponse{
Error: clientErr,
}
continue
}

futures[resourceID] = future
}

// Concurrent async requests.
func (c *Client) waitAsync(ctx context.Context, futures map[string]*azure.Future, previousResponses map[string]*PutResourcesResponse) {
wg := sync.WaitGroup{}
var responseLock sync.Mutex
for resourceID, future := range futures {
Expand All @@ -457,22 +418,111 @@ func (c *Client) PutResources(ctx context.Context, resources map[string]interfac
}

responseLock.Lock()
responses[resourceID] = &PutResourcesResponse{
previousResponses[resourceID] = &PutResourcesResponse{
Error: retriableErr,
}
responseLock.Unlock()
return
}
}(resourceID, future)
}
wg.Wait()
}

// PutResources puts a list of resources from resources map[resourceID]parameters.
// Those resources sync requests are sequential while async requests are concurrent. It's especially
// useful when the ARM API doesn't support concurrent requests.
func (c *Client) PutResources(ctx context.Context, resources map[string]interface{}) map[string]*PutResourcesResponse {
if len(resources) == 0 {
return nil
}

responseLock.Lock()
// Sequential sync requests.
futures := make(map[string]*azure.Future)
responses := make(map[string]*PutResourcesResponse)
for resourceID, parameters := range resources {
future, rerr := c.PutResourceAsync(ctx, resourceID, parameters)
if rerr != nil {
responses[resourceID] = &PutResourcesResponse{
Response: response,
Error: rerr,
}
responseLock.Unlock()
}(resourceID, future)
continue
}
futures[resourceID] = future
}

wg.Wait()
c.waitAsync(ctx, futures, responses)

return responses
}

// PutResourcesInBatches is similar with PutResources, but it sends sync request concurrently in batches.
func (c *Client) PutResourcesInBatches(ctx context.Context, resources map[string]interface{}, batchSize int) map[string]*PutResourcesResponse {
if len(resources) == 0 {
return nil
}

if batchSize <= 0 {
klog.V(4).Infof("PutResourcesInBatches: batch size %d, put resources in sequence", batchSize)
return c.PutResources(ctx, resources)
}

if batchSize > len(resources) {
klog.V(4).Infof("PutResourcesInBatches: batch size %d, but the number of the resources is %d", batchSize, resources)
batchSize = len(resources)
}
klog.V(4).Infof("PutResourcesInBatches: send sync requests in parallel with the batch size %d", batchSize)

// Convert map to slice because it is more straightforward to
// loop over slice in batches than map.
type resourcesMeta struct {
resourceID string
parameters interface{}
}
resourcesList := make([]resourcesMeta, 0)
for resourceID, parameters := range resources {
resourcesList = append(resourcesList, resourcesMeta{
resourceID: resourceID,
parameters: parameters,
})
}

// Concurrent sync requests in batches.
futures := make(map[string]*azure.Future)
responses := make(map[string]*PutResourcesResponse)
wg := sync.WaitGroup{}
var responseLock, futuresLock sync.Mutex
for i := 0; i < len(resourcesList); i += batchSize {
j := i + batchSize
if j > len(resourcesList) {
j = len(resourcesList)
}

for k := i; k < j; k++ {
wg.Add(1)
go func(resourceID string, parameters interface{}) {
defer wg.Done()
future, rerr := c.PutResourceAsync(ctx, resourceID, parameters)
if rerr != nil {
responseLock.Lock()
responses[resourceID] = &PutResourcesResponse{
Error: rerr,
}
responseLock.Unlock()
return
}

futuresLock.Lock()
futures[resourceID] = future
futuresLock.Unlock()
}(resourcesList[k].resourceID, resourcesList[k].parameters)
}
wg.Wait()
}

// Concurrent async requests.
c.waitAsync(ctx, futures, responses)

return responses
}

Expand Down
104 changes: 88 additions & 16 deletions pkg/azureclients/armclient/azure_armclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/http"
"net/http/cookiejar"
"net/http/httptest"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -340,6 +341,26 @@ func TestPutResource(t *testing.T) {
}

func TestPutResources(t *testing.T) {
total := 0
server := getTestServer(t, &total)

backoff := &retry.Backoff{Steps: 1}
armClient := New(nil, server.URL, "test", "2019-01-01", "eastus", backoff)
armClient.client.RetryDuration = time.Millisecond * 1

ctx := context.Background()
resources := map[string]interface{}{
"/id/1": nil,
"/id/2": nil,
}
responses := armClient.PutResources(ctx, nil)
assert.Nil(t, responses)
responses = armClient.PutResources(ctx, resources)
assert.NotNil(t, responses)
assert.Equal(t, 3, total)
}

func getTestServer(t *testing.T, counter *int) *httptest.Server {
serverFuncs := []func(rw http.ResponseWriter, req *http.Request){
func(rw http.ResponseWriter, req *http.Request) {
assert.Equal(t, "PUT", req.Method)
Expand Down Expand Up @@ -369,30 +390,81 @@ func TestPutResources(t *testing.T) {
},
}

i, total := 0, 0
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
i := 0
var l sync.Mutex
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
l.Lock()
serverFuncs[i](w, r)
i++
if i > 3 {
i = 3
}
total++
*counter++
l.Unlock()
}))
}

backoff := &retry.Backoff{Steps: 1}
armClient := New(nil, server.URL, "test", "2019-01-01", "eastus", backoff)
armClient.client.RetryDuration = time.Millisecond * 1

ctx := context.Background()
resources := map[string]interface{}{
"/id/1": nil,
"/id/2": nil,
func TestPutResourcesInBatches(t *testing.T) {
for _, testCase := range []struct {
description string
resources map[string]interface{}
batchSize, expectedCallTimes int
}{
{
description: "",
resources: map[string]interface{}{
"/id/1": nil,
"/id/2": nil,
},
batchSize: 2,
expectedCallTimes: 3,
},
{
description: "",
resources: map[string]interface{}{
"/id/1": nil,
"/id/2": nil,
},
batchSize: 1,
expectedCallTimes: 3,
},
{
description: "",
resources: nil,
},
{
description: "PutResourcesInBatches should set the batch size to the length of the resources if the batch size is larger than it",
resources: map[string]interface{}{
"/id/1": nil,
"/id/2": nil,
},
batchSize: 10,
expectedCallTimes: 3,
},
{
description: "PutResourcesInBatches should call PutResources if the batch size is smaller than or equal to zero",
resources: map[string]interface{}{
"/id/1": nil,
"/id/2": nil,
},
expectedCallTimes: 3,
},
} {
t.Run(testCase.description, func(t *testing.T) {
total := 0
server := getTestServer(t, &total)

backoff := &retry.Backoff{Steps: 1}
armClient := New(nil, server.URL, "test", "2019-01-01", "eastus", backoff)
armClient.client.RetryDuration = time.Millisecond * 1

ctx := context.Background()
responses := armClient.PutResourcesInBatches(ctx, testCase.resources, testCase.batchSize)
assert.Equal(t, testCase.resources == nil, responses == nil)
assert.Equal(t, testCase.expectedCallTimes, total)
})
}
responses := armClient.PutResources(ctx, nil)
assert.Nil(t, responses)
responses = armClient.PutResources(ctx, resources)
assert.NotNil(t, responses)
assert.Equal(t, 3, total)

}

func TestPutResourceAsync(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/azureclients/armclient/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ type Interface interface {
// useful when the ARM API doesn't support concurrent requests.
PutResources(ctx context.Context, resources map[string]interface{}) map[string]*PutResourcesResponse

// PutResourcesInBatches is similar with PutResources, but it sends sync request concurrently in batches.
PutResourcesInBatches(ctx context.Context, resources map[string]interface{}, batchSize int) map[string]*PutResourcesResponse

// PutResourceWithDecorators puts a resource with decorators by resource ID
PutResourceWithDecorators(ctx context.Context, resourceID string, parameters interface{}, decorators []autorest.PrepareDecorator) (*http.Response, *retry.Error)

Expand Down