Skip to content

Commit

Permalink
[ARG] Add an option to disable API call cache
Browse files Browse the repository at this point in the history
New option: disableAPICallCache
When ARG is enabled, this option should be true.

Signed-off-by: Zhecheng Li <zhechengli@microsoft.com>
  • Loading branch information
lzhecheng committed Jun 27, 2023
1 parent cb04dcf commit 263fa0a
Show file tree
Hide file tree
Showing 14 changed files with 223 additions and 115 deletions.
99 changes: 87 additions & 12 deletions pkg/cache/azure_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,28 +62,54 @@ func cacheKeyFunc(obj interface{}) (string, error) {
return obj.(*AzureCacheEntry).Key, nil
}

// Cache operations
type Resource interface {
Get(key string, crt AzureCacheReadType) (interface{}, error)
GetWithDeepCopy(key string, crt AzureCacheReadType) (interface{}, error)
Delete(key string) error
Set(key string, data interface{})
Update(key string, data interface{})

GetStore() cache.Store
Lock()
Unlock()
}

// TimedCache is a cache with TTL.
type TimedCache struct {
Store cache.Store
Lock sync.Mutex
Store cache.Store
MutuxLock sync.Mutex
Getter GetFunc
TTL time.Duration
}

type NoCache struct {
Getter GetFunc
TTL time.Duration
}

// NewTimedcache creates a new TimedCache.
func NewTimedcache(ttl time.Duration, getter GetFunc) (*TimedCache, error) {
// NewAzureCache creates a new azcache.Resource.
func NewAzureCache(ttl time.Duration, getter GetFunc, disabled bool) (Resource, error) {
if getter == nil {
return nil, fmt.Errorf("getter is not provided")
}

return &TimedCache{
Getter: getter,
if disabled {
noCache := &NoCache{
Getter: getter,
}
return noCache, nil
}

timedCache := &TimedCache{
// switch to using NewStore instead of NewTTLStore so that we can
// reuse entries for calls that are fine with reading expired/stalled data.
// with NewTTLStore, entries are not returned if they have already expired.
Store: cache.NewStore(cacheKeyFunc),
TTL: ttl,
}, nil
Store: cache.NewStore(cacheKeyFunc),
MutuxLock: sync.Mutex{},
TTL: ttl,
Getter: getter,
}
return timedCache, nil
}

// getInternal returns AzureCacheEntry by key. If the key is not cached yet,
Expand All @@ -100,8 +126,8 @@ func (t *TimedCache) getInternal(key string) (*AzureCacheEntry, error) {

// lock here to ensure if entry doesn't exist, we add a new entry
// avoiding overwrites
t.Lock.Lock()
defer t.Lock.Unlock()
t.Lock()
defer t.Unlock()

// Another goroutine might have written the same key.
entry, exists, err = t.Store.GetByKey(key)
Expand All @@ -127,13 +153,21 @@ func (t *TimedCache) Get(key string, crt AzureCacheReadType) (interface{}, error
return t.get(key, crt)
}

func (n *NoCache) Get(key string, crt AzureCacheReadType) (interface{}, error) {
return n.Getter(key)
}

// Get returns the requested item by key with deep copy.
func (t *TimedCache) GetWithDeepCopy(key string, crt AzureCacheReadType) (interface{}, error) {
data, err := t.get(key, crt)
copied := deepcopy.Copy(data)
return copied, err
}

func (n *NoCache) GetWithDeepCopy(key string, crt AzureCacheReadType) (interface{}, error) {
return n.Getter(key)
}

func (t *TimedCache) get(key string, crt AzureCacheReadType) (interface{}, error) {
entry, err := t.getInternal(key)
if err != nil {
Expand Down Expand Up @@ -177,6 +211,10 @@ func (t *TimedCache) Delete(key string) error {
})
}

func (n *NoCache) Delete(key string) error {
return nil
}

// Set sets the data cache for the key.
// It is only used for testing.
func (t *TimedCache) Set(key string, data interface{}) {
Expand All @@ -187,6 +225,9 @@ func (t *TimedCache) Set(key string, data interface{}) {
})
}

func (n *NoCache) Set(key string, data interface{}) {
}

// Update updates the data cache for the key.
func (t *TimedCache) Update(key string, data interface{}) {
if entry, err := t.getInternal(key); err == nil {
Expand All @@ -202,3 +243,37 @@ func (t *TimedCache) Update(key string, data interface{}) {
})
}
}

