Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry pick of #83102: Fix aggressive VM calls for Azure VMSS. #83656

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@ func (ss *scaleSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nod
defer cancel()

// Invalidate the cache right after updating
key := buildVmssCacheKey(nodeResourceGroup, ss.makeVmssVMName(ssName, instanceID))
defer ss.vmssVMCache.Delete(key)
if err = ss.deleteCacheForNode(vmName); err != nil {
return err
}

klog.V(2).Infof("azureDisk - update(%s): vm(%s) - attach disk(%s, %s)", nodeResourceGroup, nodeName, diskName, diskURI)
_, err = ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, ssName, instanceID, newVM, "attach_disk")
Expand Down Expand Up @@ -155,8 +156,9 @@ func (ss *scaleSet) DetachDisk(diskName, diskURI string, nodeName types.NodeName
defer cancel()

// Invalidate the cache right after updating
key := buildVmssCacheKey(nodeResourceGroup, ss.makeVmssVMName(ssName, instanceID))
defer ss.vmssVMCache.Delete(key)
if err = ss.deleteCacheForNode(vmName); err != nil {
return nil, err
}

klog.V(2).Infof("azureDisk - update(%s): vm(%s) - detach disk(%s, %s)", nodeResourceGroup, nodeName, diskName, diskURI)
return ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, ssName, instanceID, newVM, "detach_disk")
Expand Down
141 changes: 87 additions & 54 deletions staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sort"
"strconv"
"strings"
"sync"

"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-03-01/compute"
"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-07-01/network"
Expand Down Expand Up @@ -55,10 +56,8 @@ type scaleSet struct {
// (e.g. master nodes) may not belong to any scale sets.
availabilitySet VMSet

vmssCache *timedCache
vmssVMCache *timedCache
nodeNameToScaleSetMappingCache *timedCache
availabilitySetNodesCache *timedCache
vmssVMCache *timedCache
availabilitySetNodesCache *timedCache
}

// newScaleSet creates a new scaleSet.
Expand All @@ -69,22 +68,12 @@ func newScaleSet(az *Cloud) (VMSet, error) {
availabilitySet: newAvailabilitySet(az),
}

ss.nodeNameToScaleSetMappingCache, err = ss.newNodeNameToScaleSetMappingCache()
if err != nil {
return nil, err
}

ss.availabilitySetNodesCache, err = ss.newAvailabilitySetNodesCache()
if err != nil {
return nil, err
}

ss.vmssCache, err = ss.newVmssCache()
if err != nil {
return nil, err
}

