From 97f9e40079510002c59ff5b29c2160e2979437c5 Mon Sep 17 00:00:00 2001 From: Qi Ni Date: Tue, 16 Apr 2024 14:47:18 +0800 Subject: [PATCH] fix: Retry put vmss vm on preempted error --- .../vmssvmclient/azure_vmssvmclient.go | 78 +++++++++++++------ .../vmssvmclient/azure_vmssvmclient_test.go | 58 ++++++++++++++ pkg/consts/consts.go | 4 +- pkg/retry/azure_error.go | 8 +- pkg/retry/azure_error_test.go | 4 +- 5 files changed, 122 insertions(+), 30 deletions(-) diff --git a/pkg/azureclients/vmssvmclient/azure_vmssvmclient.go b/pkg/azureclients/vmssvmclient/azure_vmssvmclient.go index 4ad702caa0..6aba901b1f 100644 --- a/pkg/azureclients/vmssvmclient/azure_vmssvmclient.go +++ b/pkg/azureclients/vmssvmclient/azure_vmssvmclient.go @@ -509,7 +509,48 @@ func (c *Client) updateVMSSVMs(ctx context.Context, resourceGroupName string, VM } responses := c.armClient.PutResourcesInBatches(ctx, resources, batchSize) - errors := make([]*retry.Error, 0) + errors, retryIDs := c.parseResp(ctx, responses, true) + if len(retryIDs) > 0 { + retryResources := make(map[string]interface{}) + for _, id := range retryIDs { + retryResources[id] = resources[id] + } + resps := c.armClient.PutResourcesInBatches(ctx, retryResources, batchSize) + errs, _ := c.parseResp(ctx, resps, false) + errors = append(errors, errs...) + } + + // Aggregate errors. + if len(errors) > 0 { + rerr := &retry.Error{} + errs := make([]error, 0) + for _, err := range errors { + if !err.Retriable && strings.Contains(err.Error().Error(), consts.ConcurrentRequestConflictMessage) { + err.Retriable = true + err.RetryAfter = time.Now().Add(5 * time.Second) + } + + if err.IsThrottled() && err.RetryAfter.After(rerr.RetryAfter) { + rerr.RetryAfter = err.RetryAfter + } + errs = append(errs, err.Error()) + } + rerr.RawError = utilerrors.Flatten(utilerrors.NewAggregate(errs)) + return rerr + } + + return nil +} + +func (c *Client) parseResp( + ctx context.Context, + responses map[string]*armclient.PutResourcesResponse, + shouldRetry bool, +) ([]*retry.Error, []string) { + var ( + errors []*retry.Error + retryIDs []string + ) for resourceID, resp := range responses { if resp == nil { continue @@ -534,6 +575,19 @@ func (c *Client) updateVMSSVMs(ctx context.Context, resourceGroupName string, VM continue } + if retry.IsSuccessHTTPResponse(resp.Response) && + strings.Contains( + strings.ToLower(errMsg), + strings.ToLower(consts.OperationPreemptedErrorMessage), + ) { + if shouldRetry { + klog.V(2).Infof("The operation on VM %s is preempted, will retry.", resourceID) + retryIDs = append(retryIDs, resourceID) + continue + } + klog.V(2).Infof("The operation on VM %s is preempted, will not retry.", resourceID) + } + errors = append(errors, resp.Error) continue } @@ -546,25 +600,5 @@ func (c *Client) updateVMSSVMs(ctx context.Context, resourceGroupName string, VM } } } - - // Aggregate errors. - if len(errors) > 0 { - rerr := &retry.Error{} - errs := make([]error, 0) - for _, err := range errors { - if !err.Retriable && strings.Contains(err.Error().Error(), consts.ConcurrentRequestConflictMessage) { - err.Retriable = true - err.RetryAfter = time.Now().Add(5 * time.Second) - } - - if err.IsThrottled() && err.RetryAfter.After(rerr.RetryAfter) { - rerr.RetryAfter = err.RetryAfter - } - errs = append(errs, err.Error()) - } - rerr.RawError = utilerrors.Flatten(utilerrors.NewAggregate(errs)) - return rerr - } - - return nil + return errors, retryIDs } diff --git a/pkg/azureclients/vmssvmclient/azure_vmssvmclient_test.go b/pkg/azureclients/vmssvmclient/azure_vmssvmclient_test.go index 85729fa20c..2d5a61583d 100644 --- a/pkg/azureclients/vmssvmclient/azure_vmssvmclient_test.go +++ b/pkg/azureclients/vmssvmclient/azure_vmssvmclient_test.go @@ -744,6 +744,64 @@ func TestUpdateVMsWithUpdateVMsResponderError(t *testing.T) { assert.NotNil(t, rerr) } +func TestUpdateVMsPreemptedRetry(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + vmssVM1 := getTestVMSSVM("vmss1", "1") + vmssVM2 := getTestVMSSVM("vmss1", "2") + instances := map[string]compute.VirtualMachineScaleSetVM{ + "1": vmssVM1, + "2": vmssVM2, + } + testvmssVMs1 := map[string]interface{}{ + pointer.StringDeref(vmssVM1.ID, ""): vmssVM1, + pointer.StringDeref(vmssVM2.ID, ""): vmssVM2, + } + testvmssVMs2 := map[string]interface{}{ + pointer.StringDeref(vmssVM2.ID, ""): vmssVM2, + } + resp1 := &http.Response{ + StatusCode: http.StatusNotFound, + Body: io.NopCloser(bytes.NewReader([]byte(""))), + } + resp2 := &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewReader([]byte(""))), + } + preemptErr := retry.NewError(false, errors.New(consts.OperationPreemptedErrorMessage)) + resps1 := map[string]*armclient.PutResourcesResponse{ + pointer.StringDeref(vmssVM1.ID, ""): {Response: resp1}, + pointer.StringDeref(vmssVM2.ID, ""): {Response: resp2, Error: preemptErr}, + } + resps2 := map[string]*armclient.PutResourcesResponse{ + pointer.StringDeref(vmssVM2.ID, ""): {Response: resp2, Error: preemptErr}, + } + armClient := mockarmclient.NewMockInterface(ctrl) + firstPut := armClient.EXPECT().PutResourcesInBatches(gomock.Any(), testvmssVMs1, 0).Return(resps1) + armClient.EXPECT().PutResourcesInBatches(gomock.Any(), testvmssVMs2, 0).Return(resps2).After(firstPut) + armClient.EXPECT().CloseResponse(gomock.Any(), gomock.Any()).Times(3) + + vmssvmClient := getTestVMSSVMClient(armClient) + rerr := vmssvmClient.UpdateVMs(context.TODO(), "rg", "vmss1", instances, "test", 0) + assert.NotNil(t, rerr) + assert.Contains(t, rerr.RawError.Error(), consts.OperationPreemptedErrorMessage) + + resps1 = map[string]*armclient.PutResourcesResponse{ + pointer.StringDeref(vmssVM2.ID, ""): {Response: resp2, Error: preemptErr}, + } + resps2 = map[string]*armclient.PutResourcesResponse{ + pointer.StringDeref(vmssVM2.ID, ""): {Response: resp2}, + } + firstPut = armClient.EXPECT().PutResourcesInBatches(gomock.Any(), testvmssVMs1, 0).Return(resps1) + armClient.EXPECT().PutResourcesInBatches(gomock.Any(), testvmssVMs2, 0).Return(resps2).After(firstPut) + armClient.EXPECT().CloseResponse(gomock.Any(), gomock.Any()).Times(2) + + vmssvmClient = getTestVMSSVMClient(armClient) + rerr = vmssvmClient.UpdateVMs(context.TODO(), "rg", "vmss1", instances, "test", 0) + assert.Nil(t, rerr) +} + func TestUpdateVMsNeverRateLimiter(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() diff --git a/pkg/consts/consts.go b/pkg/consts/consts.go index f9692c2012..65cf631105 100644 --- a/pkg/consts/consts.go +++ b/pkg/consts/consts.go @@ -413,8 +413,8 @@ const ( CannotUpdateVMBeingDeletedMessagePrefix = "'Put on Virtual Machine Scale Set VM Instance' is not allowed on Virtual Machine Scale Set" // CannotUpdateVMBeingDeletedMessageSuffix is the suffix of the error message that the request failed due to delete a VM that is being deleted CannotUpdateVMBeingDeletedMessageSuffix = "since it is marked for deletion" - // OperationPreemptedErrorCode is the error code returned for vm operation preempted errors - OperationPreemptedErrorCode = "OperationPreempted" + // OperationPreemptedErrorMessage is the error message returned for vm operation preempted errors + OperationPreemptedErrorMessage = "Operation execution has been preempted by a more recent operation" ) // node ipam controller diff --git a/pkg/retry/azure_error.go b/pkg/retry/azure_error.go index 81691fc89f..923e33de0d 100644 --- a/pkg/retry/azure_error.go +++ b/pkg/retry/azure_error.go @@ -149,7 +149,7 @@ func GetError(resp *http.Response, err error) *Error { return nil } - if err == nil && resp != nil && isSuccessHTTPResponse(resp) { + if err == nil && resp != nil && IsSuccessHTTPResponse(resp) { // HTTP 2xx suggests a successful response return nil } @@ -166,8 +166,8 @@ func GetError(resp *http.Response, err error) *Error { } } -// isSuccessHTTPResponse determines if the response from an HTTP request suggests success -func isSuccessHTTPResponse(resp *http.Response) bool { +// IsSuccessHTTPResponse determines if the response from an HTTP request suggests success +func IsSuccessHTTPResponse(resp *http.Response) bool { if resp == nil { return false } @@ -219,7 +219,7 @@ func shouldRetryHTTPRequest(resp *http.Response, err error) bool { } // should retry on <200, error>. - if isSuccessHTTPResponse(resp) && err != nil { + if IsSuccessHTTPResponse(resp) && err != nil { return true } diff --git a/pkg/retry/azure_error_test.go b/pkg/retry/azure_error_test.go index 8e6d5020f8..294cc76fdd 100644 --- a/pkg/retry/azure_error_test.go +++ b/pkg/retry/azure_error_test.go @@ -303,7 +303,7 @@ func TestIsSuccessResponse(t *testing.T) { resp := http.Response{ StatusCode: test.code, } - res := isSuccessHTTPResponse(&resp) + res := IsSuccessHTTPResponse(&resp) if res != test.expected { t.Errorf("expected: %v, saw: %v", test.expected, res) } @@ -311,7 +311,7 @@ func TestIsSuccessResponse(t *testing.T) { } func TestIsSuccessResponseNil(t *testing.T) { - res := isSuccessHTTPResponse(nil) + res := IsSuccessHTTPResponse(nil) assert.Equal(t, false, res) }