Skip to content

Commit

Permalink
Merge pull request #5920 from nilo19/fix/retry-preempted
Browse files Browse the repository at this point in the history
fix: Retry put vmss vm on preempted error
  • Loading branch information
k8s-ci-robot committed Apr 16, 2024
2 parents edf9898 + 97f9e40 commit 76d3b12
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 30 deletions.
78 changes: 56 additions & 22 deletions pkg/azureclients/vmssvmclient/azure_vmssvmclient.go
Expand Up @@ -509,7 +509,48 @@ func (c *Client) updateVMSSVMs(ctx context.Context, resourceGroupName string, VM
}

responses := c.armClient.PutResourcesInBatches(ctx, resources, batchSize)
errors := make([]*retry.Error, 0)
errors, retryIDs := c.parseResp(ctx, responses, true)
if len(retryIDs) > 0 {
retryResources := make(map[string]interface{})
for _, id := range retryIDs {
retryResources[id] = resources[id]
}
resps := c.armClient.PutResourcesInBatches(ctx, retryResources, batchSize)
errs, _ := c.parseResp(ctx, resps, false)
errors = append(errors, errs...)
}

// Aggregate errors.
if len(errors) > 0 {
rerr := &retry.Error{}
errs := make([]error, 0)
for _, err := range errors {
if !err.Retriable && strings.Contains(err.Error().Error(), consts.ConcurrentRequestConflictMessage) {
err.Retriable = true
err.RetryAfter = time.Now().Add(5 * time.Second)
}

if err.IsThrottled() && err.RetryAfter.After(rerr.RetryAfter) {
rerr.RetryAfter = err.RetryAfter
}
errs = append(errs, err.Error())
}
rerr.RawError = utilerrors.Flatten(utilerrors.NewAggregate(errs))
return rerr
}

return nil
}

func (c *Client) parseResp(
ctx context.Context,
responses map[string]*armclient.PutResourcesResponse,
shouldRetry bool,
) ([]*retry.Error, []string) {
var (
errors []*retry.Error
retryIDs []string
)
for resourceID, resp := range responses {
if resp == nil {
continue
Expand All @@ -534,6 +575,19 @@ func (c *Client) updateVMSSVMs(ctx context.Context, resourceGroupName string, VM
continue
}

if retry.IsSuccessHTTPResponse(resp.Response) &&
strings.Contains(
strings.ToLower(errMsg),
strings.ToLower(consts.OperationPreemptedErrorMessage),
) {
if shouldRetry {
klog.V(2).Infof("The operation on VM %s is preempted, will retry.", resourceID)
retryIDs = append(retryIDs, resourceID)
continue
}
klog.V(2).Infof("The operation on VM %s is preempted, will not retry.", resourceID)
}

errors = append(errors, resp.Error)
continue
}
Expand All @@ -546,25 +600,5 @@ func (c *Client) updateVMSSVMs(ctx context.Context, resourceGroupName string, VM
}
}
}

// Aggregate errors.
if len(errors) > 0 {
rerr := &retry.Error{}
errs := make([]error, 0)
for _, err := range errors {
if !err.Retriable && strings.Contains(err.Error().Error(), consts.ConcurrentRequestConflictMessage) {
err.Retriable = true
err.RetryAfter = time.Now().Add(5 * time.Second)
}

if err.IsThrottled() && err.RetryAfter.After(rerr.RetryAfter) {
rerr.RetryAfter = err.RetryAfter
}
errs = append(errs, err.Error())
}
rerr.RawError = utilerrors.Flatten(utilerrors.NewAggregate(errs))
return rerr
}

return nil
return errors, retryIDs
}
58 changes: 58 additions & 0 deletions pkg/azureclients/vmssvmclient/azure_vmssvmclient_test.go
Expand Up @@ -744,6 +744,64 @@ func TestUpdateVMsWithUpdateVMsResponderError(t *testing.T) {
assert.NotNil(t, rerr)
}

func TestUpdateVMsPreemptedRetry(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

vmssVM1 := getTestVMSSVM("vmss1", "1")
vmssVM2 := getTestVMSSVM("vmss1", "2")
instances := map[string]compute.VirtualMachineScaleSetVM{
"1": vmssVM1,
"2": vmssVM2,
}
testvmssVMs1 := map[string]interface{}{
pointer.StringDeref(vmssVM1.ID, ""): vmssVM1,
pointer.StringDeref(vmssVM2.ID, ""): vmssVM2,
}
testvmssVMs2 := map[string]interface{}{
pointer.StringDeref(vmssVM2.ID, ""): vmssVM2,
}
resp1 := &http.Response{
StatusCode: http.StatusNotFound,
Body: io.NopCloser(bytes.NewReader([]byte(""))),
}
resp2 := &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader([]byte(""))),
}
preemptErr := retry.NewError(false, errors.New(consts.OperationPreemptedErrorMessage))
resps1 := map[string]*armclient.PutResourcesResponse{
pointer.StringDeref(vmssVM1.ID, ""): {Response: resp1},
pointer.StringDeref(vmssVM2.ID, ""): {Response: resp2, Error: preemptErr},
}
resps2 := map[string]*armclient.PutResourcesResponse{
pointer.StringDeref(vmssVM2.ID, ""): {Response: resp2, Error: preemptErr},
}
armClient := mockarmclient.NewMockInterface(ctrl)
firstPut := armClient.EXPECT().PutResourcesInBatches(gomock.Any(), testvmssVMs1, 0).Return(resps1)
armClient.EXPECT().PutResourcesInBatches(gomock.Any(), testvmssVMs2, 0).Return(resps2).After(firstPut)
armClient.EXPECT().CloseResponse(gomock.Any(), gomock.Any()).Times(3)

