Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry pick of #93107: Azure: use per-vmss vmssvm incremental cache #93469

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
114 changes: 99 additions & 15 deletions staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go
Expand Up @@ -60,6 +60,13 @@ const (
vmssVMInstanceUpdateDelay = 3 * time.Second
)

// nodeIdentity identifies a node within a subscription.
type nodeIdentity struct {
resourceGroup string
vmssName string
nodeName string
}

// scaleSet implements VMSet interface for Azure scale set.
type scaleSet struct {
*Cloud
Expand All @@ -69,7 +76,7 @@ type scaleSet struct {
availabilitySet VMSet

vmssCache *timedCache
vmssVMCache *timedCache
vmssVMCache *sync.Map // [resourcegroup/vmssname]*timedCache
availabilitySetNodesCache *timedCache
}

Expand All @@ -79,6 +86,7 @@ func newScaleSet(az *Cloud) (VMSet, error) {
ss := &scaleSet{
Cloud: az,
availabilitySet: newAvailabilitySet(az),
vmssVMCache: &sync.Map{},
}

ss.availabilitySetNodesCache, err = ss.newAvailabilitySetNodesCache()
Expand All @@ -91,11 +99,6 @@ func newScaleSet(az *Cloud) (VMSet, error) {
return nil, err
}

ss.vmssVMCache, err = ss.newVMSSVirtualMachinesCache()
if err != nil {
return nil, err
}

return ss, nil
}

Expand Down Expand Up @@ -136,12 +139,17 @@ func (ss *scaleSet) getVMSS(vmssName string, crt cacheReadType) (*compute.Virtua
return vmss, nil
}

// getVmssVM gets virtualMachineScaleSetVM by nodeName from cache.
// It returns cloudprovider.InstanceNotFound if node does not belong to any scale sets.
func (ss *scaleSet) getVmssVM(nodeName string, crt cacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, error) {
// getVmssVMByNodeIdentity find virtualMachineScaleSetVM by nodeIdentity, using node's parent VMSS cache.
// Returns cloudprovider.InstanceNotFound if the node does not belong to the scale set named in nodeIdentity.
func (ss *scaleSet) getVmssVMByNodeIdentity(node *nodeIdentity, crt cacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, error) {
cacheKey, cache, err := ss.getVMSSVMCache(node.resourceGroup, node.vmssName)
if err != nil {
return "", "", nil, err
}

getter := func(nodeName string, crt cacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, bool, error) {
var found bool
cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, crt)
cached, err := cache.Get(cacheKey, crt)
if err != nil {
return "", "", nil, found, err
}
Expand All @@ -156,19 +164,19 @@ func (ss *scaleSet) getVmssVM(nodeName string, crt cacheReadType) (string, strin
return "", "", nil, found, nil
}

_, err := getScaleSetVMInstanceID(nodeName)
_, err = getScaleSetVMInstanceID(node.nodeName)
if err != nil {
return "", "", nil, err
}

vmssName, instanceID, vm, found, err := getter(nodeName, crt)
vmssName, instanceID, vm, found, err := getter(node.nodeName, crt)
if err != nil {
return "", "", nil, err
}

if !found {
klog.V(2).Infof("Couldn't find VMSS VM with nodeName %s, refreshing the cache", nodeName)
vmssName, instanceID, vm, found, err = getter(nodeName, cacheReadTypeForceRefresh)
klog.V(2).Infof("Couldn't find VMSS VM with nodeName %s, refreshing the cache", node.nodeName)
vmssName, instanceID, vm, found, err = getter(node.nodeName, cacheReadTypeForceRefresh)
if err != nil {
return "", "", nil, err
}
Expand All @@ -184,6 +192,17 @@ func (ss *scaleSet) getVmssVM(nodeName string, crt cacheReadType) (string, strin
return vmssName, instanceID, vm, nil
}

// getVmssVM gets virtualMachineScaleSetVM by nodeName from cache.
// Returns cloudprovider.InstanceNotFound if nodeName does not belong to any scale set.
func (ss *scaleSet) getVmssVM(nodeName string, crt cacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, error) {
node, err := ss.getNodeIdentityByNodeName(nodeName, crt)
if err != nil {
return "", "", nil, err
}

return ss.getVmssVMByNodeIdentity(node, crt)
}

// GetPowerStatusByNodeName returns the power state of the specified node.
func (ss *scaleSet) GetPowerStatusByNodeName(name string) (powerState string, err error) {
managedByAS, err := ss.isNodeManagedByAvailabilitySet(name, cacheReadTypeUnsafe)
Expand Down Expand Up @@ -219,8 +238,13 @@ func (ss *scaleSet) GetPowerStatusByNodeName(name string) (powerState string, er
// getCachedVirtualMachineByInstanceID gets scaleSetVMInfo from cache.
// The node must belong to one of scale sets.
func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID string, crt cacheReadType) (*compute.VirtualMachineScaleSetVM, error) {
cacheKey, cache, err := ss.getVMSSVMCache(resourceGroup, scaleSetName)
if err != nil {
return nil, err
}

getter := func(crt cacheReadType) (vm *compute.VirtualMachineScaleSetVM, found bool, err error) {
cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, crt)
cached, err := cache.Get(cacheKey, crt)
if err != nil {
return nil, false, err
}
Expand Down Expand Up @@ -591,6 +615,66 @@ func (ss *scaleSet) listScaleSets(resourceGroup string) ([]string, error) {
return ssNames, nil
}

// getNodeIdentityByNodeName use the VMSS cache to find a node's resourcegroup and vmss, returned in a nodeIdentity.
func (ss *scaleSet) getNodeIdentityByNodeName(nodeName string, crt cacheReadType) (*nodeIdentity, error) {
getter := func(nodeName string, crt cacheReadType) (*nodeIdentity, error) {
node := &nodeIdentity{
nodeName: nodeName,
}

cached, err := ss.vmssCache.Get(vmssKey, crt)
if err != nil {
return nil, err
}

vmsses := cached.(*sync.Map)
vmsses.Range(func(key, value interface{}) bool {
v := value.(*vmssEntry)
if v.vmss.Name == nil {
return true
}

vmssPrefix := *v.vmss.Name
if v.vmss.VirtualMachineProfile != nil &&
v.vmss.VirtualMachineProfile.OsProfile != nil &&
v.vmss.VirtualMachineProfile.OsProfile.ComputerNamePrefix != nil {
vmssPrefix = *v.vmss.VirtualMachineProfile.OsProfile.ComputerNamePrefix
}

if strings.EqualFold(vmssPrefix, nodeName[:len(nodeName)-6]) {
node.vmssName = *v.vmss.Name
node.resourceGroup = v.resourceGroup
return false
}

return true
})
return node, nil
}

if _, err := getScaleSetVMInstanceID(nodeName); err != nil {
return nil, err
}

node, err := getter(nodeName, crt)
if err != nil {
return nil, err
}
if node.vmssName != "" {
return node, nil
}

klog.V(2).Infof("Couldn't find VMSS for node %s, refreshing the cache", nodeName)
node, err = getter(nodeName, cacheReadTypeForceRefresh)
if err != nil {
return nil, err
}
if node.vmssName == "" {
return nil, cloudprovider.InstanceNotFound
}
return node, nil
}

// listScaleSetVMs lists VMs belonging to the specified scale set.
func (ss *scaleSet) listScaleSetVMs(scaleSetName, resourceGroup string) ([]compute.VirtualMachineScaleSetVM, error) {
var err error
Expand Down