Skip to content

Commit

Permalink
fix: update resources in batches asynchonously
Browse files Browse the repository at this point in the history
A regression was introduced by #1687 where the behavior of updating resources in batches changes from sending requests asynchonously to synchonously. This would lead to latencies when updating vmss vms, especially when the cluster size is huge. This unexpected change is reverted in this fix.
  • Loading branch information
nilo19 committed Apr 19, 2023
1 parent 4134650 commit 5b0be8f
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 7 deletions.
58 changes: 51 additions & 7 deletions pkg/azureclients/armclient/azure_armclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,40 @@ func (c *Client) PutResource(ctx context.Context, resourceID string, parameters
return response, nil
}

func (c *Client) waitAsync(ctx context.Context, futures map[string]*azure.Future, previousResponses map[string]*PutResourcesResponse) {
wg := sync.WaitGroup{}
var responseLock sync.Mutex
for resourceID, future := range futures {
wg.Add(1)
go func(resourceID string, future *azure.Future) {
defer wg.Done()
response, err := c.WaitForAsyncOperationResult(ctx, future, "armclient.PutResource")
if err != nil {
if response != nil {
klog.V(5).Infof("Received error in WaitForAsyncOperationResult: '%s', response code %d", err.Error(), response.StatusCode)
} else {
klog.V(5).Infof("Received error in WaitForAsyncOperationResult: '%s', no response", err.Error())
}

retriableErr := retry.GetError(response, err)
if !retriableErr.Retriable &&
strings.Contains(strings.ToUpper(err.Error()), strings.ToUpper("InternalServerError")) {
klog.V(5).Infof("Received InternalServerError in WaitForAsyncOperationResult: '%s', setting error retriable", err.Error())
retriableErr.Retriable = true
}

responseLock.Lock()
previousResponses[resourceID] = &PutResourcesResponse{
Error: retriableErr,
}
responseLock.Unlock()
return
}
}(resourceID, future)
}
wg.Wait()
}

// PutResourcesInBatches is similar with PutResources, but it sends sync request concurrently in batches.
func (c *Client) PutResourcesInBatches(ctx context.Context, resources map[string]interface{}, batchSize int) map[string]*PutResourcesResponse {
if len(resources) == 0 {
Expand All @@ -413,27 +447,37 @@ func (c *Client) PutResourcesInBatches(ctx context.Context, resources map[string
rateLimiter := make(chan struct{}, batchSize)

// Concurrent sync requests in batches.
futures := make(map[string]*azure.Future)
responses := make(map[string]*PutResourcesResponse)
wg := sync.WaitGroup{}
var responseLock sync.Mutex
var responseLock, futuresLock sync.Mutex
for resourceID, parameters := range resources {
rateLimiter <- struct{}{}
wg.Add(1)
go func(resourceID string, parameters interface{}) {
defer wg.Done()
defer func() { <-rateLimiter }()
resp, rerr := c.PutResource(ctx, resourceID, parameters)
responseLock.Lock()
defer responseLock.Unlock()
responses[resourceID] = &PutResourcesResponse{
Error: rerr,
Response: resp,
future, rerr := c.PutResourceAsync(ctx, resourceID, parameters)
if rerr != nil {
responseLock.Lock()
responses[resourceID] = &PutResourcesResponse{
Error: rerr,
}
responseLock.Unlock()
return
}

futuresLock.Lock()
futures[resourceID] = future
futuresLock.Unlock()
}(resourceID, parameters)
}
wg.Wait()
close(rateLimiter)

// Concurrent async requests.
c.waitAsync(ctx, futures, responses)

return responses
}

Expand Down
107 changes: 107 additions & 0 deletions pkg/azureclients/armclient/azure_armclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -421,6 +422,112 @@ func TestPutResource(t *testing.T) {
assert.Equal(t, true, rerr.Retriable)
}

func getTestServer(t *testing.T, counter *int) *httptest.Server {
serverFuncs := []func(rw http.ResponseWriter, req *http.Request){
func(rw http.ResponseWriter, req *http.Request) {
assert.Equal(t, "PUT", req.Method)

rw.Header().Set("Azure-AsyncOperation",
fmt.Sprintf("http://%s%s", req.Host, "/id/1?api-version=2019-01-01"))
rw.WriteHeader(http.StatusCreated)
},
func(rw http.ResponseWriter, req *http.Request) {
assert.Equal(t, "PUT", req.Method)

rw.Header().Set("Azure-AsyncOperation",
fmt.Sprintf("http://%s%s", req.Host, "/id/2?api-version=2019-01-01"))
rw.WriteHeader(http.StatusInternalServerError)
},
func(rw http.ResponseWriter, req *http.Request) {
assert.Equal(t, "GET", req.Method)

rw.WriteHeader(http.StatusOK)
_, _ = rw.Write([]byte(`{"error":{"code":"InternalServerError"},"status":"Failed"}`))
},
func(rw http.ResponseWriter, req *http.Request) {
assert.Equal(t, "GET", req.Method)

rw.WriteHeader(http.StatusOK)
_, _ = rw.Write([]byte(`{"error":{"code":"InternalServerError"},"status":"Failed"}`))
},
}

i := 0
var l sync.Mutex
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
l.Lock()
serverFuncs[i](w, r)
i++
if i > 3 {
i = 3
}
*counter++
l.Unlock()
}))
}

func TestPutResourcesInBatches(t *testing.T) {
for _, testCase := range []struct {
description string
resources map[string]interface{}
batchSize, expectedCallTimes int
}{
{
description: "",
resources: map[string]interface{}{
"/id/1": nil,
"/id/2": nil,
},
batchSize: 2,
expectedCallTimes: 3,
},
{
description: "",
resources: map[string]interface{}{
"/id/1": nil,
"/id/2": nil,
},
batchSize: 1,
expectedCallTimes: 3,
},
{
description: "",
resources: nil,
},
{
description: "PutResourcesInBatches should set the batch size to the length of the resources if the batch size is larger than it",
resources: map[string]interface{}{
"/id/1": nil,
"/id/2": nil,
},
batchSize: 10,
expectedCallTimes: 3,
},
{
description: "PutResourcesInBatches should call PutResources if the batch size is smaller than or equal to zero",
resources: map[string]interface{}{
"/id/1": nil,
"/id/2": nil,
},
expectedCallTimes: 3,
},
} {
t.Run(testCase.description, func(t *testing.T) {
total := 0
server := getTestServer(t, &total)

azConfig := azureclients.ClientConfig{Backoff: &retry.Backoff{Steps: 1}, UserAgent: "test", Location: "eastus"}
armClient := New(nil, azConfig, server.URL, "2019-01-01")
armClient.client.RetryDuration = time.Millisecond * 1

ctx := context.Background()
responses := armClient.PutResourcesInBatches(ctx, testCase.resources, testCase.batchSize)
assert.Equal(t, testCase.resources == nil, responses == nil)
assert.Equal(t, testCase.expectedCallTimes, total)
})
}
}

func TestResourceAction(t *testing.T) {
for _, tc := range []struct {
description string
Expand Down

0 comments on commit 5b0be8f

Please sign in to comment.