vmssvmClient := getTestVMSSVMClient(armClient)
rerr := vmssvmClient.UpdateVMs(context.TODO(), "rg", "vmss1", instances, "test", 0)
assert.NotNil(t, rerr)
assert.Contains(t, rerr.RawError.Error(), consts.OperationPreemptedErrorMessage)

resps1 = map[string]*armclient.PutResourcesResponse{
pointer.StringDeref(vmssVM2.ID, ""): {Response: resp2, Error: preemptErr},
}
resps2 = map[string]*armclient.PutResourcesResponse{
pointer.StringDeref(vmssVM2.ID, ""): {Response: resp2},
}
firstPut = armClient.EXPECT().PutResourcesInBatches(gomock.Any(), testvmssVMs1, 0).Return(resps1)
armClient.EXPECT().PutResourcesInBatches(gomock.Any(), testvmssVMs2, 0).Return(resps2).After(firstPut)
armClient.EXPECT().CloseResponse(gomock.Any(), gomock.Any()).Times(2)

vmssvmClient = getTestVMSSVMClient(armClient)
rerr = vmssvmClient.UpdateVMs(context.TODO(), "rg", "vmss1", instances, "test", 0)
assert.Nil(t, rerr)
}

func TestUpdateVMsNeverRateLimiter(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
4 changes: 2 additions & 2 deletions pkg/consts/consts.go
Expand Up @@ -413,8 +413,8 @@ const (
CannotUpdateVMBeingDeletedMessagePrefix = "'Put on Virtual Machine Scale Set VM Instance' is not allowed on Virtual Machine Scale Set"
// CannotUpdateVMBeingDeletedMessageSuffix is the suffix of the error message that the request failed due to delete a VM that is being deleted
CannotUpdateVMBeingDeletedMessageSuffix = "since it is marked for deletion"
// OperationPreemptedErrorCode is the error code returned for vm operation preempted errors
OperationPreemptedErrorCode = "OperationPreempted"
// OperationPreemptedErrorMessage is the error message returned for vm operation preempted errors
OperationPreemptedErrorMessage = "Operation execution has been preempted by a more recent operation"
)

// node ipam controller
Expand Down
8 changes: 4 additions & 4 deletions pkg/retry/azure_error.go
Expand Up @@ -149,7 +149,7 @@ func GetError(resp *http.Response, err error) *Error {
return nil
}

if err == nil && resp != nil && isSuccessHTTPResponse(resp) {
if err == nil && resp != nil && IsSuccessHTTPResponse(resp) {
// HTTP 2xx suggests a successful response
return nil
}
Expand All @@ -166,8 +166,8 @@ func GetError(resp *http.Response, err error) *Error {
}
}

// isSuccessHTTPResponse determines if the response from an HTTP request suggests success
func isSuccessHTTPResponse(resp *http.Response) bool {
// IsSuccessHTTPResponse determines if the response from an HTTP request suggests success
func IsSuccessHTTPResponse(resp *http.Response) bool {
if resp == nil {
return false
}
Expand Down Expand Up @@ -219,7 +219,7 @@ func shouldRetryHTTPRequest(resp *http.Response, err error) bool {
}

// should retry on <200, error>.
if isSuccessHTTPResponse(resp) && err != nil {
if IsSuccessHTTPResponse(resp) && err != nil {
return true
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/retry/azure_error_test.go
Expand Up @@ -303,15 +303,15 @@ func TestIsSuccessResponse(t *testing.T) {
resp := http.Response{
StatusCode: test.code,
}
res := isSuccessHTTPResponse(&resp)
res := IsSuccessHTTPResponse(&resp)
if res != test.expected {
t.Errorf("expected: %v, saw: %v", test.expected, res)
}
}
}

func TestIsSuccessResponseNil(t *testing.T) {
res := isSuccessHTTPResponse(nil)
res := IsSuccessHTTPResponse(nil)
assert.Equal(t, false, res)
}

Expand Down

0 comments on commit 76d3b12

Please sign in to comment.