func (n *NoCache) Update(key string, data interface{}) {
}

func (t *TimedCache) GetStore() cache.Store {
if t == nil {
return nil
}
return t.Store
}

func (n *NoCache) GetStore() cache.Store {
return nil
}

func (t *TimedCache) Lock() {
if t == nil {
return
}
t.MutuxLock.Lock()
}

func (t *TimedCache) Unlock() {
if t == nil {
return
}
t.MutuxLock.Unlock()
}

func (n *NoCache) Lock() {
}

func (n *NoCache) Unlock() {
}
6 changes: 3 additions & 3 deletions pkg/cache/azure_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ func newFakeCache(t *testing.T) (*fakeDataSource, *TimedCache) {
sem: *semaphore.NewWeighted(1),
}
getter := dataSource.get
cache, err := NewTimedcache(fakeCacheTTL, getter)
cache, err := NewAzureCache(fakeCacheTTL, getter, false)
assert.NoError(t, err)
return dataSource, cache
return dataSource, cache.(*TimedCache)
}

func TestCacheGet(t *testing.T) {
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestCacheGetError(t *testing.T) {
getter := func(key string) (interface{}, error) {
return nil, getError
}
cache, err := NewTimedcache(fakeCacheTTL, getter)
cache, err := NewAzureCache(fakeCacheTTL, getter, false)
assert.NoError(t, err)

val, err := cache.GetWithDeepCopy("key", CacheReadTypeDefault)
Expand Down
19 changes: 13 additions & 6 deletions pkg/provider/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ type Config struct {
// If the length is not 0, it is assumed the multiple standard load balancers mode is on. In this case,
// there must be one configuration named “<clustername>” or an error will be reported.
MultipleStandardLoadBalancerConfigurations []MultipleStandardLoadBalancerConfiguration `json:"multipleStandardLoadBalancerConfigurations,omitempty" yaml:"multipleStandardLoadBalancerConfigurations,omitempty"`

// DisableAPICallCache disables the cache for Azure API calls. It is for ARG support and not all resources will be disabled.
DisableAPICallCache bool `json:"disableAPICallCache,omitempty" yaml:"disableAPICallCache,omitempty"`
}

// MultipleStandardLoadBalancerConfiguration stores the properties regarding multiple standard load balancers.
Expand Down Expand Up @@ -399,16 +402,16 @@ type Cloud struct {
eventRecorder record.EventRecorder
routeUpdater *delayedRouteUpdater

vmCache *azcache.TimedCache
lbCache *azcache.TimedCache
nsgCache *azcache.TimedCache
rtCache *azcache.TimedCache
vmCache azcache.Resource
lbCache azcache.Resource
nsgCache azcache.Resource
rtCache azcache.Resource
// public ip cache
// key: [resourceGroupName]
// Value: sync.Map of [pipName]*PublicIPAddress
pipCache *azcache.TimedCache
pipCache azcache.Resource
// use LB frontEndIpConfiguration ID as the key and search for PLS attached to the frontEnd
plsCache *azcache.TimedCache
plsCache azcache.Resource

// Add service lister to always get latest service
serviceLister corelisters.ServiceLister
Expand Down Expand Up @@ -742,6 +745,10 @@ func (az *Cloud) getPutVMSSVMBatchSize() int {
}

func (az *Cloud) initCaches() (err error) {
if az.Config.DisableAPICallCache {
klog.Infof("API call cache is disabled, ignore logs about cache operations")
}

az.vmCache, err = az.newVMCache()
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions pkg/provider/azure_instance_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ type LoadBalancerMetadata struct {
// InstanceMetadataService knows how to query the Azure instance metadata server.
type InstanceMetadataService struct {
imdsServer string
imsCache *azcache.TimedCache
imsCache azcache.Resource
}

// NewInstanceMetadataService creates an instance of the InstanceMetadataService accessor object.
Expand All @@ -111,7 +111,7 @@ func NewInstanceMetadataService(imdsServer string) (*InstanceMetadataService, er
imdsServer: imdsServer,
}

imsCache, err := azcache.NewTimedcache(consts.MetadataCacheTTL, ims.getMetadata)
imsCache, err := azcache.NewAzureCache(consts.MetadataCacheTTL, ims.getMetadata, false)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/provider/azure_instances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,9 @@ func TestInstanceID(t *testing.T) {
t.Errorf("Test [%s] unexpected error: %v", test.name, err)
}
if test.useCustomImsCache {
cloud.Metadata.imsCache, err = azcache.NewTimedcache(consts.MetadataCacheTTL, func(key string) (interface{}, error) {
cloud.Metadata.imsCache, err = azcache.NewAzureCache(consts.MetadataCacheTTL, func(key string) (interface{}, error) {
return nil, fmt.Errorf("getError")
})
}, false)
if err != nil {
t.Errorf("Test [%s] unexpected error: %v", test.name, err)
}
Expand Down Expand Up @@ -640,9 +640,9 @@ func TestNodeAddresses(t *testing.T) {
}

if test.useCustomImsCache {
cloud.Metadata.imsCache, err = azcache.NewTimedcache(consts.MetadataCacheTTL, func(key string) (interface{}, error) {
cloud.Metadata.imsCache, err = azcache.NewAzureCache(consts.MetadataCacheTTL, func(key string) (interface{}, error) {
return nil, fmt.Errorf("getError")
})
}, false)
if err != nil {
t.Errorf("Test [%s] unexpected error: %v", test.name, err)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/provider/azure_standard.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,15 +494,15 @@ func MakeCRC32(str string) string {
type availabilitySet struct {
*Cloud

vmasCache *azcache.TimedCache
vmasCache azcache.Resource
}

type AvailabilitySetEntry struct {
VMAS *compute.AvailabilitySet
ResourceGroup string
}

func (as *availabilitySet) newVMASCache() (*azcache.TimedCache, error) {
func (as *availabilitySet) newVMASCache() (azcache.Resource, error) {
getter := func(key string) (interface{}, error) {
localCache := &sync.Map{}

Expand Down Expand Up @@ -538,7 +538,7 @@ func (as *availabilitySet) newVMASCache() (*azcache.TimedCache, error) {
as.Config.AvailabilitySetsCacheTTLInSeconds = consts.VMASCacheTTLDefaultInSeconds
}

return azcache.NewTimedcache(time.Duration(as.Config.AvailabilitySetsCacheTTLInSeconds)*time.Second, getter)
return azcache.NewAzureCache(time.Duration(as.Config.AvailabilitySetsCacheTTLInSeconds)*time.Second, getter, as.Cloud.Config.DisableAPICallCache)
}

// newStandardSet creates a new availabilitySet.
Expand Down
8 changes: 4 additions & 4 deletions pkg/provider/azure_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,17 +286,17 @@ func getVMSSVMCacheKey(resourceGroup, vmssName string) string {
}

// isNodeInVMSSVMCache check whether nodeName is in vmssVMCache
func isNodeInVMSSVMCache(nodeName string, vmssVMCache *azcache.TimedCache) bool {
func isNodeInVMSSVMCache(nodeName string, vmssVMCache azcache.Resource) bool {
if vmssVMCache == nil {
return false
}

var isInCache bool

vmssVMCache.Lock.Lock()
defer vmssVMCache.Lock.Unlock()
vmssVMCache.Lock()
defer vmssVMCache.Unlock()

for _, entry := range vmssVMCache.Store.List() {
for _, entry := range vmssVMCache.GetStore().List() {
if entry != nil {
e := entry.(*azcache.AzureCacheEntry)
e.Lock.Lock()
Expand Down
19 changes: 10 additions & 9 deletions pkg/provider/azure_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,22 +458,21 @@ func TestGetVMSSVMCacheKey(t *testing.T) {
}

func TestIsNodeInVMSSVMCache(t *testing.T) {

getter := func(key string) (interface{}, error) {
return nil, nil
}
emptyCacheEntryTimedCache, _ := azcache.NewTimedcache(fakeCacheTTL, getter)
emptyCacheEntryTimedCache, _ := azcache.NewAzureCache(fakeCacheTTL, getter, false)
emptyCacheEntryTimedCache.Set("key", nil)

cacheEntryTimedCache, _ := azcache.NewTimedcache(fakeCacheTTL, getter)
cacheEntryTimedCache, _ := azcache.NewAzureCache(fakeCacheTTL, getter, false)
syncMap := &sync.Map{}
syncMap.Store("node", nil)
cacheEntryTimedCache.Set("key", syncMap)

tests := []struct {
description string
nodeName string
vmssVMCache *azcache.TimedCache
vmssVMCache azcache.Resource
expectedResult bool
}{
{
Expand All @@ -483,26 +482,28 @@ func TestIsNodeInVMSSVMCache(t *testing.T) {
},
{
description: "empty CacheEntry timed cache",
vmssVMCache: emptyCacheEntryTimedCache,
vmssVMCache: emptyCacheEntryTimedCache.(*azcache.TimedCache),
expectedResult: false,
},
{
description: "node name in the cache",
nodeName: "node",
vmssVMCache: cacheEntryTimedCache,
vmssVMCache: cacheEntryTimedCache.(*azcache.TimedCache),
expectedResult: true,
},
{
description: "node name not in the cache",
nodeName: "node2",
vmssVMCache: cacheEntryTimedCache,
vmssVMCache: cacheEntryTimedCache.(*azcache.TimedCache),
expectedResult: false,
},
}

for _, test := range tests {
result := isNodeInVMSSVMCache(test.nodeName, test.vmssVMCache)
assert.Equal(t, test.expectedResult, result, test.description)
t.Run(test.description, func(t *testing.T) {
result := isNodeInVMSSVMCache(test.nodeName, test.vmssVMCache)
assert.Equal(t, test.expectedResult, result)
})
}
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/provider/azure_vmss.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,19 @@ type ScaleSet struct {
// vmssCache is timed cache where the Store in the cache is a map of
// Key: consts.VMSSKey
// Value: sync.Map of [vmssName]*VMSSEntry
vmssCache *azcache.TimedCache
vmssCache azcache.Resource

// vmssVMCache is timed cache where the Store in the cache is a map of
// Key: [resourcegroup/vmssName]
// Value: sync.Map of [vmName]*VMSSVirtualMachineEntry
vmssVMCache *azcache.TimedCache
vmssVMCache azcache.Resource

// nonVmssUniformNodesCache is used to store node names from non uniform vm.
// Currently, the nodes can from avset or vmss flex or individual vm.
// This cache contains an entry called nonVmssUniformNodesEntry.
// nonVmssUniformNodesEntry contains avSetVMNodeNames list, clusterNodeNames list
// and current clusterNodeNames.
nonVmssUniformNodesCache *azcache.TimedCache
nonVmssUniformNodesCache azcache.Resource

// lockMap in cache refresh
lockMap *lockMap
Expand Down

0 comments on commit 263fa0a

Please sign in to comment.