Skip to content

Commit

Permalink
Merge pull request #2608 from mukhoakash/fix_vmss
Browse files Browse the repository at this point in the history
Fix: Updating vmssvmcache instead of invalidating vm entry on UpdateVM operation
  • Loading branch information
jwtty committed Nov 8, 2022
2 parents e9a699a + 6e788a4 commit e8e8bec
Show file tree
Hide file tree
Showing 13 changed files with 207 additions and 60 deletions.
42 changes: 27 additions & 15 deletions pkg/azureclients/vmssvmclient/azure_vmssvmclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,34 +231,32 @@ func (c *Client) listVMSSVM(ctx context.Context, resourceGroupName string, virtu
}

// Update updates a VirtualMachineScaleSetVM.
func (c *Client) Update(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string, parameters compute.VirtualMachineScaleSetVM, source string) *retry.Error {
func (c *Client) Update(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string, parameters compute.VirtualMachineScaleSetVM, source string) (*compute.VirtualMachineScaleSetVM, *retry.Error) {
mc := metrics.NewMetricContext("vmssvm", "update", resourceGroupName, c.subscriptionID, source)

// Report errors if the client is rate limited.
if !c.rateLimiterWriter.TryAccept() {
mc.RateLimitedCount()
return retry.GetRateLimitError(true, "VMSSVMUpdate")
return nil, retry.GetRateLimitError(true, "VMSSVMUpdate")
}

// Report errors if the client is throttled.
if c.RetryAfterWriter.After(time.Now()) {
mc.ThrottledCount()
rerr := retry.GetThrottlingError("VMSSVMUpdate", "client throttled", c.RetryAfterWriter)
return rerr
return nil, rerr
}

rerr := c.updateVMSSVM(ctx, resourceGroupName, VMScaleSetName, instanceID, parameters)
result, rerr := c.updateVMSSVM(ctx, resourceGroupName, VMScaleSetName, instanceID, parameters)
mc.Observe(rerr)
if rerr != nil {
if rerr.IsThrottled() {
// Update RetryAfterReader so that no more requests would be sent until RetryAfter expires.
c.RetryAfterWriter = rerr.RetryAfter
}

return rerr
}

return nil
return result, rerr
}

// UpdateAsync updates a VirtualMachineScaleSetVM asynchronously
Expand Down Expand Up @@ -302,7 +300,7 @@ func (c *Client) UpdateAsync(ctx context.Context, resourceGroupName string, VMSc
}

// WaitForUpdateResult waits for the response of the update request
func (c *Client) WaitForUpdateResult(ctx context.Context, future *azure.Future, resourceGroupName, source string) *retry.Error {
func (c *Client) WaitForUpdateResult(ctx context.Context, future *azure.Future, resourceGroupName, source string) (*compute.VirtualMachineScaleSetVM, *retry.Error) {
mc := metrics.NewMetricContext("vmss", "wait_for_update_result", resourceGroupName, c.subscriptionID, source)
response, err := c.armClient.WaitForAsyncOperationResult(ctx, future, "VMSSWaitForUpdateResult")
mc.Observe(retry.NewErrorOrNil(false, err))
Expand All @@ -312,13 +310,25 @@ func (c *Client) WaitForUpdateResult(ctx context.Context, future *azure.Future,
} else {
klog.V(5).Infof("Received error in WaitForAsyncOperationResult: '%s', no response", err.Error())
}
return retry.GetError(response, err)
return nil, retry.GetError(response, err)
}
return nil

if response != nil && response.StatusCode != http.StatusNoContent {
result, rerr := c.updateResponder(response)
if rerr != nil {
klog.V(5).Infof("Received error in WaitForAsyncOperationResult updateResponder: '%s'", rerr.Error())
}

return result, rerr
}

result := &compute.VirtualMachineScaleSetVM{}
result.Response = autorest.Response{Response: response}
return result, nil
}

// updateVMSSVM updates a VirtualMachineScaleSetVM.
func (c *Client) updateVMSSVM(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string, parameters compute.VirtualMachineScaleSetVM) *retry.Error {
func (c *Client) updateVMSSVM(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string, parameters compute.VirtualMachineScaleSetVM) (*compute.VirtualMachineScaleSetVM, *retry.Error) {
resourceID := armclient.GetChildResourceID(
c.subscriptionID,
resourceGroupName,
Expand All @@ -332,18 +342,20 @@ func (c *Client) updateVMSSVM(ctx context.Context, resourceGroupName string, VMS
defer c.armClient.CloseResponse(ctx, response)
if rerr != nil {
klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "vmssvm.put.request", resourceID, rerr.Error())
return rerr
return nil, rerr
}

if response != nil && response.StatusCode != http.StatusNoContent {
_, rerr = c.updateResponder(response)
result, rerr := c.updateResponder(response)
if rerr != nil {
klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "vmssvm.put.respond", resourceID, rerr.Error())
return rerr
}
return result, rerr
}

return nil
result := &compute.VirtualMachineScaleSetVM{}
result.Response = autorest.Response{Response: response}
return result, nil
}

func (c *Client) updateResponder(resp *http.Response) (*compute.VirtualMachineScaleSetVM, *retry.Error) {
Expand Down
31 changes: 25 additions & 6 deletions pkg/azureclients/vmssvmclient/azure_vmssvmclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,9 +509,13 @@ func TestUpdate(t *testing.T) {
armClient.EXPECT().PutResource(gomock.Any(), to.String(vmssVM.ID), vmssVM).Return(response, nil).Times(1)
armClient.EXPECT().CloseResponse(gomock.Any(), gomock.Any()).Times(1)

expected := &compute.VirtualMachineScaleSetVM{}
expected.Response = autorest.Response{Response: response}

vmssClient := getTestVMSSVMClient(armClient)
rerr := vmssClient.Update(context.TODO(), "rg", "vmss1", "0", vmssVM, "test")
result, rerr := vmssClient.Update(context.TODO(), "rg", "vmss1", "0", vmssVM, "test")
assert.Nil(t, rerr)
assert.Equal(t, expected, result)
}

func TestUpdateAsync(t *testing.T) {
Expand Down Expand Up @@ -575,8 +579,14 @@ func TestWaitForUpdateResult(t *testing.T) {
armClient.EXPECT().WaitForAsyncOperationResult(gomock.Any(), gomock.Any(), "VMSSWaitForUpdateResult").Return(test.response, test.responseErr).Times(1)

vmssClient := getTestVMSSVMClient(armClient)
err := vmssClient.WaitForUpdateResult(context.TODO(), &azure.Future{}, "rg", "test")
response, err := vmssClient.WaitForUpdateResult(context.TODO(), &azure.Future{}, "rg", "test")
assert.Equal(t, err, test.expectedResult)
var output *compute.VirtualMachineScaleSetVM
if err == nil {
output = &compute.VirtualMachineScaleSetVM{}
output.Response = autorest.Response{Response: test.response}
}
assert.Equal(t, response, output)
}
}

Expand All @@ -592,10 +602,13 @@ func TestUpdateWithUpdateResponderError(t *testing.T) {
}
armClient.EXPECT().PutResource(gomock.Any(), to.String(vmssVM.ID), vmssVM).Return(response, nil).Times(1)
armClient.EXPECT().CloseResponse(gomock.Any(), gomock.Any()).Times(1)
expected := &compute.VirtualMachineScaleSetVM{}
expected.Response = autorest.Response{Response: response}

vmssvmClient := getTestVMSSVMClient(armClient)
rerr := vmssvmClient.Update(context.TODO(), "rg", "vmss1", "0", vmssVM, "test")
result, rerr := vmssvmClient.Update(context.TODO(), "rg", "vmss1", "0", vmssVM, "test")
assert.NotNil(t, rerr)
assert.Equal(t, expected, result)
}

func TestUpdateNeverRateLimiter(t *testing.T) {
Expand All @@ -610,9 +623,11 @@ func TestUpdateNeverRateLimiter(t *testing.T) {
armClient := mockarmclient.NewMockInterface(ctrl)
vmssvmClient := getTestVMSSVMClientWithNeverRateLimiter(armClient)
vmssVM := getTestVMSSVM("vmss1", "0")
rerr := vmssvmClient.Update(context.TODO(), "rg", "vmss1", "0", vmssVM, "test")
var expected *compute.VirtualMachineScaleSetVM
result, rerr := vmssvmClient.Update(context.TODO(), "rg", "vmss1", "0", vmssVM, "test")
assert.NotNil(t, rerr)
assert.Equal(t, vmssvmUpdateErr, rerr)
assert.Equal(t, expected, result)
}

func TestUpdateRetryAfterReader(t *testing.T) {
Expand All @@ -628,9 +643,11 @@ func TestUpdateRetryAfterReader(t *testing.T) {
vmssVM := getTestVMSSVM("vmss1", "0")
armClient := mockarmclient.NewMockInterface(ctrl)
vmClient := getTestVMSSVMClientWithRetryAfterReader(armClient)
rerr := vmClient.Update(context.TODO(), "rg", "vmss1", "0", vmssVM, "test")
var expected *compute.VirtualMachineScaleSetVM
result, rerr := vmClient.Update(context.TODO(), "rg", "vmss1", "0", vmssVM, "test")
assert.NotNil(t, rerr)
assert.Equal(t, vmssvmUpdateErr, rerr)
assert.Equal(t, expected, result)
}

func TestUpdateThrottle(t *testing.T) {
Expand All @@ -654,9 +671,11 @@ func TestUpdateThrottle(t *testing.T) {
armClient.EXPECT().CloseResponse(gomock.Any(), gomock.Any()).Times(1)

vmssvmClient := getTestVMSSVMClient(armClient)
rerr := vmssvmClient.Update(context.TODO(), "rg", "vmss1", "0", vmssVM, "test")
var expected *compute.VirtualMachineScaleSetVM
result, rerr := vmssvmClient.Update(context.TODO(), "rg", "vmss1", "0", vmssVM, "test")
assert.NotNil(t, rerr)
assert.Equal(t, throttleErr, rerr)
assert.Equal(t, expected, result)
}

func TestUpdateVMs(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/azureclients/vmssvmclient/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ type Interface interface {
List(ctx context.Context, resourceGroupName string, virtualMachineScaleSetName string, expand string) ([]compute.VirtualMachineScaleSetVM, *retry.Error)

// Update updates a VirtualMachineScaleSetVM.
Update(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string, parameters compute.VirtualMachineScaleSetVM, source string) *retry.Error
Update(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string, parameters compute.VirtualMachineScaleSetVM, source string) (*compute.VirtualMachineScaleSetVM, *retry.Error)

// UpdateAsync updates a VirtualMachineScaleSetVM asynchronously
UpdateAsync(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string, parameters compute.VirtualMachineScaleSetVM, source string) (*azure.Future, *retry.Error)

// WaitForUpdateResult waits for the response of the update request
WaitForUpdateResult(ctx context.Context, future *azure.Future, resourceGroupName, source string) *retry.Error
WaitForUpdateResult(ctx context.Context, future *azure.Future, resourceGroupName, source string) (*compute.VirtualMachineScaleSetVM, *retry.Error)

// UpdateVMs updates a list of VirtualMachineScaleSetVM from map[instanceID]compute.VirtualMachineScaleSetVM.
UpdateVMs(ctx context.Context, resourceGroupName string, VMScaleSetName string, instances map[string]compute.VirtualMachineScaleSetVM, source string, batchSize int) *retry.Error
Expand Down
14 changes: 8 additions & 6 deletions pkg/azureclients/vmssvmclient/mockvmssvmclient/interface.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions pkg/cache/azure_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,12 @@ func (t *TimedCache) Set(key string, data interface{}) {
CreatedOn: time.Now().UTC(),
})
}

// Update updates the data cache for the key.
func (t *TimedCache) Update(key string, data interface{}) {
_ = t.Store.Update(&AzureCacheEntry{
Key: key,
Data: data,
CreatedOn: time.Now().UTC(),
})
}
2 changes: 1 addition & 1 deletion pkg/provider/azure_controller_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (c *controllerCommon) AttachDisk(ctx context.Context, async bool, diskName,
if err != nil {
return -1, err
}
return lun, vmset.WaitForUpdateResult(ctx, future, resourceGroup, "attach_disk")
return lun, vmset.WaitForUpdateResult(ctx, future, nodeName, resourceGroup, "attach_disk")
}

func (c *controllerCommon) insertAttachDiskRequest(diskURI, nodeName string, options *AttachDiskOptions) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/provider/azure_controller_standard.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (as *availabilitySet) DeleteCacheForNode(nodeName string) error {
}

// WaitForUpdateResult waits for the response of the update request
func (as *availabilitySet) WaitForUpdateResult(ctx context.Context, future *azure.Future, resourceGroupName, source string) error {
func (as *availabilitySet) WaitForUpdateResult(ctx context.Context, future *azure.Future, nodeName types.NodeName, resourceGroupName, source string) error {
if rerr := as.VirtualMachinesClient.WaitForUpdateResult(ctx, future, resourceGroupName, source); rerr != nil {
return rerr.Error()
}
Expand Down

0 comments on commit e8e8bec

Please sign in to comment.