diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index e39bfa5763652..f54cf0d2beaf8 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -46,8 +46,8 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" - "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" schedutil "k8s.io/kubernetes/pkg/scheduler/util" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" "k8s.io/utils/ptr" ) @@ -302,7 +302,7 @@ type dynamicResources struct { // When implementing cluster autoscaler support, this assume cache or // something like it (see https://github.com/kubernetes/kubernetes/pull/112202) // might have to be managed by the cluster autoscaler. - claimAssumeCache volumebinding.AssumeCache + claimAssumeCache *assumecache.AssumeCache // inFlightAllocations is map from claim UUIDs to claim objects for those claims // for which allocation was triggered during a scheduling cycle and the @@ -355,7 +355,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe classParametersLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClassParameters().Lister(), resourceSliceLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceSlices().Lister(), claimNameLookup: resourceclaim.NewNameLookup(fh.ClientSet()), - claimAssumeCache: volumebinding.NewAssumeCache(logger, fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Informer(), "claim", "", nil), + claimAssumeCache: assumecache.NewAssumeCache(logger, fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Informer(), "claim", "", nil), } return pl, nil diff --git a/pkg/scheduler/framework/plugins/volumebinding/assume_cache.go b/pkg/scheduler/framework/plugins/volumebinding/assume_cache.go index 945c7a3efff1b..ee5a5d79b58b6 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/assume_cache.go +++ b/pkg/scheduler/framework/plugins/volumebinding/assume_cache.go @@ -18,353 +18,17 @@ package volumebinding import ( "fmt" - "strconv" - "sync" "k8s.io/klog/v2" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/client-go/tools/cache" storagehelpers "k8s.io/component-helpers/storage/volume" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" ) -// AssumeCache is a cache on top of the informer that allows for updating -// objects outside of informer events and also restoring the informer -// cache's version of the object. Objects are assumed to be -// Kubernetes API objects that implement meta.Interface -type AssumeCache interface { - // Assume updates the object in-memory only - Assume(obj interface{}) error - - // Restore the informer cache's version of the object - Restore(objName string) - - // Get the object by name - Get(objName string) (interface{}, error) - - // GetAPIObj gets the API object by name - GetAPIObj(objName string) (interface{}, error) - - // List all the objects in the cache - List(indexObj interface{}) []interface{} -} - -type errWrongType struct { - typeName string - object interface{} -} - -func (e *errWrongType) Error() string { - return fmt.Sprintf("could not convert object to type %v: %+v", e.typeName, e.object) -} - -type errNotFound struct { - typeName string - objectName string -} - -func (e *errNotFound) Error() string { - return fmt.Sprintf("could not find %v %q", e.typeName, e.objectName) -} - -type errObjectName struct { - detailedErr error -} - -func (e *errObjectName) Error() string { - return fmt.Sprintf("failed to get object name: %v", e.detailedErr) -} - -// assumeCache stores two pointers to represent a single object: -// - The pointer to the informer object. -// - The pointer to the latest object, which could be the same as -// the informer object, or an in-memory object. -// -// An informer update always overrides the latest object pointer. -// -// Assume() only updates the latest object pointer. -// Restore() sets the latest object pointer back to the informer object. -// Get/List() always returns the latest object pointer. -type assumeCache struct { - // The logger that was chosen when setting up the cache. - // Will be used for all operations. - logger klog.Logger - - // Synchronizes updates to store - rwMutex sync.RWMutex - - // describes the object stored - description string - - // Stores objInfo pointers - store cache.Indexer - - // Index function for object - indexFunc cache.IndexFunc - indexName string -} - -type objInfo struct { - // name of the object - name string - - // Latest version of object could be cached-only or from informer - latestObj interface{} - - // Latest object from informer - apiObj interface{} -} - -func objInfoKeyFunc(obj interface{}) (string, error) { - objInfo, ok := obj.(*objInfo) - if !ok { - return "", &errWrongType{"objInfo", obj} - } - return objInfo.name, nil -} - -func (c *assumeCache) objInfoIndexFunc(obj interface{}) ([]string, error) { - objInfo, ok := obj.(*objInfo) - if !ok { - return []string{""}, &errWrongType{"objInfo", obj} - } - return c.indexFunc(objInfo.latestObj) -} - -// NewAssumeCache creates an assume cache for general objects. -func NewAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer, description, indexName string, indexFunc cache.IndexFunc) AssumeCache { - c := &assumeCache{ - logger: logger, - description: description, - indexFunc: indexFunc, - indexName: indexName, - } - indexers := cache.Indexers{} - if indexName != "" && indexFunc != nil { - indexers[indexName] = c.objInfoIndexFunc - } - c.store = cache.NewIndexer(objInfoKeyFunc, indexers) - - // Unit tests don't use informers - if informer != nil { - informer.AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: c.add, - UpdateFunc: c.update, - DeleteFunc: c.delete, - }, - ) - } - return c -} - -func (c *assumeCache) add(obj interface{}) { - if obj == nil { - return - } - - name, err := cache.MetaNamespaceKeyFunc(obj) - if err != nil { - c.logger.Error(&errObjectName{err}, "Add failed") - return - } - - c.rwMutex.Lock() - defer c.rwMutex.Unlock() - - if objInfo, _ := c.getObjInfo(name); objInfo != nil { - newVersion, err := c.getObjVersion(name, obj) - if err != nil { - c.logger.Error(err, "Add failed: couldn't get object version") - return - } - - storedVersion, err := c.getObjVersion(name, objInfo.latestObj) - if err != nil { - c.logger.Error(err, "Add failed: couldn't get stored object version") - return - } - - // Only update object if version is newer. - // This is so we don't override assumed objects due to informer resync. - if newVersion <= storedVersion { - c.logger.V(10).Info("Skip adding object to assume cache because version is not newer than storedVersion", "description", c.description, "cacheKey", name, "newVersion", newVersion, "storedVersion", storedVersion) - return - } - } - - objInfo := &objInfo{name: name, latestObj: obj, apiObj: obj} - if err = c.store.Update(objInfo); err != nil { - c.logger.Info("Error occurred while updating stored object", "err", err) - } else { - c.logger.V(10).Info("Adding object to assume cache", "description", c.description, "cacheKey", name, "assumeCache", obj) - } -} - -func (c *assumeCache) update(oldObj interface{}, newObj interface{}) { - c.add(newObj) -} - -func (c *assumeCache) delete(obj interface{}) { - if obj == nil { - return - } - - name, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) - if err != nil { - c.logger.Error(&errObjectName{err}, "Failed to delete") - return - } - - c.rwMutex.Lock() - defer c.rwMutex.Unlock() - - objInfo := &objInfo{name: name} - err = c.store.Delete(objInfo) - if err != nil { - c.logger.Error(err, "Failed to delete", "description", c.description, "cacheKey", name) - } -} - -func (c *assumeCache) getObjVersion(name string, obj interface{}) (int64, error) { - objAccessor, err := meta.Accessor(obj) - if err != nil { - return -1, err - } - - objResourceVersion, err := strconv.ParseInt(objAccessor.GetResourceVersion(), 10, 64) - if err != nil { - return -1, fmt.Errorf("error parsing ResourceVersion %q for %v %q: %s", objAccessor.GetResourceVersion(), c.description, name, err) - } - return objResourceVersion, nil -} - -func (c *assumeCache) getObjInfo(name string) (*objInfo, error) { - obj, ok, err := c.store.GetByKey(name) - if err != nil { - return nil, err - } - if !ok { - return nil, &errNotFound{c.description, name} - } - - objInfo, ok := obj.(*objInfo) - if !ok { - return nil, &errWrongType{"objInfo", obj} - } - return objInfo, nil -} - -func (c *assumeCache) Get(objName string) (interface{}, error) { - c.rwMutex.RLock() - defer c.rwMutex.RUnlock() - - objInfo, err := c.getObjInfo(objName) - if err != nil { - return nil, err - } - return objInfo.latestObj, nil -} - -func (c *assumeCache) GetAPIObj(objName string) (interface{}, error) { - c.rwMutex.RLock() - defer c.rwMutex.RUnlock() - - objInfo, err := c.getObjInfo(objName) - if err != nil { - return nil, err - } - return objInfo.apiObj, nil -} - -func (c *assumeCache) List(indexObj interface{}) []interface{} { - c.rwMutex.RLock() - defer c.rwMutex.RUnlock() - - allObjs := []interface{}{} - var objs []interface{} - if c.indexName != "" { - o, err := c.store.Index(c.indexName, &objInfo{latestObj: indexObj}) - if err != nil { - c.logger.Error(err, "List index error") - return nil - } - objs = o - } else { - objs = c.store.List() - } - - for _, obj := range objs { - objInfo, ok := obj.(*objInfo) - if !ok { - c.logger.Error(&errWrongType{"objInfo", obj}, "List error") - continue - } - allObjs = append(allObjs, objInfo.latestObj) - } - return allObjs -} - -func (c *assumeCache) Assume(obj interface{}) error { - name, err := cache.MetaNamespaceKeyFunc(obj) - if err != nil { - return &errObjectName{err} - } - - c.rwMutex.Lock() - defer c.rwMutex.Unlock() - - objInfo, err := c.getObjInfo(name) - if err != nil { - return err - } - - newVersion, err := c.getObjVersion(name, obj) - if err != nil { - return err - } - - storedVersion, err := c.getObjVersion(name, objInfo.latestObj) - if err != nil { - return err - } - - if newVersion < storedVersion { - return fmt.Errorf("%v %q is out of sync (stored: %d, assume: %d)", c.description, name, storedVersion, newVersion) - } - - // Only update the cached object - objInfo.latestObj = obj - c.logger.V(4).Info("Assumed object", "description", c.description, "cacheKey", name, "version", newVersion) - return nil -} - -func (c *assumeCache) Restore(objName string) { - c.rwMutex.Lock() - defer c.rwMutex.Unlock() - - objInfo, err := c.getObjInfo(objName) - if err != nil { - // This could be expected if object got deleted - c.logger.V(5).Info("Restore object", "description", c.description, "cacheKey", objName, "err", err) - } else { - objInfo.latestObj = objInfo.apiObj - c.logger.V(4).Info("Restored object", "description", c.description, "cacheKey", objName) - } -} - // PVAssumeCache is a AssumeCache for PersistentVolume objects -type PVAssumeCache interface { - AssumeCache - - GetPV(pvName string) (*v1.PersistentVolume, error) - GetAPIPV(pvName string) (*v1.PersistentVolume, error) - ListPVs(storageClassName string) []*v1.PersistentVolume -} - -type pvAssumeCache struct { - AssumeCache +type PVAssumeCache struct { + *assumecache.AssumeCache logger klog.Logger } @@ -376,15 +40,15 @@ func pvStorageClassIndexFunc(obj interface{}) ([]string, error) { } // NewPVAssumeCache creates a PV assume cache. -func NewPVAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer) PVAssumeCache { +func NewPVAssumeCache(logger klog.Logger, informer assumecache.Informer) *PVAssumeCache { logger = klog.LoggerWithName(logger, "PV Cache") - return &pvAssumeCache{ - AssumeCache: NewAssumeCache(logger, informer, "v1.PersistentVolume", "storageclass", pvStorageClassIndexFunc), + return &PVAssumeCache{ + AssumeCache: assumecache.NewAssumeCache(logger, informer, "v1.PersistentVolume", "storageclass", pvStorageClassIndexFunc), logger: logger, } } -func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) { +func (c *PVAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) { obj, err := c.Get(pvName) if err != nil { return nil, err @@ -392,24 +56,24 @@ func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) { pv, ok := obj.(*v1.PersistentVolume) if !ok { - return nil, &errWrongType{"v1.PersistentVolume", obj} + return nil, &assumecache.WrongTypeError{TypeName: "v1.PersistentVolume", Object: obj} } return pv, nil } -func (c *pvAssumeCache) GetAPIPV(pvName string) (*v1.PersistentVolume, error) { +func (c *PVAssumeCache) GetAPIPV(pvName string) (*v1.PersistentVolume, error) { obj, err := c.GetAPIObj(pvName) if err != nil { return nil, err } pv, ok := obj.(*v1.PersistentVolume) if !ok { - return nil, &errWrongType{"v1.PersistentVolume", obj} + return nil, &assumecache.WrongTypeError{TypeName: "v1.PersistentVolume", Object: obj} } return pv, nil } -func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume { +func (c *PVAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume { objs := c.List(&v1.PersistentVolume{ Spec: v1.PersistentVolumeSpec{ StorageClassName: storageClassName, @@ -419,7 +83,7 @@ func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume for _, obj := range objs { pv, ok := obj.(*v1.PersistentVolume) if !ok { - c.logger.Error(&errWrongType{"v1.PersistentVolume", obj}, "ListPVs") + c.logger.Error(&assumecache.WrongTypeError{TypeName: "v1.PersistentVolume", Object: obj}, "ListPVs") continue } pvs = append(pvs, pv) @@ -428,30 +92,21 @@ func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume } // PVCAssumeCache is a AssumeCache for PersistentVolumeClaim objects -type PVCAssumeCache interface { - AssumeCache - - // GetPVC returns the PVC from the cache with given pvcKey. - // pvcKey is the result of MetaNamespaceKeyFunc on PVC obj - GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) - GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) -} - -type pvcAssumeCache struct { - AssumeCache +type PVCAssumeCache struct { + *assumecache.AssumeCache logger klog.Logger } // NewPVCAssumeCache creates a PVC assume cache. -func NewPVCAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer) PVCAssumeCache { +func NewPVCAssumeCache(logger klog.Logger, informer assumecache.Informer) *PVCAssumeCache { logger = klog.LoggerWithName(logger, "PVC Cache") - return &pvcAssumeCache{ - AssumeCache: NewAssumeCache(logger, informer, "v1.PersistentVolumeClaim", "", nil), + return &PVCAssumeCache{ + AssumeCache: assumecache.NewAssumeCache(logger, informer, "v1.PersistentVolumeClaim", "", nil), logger: logger, } } -func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) { +func (c *PVCAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) { obj, err := c.Get(pvcKey) if err != nil { return nil, err @@ -459,19 +114,19 @@ func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error pvc, ok := obj.(*v1.PersistentVolumeClaim) if !ok { - return nil, &errWrongType{"v1.PersistentVolumeClaim", obj} + return nil, &assumecache.WrongTypeError{TypeName: "v1.PersistentVolumeClaim", Object: obj} } return pvc, nil } -func (c *pvcAssumeCache) GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) { +func (c *PVCAssumeCache) GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) { obj, err := c.GetAPIObj(pvcKey) if err != nil { return nil, err } pvc, ok := obj.(*v1.PersistentVolumeClaim) if !ok { - return nil, &errWrongType{"v1.PersistentVolumeClaim", obj} + return nil, &assumecache.WrongTypeError{TypeName: "v1.PersistentVolumeClaim", Object: obj} } return pvc, nil } diff --git a/pkg/scheduler/framework/plugins/volumebinding/assume_cache_test.go b/pkg/scheduler/framework/plugins/volumebinding/assume_cache_test.go index 7391a412f8905..4256789a2f047 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/assume_cache_test.go +++ b/pkg/scheduler/framework/plugins/volumebinding/assume_cache_test.go @@ -24,9 +24,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/component-helpers/storage/volume" "k8s.io/klog/v2/ktesting" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" ) -func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume, storageClassName string) { +func verifyListPVs(t *testing.T, cache *PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume, storageClassName string) { pvList := cache.ListPVs(storageClassName) if len(pvList) != len(expectedPVs) { t.Errorf("ListPVs() returned %v PVs, expected %v", len(pvList), len(expectedPVs)) @@ -42,7 +43,7 @@ func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1 } } -func verifyPV(cache PVAssumeCache, name string, expectedPV *v1.PersistentVolume) error { +func verifyPV(cache *PVAssumeCache, name string, expectedPV *v1.PersistentVolume) error { pv, err := cache.GetPV(name) if err != nil { return err @@ -99,13 +100,9 @@ func TestAssumePV(t *testing.T) { for name, scenario := range scenarios { cache := NewPVAssumeCache(logger, nil) - internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } // Add oldPV to cache - internalCache.add(scenario.oldPV) + assumecache.AddTestObject(cache.AssumeCache, scenario.oldPV) if err := verifyPV(cache, scenario.oldPV.Name, scenario.oldPV); err != nil { t.Errorf("Failed to GetPV() after initial update: %v", err) continue @@ -134,10 +131,6 @@ func TestAssumePV(t *testing.T) { func TestRestorePV(t *testing.T) { logger, _ := ktesting.NewTestContext(t) cache := NewPVAssumeCache(logger, nil) - internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } oldPV := makePV("pv1", "").withVersion("5").PersistentVolume newPV := makePV("pv1", "").withVersion("5").PersistentVolume @@ -146,7 +139,7 @@ func TestRestorePV(t *testing.T) { cache.Restore("nothing") // Add oldPV to cache - internalCache.add(oldPV) + assumecache.AddTestObject(cache.AssumeCache, oldPV) if err := verifyPV(cache, oldPV.Name, oldPV); err != nil { t.Fatalf("Failed to GetPV() after initial update: %v", err) } @@ -175,10 +168,6 @@ func TestRestorePV(t *testing.T) { func TestBasicPVCache(t *testing.T) { logger, _ := ktesting.NewTestContext(t) cache := NewPVAssumeCache(logger, nil) - internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } // Get object that doesn't exist pv, err := cache.GetPV("nothere") @@ -194,7 +183,7 @@ func TestBasicPVCache(t *testing.T) { for i := 0; i < 10; i++ { pv := makePV(fmt.Sprintf("test-pv%v", i), "").withVersion("1").PersistentVolume pvs[pv.Name] = pv - internalCache.add(pv) + assumecache.AddTestObject(cache.AssumeCache, pv) } // List them @@ -203,7 +192,7 @@ func TestBasicPVCache(t *testing.T) { // Update a PV updatedPV := makePV("test-pv3", "").withVersion("2").PersistentVolume pvs[updatedPV.Name] = updatedPV - internalCache.update(nil, updatedPV) + assumecache.UpdateTestObject(cache.AssumeCache, updatedPV) // List them verifyListPVs(t, cache, pvs, "") @@ -211,7 +200,7 @@ func TestBasicPVCache(t *testing.T) { // Delete a PV deletedPV := pvs["test-pv7"] delete(pvs, deletedPV.Name) - internalCache.delete(deletedPV) + assumecache.DeleteTestObject(cache.AssumeCache, deletedPV) // List them verifyListPVs(t, cache, pvs, "") @@ -220,17 +209,13 @@ func TestBasicPVCache(t *testing.T) { func TestPVCacheWithStorageClasses(t *testing.T) { logger, _ := ktesting.NewTestContext(t) cache := NewPVAssumeCache(logger, nil) - internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } // Add a bunch of PVs pvs1 := map[string]*v1.PersistentVolume{} for i := 0; i < 10; i++ { pv := makePV(fmt.Sprintf("test-pv%v", i), "class1").withVersion("1").PersistentVolume pvs1[pv.Name] = pv - internalCache.add(pv) + assumecache.AddTestObject(cache.AssumeCache, pv) } // Add a bunch of PVs @@ -238,7 +223,7 @@ func TestPVCacheWithStorageClasses(t *testing.T) { for i := 0; i < 10; i++ { pv := makePV(fmt.Sprintf("test2-pv%v", i), "class2").withVersion("1").PersistentVolume pvs2[pv.Name] = pv - internalCache.add(pv) + assumecache.AddTestObject(cache.AssumeCache, pv) } // List them @@ -248,7 +233,7 @@ func TestPVCacheWithStorageClasses(t *testing.T) { // Update a PV updatedPV := makePV("test-pv3", "class1").withVersion("2").PersistentVolume pvs1[updatedPV.Name] = updatedPV - internalCache.update(nil, updatedPV) + assumecache.UpdateTestObject(cache.AssumeCache, updatedPV) // List them verifyListPVs(t, cache, pvs1, "class1") @@ -257,7 +242,7 @@ func TestPVCacheWithStorageClasses(t *testing.T) { // Delete a PV deletedPV := pvs1["test-pv7"] delete(pvs1, deletedPV.Name) - internalCache.delete(deletedPV) + assumecache.DeleteTestObject(cache.AssumeCache, deletedPV) // List them verifyListPVs(t, cache, pvs1, "class1") @@ -267,16 +252,12 @@ func TestPVCacheWithStorageClasses(t *testing.T) { func TestAssumeUpdatePVCache(t *testing.T) { logger, _ := ktesting.NewTestContext(t) cache := NewPVAssumeCache(logger, nil) - internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } pvName := "test-pv0" // Add a PV pv := makePV(pvName, "").withVersion("1").PersistentVolume - internalCache.add(pv) + assumecache.AddTestObject(cache.AssumeCache, pv) if err := verifyPV(cache, pvName, pv); err != nil { t.Fatalf("failed to get PV: %v", err) } @@ -292,7 +273,7 @@ func TestAssumeUpdatePVCache(t *testing.T) { } // Add old PV - internalCache.add(pv) + assumecache.AddTestObject(cache.AssumeCache, pv) if err := verifyPV(cache, pvName, newPV); err != nil { t.Fatalf("failed to get PV after old PV added: %v", err) } @@ -309,7 +290,7 @@ func makeClaim(name, version, namespace string) *v1.PersistentVolumeClaim { } } -func verifyPVC(cache PVCAssumeCache, pvcKey string, expectedPVC *v1.PersistentVolumeClaim) error { +func verifyPVC(cache *PVCAssumeCache, pvcKey string, expectedPVC *v1.PersistentVolumeClaim) error { pvc, err := cache.GetPVC(pvcKey) if err != nil { return err @@ -361,13 +342,9 @@ func TestAssumePVC(t *testing.T) { for name, scenario := range scenarios { cache := NewPVCAssumeCache(logger, nil) - internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } // Add oldPVC to cache - internalCache.add(scenario.oldPVC) + assumecache.AddTestObject(cache.AssumeCache, scenario.oldPVC) if err := verifyPVC(cache, getPVCName(scenario.oldPVC), scenario.oldPVC); err != nil { t.Errorf("Failed to GetPVC() after initial update: %v", err) continue @@ -396,10 +373,6 @@ func TestAssumePVC(t *testing.T) { func TestRestorePVC(t *testing.T) { logger, _ := ktesting.NewTestContext(t) cache := NewPVCAssumeCache(logger, nil) - internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } oldPVC := makeClaim("pvc1", "5", "ns1") newPVC := makeClaim("pvc1", "5", "ns1") @@ -408,7 +381,7 @@ func TestRestorePVC(t *testing.T) { cache.Restore("nothing") // Add oldPVC to cache - internalCache.add(oldPVC) + assumecache.AddTestObject(cache.AssumeCache, oldPVC) if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil { t.Fatalf("Failed to GetPVC() after initial update: %v", err) } @@ -437,17 +410,13 @@ func TestRestorePVC(t *testing.T) { func TestAssumeUpdatePVCCache(t *testing.T) { logger, _ := ktesting.NewTestContext(t) cache := NewPVCAssumeCache(logger, nil) - internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } pvcName := "test-pvc0" pvcNamespace := "test-ns" // Add a PVC pvc := makeClaim(pvcName, "1", pvcNamespace) - internalCache.add(pvc) + assumecache.AddTestObject(cache.AssumeCache, pvc) if err := verifyPVC(cache, getPVCName(pvc), pvc); err != nil { t.Fatalf("failed to get PVC: %v", err) } @@ -463,7 +432,7 @@ func TestAssumeUpdatePVCCache(t *testing.T) { } // Add old PVC - internalCache.add(pvc) + assumecache.AddTestObject(cache.AssumeCache, pvc) if err := verifyPVC(cache, getPVCName(pvc), newPVC); err != nil { t.Fatalf("failed to get PVC after old PVC added: %v", err) } diff --git a/pkg/scheduler/framework/plugins/volumebinding/binder.go b/pkg/scheduler/framework/plugins/volumebinding/binder.go index f6ce916c6bfe1..4f078ea338537 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/binder.go +++ b/pkg/scheduler/framework/plugins/volumebinding/binder.go @@ -18,6 +18,7 @@ package volumebinding import ( "context" + "errors" "fmt" "sort" "strings" @@ -45,6 +46,7 @@ import ( v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding/metrics" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" "k8s.io/kubernetes/pkg/volume/util" ) @@ -218,8 +220,8 @@ type volumeBinder struct { nodeLister corelisters.NodeLister csiNodeLister storagelisters.CSINodeLister - pvcCache PVCAssumeCache - pvCache PVAssumeCache + pvcCache *PVCAssumeCache + pvCache *PVAssumeCache // Amount of time to wait for the bind operation to succeed bindTimeout time.Duration @@ -720,7 +722,7 @@ func (b *volumeBinder) checkBindings(logger klog.Logger, pod *v1.Pod, bindings [ if pvc.Spec.VolumeName != "" { pv, err := b.pvCache.GetAPIPV(pvc.Spec.VolumeName) if err != nil { - if _, ok := err.(*errNotFound); ok { + if errors.Is(err, assumecache.ErrNotFound) { // We tolerate NotFound error here, because PV is possibly // not found because of API delay, we can check next time. // And if PV does not exist because it's deleted, PVC will @@ -873,7 +875,7 @@ func (b *volumeBinder) checkBoundClaims(logger klog.Logger, claims []*v1.Persist pvName := pvc.Spec.VolumeName pv, err := b.pvCache.GetPV(pvName) if err != nil { - if _, ok := err.(*errNotFound); ok { + if errors.Is(err, assumecache.ErrNotFound) { err = nil } return true, false, err diff --git a/pkg/scheduler/framework/plugins/volumebinding/binder_test.go b/pkg/scheduler/framework/plugins/volumebinding/binder_test.go index 1746780ce2ebc..60ebdc4598426 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/binder_test.go +++ b/pkg/scheduler/framework/plugins/volumebinding/binder_test.go @@ -47,6 +47,7 @@ import ( _ "k8s.io/klog/v2/ktesting/init" "k8s.io/kubernetes/pkg/controller" pvtesting "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" ) var ( @@ -138,8 +139,6 @@ type testEnv struct { internalPodInformer coreinformers.PodInformer internalNodeInformer coreinformers.NodeInformer internalCSINodeInformer storageinformers.CSINodeInformer - internalPVCache *assumeCache - internalPVCCache *assumeCache // For CSIStorageCapacity feature testing: internalCSIDriverInformer storageinformers.CSIDriverInformer @@ -258,18 +257,6 @@ func newTestBinder(t *testing.T, ctx context.Context) *testEnv { t.Fatalf("Failed to convert to internal binder") } - pvCache := internalBinder.pvCache - internalPVCache, ok := pvCache.(*pvAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to convert to internal PV cache") - } - - pvcCache := internalBinder.pvcCache - internalPVCCache, ok := pvcCache.(*pvcAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to convert to internal PVC cache") - } - return &testEnv{ client: client, reactor: reactor, @@ -278,8 +265,6 @@ func newTestBinder(t *testing.T, ctx context.Context) *testEnv { internalPodInformer: podInformer, internalNodeInformer: nodeInformer, internalCSINodeInformer: csiNodeInformer, - internalPVCache: internalPVCache, - internalPVCCache: internalPVCCache, internalCSIDriverInformer: csiDriverInformer, internalCSIStorageCapacityInformer: csiStorageCapacityInformer, @@ -313,9 +298,8 @@ func (env *testEnv) addCSIStorageCapacities(capacities []*storagev1.CSIStorageCa } func (env *testEnv) initClaims(cachedPVCs []*v1.PersistentVolumeClaim, apiPVCs []*v1.PersistentVolumeClaim) { - internalPVCCache := env.internalPVCCache for _, pvc := range cachedPVCs { - internalPVCCache.add(pvc) + assumecache.AddTestObject(env.internalBinder.pvcCache.AssumeCache, pvc) if apiPVCs == nil { env.reactor.AddClaim(pvc) } @@ -326,9 +310,8 @@ func (env *testEnv) initClaims(cachedPVCs []*v1.PersistentVolumeClaim, apiPVCs [ } func (env *testEnv) initVolumes(cachedPVs []*v1.PersistentVolume, apiPVs []*v1.PersistentVolume) { - internalPVCache := env.internalPVCache for _, pv := range cachedPVs { - internalPVCache.add(pv) + assumecache.AddTestObject(env.internalBinder.pvCache.AssumeCache, pv) if apiPVs == nil { env.reactor.AddVolume(pv) } @@ -349,7 +332,7 @@ func (env *testEnv) updateVolumes(ctx context.Context, pvs []*v1.PersistentVolum } return wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 3*time.Second, false, func(ctx context.Context) (bool, error) { for _, pv := range pvs { - obj, err := env.internalPVCache.GetAPIObj(pv.Name) + obj, err := env.internalBinder.pvCache.GetAPIObj(pv.Name) if obj == nil || err != nil { return false, nil } @@ -375,7 +358,7 @@ func (env *testEnv) updateClaims(ctx context.Context, pvcs []*v1.PersistentVolum } return wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 3*time.Second, false, func(ctx context.Context) (bool, error) { for _, pvc := range pvcs { - obj, err := env.internalPVCCache.GetAPIObj(getPVCName(pvc)) + obj, err := env.internalBinder.pvcCache.GetAPIObj(getPVCName(pvc)) if obj == nil || err != nil { return false, nil } @@ -393,13 +376,13 @@ func (env *testEnv) updateClaims(ctx context.Context, pvcs []*v1.PersistentVolum func (env *testEnv) deleteVolumes(pvs []*v1.PersistentVolume) { for _, pv := range pvs { - env.internalPVCache.delete(pv) + assumecache.DeleteTestObject(env.internalBinder.pvCache.AssumeCache, pv) } } func (env *testEnv) deleteClaims(pvcs []*v1.PersistentVolumeClaim) { for _, pvc := range pvcs { - env.internalPVCCache.delete(pvc) + assumecache.DeleteTestObject(env.internalBinder.pvcCache.AssumeCache, pvc) } } diff --git a/pkg/scheduler/util/assumecache/assume_cache.go b/pkg/scheduler/util/assumecache/assume_cache.go new file mode 100644 index 0000000000000..69ec1175f0344 --- /dev/null +++ b/pkg/scheduler/util/assumecache/assume_cache.go @@ -0,0 +1,402 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package assumecache + +import ( + "errors" + "fmt" + "strconv" + "sync" + + "k8s.io/klog/v2" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/client-go/tools/cache" +) + +// Informer is the subset of [cache.SharedInformer] that NewAssumeCache depends upon. +type Informer interface { + AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) +} + +// AddTestObject adds an object to the assume cache. +// Only use this for unit testing! +func AddTestObject(cache *AssumeCache, obj interface{}) { + cache.add(obj) +} + +// UpdateTestObject updates an object in the assume cache. +// Only use this for unit testing! +func UpdateTestObject(cache *AssumeCache, obj interface{}) { + cache.update(nil, obj) +} + +// DeleteTestObject deletes object in the assume cache. +// Only use this for unit testing! +func DeleteTestObject(cache *AssumeCache, obj interface{}) { + cache.delete(obj) +} + +// Sentinel errors that can be checked for with errors.Is. +var ( + ErrWrongType = errors.New("object has wrong type") + ErrNotFound = errors.New("object not found") + ErrObjectName = errors.New("cannot determine object name") +) + +type WrongTypeError struct { + TypeName string + Object interface{} +} + +func (e WrongTypeError) Error() string { + return fmt.Sprintf("could not convert object to type %v: %+v", e.TypeName, e.Object) +} + +func (e WrongTypeError) Is(err error) bool { + return err == ErrWrongType +} + +type NotFoundError struct { + TypeName string + ObjectKey string +} + +func (e NotFoundError) Error() string { + return fmt.Sprintf("could not find %v %q", e.TypeName, e.ObjectKey) +} + +func (e NotFoundError) Is(err error) bool { + return err == ErrNotFound +} + +type ObjectNameError struct { + DetailedErr error +} + +func (e ObjectNameError) Error() string { + return fmt.Sprintf("failed to get object name: %v", e.DetailedErr) +} + +func (e ObjectNameError) Is(err error) bool { + return err == ErrObjectName +} + +// AssumeCache is a cache on top of the informer that allows for updating +// objects outside of informer events and also restoring the informer +// cache's version of the object. Objects are assumed to be +// Kubernetes API objects that are supported by [meta.Accessor]. +// +// Objects can referenced via their key, with [cache.MetaNamespaceKeyFunc] +// as key function. +// +// AssumeCache stores two pointers to represent a single object: +// - The pointer to the informer object. +// - The pointer to the latest object, which could be the same as +// the informer object, or an in-memory object. +// +// An informer update always overrides the latest object pointer. +// +// Assume() only updates the latest object pointer. +// Restore() sets the latest object pointer back to the informer object. +// Get/List() always returns the latest object pointer. +type AssumeCache struct { + // The logger that was chosen when setting up the cache. + // Will be used for all operations. + logger klog.Logger + + // Synchronizes updates to store + rwMutex sync.RWMutex + + // describes the object stored + description string + + // Stores objInfo pointers + store cache.Indexer + + // Index function for object + indexFunc cache.IndexFunc + indexName string +} + +type objInfo struct { + // name of the object + name string + + // Latest version of object could be cached-only or from informer + latestObj interface{} + + // Latest object from informer + apiObj interface{} +} + +func objInfoKeyFunc(obj interface{}) (string, error) { + objInfo, ok := obj.(*objInfo) + if !ok { + return "", &WrongTypeError{TypeName: "objInfo", Object: obj} + } + return objInfo.name, nil +} + +func (c *AssumeCache) objInfoIndexFunc(obj interface{}) ([]string, error) { + objInfo, ok := obj.(*objInfo) + if !ok { + return []string{""}, &WrongTypeError{TypeName: "objInfo", Object: obj} + } + return c.indexFunc(objInfo.latestObj) +} + +// NewAssumeCache creates an assume cache for general objects. +func NewAssumeCache(logger klog.Logger, informer Informer, description, indexName string, indexFunc cache.IndexFunc) *AssumeCache { + c := &AssumeCache{ + logger: logger, + description: description, + indexFunc: indexFunc, + indexName: indexName, + } + indexers := cache.Indexers{} + if indexName != "" && indexFunc != nil { + indexers[indexName] = c.objInfoIndexFunc + } + c.store = cache.NewIndexer(objInfoKeyFunc, indexers) + + // Unit tests don't use informers + if informer != nil { + // Cannot fail in practice?! No-one bothers checking the error. + _, _ = informer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.add, + UpdateFunc: c.update, + DeleteFunc: c.delete, + }, + ) + } + return c +} + +func (c *AssumeCache) add(obj interface{}) { + if obj == nil { + return + } + + name, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + c.logger.Error(&ObjectNameError{err}, "Add failed") + return + } + + c.rwMutex.Lock() + defer c.rwMutex.Unlock() + + if objInfo, _ := c.getObjInfo(name); objInfo != nil { + newVersion, err := c.getObjVersion(name, obj) + if err != nil { + c.logger.Error(err, "Add failed: couldn't get object version") + return + } + + storedVersion, err := c.getObjVersion(name, objInfo.latestObj) + if err != nil { + c.logger.Error(err, "Add failed: couldn't get stored object version") + return + } + + // Only update object if version is newer. + // This is so we don't override assumed objects due to informer resync. + if newVersion <= storedVersion { + c.logger.V(10).Info("Skip adding object to assume cache because version is not newer than storedVersion", "description", c.description, "cacheKey", name, "newVersion", newVersion, "storedVersion", storedVersion) + return + } + } + + objInfo := &objInfo{name: name, latestObj: obj, apiObj: obj} + if err = c.store.Update(objInfo); err != nil { + c.logger.Info("Error occurred while updating stored object", "err", err) + } else { + c.logger.V(10).Info("Adding object to assume cache", "description", c.description, "cacheKey", name, "assumeCache", obj) + } +} + +func (c *AssumeCache) update(oldObj interface{}, newObj interface{}) { + c.add(newObj) +} + +func (c *AssumeCache) delete(obj interface{}) { + if obj == nil { + return + } + + name, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + c.logger.Error(&ObjectNameError{err}, "Failed to delete") + return + } + + c.rwMutex.Lock() + defer c.rwMutex.Unlock() + + objInfo := &objInfo{name: name} + err = c.store.Delete(objInfo) + if err != nil { + c.logger.Error(err, "Failed to delete", "description", c.description, "cacheKey", name) + } +} + +func (c *AssumeCache) getObjVersion(name string, obj interface{}) (int64, error) { + objAccessor, err := meta.Accessor(obj) + if err != nil { + return -1, err + } + + objResourceVersion, err := strconv.ParseInt(objAccessor.GetResourceVersion(), 10, 64) + if err != nil { + //nolint:errorlint // Intentionally not wrapping the error, the underlying error is an implementation detail. + return -1, fmt.Errorf("error parsing ResourceVersion %q for %v %q: %v", objAccessor.GetResourceVersion(), c.description, name, err) + } + return objResourceVersion, nil +} + +func (c *AssumeCache) getObjInfo(key string) (*objInfo, error) { + obj, ok, err := c.store.GetByKey(key) + if err != nil { + return nil, err + } + if !ok { + return nil, &NotFoundError{TypeName: c.description, ObjectKey: key} + } + + objInfo, ok := obj.(*objInfo) + if !ok { + return nil, &WrongTypeError{"objInfo", obj} + } + return objInfo, nil +} + +// Get the object by its key. +func (c *AssumeCache) Get(key string) (interface{}, error) { + c.rwMutex.RLock() + defer c.rwMutex.RUnlock() + + objInfo, err := c.getObjInfo(key) + if err != nil { + return nil, err + } + return objInfo.latestObj, nil +} + +// GetAPIObj gets the informer cache's version by its key. +func (c *AssumeCache) GetAPIObj(key string) (interface{}, error) { + c.rwMutex.RLock() + defer c.rwMutex.RUnlock() + + objInfo, err := c.getObjInfo(key) + if err != nil { + return nil, err + } + return objInfo.apiObj, nil +} + +// List all the objects in the cache. +func (c *AssumeCache) List(indexObj interface{}) []interface{} { + c.rwMutex.RLock() + defer c.rwMutex.RUnlock() + + allObjs := []interface{}{} + var objs []interface{} + if c.indexName != "" { + o, err := c.store.Index(c.indexName, &objInfo{latestObj: indexObj}) + if err != nil { + c.logger.Error(err, "List index error") + return nil + } + objs = o + } else { + objs = c.store.List() + } + + for _, obj := range objs { + objInfo, ok := obj.(*objInfo) + if !ok { + c.logger.Error(&WrongTypeError{TypeName: "objInfo", Object: obj}, "List error") + continue + } + allObjs = append(allObjs, objInfo.latestObj) + } + return allObjs +} + +// Assume updates the object in-memory only. +// +// The version of the object must be greater or equal to +// the current object, otherwise an error is returned. +// +// Storing an object with the same version is supported +// by the assume cache, but suffers from a race: if an +// update is received via the informer while such an +// object is assumed, it gets dropped in favor of the +// newer object from the apiserver. +// +// Only assuming objects that were returned by an apiserver +// operation (Update, Patch) is safe. +func (c *AssumeCache) Assume(obj interface{}) error { + name, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + return &ObjectNameError{err} + } + + c.rwMutex.Lock() + defer c.rwMutex.Unlock() + + objInfo, err := c.getObjInfo(name) + if err != nil { + return err + } + + newVersion, err := c.getObjVersion(name, obj) + if err != nil { + return err + } + + storedVersion, err := c.getObjVersion(name, objInfo.latestObj) + if err != nil { + return err + } + + if newVersion < storedVersion { + return fmt.Errorf("%v %q is out of sync (stored: %d, assume: %d)", c.description, name, storedVersion, newVersion) + } + + // Only update the cached object + objInfo.latestObj = obj + c.logger.V(4).Info("Assumed object", "description", c.description, "cacheKey", name, "version", newVersion) + return nil +} + +// Restore the informer cache's version of the object. +func (c *AssumeCache) Restore(objName string) { + c.rwMutex.Lock() + defer c.rwMutex.Unlock() + + objInfo, err := c.getObjInfo(objName) + if err != nil { + // This could be expected if object got deleted + c.logger.V(5).Info("Restore object", "description", c.description, "cacheKey", objName, "err", err) + } else { + objInfo.latestObj = objInfo.apiObj + c.logger.V(4).Info("Restored object", "description", c.description, "cacheKey", objName) + } +} diff --git a/pkg/scheduler/util/assumecache/assume_cache_test.go b/pkg/scheduler/util/assumecache/assume_cache_test.go new file mode 100644 index 0000000000000..6c11ac275fa71 --- /dev/null +++ b/pkg/scheduler/util/assumecache/assume_cache_test.go @@ -0,0 +1,327 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package assumecache + +import ( + "fmt" + "slices" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/kubernetes/test/utils/ktesting" +) + +// testInformer implements [Informer] and can be used to feed changes into an assume +// cache during unit testing. Only a single event handler is supported, which is +// sufficient for one assume cache. +type testInformer struct { + handler cache.ResourceEventHandler +} + +func (i *testInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) { + i.handler = handler + return nil, nil +} + +func (i *testInformer) add(obj interface{}) { + if i.handler == nil { + return + } + i.handler.OnAdd(obj, false) +} + +func (i *testInformer) update(obj interface{}) { + if i.handler == nil { + return + } + i.handler.OnUpdate(nil, obj) +} + +func (i *testInformer) delete(obj interface{}) { + if i.handler == nil { + return + } + i.handler.OnDelete(obj) +} + +func makeObj(name, version, namespace string) metav1.Object { + return &metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + ResourceVersion: version, + } +} + +func newTest(t *testing.T) (ktesting.TContext, *AssumeCache, *testInformer) { + return newTestWithIndexer(t, "", nil) +} + +func newTestWithIndexer(t *testing.T, indexName string, indexFunc cache.IndexFunc) (ktesting.TContext, *AssumeCache, *testInformer) { + tCtx := ktesting.Init(t) + informer := new(testInformer) + cache := NewAssumeCache(tCtx.Logger(), informer, "TestObject", indexName, indexFunc) + return tCtx, cache, informer +} + +func verify(tCtx ktesting.TContext, cache *AssumeCache, key string, expectedObject, expectedAPIObject interface{}) { + tCtx.Helper() + actualObject, err := cache.Get(key) + if err != nil { + tCtx.Fatalf("unexpected error retrieving object for key %s: %v", key, err) + } + if actualObject != expectedObject { + tCtx.Fatalf("Get() returned %v, expected %v", actualObject, expectedObject) + } + actualAPIObject, err := cache.GetAPIObj(key) + if err != nil { + tCtx.Fatalf("unexpected error retrieving API object for key %s: %v", key, err) + } + if actualAPIObject != expectedAPIObject { + tCtx.Fatalf("GetAPIObject() returned %v, expected %v", actualAPIObject, expectedAPIObject) + } +} + +func verifyList(tCtx ktesting.TContext, assumeCache *AssumeCache, expectedObjs []interface{}, indexObj interface{}) { + actualObjs := assumeCache.List(indexObj) + diff := cmp.Diff(expectedObjs, actualObjs, cmpopts.SortSlices(func(x, y interface{}) bool { + xKey, err := cache.MetaNamespaceKeyFunc(x) + if err != nil { + tCtx.Fatalf("unexpected error determining key for %v: %v", x, err) + } + yKey, err := cache.MetaNamespaceKeyFunc(y) + if err != nil { + tCtx.Fatalf("unexpected error determining key for %v: %v", y, err) + } + return xKey < yKey + })) + if diff != "" { + tCtx.Fatalf("List() result differs (- expected, + actual):\n%s", diff) + } +} + +func TestAssume(t *testing.T) { + scenarios := map[string]struct { + oldObj metav1.Object + newObj interface{} + expectErr error + }{ + "success-same-version": { + oldObj: makeObj("pvc1", "5", ""), + newObj: makeObj("pvc1", "5", ""), + }, + "success-new-higher-version": { + oldObj: makeObj("pvc1", "5", ""), + newObj: makeObj("pvc1", "6", ""), + }, + "fail-old-not-found": { + oldObj: makeObj("pvc2", "5", ""), + newObj: makeObj("pvc1", "5", ""), + expectErr: ErrNotFound, + }, + "fail-new-lower-version": { + oldObj: makeObj("pvc1", "5", ""), + newObj: makeObj("pvc1", "4", ""), + expectErr: cmpopts.AnyError, + }, + "fail-new-bad-version": { + oldObj: makeObj("pvc1", "5", ""), + newObj: makeObj("pvc1", "a", ""), + expectErr: cmpopts.AnyError, + }, + "fail-old-bad-version": { + oldObj: makeObj("pvc1", "a", ""), + newObj: makeObj("pvc1", "5", ""), + expectErr: cmpopts.AnyError, + }, + "fail-new-bad-object": { + oldObj: makeObj("pvc1", "5", ""), + newObj: 1, + expectErr: ErrObjectName, + }, + } + + for name, scenario := range scenarios { + t.Run(name, func(t *testing.T) { + tCtx, cache, informer := newTest(t) + + // Add old object to cache. + informer.add(scenario.oldObj) + verify(tCtx, cache, scenario.oldObj.GetName(), scenario.oldObj, scenario.oldObj) + + // Assume new object. + err := cache.Assume(scenario.newObj) + if diff := cmp.Diff(scenario.expectErr, err, cmpopts.EquateErrors()); diff != "" { + t.Errorf("Assume() returned error: %v\ndiff (- expected, + actual):\n%s", err, diff) + } + + // Check that Get returns correct object. + expectedObj := scenario.newObj + if scenario.expectErr != nil { + expectedObj = scenario.oldObj + } + verify(tCtx, cache, scenario.oldObj.GetName(), expectedObj, scenario.oldObj) + }) + } +} + +func TestRestore(t *testing.T) { + tCtx, cache, informer := newTest(t) + + // This test assumes an object with the same version as the API object. + // The assume cache supports that, but doing so in real code suffers from + // a race: if an unrelated update is received from the apiserver while + // such an object is assumed, the local modification gets dropped. + oldObj := makeObj("pvc1", "5", "") + newObj := makeObj("pvc1", "5", "") + + // Restore object that doesn't exist + cache.Restore("nothing") + + // Add old object to cache. + informer.add(oldObj) + verify(ktesting.WithStep(tCtx, "after initial update"), cache, oldObj.GetName(), oldObj, oldObj) + + // Restore object. + cache.Restore(oldObj.GetName()) + verify(ktesting.WithStep(tCtx, "after initial Restore"), cache, oldObj.GetName(), oldObj, oldObj) + + // Assume new object. + if err := cache.Assume(newObj); err != nil { + t.Fatalf("Assume() returned error %v", err) + } + verify(ktesting.WithStep(tCtx, "after Assume"), cache, oldObj.GetName(), newObj, oldObj) + + // Restore object. + cache.Restore(oldObj.GetName()) + verify(ktesting.WithStep(tCtx, "after second Restore"), cache, oldObj.GetName(), oldObj, oldObj) +} + +func TestEvents(t *testing.T) { + tCtx, cache, informer := newTest(t) + + oldObj := makeObj("pvc1", "5", "") + newObj := makeObj("pvc1", "6", "") + key := oldObj.GetName() + + // Add old object to cache. + informer.add(oldObj) + verify(ktesting.WithStep(tCtx, "after initial update"), cache, key, oldObj, oldObj) + + // Update object. + informer.update(newObj) + verify(ktesting.WithStep(tCtx, "after initial update"), cache, key, newObj, newObj) + + // Some error cases (don't occur in practice). + informer.add(1) + verify(ktesting.WithStep(tCtx, "after nop add"), cache, key, newObj, newObj) + informer.add(nil) + verify(ktesting.WithStep(tCtx, "after nil add"), cache, key, newObj, newObj) + informer.update(oldObj) + verify(ktesting.WithStep(tCtx, "after nop update"), cache, key, newObj, newObj) + informer.update(nil) + verify(ktesting.WithStep(tCtx, "after nil update"), cache, key, newObj, newObj) + informer.delete(nil) + verify(ktesting.WithStep(tCtx, "after nop delete"), cache, key, newObj, newObj) + + // Delete object. + informer.delete(oldObj) + _, err := cache.Get(key) + if diff := cmp.Diff(ErrNotFound, err, cmpopts.EquateErrors()); diff != "" { + t.Errorf("Get did not return expected error: %v\ndiff (- expected, + actual):\n%s", err, diff) + } +} + +func TestListNoIndexer(t *testing.T) { + tCtx, cache, informer := newTest(t) + + // Add a bunch of objects. + objs := make([]interface{}, 0, 10) + for i := 0; i < 10; i++ { + obj := makeObj(fmt.Sprintf("test-pvc%v", i), "1", "") + objs = append(objs, obj) + informer.add(obj) + } + + // List them + verifyList(ktesting.WithStep(tCtx, "after add"), cache, objs, "") + + // Update an object. + updatedObj := makeObj("test-pvc3", "2", "") + objs[3] = updatedObj + informer.update(updatedObj) + + // List them + verifyList(ktesting.WithStep(tCtx, "after update"), cache, objs, "") + + // Delete a PV + deletedObj := objs[7] + objs = slices.Delete(objs, 7, 8) + informer.delete(deletedObj) + + // List them + verifyList(ktesting.WithStep(tCtx, "after delete"), cache, objs, "") +} + +func TestListWithIndexer(t *testing.T) { + namespaceIndexer := func(obj interface{}) ([]string, error) { + objAccessor, err := meta.Accessor(obj) + if err != nil { + return nil, err + } + return []string{objAccessor.GetNamespace()}, nil + } + tCtx, cache, informer := newTestWithIndexer(t, "myNamespace", namespaceIndexer) + + // Add a bunch of objects. + ns := "ns1" + objs := make([]interface{}, 0, 10) + for i := 0; i < 10; i++ { + obj := makeObj(fmt.Sprintf("test-pvc%v", i), "1", ns) + objs = append(objs, obj) + informer.add(obj) + } + + // Add a bunch of other objects. + for i := 0; i < 10; i++ { + obj := makeObj(fmt.Sprintf("test-pvc%v", i), "1", "ns2") + informer.add(obj) + } + + // List them + verifyList(ktesting.WithStep(tCtx, "after add"), cache, objs, objs[0]) + + // Update an object. + updatedObj := makeObj("test-pvc3", "2", ns) + objs[3] = updatedObj + informer.update(updatedObj) + + // List them + verifyList(ktesting.WithStep(tCtx, "after update"), cache, objs, objs[0]) + + // Delete a PV + deletedObj := objs[7] + objs = slices.Delete(objs, 7, 8) + informer.delete(deletedObj) + + // List them + verifyList(ktesting.WithStep(tCtx, "after delete"), cache, objs, objs[0]) +}