ss.vmssVMCache, err = ss.newVmssVMCache()
ss.vmssVMCache, err = ss.newVMSSVirtualMachinesCache()
if err != nil {
return nil, err
}
Expand All @@ -94,39 +83,46 @@ func newScaleSet(az *Cloud) (VMSet, error) {

// getVmssVM gets virtualMachineScaleSetVM by nodeName from cache.
// It returns cloudprovider.InstanceNotFound if node does not belong to any scale sets.
func (ss *scaleSet) getVmssVM(nodeName string) (ssName, instanceID string, vm compute.VirtualMachineScaleSetVM, err error) {
instanceID, err = getScaleSetVMInstanceID(nodeName)
if err != nil {
return ssName, instanceID, vm, err
}
func (ss *scaleSet) getVmssVM(nodeName string) (string, string, *compute.VirtualMachineScaleSetVM, error) {
getter := func(nodeName string) (string, string, *compute.VirtualMachineScaleSetVM, error) {
cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey)
if err != nil {
return "", "", nil, err
}

ssName, err = ss.getScaleSetNameByNodeName(nodeName)
if err != nil {
return ssName, instanceID, vm, err
}
virtualMachines := cached.(*sync.Map)
if vm, ok := virtualMachines.Load(nodeName); ok {
result := vm.(*vmssVirtualMachinesEntry)
return result.vmssName, result.instanceID, result.virtualMachine, nil
}

if ssName == "" {
return "", "", vm, cloudprovider.InstanceNotFound
return "", "", nil, nil
}

resourceGroup, err := ss.GetNodeResourceGroup(nodeName)
_, err := getScaleSetVMInstanceID(nodeName)
if err != nil {
return "", "", vm, err
return "", "", nil, err
}

klog.V(4).Infof("getVmssVM gets scaleSetName (%q) and instanceID (%q) for node %q", ssName, instanceID, nodeName)
key := buildVmssCacheKey(resourceGroup, ss.makeVmssVMName(ssName, instanceID))
cachedVM, err := ss.vmssVMCache.Get(key)
vmssName, instanceID, vm, err := getter(nodeName)
if err != nil {
return ssName, instanceID, vm, err
return "", "", nil, err
}
if vm != nil {
return vmssName, instanceID, vm, nil
}

if cachedVM == nil {
klog.Errorf("Can't find node (%q) in any scale sets", nodeName)
return ssName, instanceID, vm, cloudprovider.InstanceNotFound
klog.V(3).Infof("Couldn't find VMSS VM with nodeName %s, refreshing the cache", nodeName)
ss.vmssVMCache.Delete(vmssVirtualMachinesKey)
vmssName, instanceID, vm, err = getter(nodeName)
if err != nil {
return "", "", nil, err
}

return ssName, instanceID, *(cachedVM.(*compute.VirtualMachineScaleSetVM)), nil
if vm == nil {
return "", "", nil, cloudprovider.InstanceNotFound
}
return vmssName, instanceID, vm, nil
}

// GetPowerStatusByNodeName returns the power state of the specified node.
Expand All @@ -151,20 +147,49 @@ func (ss *scaleSet) GetPowerStatusByNodeName(name string) (powerState string, er

// getCachedVirtualMachineByInstanceID gets scaleSetVMInfo from cache.
// The node must belong to one of scale sets.
func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID string) (vm compute.VirtualMachineScaleSetVM, err error) {
vmName := ss.makeVmssVMName(scaleSetName, instanceID)
key := buildVmssCacheKey(resourceGroup, vmName)
cachedVM, err := ss.vmssVMCache.Get(key)
func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID string) (*compute.VirtualMachineScaleSetVM, error) {
getter := func() (vm *compute.VirtualMachineScaleSetVM, found bool, err error) {
cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey)
if err != nil {
return nil, false, err
}

virtualMachines := cached.(*sync.Map)
virtualMachines.Range(func(key, value interface{}) bool {
vmEntry := value.(*vmssVirtualMachinesEntry)
if strings.EqualFold(vmEntry.resourceGroup, resourceGroup) &&
strings.EqualFold(vmEntry.vmssName, scaleSetName) &&
strings.EqualFold(vmEntry.instanceID, instanceID) {
vm = vmEntry.virtualMachine
found = true
return false
}

return true
})

return vm, found, nil
}

vm, found, err := getter()
if err != nil {
return vm, err
return nil, err
}
if found {
return vm, nil
}

if cachedVM == nil {
klog.Errorf("couldn't find vmss virtual machine by scaleSetName (%s) and instanceID (%s)", scaleSetName, instanceID)
return vm, cloudprovider.InstanceNotFound
klog.V(3).Infof("Couldn't find VMSS VM with scaleSetName %q and instanceID %q, refreshing the cache", scaleSetName, instanceID)
ss.vmssVMCache.Delete(vmssVirtualMachinesKey)
vm, found, err = getter()
if err != nil {
return nil, err
}
if !found {
return nil, cloudprovider.InstanceNotFound
}

return *(cachedVM.(*compute.VirtualMachineScaleSetVM)), nil
return vm, nil
}

// GetInstanceIDByNodeName gets the cloud provider ID by node name.
Expand Down Expand Up @@ -432,9 +457,15 @@ func (ss *scaleSet) listScaleSets(resourceGroup string) ([]string, error) {
return nil, err
}

ssNames := make([]string, len(allScaleSets))
for i := range allScaleSets {
ssNames[i] = *(allScaleSets[i].Name)
ssNames := make([]string, 0)
for _, vmss := range allScaleSets {
name := *vmss.Name
if vmss.Sku != nil && to.Int64(vmss.Sku.Capacity) == 0 {
klog.V(3).Infof("Capacity of VMSS %q is 0, skipping", name)
continue
}

ssNames = append(ssNames, name)
}

return ssNames, nil
Expand Down Expand Up @@ -469,7 +500,7 @@ func (ss *scaleSet) getAgentPoolScaleSets(nodes []*v1.Node) (*[]string, error) {
}

nodeName := nodes[nx].Name
ssName, err := ss.getScaleSetNameByNodeName(nodeName)
ssName, _, _, err := ss.getVmssVM(nodeName)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -568,7 +599,7 @@ func (ss *scaleSet) GetPrimaryInterface(nodeName string) (network.Interface, err
return network.Interface{}, err
}

primaryInterfaceID, err := ss.getPrimaryInterfaceID(vm)
primaryInterfaceID, err := ss.getPrimaryInterfaceID(*vm)
if err != nil {
klog.Errorf("error: ss.GetPrimaryInterface(%s), ss.getPrimaryInterfaceID(), err=%v", nodeName, err)
return network.Interface{}, err
Expand Down Expand Up @@ -748,8 +779,9 @@ func (ss *scaleSet) EnsureHostInPool(service *v1.Service, nodeName types.NodeNam
}

// Invalidate the cache since we would update it.
key := buildVmssCacheKey(nodeResourceGroup, ss.makeVmssVMName(ssName, instanceID))
defer ss.vmssVMCache.Delete(key)
if err = ss.deleteCacheForNode(vmName); err != nil {
return err
}

// Update vmssVM with backoff.
ctx, cancel := getContextWithCancel()
Expand Down Expand Up @@ -1022,8 +1054,9 @@ func (ss *scaleSet) ensureBackendPoolDeletedFromNode(service *v1.Service, nodeNa
}

// Invalidate the cache since we would update it.
key := buildVmssCacheKey(nodeResourceGroup, ss.makeVmssVMName(ssName, instanceID))
defer ss.vmssVMCache.Delete(key)
if err = ss.deleteCacheForNode(nodeName); err != nil {
return err
}

// Update vmssVM with backoff.
ctx, cancel := getContextWithCancel()
Expand Down