diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go index 42f6a426c460..45440f4f7de3 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go @@ -60,6 +60,13 @@ const ( vmssVMInstanceUpdateDelay = 3 * time.Second ) +// nodeIdentity identifies a node within a subscription. +type nodeIdentity struct { + resourceGroup string + vmssName string + nodeName string +} + // scaleSet implements VMSet interface for Azure scale set. type scaleSet struct { *Cloud @@ -69,7 +76,7 @@ type scaleSet struct { availabilitySet VMSet vmssCache *timedCache - vmssVMCache *timedCache + vmssVMCache *sync.Map // [resourcegroup/vmssname]*timedCache availabilitySetNodesCache *timedCache } @@ -79,6 +86,7 @@ func newScaleSet(az *Cloud) (VMSet, error) { ss := &scaleSet{ Cloud: az, availabilitySet: newAvailabilitySet(az), + vmssVMCache: &sync.Map{}, } ss.availabilitySetNodesCache, err = ss.newAvailabilitySetNodesCache() @@ -91,11 +99,6 @@ func newScaleSet(az *Cloud) (VMSet, error) { return nil, err } - ss.vmssVMCache, err = ss.newVMSSVirtualMachinesCache() - if err != nil { - return nil, err - } - return ss, nil } @@ -136,12 +139,17 @@ func (ss *scaleSet) getVMSS(vmssName string, crt cacheReadType) (*compute.Virtua return vmss, nil } -// getVmssVM gets virtualMachineScaleSetVM by nodeName from cache. -// It returns cloudprovider.InstanceNotFound if node does not belong to any scale sets. -func (ss *scaleSet) getVmssVM(nodeName string, crt cacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, error) { +// getVmssVMByNodeIdentity find virtualMachineScaleSetVM by nodeIdentity, using node's parent VMSS cache. +// Returns cloudprovider.InstanceNotFound if the node does not belong to the scale set named in nodeIdentity. +func (ss *scaleSet) getVmssVMByNodeIdentity(node *nodeIdentity, crt cacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, error) { + cacheKey, cache, err := ss.getVMSSVMCache(node.resourceGroup, node.vmssName) + if err != nil { + return "", "", nil, err + } + getter := func(nodeName string, crt cacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, bool, error) { var found bool - cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, crt) + cached, err := cache.Get(cacheKey, crt) if err != nil { return "", "", nil, found, err } @@ -156,19 +164,19 @@ func (ss *scaleSet) getVmssVM(nodeName string, crt cacheReadType) (string, strin return "", "", nil, found, nil } - _, err := getScaleSetVMInstanceID(nodeName) + _, err = getScaleSetVMInstanceID(node.nodeName) if err != nil { return "", "", nil, err } - vmssName, instanceID, vm, found, err := getter(nodeName, crt) + vmssName, instanceID, vm, found, err := getter(node.nodeName, crt) if err != nil { return "", "", nil, err } if !found { - klog.V(2).Infof("Couldn't find VMSS VM with nodeName %s, refreshing the cache", nodeName) - vmssName, instanceID, vm, found, err = getter(nodeName, cacheReadTypeForceRefresh) + klog.V(2).Infof("Couldn't find VMSS VM with nodeName %s, refreshing the cache", node.nodeName) + vmssName, instanceID, vm, found, err = getter(node.nodeName, cacheReadTypeForceRefresh) if err != nil { return "", "", nil, err } @@ -184,6 +192,17 @@ func (ss *scaleSet) getVmssVM(nodeName string, crt cacheReadType) (string, strin return vmssName, instanceID, vm, nil } +// getVmssVM gets virtualMachineScaleSetVM by nodeName from cache. +// Returns cloudprovider.InstanceNotFound if nodeName does not belong to any scale set. +func (ss *scaleSet) getVmssVM(nodeName string, crt cacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, error) { + node, err := ss.getNodeIdentityByNodeName(nodeName, crt) + if err != nil { + return "", "", nil, err + } + + return ss.getVmssVMByNodeIdentity(node, crt) +} + // GetPowerStatusByNodeName returns the power state of the specified node. func (ss *scaleSet) GetPowerStatusByNodeName(name string) (powerState string, err error) { managedByAS, err := ss.isNodeManagedByAvailabilitySet(name, cacheReadTypeUnsafe) @@ -219,8 +238,13 @@ func (ss *scaleSet) GetPowerStatusByNodeName(name string) (powerState string, er // getCachedVirtualMachineByInstanceID gets scaleSetVMInfo from cache. // The node must belong to one of scale sets. func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID string, crt cacheReadType) (*compute.VirtualMachineScaleSetVM, error) { + cacheKey, cache, err := ss.getVMSSVMCache(resourceGroup, scaleSetName) + if err != nil { + return nil, err + } + getter := func(crt cacheReadType) (vm *compute.VirtualMachineScaleSetVM, found bool, err error) { - cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, crt) + cached, err := cache.Get(cacheKey, crt) if err != nil { return nil, false, err } @@ -591,6 +615,66 @@ func (ss *scaleSet) listScaleSets(resourceGroup string) ([]string, error) { return ssNames, nil } +// getNodeIdentityByNodeName use the VMSS cache to find a node's resourcegroup and vmss, returned in a nodeIdentity. +func (ss *scaleSet) getNodeIdentityByNodeName(nodeName string, crt cacheReadType) (*nodeIdentity, error) { + getter := func(nodeName string, crt cacheReadType) (*nodeIdentity, error) { + node := &nodeIdentity{ + nodeName: nodeName, + } + + cached, err := ss.vmssCache.Get(vmssKey, crt) + if err != nil { + return nil, err + } + + vmsses := cached.(*sync.Map) + vmsses.Range(func(key, value interface{}) bool { + v := value.(*vmssEntry) + if v.vmss.Name == nil { + return true + } + + vmssPrefix := *v.vmss.Name + if v.vmss.VirtualMachineProfile != nil && + v.vmss.VirtualMachineProfile.OsProfile != nil && + v.vmss.VirtualMachineProfile.OsProfile.ComputerNamePrefix != nil { + vmssPrefix = *v.vmss.VirtualMachineProfile.OsProfile.ComputerNamePrefix + } + + if strings.EqualFold(vmssPrefix, nodeName[:len(nodeName)-6]) { + node.vmssName = *v.vmss.Name + node.resourceGroup = v.resourceGroup + return false + } + + return true + }) + return node, nil + } + + if _, err := getScaleSetVMInstanceID(nodeName); err != nil { + return nil, err + } + + node, err := getter(nodeName, crt) + if err != nil { + return nil, err + } + if node.vmssName != "" { + return node, nil + } + + klog.V(2).Infof("Couldn't find VMSS for node %s, refreshing the cache", nodeName) + node, err = getter(nodeName, cacheReadTypeForceRefresh) + if err != nil { + return nil, err + } + if node.vmssName == "" { + return nil, cloudprovider.InstanceNotFound + } + return node, nil +} + // listScaleSetVMs lists VMs belonging to the specified scale set. func (ss *scaleSet) listScaleSetVMs(scaleSetName, resourceGroup string) ([]compute.VirtualMachineScaleSetVM, error) { var err error diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache.go index fd30af6c2209..210dfee60303 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache.go @@ -20,6 +20,7 @@ package azure import ( "context" + "fmt" "strings" "sync" "time" @@ -36,7 +37,6 @@ var ( vmssCacheSeparator = "#" vmssKey = "k8svmssKey" - vmssVirtualMachinesKey = "k8svmssVirtualMachinesKey" availabilitySetNodesKey = "k8sAvailabilitySetNodesKey" availabilitySetNodesCacheTTL = 15 * time.Minute @@ -55,8 +55,9 @@ type vmssVirtualMachinesEntry struct { } type vmssEntry struct { - vmss *compute.VirtualMachineScaleSet - lastUpdate time.Time + vmss *compute.VirtualMachineScaleSet + resourceGroup string + lastUpdate time.Time } func (ss *scaleSet) newVMSSCache() (*timedCache, error) { @@ -82,8 +83,9 @@ func (ss *scaleSet) newVMSSCache() (*timedCache, error) { continue } localCache.Store(*scaleSet.Name, &vmssEntry{ - vmss: &scaleSet, - lastUpdate: time.Now().UTC(), + vmss: &scaleSet, + resourceGroup: resourceGroup, + lastUpdate: time.Now().UTC(), }) } } @@ -108,20 +110,58 @@ func extractVmssVMName(name string) (string, string, error) { return ssName, instanceID, nil } -func (ss *scaleSet) newVMSSVirtualMachinesCache() (*timedCache, error) { - if ss.Config.VmssVirtualMachinesCacheTTLInSeconds == 0 { - ss.Config.VmssVirtualMachinesCacheTTLInSeconds = vmssVirtualMachinesCacheTTLDefaultInSeconds +// getVMSSVMCache returns an *timedCache and cache key for a VMSS (creating that cache if new). +func (ss *scaleSet) getVMSSVMCache(resourceGroup, vmssName string) (string, *timedCache, error) { + cacheKey := strings.ToLower(fmt.Sprintf("%s/%s", resourceGroup, vmssName)) + if entry, ok := ss.vmssVMCache.Load(cacheKey); ok { + cache := entry.(*timedCache) + return cacheKey, cache, nil } - vmssVirtualMachinesCacheTTL := time.Duration(ss.Config.VmssVirtualMachinesCacheTTLInSeconds) * time.Second + cache, err := ss.newVMSSVirtualMachinesCache(resourceGroup, vmssName, cacheKey) + if err != nil { + return "", nil, err + } + ss.vmssVMCache.Store(cacheKey, cache) + return cacheKey, cache, nil +} + +// gcVMSSVMCache delete stale VMSS VMs caches from deleted VMSSes. +func (ss *scaleSet) gcVMSSVMCache() error { + cached, err := ss.vmssCache.Get(vmssKey, cacheReadTypeUnsafe) + if err != nil { + return err + } + + vmsses := cached.(*sync.Map) + removed := map[string]bool{} + ss.vmssVMCache.Range(func(key, value interface{}) bool { + cacheKey := key.(string) + vlistIdx := cacheKey[strings.LastIndex(cacheKey, "/")+1:] + if _, ok := vmsses.Load(vlistIdx); !ok { + removed[cacheKey] = true + } + return true + }) + + for key := range removed { + ss.vmssVMCache.Delete(key) + } + + return nil +} + +// newVMSSVirtualMachinesCache instanciates a new VMs cache for VMs belonging to the provided VMSS. +func (ss *scaleSet) newVMSSVirtualMachinesCache(resourceGroupName, vmssName, cacheKey string) (*timedCache, error) { getter := func(key string) (interface{}, error) { localCache := &sync.Map{} // [nodeName]*vmssVirtualMachinesEntry oldCache := make(map[string]vmssVirtualMachinesEntry) - if ss.vmssVMCache != nil { + if vmssCache, ok := ss.vmssVMCache.Load(cacheKey); ok { // get old cache before refreshing the cache - entry, exists, err := ss.vmssVMCache.store.GetByKey(vmssVirtualMachinesKey) + cache := vmssCache.(*timedCache) + entry, exists, err := cache.store.GetByKey(cacheKey) if err != nil { return nil, err } @@ -137,70 +177,61 @@ func (ss *scaleSet) newVMSSVirtualMachinesCache() (*timedCache, error) { } } - allResourceGroups, err := ss.GetResourceGroups() + vms, err := ss.listScaleSetVMs(vmssName, resourceGroupName) if err != nil { return nil, err } - for _, resourceGroup := range allResourceGroups.List() { - scaleSetNames, err := ss.listScaleSets(resourceGroup) - if err != nil { - return nil, err + for i := range vms { + vm := vms[i] + if vm.OsProfile == nil || vm.OsProfile.ComputerName == nil { + klog.Warningf("failed to get computerName for vmssVM (%q)", vmssName) + continue } - for _, ssName := range scaleSetNames { - vms, err := ss.listScaleSetVMs(ssName, resourceGroup) - if err != nil { - return nil, err - } - - for i := range vms { - vm := vms[i] - if vm.OsProfile == nil || vm.OsProfile.ComputerName == nil { - klog.Warningf("failed to get computerName for vmssVM (%q)", ssName) - continue - } - - computerName := strings.ToLower(*vm.OsProfile.ComputerName) - localCache.Store(computerName, &vmssVirtualMachinesEntry{ - resourceGroup: resourceGroup, - vmssName: ssName, - instanceID: to.String(vm.InstanceID), - virtualMachine: &vm, - lastUpdate: time.Now().UTC(), - }) - - if _, exists := oldCache[computerName]; exists { - delete(oldCache, computerName) - } - } + computerName := strings.ToLower(*vm.OsProfile.ComputerName) + vmssVMCacheEntry := &vmssVirtualMachinesEntry{ + resourceGroup: resourceGroupName, + vmssName: vmssName, + instanceID: to.String(vm.InstanceID), + virtualMachine: &vm, + lastUpdate: time.Now().UTC(), + } + // set cache entry to nil when the VM is under deleting. + if vm.VirtualMachineScaleSetVMProperties != nil && + strings.EqualFold(to.String(vm.VirtualMachineScaleSetVMProperties.ProvisioningState), string(compute.ProvisioningStateDeleting)) { + klog.V(4).Infof("VMSS virtualMachine %q is under deleting, setting its cache to nil", computerName) + vmssVMCacheEntry.virtualMachine = nil } + localCache.Store(computerName, vmssVMCacheEntry) - // add old missing cache data with nil entries to prevent aggressive - // ARM calls during cache invalidation - for name, vmEntry := range oldCache { - // if the nil cache entry has existed for vmssVirtualMachinesCacheTTL in the cache - // then it should not be added back to the cache - if vmEntry.virtualMachine == nil || time.Since(vmEntry.lastUpdate) > vmssVirtualMachinesCacheTTL { - klog.V(5).Infof("ignoring expired entries from old cache for %s", name) - continue - } - lastUpdate := time.Now().UTC() - if vmEntry.virtualMachine == nil { - // if this is already a nil entry then keep the time the nil - // entry was first created, so we can cleanup unwanted entries - lastUpdate = vmEntry.lastUpdate - } + delete(oldCache, computerName) + } - klog.V(5).Infof("adding old entries to new cache for %s", name) - localCache.Store(name, &vmssVirtualMachinesEntry{ - resourceGroup: vmEntry.resourceGroup, - vmssName: vmEntry.vmssName, - instanceID: vmEntry.instanceID, - virtualMachine: nil, - lastUpdate: lastUpdate, - }) + // add old missing cache data with nil entries to prevent aggressive + // ARM calls during cache invalidation + for name, vmEntry := range oldCache { + // if the nil cache entry has existed for 15 minutes in the cache + // then it should not be added back to the cache + if vmEntry.virtualMachine == nil && time.Since(vmEntry.lastUpdate) > 15*time.Minute { + klog.V(5).Infof("ignoring expired entries from old cache for %s", name) + continue + } + lastUpdate := time.Now().UTC() + if vmEntry.virtualMachine == nil { + // if this is already a nil entry then keep the time the nil + // entry was first created, so we can cleanup unwanted entries + lastUpdate = vmEntry.lastUpdate } + + klog.V(5).Infof("adding old entries to new cache for %s", name) + localCache.Store(name, &vmssVirtualMachinesEntry{ + resourceGroup: vmEntry.resourceGroup, + vmssName: vmEntry.vmssName, + instanceID: vmEntry.instanceID, + virtualMachine: nil, + lastUpdate: lastUpdate, + }) } return localCache, nil @@ -210,14 +241,30 @@ func (ss *scaleSet) newVMSSVirtualMachinesCache() (*timedCache, error) { } func (ss *scaleSet) deleteCacheForNode(nodeName string) error { - cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, cacheReadTypeUnsafe) + node, err := ss.getNodeIdentityByNodeName(nodeName, cacheReadTypeUnsafe) + if err != nil { + klog.Errorf("deleteCacheForNode(%s) failed with error: %v", nodeName, err) + return err + } + + cacheKey, timedcache, err := ss.getVMSSVMCache(node.resourceGroup, node.vmssName) if err != nil { klog.Errorf("deleteCacheForNode(%s) failed with error: %v", nodeName, err) return err } - virtualMachines := cached.(*sync.Map) + vmcache, err := timedcache.Get(cacheKey, cacheReadTypeUnsafe) + if err != nil { + klog.Errorf("deleteCacheForNode(%s) failed with error: %v", nodeName, err) + return err + } + virtualMachines := vmcache.(*sync.Map) virtualMachines.Delete(nodeName) + + if err := ss.gcVMSSVMCache(); err != nil { + klog.Errorf("deleteCacheForNode(%s) failed to gc stale vmss caches: %v", nodeName, err) + } + return nil } diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache_test.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache_test.go index cfd71e4a5557..83ecb2995265 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache_test.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache_test.go @@ -99,7 +99,9 @@ func TestVMSSVMCache(t *testing.T) { assert.NoError(t, err) // the VM should be removed from cache after deleteCacheForNode(). - cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, cacheReadTypeDefault) + cacheKey, cache, err := ss.getVMSSVMCache("rg", vmssName) + assert.NoError(t, err) + cached, err := cache.Get(cacheKey, cacheReadTypeDefault) assert.NoError(t, err) cachedVirtualMachines := cached.(*sync.Map) _, ok := cachedVirtualMachines.Load(vmName) diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_test.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_test.go index f3b1ce07564d..b938542c66d6 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_test.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_test.go @@ -56,6 +56,13 @@ func setTestVirtualMachineCloud(ss *Cloud, scaleSetName, zone string, faultDomai scaleSets["rg"] = map[string]compute.VirtualMachineScaleSet{ scaleSetName: { Name: &scaleSetName, + VirtualMachineScaleSetProperties: &compute.VirtualMachineScaleSetProperties{ + VirtualMachineProfile: &compute.VirtualMachineScaleSetVMProfile{ + OsProfile: &compute.VirtualMachineScaleSetOSProfile{ + ComputerNamePrefix: to.StringPtr("vmssee6c2"), + }, + }, + }, }, } virtualMachineScaleSetsClient.setFakeStore(scaleSets)