Skip to content

Commit

Permalink
Merge pull request #100110 from CecileRobertMichon/azure-vm-cache
Browse files Browse the repository at this point in the history
Cherry pick #537 from cloud provider azure: Refresh VM cache when node is not found
  • Loading branch information
k8s-ci-robot committed Apr 9, 2021
2 parents b15859b + 8850c8c commit 8300553
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 5 deletions.
25 changes: 25 additions & 0 deletions staging/src/k8s.io/legacy-cloud-providers/azure/azure.go
Expand Up @@ -274,6 +274,8 @@ type Cloud struct {
ipv6DualStackEnabled bool
// Lock for access to node caches, includes nodeZones, nodeResourceGroups, and unmanagedNodes.
nodeCachesLock sync.RWMutex
// nodeNames holds current nodes for tracking added nodes in VM caches.
nodeNames sets.String
// nodeZones is a mapping from Zone to a sets.String of Node's names in the Zone
// it is updated by the nodeInformer
nodeZones map[string]sets.String
Expand Down Expand Up @@ -342,6 +344,7 @@ func NewCloudWithoutFeatureGates(configReader io.Reader) (*Cloud, error) {
}

az := &Cloud{
nodeNames: sets.NewString(),
nodeZones: map[string]sets.String{},
nodeResourceGroups: map[string]string{},
unmanagedNodes: sets.NewString(),
Expand Down Expand Up @@ -782,6 +785,9 @@ func (az *Cloud) updateNodeCaches(prevNode, newNode *v1.Node) {
defer az.nodeCachesLock.Unlock()

if prevNode != nil {
// Remove from nodeNames cache.
az.nodeNames.Delete(prevNode.ObjectMeta.Name)

// Remove from nodeZones cache.
prevZone, ok := prevNode.ObjectMeta.Labels[LabelFailureDomainBetaZone]
if ok && az.isAvailabilityZone(prevZone) {
Expand All @@ -805,6 +811,9 @@ func (az *Cloud) updateNodeCaches(prevNode, newNode *v1.Node) {
}

if newNode != nil {
// Add to nodeNames cache.
az.nodeNames.Insert(newNode.ObjectMeta.Name)

// Add to nodeZones cache.
newZone, ok := newNode.ObjectMeta.Labels[LabelFailureDomainBetaZone]
if ok && az.isAvailabilityZone(newZone) {
Expand Down Expand Up @@ -876,6 +885,22 @@ func (az *Cloud) GetNodeResourceGroup(nodeName string) (string, error) {
return az.ResourceGroup, nil
}

// GetNodeNames returns a set of all node names in the k8s cluster.
func (az *Cloud) GetNodeNames() (sets.String, error) {
// Kubelet won't set az.nodeInformerSynced, return nil.
if az.nodeInformerSynced == nil {
return nil, nil
}

az.nodeCachesLock.RLock()
defer az.nodeCachesLock.RUnlock()
if !az.nodeInformerSynced() {
return nil, fmt.Errorf("node informer is not synced when trying to GetNodeNames")
}

return sets.NewString(az.nodeNames.List()...), nil
}

// GetResourceGroups returns a set of resource groups that all nodes are running on.
func (az *Cloud) GetResourceGroups() (sets.String, error) {
// Kubelet won't set az.nodeInformerSynced, always return configured resourceGroup.
Expand Down
3 changes: 3 additions & 0 deletions staging/src/k8s.io/legacy-cloud-providers/azure/azure_test.go
Expand Up @@ -3244,6 +3244,7 @@ func TestUpdateNodeCaches(t *testing.T) {
az.nodeZones = map[string]sets.String{zone: nodesInZone}
az.nodeResourceGroups = map[string]string{"prevNode": "rg"}
az.unmanagedNodes = sets.NewString("prevNode")
az.nodeNames = sets.NewString("prevNode")

prevNode := v1.Node{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -3260,6 +3261,7 @@ func TestUpdateNodeCaches(t *testing.T) {
assert.Equal(t, 0, len(az.nodeZones[zone]))
assert.Equal(t, 0, len(az.nodeResourceGroups))
assert.Equal(t, 0, len(az.unmanagedNodes))
assert.Equal(t, 0, len(az.nodeNames))

newNode := v1.Node{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -3276,6 +3278,7 @@ func TestUpdateNodeCaches(t *testing.T) {
assert.Equal(t, 1, len(az.nodeZones[zone]))
assert.Equal(t, 1, len(az.nodeResourceGroups))
assert.Equal(t, 1, len(az.unmanagedNodes))
assert.Equal(t, 1, len(az.nodeNames))
}

func TestGetActiveZones(t *testing.T) {
Expand Down
Expand Up @@ -74,7 +74,8 @@ type scaleSet struct {
*Cloud

// availabilitySet is also required for scaleSet because some instances
// (e.g. master nodes) may not belong to any scale sets.
// (e.g. control plane nodes) may not belong to any scale sets.
// this also allows for clusters with both VM and VMSS nodes.
availabilitySet VMSet

vmssCache *azcache.TimedCache
Expand Down
Expand Up @@ -58,6 +58,11 @@ type vmssEntry struct {
lastUpdate time.Time
}

type availabilitySetEntry struct {
vmNames sets.String
nodeNames sets.String
}

func (ss *scaleSet) newVMSSCache() (*azcache.TimedCache, error) {
getter := func(key string) (interface{}, error) {
localCache := &sync.Map{} // [vmssName]*vmssEntry
Expand Down Expand Up @@ -273,7 +278,7 @@ func (ss *scaleSet) deleteCacheForNode(nodeName string) error {

func (ss *scaleSet) newAvailabilitySetNodesCache() (*azcache.TimedCache, error) {
getter := func(key string) (interface{}, error) {
localCache := sets.NewString()
vmNames := sets.NewString()
resourceGroups, err := ss.GetResourceGroups()
if err != nil {
return nil, err
Expand All @@ -287,11 +292,22 @@ func (ss *scaleSet) newAvailabilitySetNodesCache() (*azcache.TimedCache, error)

for _, vm := range vmList {
if vm.Name != nil {
localCache.Insert(*vm.Name)
vmNames.Insert(*vm.Name)
}
}
}

// store all the node names in the cluster when the cache data was created.
nodeNames, err := ss.GetNodeNames()
if err != nil {
return nil, err
}

localCache := availabilitySetEntry{
vmNames: vmNames,
nodeNames: nodeNames,
}

return localCache, nil
}

Expand All @@ -313,6 +329,16 @@ func (ss *scaleSet) isNodeManagedByAvailabilitySet(nodeName string, crt azcache.
return false, err
}

availabilitySetNodes := cached.(sets.String)
return availabilitySetNodes.Has(nodeName), nil
cachedNodes := cached.(availabilitySetEntry).nodeNames
// if the node is not in the cache, assume the node has joined after the last cache refresh and attempt to refresh the cache.
if !cachedNodes.Has(nodeName) {
klog.V(2).Infof("Node %s has joined the cluster since the last VM cache refresh, refreshing the cache", nodeName)
cached, err = ss.availabilitySetNodesCache.Get(availabilitySetNodesKey, azcache.CacheReadTypeForceRefresh)
if err != nil {
return false, err
}
}

cachedVMs := cached.(availabilitySetEntry).vmNames
return cachedVMs.Has(nodeName), nil
}

0 comments on commit 8300553

Please sign in to comment.