From 910b90fca305dcf807b24cd1562de28e55f7e84d Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 25 Apr 2024 10:49:41 +0200 Subject: [PATCH 1/3] scheduler: move assume cache to utils, part 1 This is a verbatim move resp. copy of the files. They don't build in their new location yet. --- .../assumecache}/assume_cache.go | 0 .../util/assumecache/assume_cache_test.go | 470 ++++++++++++++++++ 2 files changed, 470 insertions(+) rename pkg/scheduler/{framework/plugins/volumebinding => util/assumecache}/assume_cache.go (100%) create mode 100644 pkg/scheduler/util/assumecache/assume_cache_test.go diff --git a/pkg/scheduler/framework/plugins/volumebinding/assume_cache.go b/pkg/scheduler/util/assumecache/assume_cache.go similarity index 100% rename from pkg/scheduler/framework/plugins/volumebinding/assume_cache.go rename to pkg/scheduler/util/assumecache/assume_cache.go diff --git a/pkg/scheduler/util/assumecache/assume_cache_test.go b/pkg/scheduler/util/assumecache/assume_cache_test.go new file mode 100644 index 0000000000000..7391a412f8905 --- /dev/null +++ b/pkg/scheduler/util/assumecache/assume_cache_test.go @@ -0,0 +1,470 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package volumebinding + +import ( + "fmt" + "testing" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/component-helpers/storage/volume" + "k8s.io/klog/v2/ktesting" +) + +func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume, storageClassName string) { + pvList := cache.ListPVs(storageClassName) + if len(pvList) != len(expectedPVs) { + t.Errorf("ListPVs() returned %v PVs, expected %v", len(pvList), len(expectedPVs)) + } + for _, pv := range pvList { + expectedPV, ok := expectedPVs[pv.Name] + if !ok { + t.Errorf("ListPVs() returned unexpected PV %q", pv.Name) + } + if expectedPV != pv { + t.Errorf("ListPVs() returned PV %p, expected %p", pv, expectedPV) + } + } +} + +func verifyPV(cache PVAssumeCache, name string, expectedPV *v1.PersistentVolume) error { + pv, err := cache.GetPV(name) + if err != nil { + return err + } + if pv != expectedPV { + return fmt.Errorf("GetPV() returned %p, expected %p", pv, expectedPV) + } + return nil +} + +func TestAssumePV(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) + scenarios := map[string]struct { + oldPV *v1.PersistentVolume + newPV *v1.PersistentVolume + shouldSucceed bool + }{ + "success-same-version": { + oldPV: makePV("pv1", "").withVersion("5").PersistentVolume, + newPV: makePV("pv1", "").withVersion("5").PersistentVolume, + shouldSucceed: true, + }, + "success-storageclass-same-version": { + oldPV: makePV("pv1", "class1").withVersion("5").PersistentVolume, + newPV: makePV("pv1", "class1").withVersion("5").PersistentVolume, + shouldSucceed: true, + }, + "success-new-higher-version": { + oldPV: makePV("pv1", "").withVersion("5").PersistentVolume, + newPV: makePV("pv1", "").withVersion("6").PersistentVolume, + shouldSucceed: true, + }, + "fail-old-not-found": { + oldPV: makePV("pv2", "").withVersion("5").PersistentVolume, + newPV: makePV("pv1", "").withVersion("5").PersistentVolume, + shouldSucceed: false, + }, + "fail-new-lower-version": { + oldPV: makePV("pv1", "").withVersion("5").PersistentVolume, + newPV: makePV("pv1", "").withVersion("4").PersistentVolume, + shouldSucceed: false, + }, + "fail-new-bad-version": { + oldPV: makePV("pv1", "").withVersion("5").PersistentVolume, + newPV: makePV("pv1", "").withVersion("a").PersistentVolume, + shouldSucceed: false, + }, + "fail-old-bad-version": { + oldPV: makePV("pv1", "").withVersion("a").PersistentVolume, + newPV: makePV("pv1", "").withVersion("5").PersistentVolume, + shouldSucceed: false, + }, + } + + for name, scenario := range scenarios { + cache := NewPVAssumeCache(logger, nil) + internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) + if !ok { + t.Fatalf("Failed to get internal cache") + } + + // Add oldPV to cache + internalCache.add(scenario.oldPV) + if err := verifyPV(cache, scenario.oldPV.Name, scenario.oldPV); err != nil { + t.Errorf("Failed to GetPV() after initial update: %v", err) + continue + } + + // Assume newPV + err := cache.Assume(scenario.newPV) + if scenario.shouldSucceed && err != nil { + t.Errorf("Test %q failed: Assume() returned error %v", name, err) + } + if !scenario.shouldSucceed && err == nil { + t.Errorf("Test %q failed: Assume() returned success but expected error", name) + } + + // Check that GetPV returns correct PV + expectedPV := scenario.newPV + if !scenario.shouldSucceed { + expectedPV = scenario.oldPV + } + if err := verifyPV(cache, scenario.oldPV.Name, expectedPV); err != nil { + t.Errorf("Failed to GetPV() after initial update: %v", err) + } + } +} + +func TestRestorePV(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) + cache := NewPVAssumeCache(logger, nil) + internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) + if !ok { + t.Fatalf("Failed to get internal cache") + } + + oldPV := makePV("pv1", "").withVersion("5").PersistentVolume + newPV := makePV("pv1", "").withVersion("5").PersistentVolume + + // Restore PV that doesn't exist + cache.Restore("nothing") + + // Add oldPV to cache + internalCache.add(oldPV) + if err := verifyPV(cache, oldPV.Name, oldPV); err != nil { + t.Fatalf("Failed to GetPV() after initial update: %v", err) + } + + // Restore PV + cache.Restore(oldPV.Name) + if err := verifyPV(cache, oldPV.Name, oldPV); err != nil { + t.Fatalf("Failed to GetPV() after initial restore: %v", err) + } + + // Assume newPV + if err := cache.Assume(newPV); err != nil { + t.Fatalf("Assume() returned error %v", err) + } + if err := verifyPV(cache, oldPV.Name, newPV); err != nil { + t.Fatalf("Failed to GetPV() after Assume: %v", err) + } + + // Restore PV + cache.Restore(oldPV.Name) + if err := verifyPV(cache, oldPV.Name, oldPV); err != nil { + t.Fatalf("Failed to GetPV() after restore: %v", err) + } +} + +func TestBasicPVCache(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) + cache := NewPVAssumeCache(logger, nil) + internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) + if !ok { + t.Fatalf("Failed to get internal cache") + } + + // Get object that doesn't exist + pv, err := cache.GetPV("nothere") + if err == nil { + t.Errorf("GetPV() returned unexpected success") + } + if pv != nil { + t.Errorf("GetPV() returned unexpected PV %q", pv.Name) + } + + // Add a bunch of PVs + pvs := map[string]*v1.PersistentVolume{} + for i := 0; i < 10; i++ { + pv := makePV(fmt.Sprintf("test-pv%v", i), "").withVersion("1").PersistentVolume + pvs[pv.Name] = pv + internalCache.add(pv) + } + + // List them + verifyListPVs(t, cache, pvs, "") + + // Update a PV + updatedPV := makePV("test-pv3", "").withVersion("2").PersistentVolume + pvs[updatedPV.Name] = updatedPV + internalCache.update(nil, updatedPV) + + // List them + verifyListPVs(t, cache, pvs, "") + + // Delete a PV + deletedPV := pvs["test-pv7"] + delete(pvs, deletedPV.Name) + internalCache.delete(deletedPV) + + // List them + verifyListPVs(t, cache, pvs, "") +} + +func TestPVCacheWithStorageClasses(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) + cache := NewPVAssumeCache(logger, nil) + internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) + if !ok { + t.Fatalf("Failed to get internal cache") + } + + // Add a bunch of PVs + pvs1 := map[string]*v1.PersistentVolume{} + for i := 0; i < 10; i++ { + pv := makePV(fmt.Sprintf("test-pv%v", i), "class1").withVersion("1").PersistentVolume + pvs1[pv.Name] = pv + internalCache.add(pv) + } + + // Add a bunch of PVs + pvs2 := map[string]*v1.PersistentVolume{} + for i := 0; i < 10; i++ { + pv := makePV(fmt.Sprintf("test2-pv%v", i), "class2").withVersion("1").PersistentVolume + pvs2[pv.Name] = pv + internalCache.add(pv) + } + + // List them + verifyListPVs(t, cache, pvs1, "class1") + verifyListPVs(t, cache, pvs2, "class2") + + // Update a PV + updatedPV := makePV("test-pv3", "class1").withVersion("2").PersistentVolume + pvs1[updatedPV.Name] = updatedPV + internalCache.update(nil, updatedPV) + + // List them + verifyListPVs(t, cache, pvs1, "class1") + verifyListPVs(t, cache, pvs2, "class2") + + // Delete a PV + deletedPV := pvs1["test-pv7"] + delete(pvs1, deletedPV.Name) + internalCache.delete(deletedPV) + + // List them + verifyListPVs(t, cache, pvs1, "class1") + verifyListPVs(t, cache, pvs2, "class2") +} + +func TestAssumeUpdatePVCache(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) + cache := NewPVAssumeCache(logger, nil) + internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) + if !ok { + t.Fatalf("Failed to get internal cache") + } + + pvName := "test-pv0" + + // Add a PV + pv := makePV(pvName, "").withVersion("1").PersistentVolume + internalCache.add(pv) + if err := verifyPV(cache, pvName, pv); err != nil { + t.Fatalf("failed to get PV: %v", err) + } + + // Assume PV + newPV := pv.DeepCopy() + newPV.Spec.ClaimRef = &v1.ObjectReference{Name: "test-claim"} + if err := cache.Assume(newPV); err != nil { + t.Fatalf("failed to assume PV: %v", err) + } + if err := verifyPV(cache, pvName, newPV); err != nil { + t.Fatalf("failed to get PV after assume: %v", err) + } + + // Add old PV + internalCache.add(pv) + if err := verifyPV(cache, pvName, newPV); err != nil { + t.Fatalf("failed to get PV after old PV added: %v", err) + } +} + +func makeClaim(name, version, namespace string) *v1.PersistentVolumeClaim { + return &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + ResourceVersion: version, + Annotations: map[string]string{}, + }, + } +} + +func verifyPVC(cache PVCAssumeCache, pvcKey string, expectedPVC *v1.PersistentVolumeClaim) error { + pvc, err := cache.GetPVC(pvcKey) + if err != nil { + return err + } + if pvc != expectedPVC { + return fmt.Errorf("GetPVC() returned %p, expected %p", pvc, expectedPVC) + } + return nil +} + +func TestAssumePVC(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) + scenarios := map[string]struct { + oldPVC *v1.PersistentVolumeClaim + newPVC *v1.PersistentVolumeClaim + shouldSucceed bool + }{ + "success-same-version": { + oldPVC: makeClaim("pvc1", "5", "ns1"), + newPVC: makeClaim("pvc1", "5", "ns1"), + shouldSucceed: true, + }, + "success-new-higher-version": { + oldPVC: makeClaim("pvc1", "5", "ns1"), + newPVC: makeClaim("pvc1", "6", "ns1"), + shouldSucceed: true, + }, + "fail-old-not-found": { + oldPVC: makeClaim("pvc2", "5", "ns1"), + newPVC: makeClaim("pvc1", "5", "ns1"), + shouldSucceed: false, + }, + "fail-new-lower-version": { + oldPVC: makeClaim("pvc1", "5", "ns1"), + newPVC: makeClaim("pvc1", "4", "ns1"), + shouldSucceed: false, + }, + "fail-new-bad-version": { + oldPVC: makeClaim("pvc1", "5", "ns1"), + newPVC: makeClaim("pvc1", "a", "ns1"), + shouldSucceed: false, + }, + "fail-old-bad-version": { + oldPVC: makeClaim("pvc1", "a", "ns1"), + newPVC: makeClaim("pvc1", "5", "ns1"), + shouldSucceed: false, + }, + } + + for name, scenario := range scenarios { + cache := NewPVCAssumeCache(logger, nil) + internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache) + if !ok { + t.Fatalf("Failed to get internal cache") + } + + // Add oldPVC to cache + internalCache.add(scenario.oldPVC) + if err := verifyPVC(cache, getPVCName(scenario.oldPVC), scenario.oldPVC); err != nil { + t.Errorf("Failed to GetPVC() after initial update: %v", err) + continue + } + + // Assume newPVC + err := cache.Assume(scenario.newPVC) + if scenario.shouldSucceed && err != nil { + t.Errorf("Test %q failed: Assume() returned error %v", name, err) + } + if !scenario.shouldSucceed && err == nil { + t.Errorf("Test %q failed: Assume() returned success but expected error", name) + } + + // Check that GetPVC returns correct PVC + expectedPV := scenario.newPVC + if !scenario.shouldSucceed { + expectedPV = scenario.oldPVC + } + if err := verifyPVC(cache, getPVCName(scenario.oldPVC), expectedPV); err != nil { + t.Errorf("Failed to GetPVC() after initial update: %v", err) + } + } +} + +func TestRestorePVC(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) + cache := NewPVCAssumeCache(logger, nil) + internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache) + if !ok { + t.Fatalf("Failed to get internal cache") + } + + oldPVC := makeClaim("pvc1", "5", "ns1") + newPVC := makeClaim("pvc1", "5", "ns1") + + // Restore PVC that doesn't exist + cache.Restore("nothing") + + // Add oldPVC to cache + internalCache.add(oldPVC) + if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil { + t.Fatalf("Failed to GetPVC() after initial update: %v", err) + } + + // Restore PVC + cache.Restore(getPVCName(oldPVC)) + if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil { + t.Fatalf("Failed to GetPVC() after initial restore: %v", err) + } + + // Assume newPVC + if err := cache.Assume(newPVC); err != nil { + t.Fatalf("Assume() returned error %v", err) + } + if err := verifyPVC(cache, getPVCName(oldPVC), newPVC); err != nil { + t.Fatalf("Failed to GetPVC() after Assume: %v", err) + } + + // Restore PVC + cache.Restore(getPVCName(oldPVC)) + if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil { + t.Fatalf("Failed to GetPVC() after restore: %v", err) + } +} + +func TestAssumeUpdatePVCCache(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) + cache := NewPVCAssumeCache(logger, nil) + internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache) + if !ok { + t.Fatalf("Failed to get internal cache") + } + + pvcName := "test-pvc0" + pvcNamespace := "test-ns" + + // Add a PVC + pvc := makeClaim(pvcName, "1", pvcNamespace) + internalCache.add(pvc) + if err := verifyPVC(cache, getPVCName(pvc), pvc); err != nil { + t.Fatalf("failed to get PVC: %v", err) + } + + // Assume PVC + newPVC := pvc.DeepCopy() + newPVC.Annotations[volume.AnnSelectedNode] = "test-node" + if err := cache.Assume(newPVC); err != nil { + t.Fatalf("failed to assume PVC: %v", err) + } + if err := verifyPVC(cache, getPVCName(pvc), newPVC); err != nil { + t.Fatalf("failed to get PVC after assume: %v", err) + } + + // Add old PVC + internalCache.add(pvc) + if err := verifyPVC(cache, getPVCName(pvc), newPVC); err != nil { + t.Fatalf("failed to get PVC after old PVC added: %v", err) + } +} From 26e0409c36836c5be1264c166a7e8a9306c4c8c4 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 28 Mar 2024 20:28:29 +0100 Subject: [PATCH 2/3] scheduler: move assume cache to utils, part 2 This is now used by both the volumebinding and dynamicresources plugin, so promoting it to a common helper package is better. In terms of functionality, nothing was changed. Documentation got updated (warns about storing locally modified objects, clarifies what the Get parameters are). Code coverage should be a bit better than before (tested with and without indexer, exercises event handlers, more error paths). Checking for specific errors can now be done via errors.Is. --- .../dynamicresources/dynamicresources.go | 6 +- .../plugins/volumebinding/assume_cache.go | 149 +++++ .../volumebinding/assume_cache_test.go | 63 +- .../framework/plugins/volumebinding/binder.go | 6 +- .../plugins/volumebinding/binder_test.go | 31 +- .../util/assumecache/assume_cache.go | 275 ++++---- .../util/assumecache/assume_cache_test.go | 603 +++++++----------- 7 files changed, 518 insertions(+), 615 deletions(-) create mode 100644 pkg/scheduler/framework/plugins/volumebinding/assume_cache.go diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index e39bfa5763652..ab5a187d4236c 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -46,8 +46,8 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" - "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" schedutil "k8s.io/kubernetes/pkg/scheduler/util" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" "k8s.io/utils/ptr" ) @@ -302,7 +302,7 @@ type dynamicResources struct { // When implementing cluster autoscaler support, this assume cache or // something like it (see https://github.com/kubernetes/kubernetes/pull/112202) // might have to be managed by the cluster autoscaler. - claimAssumeCache volumebinding.AssumeCache + claimAssumeCache assumecache.AssumeCache // inFlightAllocations is map from claim UUIDs to claim objects for those claims // for which allocation was triggered during a scheduling cycle and the @@ -355,7 +355,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe classParametersLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClassParameters().Lister(), resourceSliceLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceSlices().Lister(), claimNameLookup: resourceclaim.NewNameLookup(fh.ClientSet()), - claimAssumeCache: volumebinding.NewAssumeCache(logger, fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Informer(), "claim", "", nil), + claimAssumeCache: assumecache.NewAssumeCache(logger, fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Informer(), "claim", "", nil), } return pl, nil diff --git a/pkg/scheduler/framework/plugins/volumebinding/assume_cache.go b/pkg/scheduler/framework/plugins/volumebinding/assume_cache.go new file mode 100644 index 0000000000000..bfbd322af766e --- /dev/null +++ b/pkg/scheduler/framework/plugins/volumebinding/assume_cache.go @@ -0,0 +1,149 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package volumebinding + +import ( + "fmt" + + "k8s.io/klog/v2" + + v1 "k8s.io/api/core/v1" + storagehelpers "k8s.io/component-helpers/storage/volume" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" +) + +// PVAssumeCache is a AssumeCache for PersistentVolume objects +type PVAssumeCache interface { + assumecache.AssumeCache + + GetPV(pvName string) (*v1.PersistentVolume, error) + GetAPIPV(pvName string) (*v1.PersistentVolume, error) + ListPVs(storageClassName string) []*v1.PersistentVolume +} + +type pvAssumeCache struct { + assumecache.AssumeCache + logger klog.Logger +} + +func pvStorageClassIndexFunc(obj interface{}) ([]string, error) { + if pv, ok := obj.(*v1.PersistentVolume); ok { + return []string{storagehelpers.GetPersistentVolumeClass(pv)}, nil + } + return []string{""}, fmt.Errorf("object is not a v1.PersistentVolume: %v", obj) +} + +// NewPVAssumeCache creates a PV assume cache. +func NewPVAssumeCache(logger klog.Logger, informer assumecache.Informer) PVAssumeCache { + logger = klog.LoggerWithName(logger, "PV Cache") + return &pvAssumeCache{ + AssumeCache: assumecache.NewAssumeCache(logger, informer, "v1.PersistentVolume", "storageclass", pvStorageClassIndexFunc), + logger: logger, + } +} + +func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) { + obj, err := c.Get(pvName) + if err != nil { + return nil, err + } + + pv, ok := obj.(*v1.PersistentVolume) + if !ok { + return nil, &assumecache.WrongTypeError{TypeName: "v1.PersistentVolume", Object: obj} + } + return pv, nil +} + +func (c *pvAssumeCache) GetAPIPV(pvName string) (*v1.PersistentVolume, error) { + obj, err := c.GetAPIObj(pvName) + if err != nil { + return nil, err + } + pv, ok := obj.(*v1.PersistentVolume) + if !ok { + return nil, &assumecache.WrongTypeError{TypeName: "v1.PersistentVolume", Object: obj} + } + return pv, nil +} + +func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume { + objs := c.List(&v1.PersistentVolume{ + Spec: v1.PersistentVolumeSpec{ + StorageClassName: storageClassName, + }, + }) + pvs := []*v1.PersistentVolume{} + for _, obj := range objs { + pv, ok := obj.(*v1.PersistentVolume) + if !ok { + c.logger.Error(&assumecache.WrongTypeError{TypeName: "v1.PersistentVolume", Object: obj}, "ListPVs") + continue + } + pvs = append(pvs, pv) + } + return pvs +} + +// PVCAssumeCache is a AssumeCache for PersistentVolumeClaim objects +type PVCAssumeCache interface { + assumecache.AssumeCache + + // GetPVC returns the PVC from the cache with given pvcKey. + // pvcKey is the result of MetaNamespaceKeyFunc on PVC obj + GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) + GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) +} + +type pvcAssumeCache struct { + assumecache.AssumeCache + logger klog.Logger +} + +// NewPVCAssumeCache creates a PVC assume cache. +func NewPVCAssumeCache(logger klog.Logger, informer assumecache.Informer) PVCAssumeCache { + logger = klog.LoggerWithName(logger, "PVC Cache") + return &pvcAssumeCache{ + AssumeCache: assumecache.NewAssumeCache(logger, informer, "v1.PersistentVolumeClaim", "", nil), + logger: logger, + } +} + +func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) { + obj, err := c.Get(pvcKey) + if err != nil { + return nil, err + } + + pvc, ok := obj.(*v1.PersistentVolumeClaim) + if !ok { + return nil, &assumecache.WrongTypeError{TypeName: "v1.PersistentVolumeClaim", Object: obj} + } + return pvc, nil +} + +func (c *pvcAssumeCache) GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) { + obj, err := c.GetAPIObj(pvcKey) + if err != nil { + return nil, err + } + pvc, ok := obj.(*v1.PersistentVolumeClaim) + if !ok { + return nil, &assumecache.WrongTypeError{TypeName: "v1.PersistentVolumeClaim", Object: obj} + } + return pvc, nil +} diff --git a/pkg/scheduler/framework/plugins/volumebinding/assume_cache_test.go b/pkg/scheduler/framework/plugins/volumebinding/assume_cache_test.go index 7391a412f8905..92f047fba06ac 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/assume_cache_test.go +++ b/pkg/scheduler/framework/plugins/volumebinding/assume_cache_test.go @@ -24,6 +24,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/component-helpers/storage/volume" "k8s.io/klog/v2/ktesting" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" ) func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume, storageClassName string) { @@ -99,13 +100,9 @@ func TestAssumePV(t *testing.T) { for name, scenario := range scenarios { cache := NewPVAssumeCache(logger, nil) - internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } // Add oldPV to cache - internalCache.add(scenario.oldPV) + assumecache.AddTestObject(cache, scenario.oldPV) if err := verifyPV(cache, scenario.oldPV.Name, scenario.oldPV); err != nil { t.Errorf("Failed to GetPV() after initial update: %v", err) continue @@ -134,10 +131,6 @@ func TestAssumePV(t *testing.T) { func TestRestorePV(t *testing.T) { logger, _ := ktesting.NewTestContext(t) cache := NewPVAssumeCache(logger, nil) - internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } oldPV := makePV("pv1", "").withVersion("5").PersistentVolume newPV := makePV("pv1", "").withVersion("5").PersistentVolume @@ -146,7 +139,7 @@ func TestRestorePV(t *testing.T) { cache.Restore("nothing") // Add oldPV to cache - internalCache.add(oldPV) + assumecache.AddTestObject(cache, oldPV) if err := verifyPV(cache, oldPV.Name, oldPV); err != nil { t.Fatalf("Failed to GetPV() after initial update: %v", err) } @@ -175,10 +168,6 @@ func TestRestorePV(t *testing.T) { func TestBasicPVCache(t *testing.T) { logger, _ := ktesting.NewTestContext(t) cache := NewPVAssumeCache(logger, nil) - internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } // Get object that doesn't exist pv, err := cache.GetPV("nothere") @@ -194,7 +183,7 @@ func TestBasicPVCache(t *testing.T) { for i := 0; i < 10; i++ { pv := makePV(fmt.Sprintf("test-pv%v", i), "").withVersion("1").PersistentVolume pvs[pv.Name] = pv - internalCache.add(pv) + assumecache.AddTestObject(cache, pv) } // List them @@ -203,7 +192,7 @@ func TestBasicPVCache(t *testing.T) { // Update a PV updatedPV := makePV("test-pv3", "").withVersion("2").PersistentVolume pvs[updatedPV.Name] = updatedPV - internalCache.update(nil, updatedPV) + assumecache.UpdateTestObject(cache, updatedPV) // List them verifyListPVs(t, cache, pvs, "") @@ -211,7 +200,7 @@ func TestBasicPVCache(t *testing.T) { // Delete a PV deletedPV := pvs["test-pv7"] delete(pvs, deletedPV.Name) - internalCache.delete(deletedPV) + assumecache.DeleteTestObject(cache, deletedPV) // List them verifyListPVs(t, cache, pvs, "") @@ -220,17 +209,13 @@ func TestBasicPVCache(t *testing.T) { func TestPVCacheWithStorageClasses(t *testing.T) { logger, _ := ktesting.NewTestContext(t) cache := NewPVAssumeCache(logger, nil) - internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } // Add a bunch of PVs pvs1 := map[string]*v1.PersistentVolume{} for i := 0; i < 10; i++ { pv := makePV(fmt.Sprintf("test-pv%v", i), "class1").withVersion("1").PersistentVolume pvs1[pv.Name] = pv - internalCache.add(pv) + assumecache.AddTestObject(cache, pv) } // Add a bunch of PVs @@ -238,7 +223,7 @@ func TestPVCacheWithStorageClasses(t *testing.T) { for i := 0; i < 10; i++ { pv := makePV(fmt.Sprintf("test2-pv%v", i), "class2").withVersion("1").PersistentVolume pvs2[pv.Name] = pv - internalCache.add(pv) + assumecache.AddTestObject(cache, pv) } // List them @@ -248,7 +233,7 @@ func TestPVCacheWithStorageClasses(t *testing.T) { // Update a PV updatedPV := makePV("test-pv3", "class1").withVersion("2").PersistentVolume pvs1[updatedPV.Name] = updatedPV - internalCache.update(nil, updatedPV) + assumecache.UpdateTestObject(cache, updatedPV) // List them verifyListPVs(t, cache, pvs1, "class1") @@ -257,7 +242,7 @@ func TestPVCacheWithStorageClasses(t *testing.T) { // Delete a PV deletedPV := pvs1["test-pv7"] delete(pvs1, deletedPV.Name) - internalCache.delete(deletedPV) + assumecache.DeleteTestObject(cache, deletedPV) // List them verifyListPVs(t, cache, pvs1, "class1") @@ -267,16 +252,12 @@ func TestPVCacheWithStorageClasses(t *testing.T) { func TestAssumeUpdatePVCache(t *testing.T) { logger, _ := ktesting.NewTestContext(t) cache := NewPVAssumeCache(logger, nil) - internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } pvName := "test-pv0" // Add a PV pv := makePV(pvName, "").withVersion("1").PersistentVolume - internalCache.add(pv) + assumecache.AddTestObject(cache, pv) if err := verifyPV(cache, pvName, pv); err != nil { t.Fatalf("failed to get PV: %v", err) } @@ -292,7 +273,7 @@ func TestAssumeUpdatePVCache(t *testing.T) { } // Add old PV - internalCache.add(pv) + assumecache.AddTestObject(cache, pv) if err := verifyPV(cache, pvName, newPV); err != nil { t.Fatalf("failed to get PV after old PV added: %v", err) } @@ -361,13 +342,9 @@ func TestAssumePVC(t *testing.T) { for name, scenario := range scenarios { cache := NewPVCAssumeCache(logger, nil) - internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } // Add oldPVC to cache - internalCache.add(scenario.oldPVC) + assumecache.AddTestObject(cache, scenario.oldPVC) if err := verifyPVC(cache, getPVCName(scenario.oldPVC), scenario.oldPVC); err != nil { t.Errorf("Failed to GetPVC() after initial update: %v", err) continue @@ -396,10 +373,6 @@ func TestAssumePVC(t *testing.T) { func TestRestorePVC(t *testing.T) { logger, _ := ktesting.NewTestContext(t) cache := NewPVCAssumeCache(logger, nil) - internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } oldPVC := makeClaim("pvc1", "5", "ns1") newPVC := makeClaim("pvc1", "5", "ns1") @@ -408,7 +381,7 @@ func TestRestorePVC(t *testing.T) { cache.Restore("nothing") // Add oldPVC to cache - internalCache.add(oldPVC) + assumecache.AddTestObject(cache, oldPVC) if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil { t.Fatalf("Failed to GetPVC() after initial update: %v", err) } @@ -437,17 +410,13 @@ func TestRestorePVC(t *testing.T) { func TestAssumeUpdatePVCCache(t *testing.T) { logger, _ := ktesting.NewTestContext(t) cache := NewPVCAssumeCache(logger, nil) - internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } pvcName := "test-pvc0" pvcNamespace := "test-ns" // Add a PVC pvc := makeClaim(pvcName, "1", pvcNamespace) - internalCache.add(pvc) + assumecache.AddTestObject(cache, pvc) if err := verifyPVC(cache, getPVCName(pvc), pvc); err != nil { t.Fatalf("failed to get PVC: %v", err) } @@ -463,7 +432,7 @@ func TestAssumeUpdatePVCCache(t *testing.T) { } // Add old PVC - internalCache.add(pvc) + assumecache.AddTestObject(cache, pvc) if err := verifyPVC(cache, getPVCName(pvc), newPVC); err != nil { t.Fatalf("failed to get PVC after old PVC added: %v", err) } diff --git a/pkg/scheduler/framework/plugins/volumebinding/binder.go b/pkg/scheduler/framework/plugins/volumebinding/binder.go index f6ce916c6bfe1..ac1031da0e999 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/binder.go +++ b/pkg/scheduler/framework/plugins/volumebinding/binder.go @@ -18,6 +18,7 @@ package volumebinding import ( "context" + "errors" "fmt" "sort" "strings" @@ -45,6 +46,7 @@ import ( v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding/metrics" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" "k8s.io/kubernetes/pkg/volume/util" ) @@ -720,7 +722,7 @@ func (b *volumeBinder) checkBindings(logger klog.Logger, pod *v1.Pod, bindings [ if pvc.Spec.VolumeName != "" { pv, err := b.pvCache.GetAPIPV(pvc.Spec.VolumeName) if err != nil { - if _, ok := err.(*errNotFound); ok { + if errors.Is(err, assumecache.ErrNotFound) { // We tolerate NotFound error here, because PV is possibly // not found because of API delay, we can check next time. // And if PV does not exist because it's deleted, PVC will @@ -873,7 +875,7 @@ func (b *volumeBinder) checkBoundClaims(logger klog.Logger, claims []*v1.Persist pvName := pvc.Spec.VolumeName pv, err := b.pvCache.GetPV(pvName) if err != nil { - if _, ok := err.(*errNotFound); ok { + if errors.Is(err, assumecache.ErrNotFound) { err = nil } return true, false, err diff --git a/pkg/scheduler/framework/plugins/volumebinding/binder_test.go b/pkg/scheduler/framework/plugins/volumebinding/binder_test.go index 1746780ce2ebc..b9497d089aa54 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/binder_test.go +++ b/pkg/scheduler/framework/plugins/volumebinding/binder_test.go @@ -47,6 +47,7 @@ import ( _ "k8s.io/klog/v2/ktesting/init" "k8s.io/kubernetes/pkg/controller" pvtesting "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" ) var ( @@ -138,8 +139,6 @@ type testEnv struct { internalPodInformer coreinformers.PodInformer internalNodeInformer coreinformers.NodeInformer internalCSINodeInformer storageinformers.CSINodeInformer - internalPVCache *assumeCache - internalPVCCache *assumeCache // For CSIStorageCapacity feature testing: internalCSIDriverInformer storageinformers.CSIDriverInformer @@ -258,18 +257,6 @@ func newTestBinder(t *testing.T, ctx context.Context) *testEnv { t.Fatalf("Failed to convert to internal binder") } - pvCache := internalBinder.pvCache - internalPVCache, ok := pvCache.(*pvAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to convert to internal PV cache") - } - - pvcCache := internalBinder.pvcCache - internalPVCCache, ok := pvcCache.(*pvcAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to convert to internal PVC cache") - } - return &testEnv{ client: client, reactor: reactor, @@ -278,8 +265,6 @@ func newTestBinder(t *testing.T, ctx context.Context) *testEnv { internalPodInformer: podInformer, internalNodeInformer: nodeInformer, internalCSINodeInformer: csiNodeInformer, - internalPVCache: internalPVCache, - internalPVCCache: internalPVCCache, internalCSIDriverInformer: csiDriverInformer, internalCSIStorageCapacityInformer: csiStorageCapacityInformer, @@ -313,9 +298,8 @@ func (env *testEnv) addCSIStorageCapacities(capacities []*storagev1.CSIStorageCa } func (env *testEnv) initClaims(cachedPVCs []*v1.PersistentVolumeClaim, apiPVCs []*v1.PersistentVolumeClaim) { - internalPVCCache := env.internalPVCCache for _, pvc := range cachedPVCs { - internalPVCCache.add(pvc) + assumecache.AddTestObject(env.internalBinder.pvcCache, pvc) if apiPVCs == nil { env.reactor.AddClaim(pvc) } @@ -326,9 +310,8 @@ func (env *testEnv) initClaims(cachedPVCs []*v1.PersistentVolumeClaim, apiPVCs [ } func (env *testEnv) initVolumes(cachedPVs []*v1.PersistentVolume, apiPVs []*v1.PersistentVolume) { - internalPVCache := env.internalPVCache for _, pv := range cachedPVs { - internalPVCache.add(pv) + assumecache.AddTestObject(env.internalBinder.pvCache, pv) if apiPVs == nil { env.reactor.AddVolume(pv) } @@ -349,7 +332,7 @@ func (env *testEnv) updateVolumes(ctx context.Context, pvs []*v1.PersistentVolum } return wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 3*time.Second, false, func(ctx context.Context) (bool, error) { for _, pv := range pvs { - obj, err := env.internalPVCache.GetAPIObj(pv.Name) + obj, err := env.internalBinder.pvCache.GetAPIObj(pv.Name) if obj == nil || err != nil { return false, nil } @@ -375,7 +358,7 @@ func (env *testEnv) updateClaims(ctx context.Context, pvcs []*v1.PersistentVolum } return wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 3*time.Second, false, func(ctx context.Context) (bool, error) { for _, pvc := range pvcs { - obj, err := env.internalPVCCache.GetAPIObj(getPVCName(pvc)) + obj, err := env.internalBinder.pvcCache.GetAPIObj(getPVCName(pvc)) if obj == nil || err != nil { return false, nil } @@ -393,13 +376,13 @@ func (env *testEnv) updateClaims(ctx context.Context, pvcs []*v1.PersistentVolum func (env *testEnv) deleteVolumes(pvs []*v1.PersistentVolume) { for _, pv := range pvs { - env.internalPVCache.delete(pv) + assumecache.DeleteTestObject(env.internalBinder.pvCache, pv) } } func (env *testEnv) deleteClaims(pvcs []*v1.PersistentVolumeClaim) { for _, pvc := range pvcs { - env.internalPVCCache.delete(pvc) + assumecache.DeleteTestObject(env.internalBinder.pvcCache, pvc) } } diff --git a/pkg/scheduler/util/assumecache/assume_cache.go b/pkg/scheduler/util/assumecache/assume_cache.go index 945c7a3efff1b..1fd1354c768b5 100644 --- a/pkg/scheduler/util/assumecache/assume_cache.go +++ b/pkg/scheduler/util/assumecache/assume_cache.go @@ -14,66 +14,125 @@ See the License for the specific language governing permissions and limitations under the License. */ -package volumebinding +package assumecache import ( + "errors" "fmt" "strconv" "sync" "k8s.io/klog/v2" - v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/client-go/tools/cache" - storagehelpers "k8s.io/component-helpers/storage/volume" ) // AssumeCache is a cache on top of the informer that allows for updating // objects outside of informer events and also restoring the informer -// cache's version of the object. Objects are assumed to be -// Kubernetes API objects that implement meta.Interface +// cache's version of the object. Objects are assumed to be +// Kubernetes API objects that are supported by [meta.Accessor]. +// +// Objects can referenced via their key, with [cache.MetaNamespaceKeyFunc] +// as key function. type AssumeCache interface { - // Assume updates the object in-memory only + // Assume updates the object in-memory only. + // + // The version of the object must be greater or equal to + // the current object, otherwise an error is returned. + // + // Storing an object with the same version is supported + // by the assume cache, but suffers from a race: if an + // update is received via the informer while such an + // object is assumed, it gets dropped in favor of the + // newer object from the apiserver. + // + // Only assuming objects that were returned by an apiserver + // operation (Update, Patch) is safe. Assume(obj interface{}) error - // Restore the informer cache's version of the object - Restore(objName string) + // Restore the informer cache's version of the object. + Restore(key string) - // Get the object by name - Get(objName string) (interface{}, error) + // Get the object by its key. + Get(key string) (interface{}, error) - // GetAPIObj gets the API object by name - GetAPIObj(objName string) (interface{}, error) + // GetAPIObj gets the informer cache's version by its key. + GetAPIObj(key string) (interface{}, error) - // List all the objects in the cache + // List all the objects in the cache. List(indexObj interface{}) []interface{} + + // getImplementation is used internally by [AddTestObject], [UpdateTestObject], [DeleteTestObject]. + getImplementation() *assumeCache +} + +// Informer is the subset of [cache.SharedInformer] that NewAssumeCache depends upon. +type Informer interface { + AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) } -type errWrongType struct { - typeName string - object interface{} +// AddTestObject adds an object to the assume cache. +// Only use this for unit testing! +func AddTestObject(cache AssumeCache, obj interface{}) { + cache.getImplementation().add(obj) } -func (e *errWrongType) Error() string { - return fmt.Sprintf("could not convert object to type %v: %+v", e.typeName, e.object) +// UpdateTestObject updates an object in the assume cache. +// Only use this for unit testing! +func UpdateTestObject(cache AssumeCache, obj interface{}) { + cache.getImplementation().update(nil, obj) } -type errNotFound struct { - typeName string - objectName string +// DeleteTestObject deletes object in the assume cache. +// Only use this for unit testing! +func DeleteTestObject(cache AssumeCache, obj interface{}) { + cache.getImplementation().delete(obj) } -func (e *errNotFound) Error() string { - return fmt.Sprintf("could not find %v %q", e.typeName, e.objectName) +// Sentinel errors that can be checked for with errors.Is. +var ( + ErrWrongType = errors.New("object has wrong type") + ErrNotFound = errors.New("object not found") + ErrObjectName = errors.New("cannot determine object name") +) + +type WrongTypeError struct { + TypeName string + Object interface{} } -type errObjectName struct { - detailedErr error +func (e WrongTypeError) Error() string { + return fmt.Sprintf("could not convert object to type %v: %+v", e.TypeName, e.Object) } -func (e *errObjectName) Error() string { - return fmt.Sprintf("failed to get object name: %v", e.detailedErr) +func (e WrongTypeError) Is(err error) bool { + return err == ErrWrongType +} + +type NotFoundError struct { + TypeName string + ObjectKey string +} + +func (e NotFoundError) Error() string { + return fmt.Sprintf("could not find %v %q", e.TypeName, e.ObjectKey) +} + +func (e NotFoundError) Is(err error) bool { + return err == ErrNotFound +} + +type ObjectNameError struct { + DetailedErr error +} + +func (e ObjectNameError) Error() string { + return fmt.Sprintf("failed to get object name: %v", e.DetailedErr) +} + +func (e ObjectNameError) Is(err error) bool { + return err == ErrObjectName } // assumeCache stores two pointers to represent a single object: @@ -119,7 +178,7 @@ type objInfo struct { func objInfoKeyFunc(obj interface{}) (string, error) { objInfo, ok := obj.(*objInfo) if !ok { - return "", &errWrongType{"objInfo", obj} + return "", &WrongTypeError{TypeName: "objInfo", Object: obj} } return objInfo.name, nil } @@ -127,13 +186,13 @@ func objInfoKeyFunc(obj interface{}) (string, error) { func (c *assumeCache) objInfoIndexFunc(obj interface{}) ([]string, error) { objInfo, ok := obj.(*objInfo) if !ok { - return []string{""}, &errWrongType{"objInfo", obj} + return []string{""}, &WrongTypeError{TypeName: "objInfo", Object: obj} } return c.indexFunc(objInfo.latestObj) } // NewAssumeCache creates an assume cache for general objects. -func NewAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer, description, indexName string, indexFunc cache.IndexFunc) AssumeCache { +func NewAssumeCache(logger klog.Logger, informer Informer, description, indexName string, indexFunc cache.IndexFunc) AssumeCache { c := &assumeCache{ logger: logger, description: description, @@ -148,7 +207,8 @@ func NewAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer, desc // Unit tests don't use informers if informer != nil { - informer.AddEventHandler( + // Cannot fail in practice?! No-one bothers checking the error. + _, _ = informer.AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: c.add, UpdateFunc: c.update, @@ -159,6 +219,10 @@ func NewAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer, desc return c } +func (c *assumeCache) getImplementation() *assumeCache { + return c +} + func (c *assumeCache) add(obj interface{}) { if obj == nil { return @@ -166,7 +230,7 @@ func (c *assumeCache) add(obj interface{}) { name, err := cache.MetaNamespaceKeyFunc(obj) if err != nil { - c.logger.Error(&errObjectName{err}, "Add failed") + c.logger.Error(&ObjectNameError{err}, "Add failed") return } @@ -213,7 +277,7 @@ func (c *assumeCache) delete(obj interface{}) { name, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { - c.logger.Error(&errObjectName{err}, "Failed to delete") + c.logger.Error(&ObjectNameError{err}, "Failed to delete") return } @@ -235,43 +299,44 @@ func (c *assumeCache) getObjVersion(name string, obj interface{}) (int64, error) objResourceVersion, err := strconv.ParseInt(objAccessor.GetResourceVersion(), 10, 64) if err != nil { - return -1, fmt.Errorf("error parsing ResourceVersion %q for %v %q: %s", objAccessor.GetResourceVersion(), c.description, name, err) + //nolint:errorlint // Intentionally not wrapping the error, the underlying error is an implementation detail. + return -1, fmt.Errorf("error parsing ResourceVersion %q for %v %q: %v", objAccessor.GetResourceVersion(), c.description, name, err) } return objResourceVersion, nil } -func (c *assumeCache) getObjInfo(name string) (*objInfo, error) { - obj, ok, err := c.store.GetByKey(name) +func (c *assumeCache) getObjInfo(key string) (*objInfo, error) { + obj, ok, err := c.store.GetByKey(key) if err != nil { return nil, err } if !ok { - return nil, &errNotFound{c.description, name} + return nil, &NotFoundError{TypeName: c.description, ObjectKey: key} } objInfo, ok := obj.(*objInfo) if !ok { - return nil, &errWrongType{"objInfo", obj} + return nil, &WrongTypeError{"objInfo", obj} } return objInfo, nil } -func (c *assumeCache) Get(objName string) (interface{}, error) { +func (c *assumeCache) Get(key string) (interface{}, error) { c.rwMutex.RLock() defer c.rwMutex.RUnlock() - objInfo, err := c.getObjInfo(objName) + objInfo, err := c.getObjInfo(key) if err != nil { return nil, err } return objInfo.latestObj, nil } -func (c *assumeCache) GetAPIObj(objName string) (interface{}, error) { +func (c *assumeCache) GetAPIObj(key string) (interface{}, error) { c.rwMutex.RLock() defer c.rwMutex.RUnlock() - objInfo, err := c.getObjInfo(objName) + objInfo, err := c.getObjInfo(key) if err != nil { return nil, err } @@ -298,7 +363,7 @@ func (c *assumeCache) List(indexObj interface{}) []interface{} { for _, obj := range objs { objInfo, ok := obj.(*objInfo) if !ok { - c.logger.Error(&errWrongType{"objInfo", obj}, "List error") + c.logger.Error(&WrongTypeError{TypeName: "objInfo", Object: obj}, "List error") continue } allObjs = append(allObjs, objInfo.latestObj) @@ -309,7 +374,7 @@ func (c *assumeCache) List(indexObj interface{}) []interface{} { func (c *assumeCache) Assume(obj interface{}) error { name, err := cache.MetaNamespaceKeyFunc(obj) if err != nil { - return &errObjectName{err} + return &ObjectNameError{err} } c.rwMutex.Lock() @@ -353,125 +418,3 @@ func (c *assumeCache) Restore(objName string) { c.logger.V(4).Info("Restored object", "description", c.description, "cacheKey", objName) } } - -// PVAssumeCache is a AssumeCache for PersistentVolume objects -type PVAssumeCache interface { - AssumeCache - - GetPV(pvName string) (*v1.PersistentVolume, error) - GetAPIPV(pvName string) (*v1.PersistentVolume, error) - ListPVs(storageClassName string) []*v1.PersistentVolume -} - -type pvAssumeCache struct { - AssumeCache - logger klog.Logger -} - -func pvStorageClassIndexFunc(obj interface{}) ([]string, error) { - if pv, ok := obj.(*v1.PersistentVolume); ok { - return []string{storagehelpers.GetPersistentVolumeClass(pv)}, nil - } - return []string{""}, fmt.Errorf("object is not a v1.PersistentVolume: %v", obj) -} - -// NewPVAssumeCache creates a PV assume cache. -func NewPVAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer) PVAssumeCache { - logger = klog.LoggerWithName(logger, "PV Cache") - return &pvAssumeCache{ - AssumeCache: NewAssumeCache(logger, informer, "v1.PersistentVolume", "storageclass", pvStorageClassIndexFunc), - logger: logger, - } -} - -func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) { - obj, err := c.Get(pvName) - if err != nil { - return nil, err - } - - pv, ok := obj.(*v1.PersistentVolume) - if !ok { - return nil, &errWrongType{"v1.PersistentVolume", obj} - } - return pv, nil -} - -func (c *pvAssumeCache) GetAPIPV(pvName string) (*v1.PersistentVolume, error) { - obj, err := c.GetAPIObj(pvName) - if err != nil { - return nil, err - } - pv, ok := obj.(*v1.PersistentVolume) - if !ok { - return nil, &errWrongType{"v1.PersistentVolume", obj} - } - return pv, nil -} - -func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume { - objs := c.List(&v1.PersistentVolume{ - Spec: v1.PersistentVolumeSpec{ - StorageClassName: storageClassName, - }, - }) - pvs := []*v1.PersistentVolume{} - for _, obj := range objs { - pv, ok := obj.(*v1.PersistentVolume) - if !ok { - c.logger.Error(&errWrongType{"v1.PersistentVolume", obj}, "ListPVs") - continue - } - pvs = append(pvs, pv) - } - return pvs -} - -// PVCAssumeCache is a AssumeCache for PersistentVolumeClaim objects -type PVCAssumeCache interface { - AssumeCache - - // GetPVC returns the PVC from the cache with given pvcKey. - // pvcKey is the result of MetaNamespaceKeyFunc on PVC obj - GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) - GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) -} - -type pvcAssumeCache struct { - AssumeCache - logger klog.Logger -} - -// NewPVCAssumeCache creates a PVC assume cache. -func NewPVCAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer) PVCAssumeCache { - logger = klog.LoggerWithName(logger, "PVC Cache") - return &pvcAssumeCache{ - AssumeCache: NewAssumeCache(logger, informer, "v1.PersistentVolumeClaim", "", nil), - logger: logger, - } -} - -func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) { - obj, err := c.Get(pvcKey) - if err != nil { - return nil, err - } - - pvc, ok := obj.(*v1.PersistentVolumeClaim) - if !ok { - return nil, &errWrongType{"v1.PersistentVolumeClaim", obj} - } - return pvc, nil -} - -func (c *pvcAssumeCache) GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) { - obj, err := c.GetAPIObj(pvcKey) - if err != nil { - return nil, err - } - pvc, ok := obj.(*v1.PersistentVolumeClaim) - if !ok { - return nil, &errWrongType{"v1.PersistentVolumeClaim", obj} - } - return pvc, nil -} diff --git a/pkg/scheduler/util/assumecache/assume_cache_test.go b/pkg/scheduler/util/assumecache/assume_cache_test.go index 7391a412f8905..febcb9e0a011d 100644 --- a/pkg/scheduler/util/assumecache/assume_cache_test.go +++ b/pkg/scheduler/util/assumecache/assume_cache_test.go @@ -14,457 +14,314 @@ See the License for the specific language governing permissions and limitations under the License. */ -package volumebinding +package assumecache import ( "fmt" + "slices" "testing" - v1 "k8s.io/api/core/v1" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/component-helpers/storage/volume" - "k8s.io/klog/v2/ktesting" + "k8s.io/client-go/tools/cache" + "k8s.io/kubernetes/test/utils/ktesting" ) -func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume, storageClassName string) { - pvList := cache.ListPVs(storageClassName) - if len(pvList) != len(expectedPVs) { - t.Errorf("ListPVs() returned %v PVs, expected %v", len(pvList), len(expectedPVs)) +// testInformer implements [Informer] and can be used to feed changes into an assume +// cache during unit testing. Only a single event handler is supported, which is +// sufficient for one assume cache. +type testInformer struct { + handler cache.ResourceEventHandler +} + +func (i *testInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) { + i.handler = handler + return nil, nil +} + +func (i *testInformer) add(obj interface{}) { + if i.handler == nil { + return } - for _, pv := range pvList { - expectedPV, ok := expectedPVs[pv.Name] - if !ok { - t.Errorf("ListPVs() returned unexpected PV %q", pv.Name) - } - if expectedPV != pv { - t.Errorf("ListPVs() returned PV %p, expected %p", pv, expectedPV) - } + i.handler.OnAdd(obj, false) +} + +func (i *testInformer) update(obj interface{}) { + if i.handler == nil { + return + } + i.handler.OnUpdate(nil, obj) +} + +func (i *testInformer) delete(obj interface{}) { + if i.handler == nil { + return } + i.handler.OnDelete(obj) } -func verifyPV(cache PVAssumeCache, name string, expectedPV *v1.PersistentVolume) error { - pv, err := cache.GetPV(name) +func makeObj(name, version, namespace string) metav1.Object { + return &metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + ResourceVersion: version, + } +} + +func newTest(t *testing.T) (ktesting.TContext, AssumeCache, *testInformer) { + return newTestWithIndexer(t, "", nil) +} + +func newTestWithIndexer(t *testing.T, indexName string, indexFunc cache.IndexFunc) (ktesting.TContext, AssumeCache, *testInformer) { + tCtx := ktesting.Init(t) + informer := new(testInformer) + cache := NewAssumeCache(tCtx.Logger(), informer, "TestObject", indexName, indexFunc) + return tCtx, cache, informer +} + +func verify(tCtx ktesting.TContext, cache AssumeCache, key string, expectedObject, expectedAPIObject interface{}) { + tCtx.Helper() + actualObject, err := cache.Get(key) + if err != nil { + tCtx.Fatalf("unexpected error retrieving object for key %s: %v", key, err) + } + if actualObject != expectedObject { + tCtx.Fatalf("Get() returned %v, expected %v", actualObject, expectedObject) + } + actualAPIObject, err := cache.GetAPIObj(key) if err != nil { - return err + tCtx.Fatalf("unexpected error retrieving API object for key %s: %v", key, err) } - if pv != expectedPV { - return fmt.Errorf("GetPV() returned %p, expected %p", pv, expectedPV) + if actualAPIObject != expectedAPIObject { + tCtx.Fatalf("GetAPIObject() returned %v, expected %v", actualAPIObject, expectedAPIObject) } - return nil } -func TestAssumePV(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) +func verifyList(tCtx ktesting.TContext, assumeCache AssumeCache, expectedObjs []interface{}, indexObj interface{}) { + actualObjs := assumeCache.List(indexObj) + diff := cmp.Diff(expectedObjs, actualObjs, cmpopts.SortSlices(func(x, y interface{}) bool { + xKey, err := cache.MetaNamespaceKeyFunc(x) + if err != nil { + tCtx.Fatalf("unexpected error determining key for %v: %v", x, err) + } + yKey, err := cache.MetaNamespaceKeyFunc(y) + if err != nil { + tCtx.Fatalf("unexpected error determining key for %v: %v", y, err) + } + return xKey < yKey + })) + if diff != "" { + tCtx.Fatalf("List() result differs (- expected, + actual):\n%s", diff) + } +} + +func TestAssume(t *testing.T) { scenarios := map[string]struct { - oldPV *v1.PersistentVolume - newPV *v1.PersistentVolume - shouldSucceed bool + oldObj metav1.Object + newObj interface{} + expectErr error }{ "success-same-version": { - oldPV: makePV("pv1", "").withVersion("5").PersistentVolume, - newPV: makePV("pv1", "").withVersion("5").PersistentVolume, - shouldSucceed: true, - }, - "success-storageclass-same-version": { - oldPV: makePV("pv1", "class1").withVersion("5").PersistentVolume, - newPV: makePV("pv1", "class1").withVersion("5").PersistentVolume, - shouldSucceed: true, + oldObj: makeObj("pvc1", "5", ""), + newObj: makeObj("pvc1", "5", ""), }, "success-new-higher-version": { - oldPV: makePV("pv1", "").withVersion("5").PersistentVolume, - newPV: makePV("pv1", "").withVersion("6").PersistentVolume, - shouldSucceed: true, + oldObj: makeObj("pvc1", "5", ""), + newObj: makeObj("pvc1", "6", ""), }, "fail-old-not-found": { - oldPV: makePV("pv2", "").withVersion("5").PersistentVolume, - newPV: makePV("pv1", "").withVersion("5").PersistentVolume, - shouldSucceed: false, + oldObj: makeObj("pvc2", "5", ""), + newObj: makeObj("pvc1", "5", ""), + expectErr: ErrNotFound, }, "fail-new-lower-version": { - oldPV: makePV("pv1", "").withVersion("5").PersistentVolume, - newPV: makePV("pv1", "").withVersion("4").PersistentVolume, - shouldSucceed: false, + oldObj: makeObj("pvc1", "5", ""), + newObj: makeObj("pvc1", "4", ""), + expectErr: cmpopts.AnyError, }, "fail-new-bad-version": { - oldPV: makePV("pv1", "").withVersion("5").PersistentVolume, - newPV: makePV("pv1", "").withVersion("a").PersistentVolume, - shouldSucceed: false, + oldObj: makeObj("pvc1", "5", ""), + newObj: makeObj("pvc1", "a", ""), + expectErr: cmpopts.AnyError, }, "fail-old-bad-version": { - oldPV: makePV("pv1", "").withVersion("a").PersistentVolume, - newPV: makePV("pv1", "").withVersion("5").PersistentVolume, - shouldSucceed: false, + oldObj: makeObj("pvc1", "a", ""), + newObj: makeObj("pvc1", "5", ""), + expectErr: cmpopts.AnyError, + }, + "fail-new-bad-object": { + oldObj: makeObj("pvc1", "5", ""), + newObj: 1, + expectErr: ErrObjectName, }, } for name, scenario := range scenarios { - cache := NewPVAssumeCache(logger, nil) - internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } - - // Add oldPV to cache - internalCache.add(scenario.oldPV) - if err := verifyPV(cache, scenario.oldPV.Name, scenario.oldPV); err != nil { - t.Errorf("Failed to GetPV() after initial update: %v", err) - continue - } - - // Assume newPV - err := cache.Assume(scenario.newPV) - if scenario.shouldSucceed && err != nil { - t.Errorf("Test %q failed: Assume() returned error %v", name, err) - } - if !scenario.shouldSucceed && err == nil { - t.Errorf("Test %q failed: Assume() returned success but expected error", name) - } - - // Check that GetPV returns correct PV - expectedPV := scenario.newPV - if !scenario.shouldSucceed { - expectedPV = scenario.oldPV - } - if err := verifyPV(cache, scenario.oldPV.Name, expectedPV); err != nil { - t.Errorf("Failed to GetPV() after initial update: %v", err) - } + t.Run(name, func(t *testing.T) { + tCtx, cache, informer := newTest(t) + + // Add old object to cache. + informer.add(scenario.oldObj) + verify(tCtx, cache, scenario.oldObj.GetName(), scenario.oldObj, scenario.oldObj) + + // Assume new object. + err := cache.Assume(scenario.newObj) + if diff := cmp.Diff(scenario.expectErr, err, cmpopts.EquateErrors()); diff != "" { + t.Errorf("Assume() returned error: %v\ndiff (- expected, + actual):\n%s", err, diff) + } + + // Check that Get returns correct object. + expectedObj := scenario.newObj + if scenario.expectErr != nil { + expectedObj = scenario.oldObj + } + verify(tCtx, cache, scenario.oldObj.GetName(), expectedObj, scenario.oldObj) + }) } } -func TestRestorePV(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) - cache := NewPVAssumeCache(logger, nil) - internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } +func TestRestore(t *testing.T) { + tCtx, cache, informer := newTest(t) - oldPV := makePV("pv1", "").withVersion("5").PersistentVolume - newPV := makePV("pv1", "").withVersion("5").PersistentVolume + // This test assumes an object with the same version as the API object. + // The assume cache supports that, but doing so in real code suffers from + // a race: if an unrelated update is received from the apiserver while + // such an object is assumed, the local modification gets dropped. + oldObj := makeObj("pvc1", "5", "") + newObj := makeObj("pvc1", "5", "") - // Restore PV that doesn't exist + // Restore object that doesn't exist cache.Restore("nothing") - // Add oldPV to cache - internalCache.add(oldPV) - if err := verifyPV(cache, oldPV.Name, oldPV); err != nil { - t.Fatalf("Failed to GetPV() after initial update: %v", err) - } + // Add old object to cache. + informer.add(oldObj) + verify(ktesting.WithStep(tCtx, "after initial update"), cache, oldObj.GetName(), oldObj, oldObj) - // Restore PV - cache.Restore(oldPV.Name) - if err := verifyPV(cache, oldPV.Name, oldPV); err != nil { - t.Fatalf("Failed to GetPV() after initial restore: %v", err) - } + // Restore object. + cache.Restore(oldObj.GetName()) + verify(ktesting.WithStep(tCtx, "after initial Restore"), cache, oldObj.GetName(), oldObj, oldObj) - // Assume newPV - if err := cache.Assume(newPV); err != nil { + // Assume new object. + if err := cache.Assume(newObj); err != nil { t.Fatalf("Assume() returned error %v", err) } - if err := verifyPV(cache, oldPV.Name, newPV); err != nil { - t.Fatalf("Failed to GetPV() after Assume: %v", err) - } + verify(ktesting.WithStep(tCtx, "after Assume"), cache, oldObj.GetName(), newObj, oldObj) - // Restore PV - cache.Restore(oldPV.Name) - if err := verifyPV(cache, oldPV.Name, oldPV); err != nil { - t.Fatalf("Failed to GetPV() after restore: %v", err) - } + // Restore object. + cache.Restore(oldObj.GetName()) + verify(ktesting.WithStep(tCtx, "after second Restore"), cache, oldObj.GetName(), oldObj, oldObj) } -func TestBasicPVCache(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) - cache := NewPVAssumeCache(logger, nil) - internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") +func TestEvents(t *testing.T) { + tCtx, cache, informer := newTest(t) + + oldObj := makeObj("pvc1", "5", "") + newObj := makeObj("pvc1", "6", "") + key := oldObj.GetName() + + // Add old object to cache. + informer.add(oldObj) + verify(ktesting.WithStep(tCtx, "after initial update"), cache, key, oldObj, oldObj) + + // Update object. + informer.update(newObj) + verify(ktesting.WithStep(tCtx, "after initial update"), cache, key, newObj, newObj) + + // Some error cases (don't occur in practice). + informer.add(1) + verify(ktesting.WithStep(tCtx, "after nop add"), cache, key, newObj, newObj) + informer.add(nil) + verify(ktesting.WithStep(tCtx, "after nil add"), cache, key, newObj, newObj) + informer.update(oldObj) + verify(ktesting.WithStep(tCtx, "after nop update"), cache, key, newObj, newObj) + informer.update(nil) + verify(ktesting.WithStep(tCtx, "after nil update"), cache, key, newObj, newObj) + informer.delete(nil) + verify(ktesting.WithStep(tCtx, "after nop delete"), cache, key, newObj, newObj) + + // Delete object. + informer.delete(oldObj) + _, err := cache.Get(key) + if diff := cmp.Diff(ErrNotFound, err, cmpopts.EquateErrors()); diff != "" { + t.Errorf("Get did not return expected error: %v\ndiff (- expected, + actual):\n%s", err, diff) } +} - // Get object that doesn't exist - pv, err := cache.GetPV("nothere") - if err == nil { - t.Errorf("GetPV() returned unexpected success") - } - if pv != nil { - t.Errorf("GetPV() returned unexpected PV %q", pv.Name) - } +func TestListNoIndexer(t *testing.T) { + tCtx, cache, informer := newTest(t) - // Add a bunch of PVs - pvs := map[string]*v1.PersistentVolume{} + // Add a bunch of objects. + objs := make([]interface{}, 0, 10) for i := 0; i < 10; i++ { - pv := makePV(fmt.Sprintf("test-pv%v", i), "").withVersion("1").PersistentVolume - pvs[pv.Name] = pv - internalCache.add(pv) + obj := makeObj(fmt.Sprintf("test-pvc%v", i), "1", "") + objs = append(objs, obj) + informer.add(obj) } // List them - verifyListPVs(t, cache, pvs, "") + verifyList(ktesting.WithStep(tCtx, "after add"), cache, objs, "") - // Update a PV - updatedPV := makePV("test-pv3", "").withVersion("2").PersistentVolume - pvs[updatedPV.Name] = updatedPV - internalCache.update(nil, updatedPV) + // Update an object. + updatedObj := makeObj("test-pvc3", "2", "") + objs[3] = updatedObj + informer.update(updatedObj) // List them - verifyListPVs(t, cache, pvs, "") + verifyList(ktesting.WithStep(tCtx, "after update"), cache, objs, "") // Delete a PV - deletedPV := pvs["test-pv7"] - delete(pvs, deletedPV.Name) - internalCache.delete(deletedPV) + deletedObj := objs[7] + objs = slices.Delete(objs, 7, 8) + informer.delete(deletedObj) // List them - verifyListPVs(t, cache, pvs, "") + verifyList(ktesting.WithStep(tCtx, "after delete"), cache, objs, "") } -func TestPVCacheWithStorageClasses(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) - cache := NewPVAssumeCache(logger, nil) - internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") +func TestListWithIndexer(t *testing.T) { + namespaceIndexer := func(obj interface{}) ([]string, error) { + objAccessor, err := meta.Accessor(obj) + if err != nil { + return nil, err + } + return []string{objAccessor.GetNamespace()}, nil } + tCtx, cache, informer := newTestWithIndexer(t, "myNamespace", namespaceIndexer) - // Add a bunch of PVs - pvs1 := map[string]*v1.PersistentVolume{} + // Add a bunch of objects. + ns := "ns1" + objs := make([]interface{}, 0, 10) for i := 0; i < 10; i++ { - pv := makePV(fmt.Sprintf("test-pv%v", i), "class1").withVersion("1").PersistentVolume - pvs1[pv.Name] = pv - internalCache.add(pv) + obj := makeObj(fmt.Sprintf("test-pvc%v", i), "1", ns) + objs = append(objs, obj) + informer.add(obj) } - // Add a bunch of PVs - pvs2 := map[string]*v1.PersistentVolume{} + // Add a bunch of other objects. for i := 0; i < 10; i++ { - pv := makePV(fmt.Sprintf("test2-pv%v", i), "class2").withVersion("1").PersistentVolume - pvs2[pv.Name] = pv - internalCache.add(pv) + obj := makeObj(fmt.Sprintf("test-pvc%v", i), "1", "ns2") + informer.add(obj) } // List them - verifyListPVs(t, cache, pvs1, "class1") - verifyListPVs(t, cache, pvs2, "class2") + verifyList(ktesting.WithStep(tCtx, "after add"), cache, objs, objs[0]) - // Update a PV - updatedPV := makePV("test-pv3", "class1").withVersion("2").PersistentVolume - pvs1[updatedPV.Name] = updatedPV - internalCache.update(nil, updatedPV) + // Update an object. + updatedObj := makeObj("test-pvc3", "2", ns) + objs[3] = updatedObj + informer.update(updatedObj) // List them - verifyListPVs(t, cache, pvs1, "class1") - verifyListPVs(t, cache, pvs2, "class2") + verifyList(ktesting.WithStep(tCtx, "after update"), cache, objs, objs[0]) // Delete a PV - deletedPV := pvs1["test-pv7"] - delete(pvs1, deletedPV.Name) - internalCache.delete(deletedPV) + deletedObj := objs[7] + objs = slices.Delete(objs, 7, 8) + informer.delete(deletedObj) // List them - verifyListPVs(t, cache, pvs1, "class1") - verifyListPVs(t, cache, pvs2, "class2") -} - -func TestAssumeUpdatePVCache(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) - cache := NewPVAssumeCache(logger, nil) - internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } - - pvName := "test-pv0" - - // Add a PV - pv := makePV(pvName, "").withVersion("1").PersistentVolume - internalCache.add(pv) - if err := verifyPV(cache, pvName, pv); err != nil { - t.Fatalf("failed to get PV: %v", err) - } - - // Assume PV - newPV := pv.DeepCopy() - newPV.Spec.ClaimRef = &v1.ObjectReference{Name: "test-claim"} - if err := cache.Assume(newPV); err != nil { - t.Fatalf("failed to assume PV: %v", err) - } - if err := verifyPV(cache, pvName, newPV); err != nil { - t.Fatalf("failed to get PV after assume: %v", err) - } - - // Add old PV - internalCache.add(pv) - if err := verifyPV(cache, pvName, newPV); err != nil { - t.Fatalf("failed to get PV after old PV added: %v", err) - } -} - -func makeClaim(name, version, namespace string) *v1.PersistentVolumeClaim { - return &v1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: namespace, - ResourceVersion: version, - Annotations: map[string]string{}, - }, - } -} - -func verifyPVC(cache PVCAssumeCache, pvcKey string, expectedPVC *v1.PersistentVolumeClaim) error { - pvc, err := cache.GetPVC(pvcKey) - if err != nil { - return err - } - if pvc != expectedPVC { - return fmt.Errorf("GetPVC() returned %p, expected %p", pvc, expectedPVC) - } - return nil -} - -func TestAssumePVC(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) - scenarios := map[string]struct { - oldPVC *v1.PersistentVolumeClaim - newPVC *v1.PersistentVolumeClaim - shouldSucceed bool - }{ - "success-same-version": { - oldPVC: makeClaim("pvc1", "5", "ns1"), - newPVC: makeClaim("pvc1", "5", "ns1"), - shouldSucceed: true, - }, - "success-new-higher-version": { - oldPVC: makeClaim("pvc1", "5", "ns1"), - newPVC: makeClaim("pvc1", "6", "ns1"), - shouldSucceed: true, - }, - "fail-old-not-found": { - oldPVC: makeClaim("pvc2", "5", "ns1"), - newPVC: makeClaim("pvc1", "5", "ns1"), - shouldSucceed: false, - }, - "fail-new-lower-version": { - oldPVC: makeClaim("pvc1", "5", "ns1"), - newPVC: makeClaim("pvc1", "4", "ns1"), - shouldSucceed: false, - }, - "fail-new-bad-version": { - oldPVC: makeClaim("pvc1", "5", "ns1"), - newPVC: makeClaim("pvc1", "a", "ns1"), - shouldSucceed: false, - }, - "fail-old-bad-version": { - oldPVC: makeClaim("pvc1", "a", "ns1"), - newPVC: makeClaim("pvc1", "5", "ns1"), - shouldSucceed: false, - }, - } - - for name, scenario := range scenarios { - cache := NewPVCAssumeCache(logger, nil) - internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } - - // Add oldPVC to cache - internalCache.add(scenario.oldPVC) - if err := verifyPVC(cache, getPVCName(scenario.oldPVC), scenario.oldPVC); err != nil { - t.Errorf("Failed to GetPVC() after initial update: %v", err) - continue - } - - // Assume newPVC - err := cache.Assume(scenario.newPVC) - if scenario.shouldSucceed && err != nil { - t.Errorf("Test %q failed: Assume() returned error %v", name, err) - } - if !scenario.shouldSucceed && err == nil { - t.Errorf("Test %q failed: Assume() returned success but expected error", name) - } - - // Check that GetPVC returns correct PVC - expectedPV := scenario.newPVC - if !scenario.shouldSucceed { - expectedPV = scenario.oldPVC - } - if err := verifyPVC(cache, getPVCName(scenario.oldPVC), expectedPV); err != nil { - t.Errorf("Failed to GetPVC() after initial update: %v", err) - } - } -} - -func TestRestorePVC(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) - cache := NewPVCAssumeCache(logger, nil) - internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } - - oldPVC := makeClaim("pvc1", "5", "ns1") - newPVC := makeClaim("pvc1", "5", "ns1") - - // Restore PVC that doesn't exist - cache.Restore("nothing") - - // Add oldPVC to cache - internalCache.add(oldPVC) - if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil { - t.Fatalf("Failed to GetPVC() after initial update: %v", err) - } - - // Restore PVC - cache.Restore(getPVCName(oldPVC)) - if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil { - t.Fatalf("Failed to GetPVC() after initial restore: %v", err) - } - - // Assume newPVC - if err := cache.Assume(newPVC); err != nil { - t.Fatalf("Assume() returned error %v", err) - } - if err := verifyPVC(cache, getPVCName(oldPVC), newPVC); err != nil { - t.Fatalf("Failed to GetPVC() after Assume: %v", err) - } - - // Restore PVC - cache.Restore(getPVCName(oldPVC)) - if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil { - t.Fatalf("Failed to GetPVC() after restore: %v", err) - } -} - -func TestAssumeUpdatePVCCache(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) - cache := NewPVCAssumeCache(logger, nil) - internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache) - if !ok { - t.Fatalf("Failed to get internal cache") - } - - pvcName := "test-pvc0" - pvcNamespace := "test-ns" - - // Add a PVC - pvc := makeClaim(pvcName, "1", pvcNamespace) - internalCache.add(pvc) - if err := verifyPVC(cache, getPVCName(pvc), pvc); err != nil { - t.Fatalf("failed to get PVC: %v", err) - } - - // Assume PVC - newPVC := pvc.DeepCopy() - newPVC.Annotations[volume.AnnSelectedNode] = "test-node" - if err := cache.Assume(newPVC); err != nil { - t.Fatalf("failed to assume PVC: %v", err) - } - if err := verifyPVC(cache, getPVCName(pvc), newPVC); err != nil { - t.Fatalf("failed to get PVC after assume: %v", err) - } - - // Add old PVC - internalCache.add(pvc) - if err := verifyPVC(cache, getPVCName(pvc), newPVC); err != nil { - t.Fatalf("failed to get PVC after old PVC added: %v", err) - } + verifyList(ktesting.WithStep(tCtx, "after delete"), cache, objs, objs[0]) } From 7f54c5dfec3e95a2751156a3a1ae4407be68297a Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 25 Apr 2024 11:36:17 +0200 Subject: [PATCH 3/3] scheduler: remove AssumeCache interface There's no reason for having the interface because there is only one implementation. Makes the implementation of the test functions a bit simpler (no casting). They are still stand-alone functions instead of methods because they should not be considered part of the "normal" API. --- .../dynamicresources/dynamicresources.go | 2 +- .../plugins/volumebinding/assume_cache.go | 43 +++---- .../volumebinding/assume_cache_test.go | 36 +++--- .../framework/plugins/volumebinding/binder.go | 4 +- .../plugins/volumebinding/binder_test.go | 8 +- .../util/assumecache/assume_cache.go | 110 ++++++++---------- .../util/assumecache/assume_cache_test.go | 8 +- 7 files changed, 88 insertions(+), 123 deletions(-) diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index ab5a187d4236c..f54cf0d2beaf8 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -302,7 +302,7 @@ type dynamicResources struct { // When implementing cluster autoscaler support, this assume cache or // something like it (see https://github.com/kubernetes/kubernetes/pull/112202) // might have to be managed by the cluster autoscaler. - claimAssumeCache assumecache.AssumeCache + claimAssumeCache *assumecache.AssumeCache // inFlightAllocations is map from claim UUIDs to claim objects for those claims // for which allocation was triggered during a scheduling cycle and the diff --git a/pkg/scheduler/framework/plugins/volumebinding/assume_cache.go b/pkg/scheduler/framework/plugins/volumebinding/assume_cache.go index bfbd322af766e..ee5a5d79b58b6 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/assume_cache.go +++ b/pkg/scheduler/framework/plugins/volumebinding/assume_cache.go @@ -27,16 +27,8 @@ import ( ) // PVAssumeCache is a AssumeCache for PersistentVolume objects -type PVAssumeCache interface { - assumecache.AssumeCache - - GetPV(pvName string) (*v1.PersistentVolume, error) - GetAPIPV(pvName string) (*v1.PersistentVolume, error) - ListPVs(storageClassName string) []*v1.PersistentVolume -} - -type pvAssumeCache struct { - assumecache.AssumeCache +type PVAssumeCache struct { + *assumecache.AssumeCache logger klog.Logger } @@ -48,15 +40,15 @@ func pvStorageClassIndexFunc(obj interface{}) ([]string, error) { } // NewPVAssumeCache creates a PV assume cache. -func NewPVAssumeCache(logger klog.Logger, informer assumecache.Informer) PVAssumeCache { +func NewPVAssumeCache(logger klog.Logger, informer assumecache.Informer) *PVAssumeCache { logger = klog.LoggerWithName(logger, "PV Cache") - return &pvAssumeCache{ + return &PVAssumeCache{ AssumeCache: assumecache.NewAssumeCache(logger, informer, "v1.PersistentVolume", "storageclass", pvStorageClassIndexFunc), logger: logger, } } -func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) { +func (c *PVAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) { obj, err := c.Get(pvName) if err != nil { return nil, err @@ -69,7 +61,7 @@ func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) { return pv, nil } -func (c *pvAssumeCache) GetAPIPV(pvName string) (*v1.PersistentVolume, error) { +func (c *PVAssumeCache) GetAPIPV(pvName string) (*v1.PersistentVolume, error) { obj, err := c.GetAPIObj(pvName) if err != nil { return nil, err @@ -81,7 +73,7 @@ func (c *pvAssumeCache) GetAPIPV(pvName string) (*v1.PersistentVolume, error) { return pv, nil } -func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume { +func (c *PVAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume { objs := c.List(&v1.PersistentVolume{ Spec: v1.PersistentVolumeSpec{ StorageClassName: storageClassName, @@ -100,30 +92,21 @@ func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume } // PVCAssumeCache is a AssumeCache for PersistentVolumeClaim objects -type PVCAssumeCache interface { - assumecache.AssumeCache - - // GetPVC returns the PVC from the cache with given pvcKey. - // pvcKey is the result of MetaNamespaceKeyFunc on PVC obj - GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) - GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) -} - -type pvcAssumeCache struct { - assumecache.AssumeCache +type PVCAssumeCache struct { + *assumecache.AssumeCache logger klog.Logger } // NewPVCAssumeCache creates a PVC assume cache. -func NewPVCAssumeCache(logger klog.Logger, informer assumecache.Informer) PVCAssumeCache { +func NewPVCAssumeCache(logger klog.Logger, informer assumecache.Informer) *PVCAssumeCache { logger = klog.LoggerWithName(logger, "PVC Cache") - return &pvcAssumeCache{ + return &PVCAssumeCache{ AssumeCache: assumecache.NewAssumeCache(logger, informer, "v1.PersistentVolumeClaim", "", nil), logger: logger, } } -func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) { +func (c *PVCAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) { obj, err := c.Get(pvcKey) if err != nil { return nil, err @@ -136,7 +119,7 @@ func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error return pvc, nil } -func (c *pvcAssumeCache) GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) { +func (c *PVCAssumeCache) GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) { obj, err := c.GetAPIObj(pvcKey) if err != nil { return nil, err diff --git a/pkg/scheduler/framework/plugins/volumebinding/assume_cache_test.go b/pkg/scheduler/framework/plugins/volumebinding/assume_cache_test.go index 92f047fba06ac..4256789a2f047 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/assume_cache_test.go +++ b/pkg/scheduler/framework/plugins/volumebinding/assume_cache_test.go @@ -27,7 +27,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/util/assumecache" ) -func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume, storageClassName string) { +func verifyListPVs(t *testing.T, cache *PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume, storageClassName string) { pvList := cache.ListPVs(storageClassName) if len(pvList) != len(expectedPVs) { t.Errorf("ListPVs() returned %v PVs, expected %v", len(pvList), len(expectedPVs)) @@ -43,7 +43,7 @@ func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1 } } -func verifyPV(cache PVAssumeCache, name string, expectedPV *v1.PersistentVolume) error { +func verifyPV(cache *PVAssumeCache, name string, expectedPV *v1.PersistentVolume) error { pv, err := cache.GetPV(name) if err != nil { return err @@ -102,7 +102,7 @@ func TestAssumePV(t *testing.T) { cache := NewPVAssumeCache(logger, nil) // Add oldPV to cache - assumecache.AddTestObject(cache, scenario.oldPV) + assumecache.AddTestObject(cache.AssumeCache, scenario.oldPV) if err := verifyPV(cache, scenario.oldPV.Name, scenario.oldPV); err != nil { t.Errorf("Failed to GetPV() after initial update: %v", err) continue @@ -139,7 +139,7 @@ func TestRestorePV(t *testing.T) { cache.Restore("nothing") // Add oldPV to cache - assumecache.AddTestObject(cache, oldPV) + assumecache.AddTestObject(cache.AssumeCache, oldPV) if err := verifyPV(cache, oldPV.Name, oldPV); err != nil { t.Fatalf("Failed to GetPV() after initial update: %v", err) } @@ -183,7 +183,7 @@ func TestBasicPVCache(t *testing.T) { for i := 0; i < 10; i++ { pv := makePV(fmt.Sprintf("test-pv%v", i), "").withVersion("1").PersistentVolume pvs[pv.Name] = pv - assumecache.AddTestObject(cache, pv) + assumecache.AddTestObject(cache.AssumeCache, pv) } // List them @@ -192,7 +192,7 @@ func TestBasicPVCache(t *testing.T) { // Update a PV updatedPV := makePV("test-pv3", "").withVersion("2").PersistentVolume pvs[updatedPV.Name] = updatedPV - assumecache.UpdateTestObject(cache, updatedPV) + assumecache.UpdateTestObject(cache.AssumeCache, updatedPV) // List them verifyListPVs(t, cache, pvs, "") @@ -200,7 +200,7 @@ func TestBasicPVCache(t *testing.T) { // Delete a PV deletedPV := pvs["test-pv7"] delete(pvs, deletedPV.Name) - assumecache.DeleteTestObject(cache, deletedPV) + assumecache.DeleteTestObject(cache.AssumeCache, deletedPV) // List them verifyListPVs(t, cache, pvs, "") @@ -215,7 +215,7 @@ func TestPVCacheWithStorageClasses(t *testing.T) { for i := 0; i < 10; i++ { pv := makePV(fmt.Sprintf("test-pv%v", i), "class1").withVersion("1").PersistentVolume pvs1[pv.Name] = pv - assumecache.AddTestObject(cache, pv) + assumecache.AddTestObject(cache.AssumeCache, pv) } // Add a bunch of PVs @@ -223,7 +223,7 @@ func TestPVCacheWithStorageClasses(t *testing.T) { for i := 0; i < 10; i++ { pv := makePV(fmt.Sprintf("test2-pv%v", i), "class2").withVersion("1").PersistentVolume pvs2[pv.Name] = pv - assumecache.AddTestObject(cache, pv) + assumecache.AddTestObject(cache.AssumeCache, pv) } // List them @@ -233,7 +233,7 @@ func TestPVCacheWithStorageClasses(t *testing.T) { // Update a PV updatedPV := makePV("test-pv3", "class1").withVersion("2").PersistentVolume pvs1[updatedPV.Name] = updatedPV - assumecache.UpdateTestObject(cache, updatedPV) + assumecache.UpdateTestObject(cache.AssumeCache, updatedPV) // List them verifyListPVs(t, cache, pvs1, "class1") @@ -242,7 +242,7 @@ func TestPVCacheWithStorageClasses(t *testing.T) { // Delete a PV deletedPV := pvs1["test-pv7"] delete(pvs1, deletedPV.Name) - assumecache.DeleteTestObject(cache, deletedPV) + assumecache.DeleteTestObject(cache.AssumeCache, deletedPV) // List them verifyListPVs(t, cache, pvs1, "class1") @@ -257,7 +257,7 @@ func TestAssumeUpdatePVCache(t *testing.T) { // Add a PV pv := makePV(pvName, "").withVersion("1").PersistentVolume - assumecache.AddTestObject(cache, pv) + assumecache.AddTestObject(cache.AssumeCache, pv) if err := verifyPV(cache, pvName, pv); err != nil { t.Fatalf("failed to get PV: %v", err) } @@ -273,7 +273,7 @@ func TestAssumeUpdatePVCache(t *testing.T) { } // Add old PV - assumecache.AddTestObject(cache, pv) + assumecache.AddTestObject(cache.AssumeCache, pv) if err := verifyPV(cache, pvName, newPV); err != nil { t.Fatalf("failed to get PV after old PV added: %v", err) } @@ -290,7 +290,7 @@ func makeClaim(name, version, namespace string) *v1.PersistentVolumeClaim { } } -func verifyPVC(cache PVCAssumeCache, pvcKey string, expectedPVC *v1.PersistentVolumeClaim) error { +func verifyPVC(cache *PVCAssumeCache, pvcKey string, expectedPVC *v1.PersistentVolumeClaim) error { pvc, err := cache.GetPVC(pvcKey) if err != nil { return err @@ -344,7 +344,7 @@ func TestAssumePVC(t *testing.T) { cache := NewPVCAssumeCache(logger, nil) // Add oldPVC to cache - assumecache.AddTestObject(cache, scenario.oldPVC) + assumecache.AddTestObject(cache.AssumeCache, scenario.oldPVC) if err := verifyPVC(cache, getPVCName(scenario.oldPVC), scenario.oldPVC); err != nil { t.Errorf("Failed to GetPVC() after initial update: %v", err) continue @@ -381,7 +381,7 @@ func TestRestorePVC(t *testing.T) { cache.Restore("nothing") // Add oldPVC to cache - assumecache.AddTestObject(cache, oldPVC) + assumecache.AddTestObject(cache.AssumeCache, oldPVC) if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil { t.Fatalf("Failed to GetPVC() after initial update: %v", err) } @@ -416,7 +416,7 @@ func TestAssumeUpdatePVCCache(t *testing.T) { // Add a PVC pvc := makeClaim(pvcName, "1", pvcNamespace) - assumecache.AddTestObject(cache, pvc) + assumecache.AddTestObject(cache.AssumeCache, pvc) if err := verifyPVC(cache, getPVCName(pvc), pvc); err != nil { t.Fatalf("failed to get PVC: %v", err) } @@ -432,7 +432,7 @@ func TestAssumeUpdatePVCCache(t *testing.T) { } // Add old PVC - assumecache.AddTestObject(cache, pvc) + assumecache.AddTestObject(cache.AssumeCache, pvc) if err := verifyPVC(cache, getPVCName(pvc), newPVC); err != nil { t.Fatalf("failed to get PVC after old PVC added: %v", err) } diff --git a/pkg/scheduler/framework/plugins/volumebinding/binder.go b/pkg/scheduler/framework/plugins/volumebinding/binder.go index ac1031da0e999..4f078ea338537 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/binder.go +++ b/pkg/scheduler/framework/plugins/volumebinding/binder.go @@ -220,8 +220,8 @@ type volumeBinder struct { nodeLister corelisters.NodeLister csiNodeLister storagelisters.CSINodeLister - pvcCache PVCAssumeCache - pvCache PVAssumeCache + pvcCache *PVCAssumeCache + pvCache *PVAssumeCache // Amount of time to wait for the bind operation to succeed bindTimeout time.Duration diff --git a/pkg/scheduler/framework/plugins/volumebinding/binder_test.go b/pkg/scheduler/framework/plugins/volumebinding/binder_test.go index b9497d089aa54..60ebdc4598426 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/binder_test.go +++ b/pkg/scheduler/framework/plugins/volumebinding/binder_test.go @@ -299,7 +299,7 @@ func (env *testEnv) addCSIStorageCapacities(capacities []*storagev1.CSIStorageCa func (env *testEnv) initClaims(cachedPVCs []*v1.PersistentVolumeClaim, apiPVCs []*v1.PersistentVolumeClaim) { for _, pvc := range cachedPVCs { - assumecache.AddTestObject(env.internalBinder.pvcCache, pvc) + assumecache.AddTestObject(env.internalBinder.pvcCache.AssumeCache, pvc) if apiPVCs == nil { env.reactor.AddClaim(pvc) } @@ -311,7 +311,7 @@ func (env *testEnv) initClaims(cachedPVCs []*v1.PersistentVolumeClaim, apiPVCs [ func (env *testEnv) initVolumes(cachedPVs []*v1.PersistentVolume, apiPVs []*v1.PersistentVolume) { for _, pv := range cachedPVs { - assumecache.AddTestObject(env.internalBinder.pvCache, pv) + assumecache.AddTestObject(env.internalBinder.pvCache.AssumeCache, pv) if apiPVs == nil { env.reactor.AddVolume(pv) } @@ -376,13 +376,13 @@ func (env *testEnv) updateClaims(ctx context.Context, pvcs []*v1.PersistentVolum func (env *testEnv) deleteVolumes(pvs []*v1.PersistentVolume) { for _, pv := range pvs { - assumecache.DeleteTestObject(env.internalBinder.pvCache, pv) + assumecache.DeleteTestObject(env.internalBinder.pvCache.AssumeCache, pv) } } func (env *testEnv) deleteClaims(pvcs []*v1.PersistentVolumeClaim) { for _, pvc := range pvcs { - assumecache.DeleteTestObject(env.internalBinder.pvcCache, pvc) + assumecache.DeleteTestObject(env.internalBinder.pvcCache.AssumeCache, pvc) } } diff --git a/pkg/scheduler/util/assumecache/assume_cache.go b/pkg/scheduler/util/assumecache/assume_cache.go index 1fd1354c768b5..69ec1175f0344 100644 --- a/pkg/scheduler/util/assumecache/assume_cache.go +++ b/pkg/scheduler/util/assumecache/assume_cache.go @@ -28,45 +28,6 @@ import ( "k8s.io/client-go/tools/cache" ) -// AssumeCache is a cache on top of the informer that allows for updating -// objects outside of informer events and also restoring the informer -// cache's version of the object. Objects are assumed to be -// Kubernetes API objects that are supported by [meta.Accessor]. -// -// Objects can referenced via their key, with [cache.MetaNamespaceKeyFunc] -// as key function. -type AssumeCache interface { - // Assume updates the object in-memory only. - // - // The version of the object must be greater or equal to - // the current object, otherwise an error is returned. - // - // Storing an object with the same version is supported - // by the assume cache, but suffers from a race: if an - // update is received via the informer while such an - // object is assumed, it gets dropped in favor of the - // newer object from the apiserver. - // - // Only assuming objects that were returned by an apiserver - // operation (Update, Patch) is safe. - Assume(obj interface{}) error - - // Restore the informer cache's version of the object. - Restore(key string) - - // Get the object by its key. - Get(key string) (interface{}, error) - - // GetAPIObj gets the informer cache's version by its key. - GetAPIObj(key string) (interface{}, error) - - // List all the objects in the cache. - List(indexObj interface{}) []interface{} - - // getImplementation is used internally by [AddTestObject], [UpdateTestObject], [DeleteTestObject]. - getImplementation() *assumeCache -} - // Informer is the subset of [cache.SharedInformer] that NewAssumeCache depends upon. type Informer interface { AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) @@ -74,20 +35,20 @@ type Informer interface { // AddTestObject adds an object to the assume cache. // Only use this for unit testing! -func AddTestObject(cache AssumeCache, obj interface{}) { - cache.getImplementation().add(obj) +func AddTestObject(cache *AssumeCache, obj interface{}) { + cache.add(obj) } // UpdateTestObject updates an object in the assume cache. // Only use this for unit testing! -func UpdateTestObject(cache AssumeCache, obj interface{}) { - cache.getImplementation().update(nil, obj) +func UpdateTestObject(cache *AssumeCache, obj interface{}) { + cache.update(nil, obj) } // DeleteTestObject deletes object in the assume cache. // Only use this for unit testing! -func DeleteTestObject(cache AssumeCache, obj interface{}) { - cache.getImplementation().delete(obj) +func DeleteTestObject(cache *AssumeCache, obj interface{}) { + cache.delete(obj) } // Sentinel errors that can be checked for with errors.Is. @@ -135,7 +96,15 @@ func (e ObjectNameError) Is(err error) bool { return err == ErrObjectName } -// assumeCache stores two pointers to represent a single object: +// AssumeCache is a cache on top of the informer that allows for updating +// objects outside of informer events and also restoring the informer +// cache's version of the object. Objects are assumed to be +// Kubernetes API objects that are supported by [meta.Accessor]. +// +// Objects can referenced via their key, with [cache.MetaNamespaceKeyFunc] +// as key function. +// +// AssumeCache stores two pointers to represent a single object: // - The pointer to the informer object. // - The pointer to the latest object, which could be the same as // the informer object, or an in-memory object. @@ -145,7 +114,7 @@ func (e ObjectNameError) Is(err error) bool { // Assume() only updates the latest object pointer. // Restore() sets the latest object pointer back to the informer object. // Get/List() always returns the latest object pointer. -type assumeCache struct { +type AssumeCache struct { // The logger that was chosen when setting up the cache. // Will be used for all operations. logger klog.Logger @@ -183,7 +152,7 @@ func objInfoKeyFunc(obj interface{}) (string, error) { return objInfo.name, nil } -func (c *assumeCache) objInfoIndexFunc(obj interface{}) ([]string, error) { +func (c *AssumeCache) objInfoIndexFunc(obj interface{}) ([]string, error) { objInfo, ok := obj.(*objInfo) if !ok { return []string{""}, &WrongTypeError{TypeName: "objInfo", Object: obj} @@ -192,8 +161,8 @@ func (c *assumeCache) objInfoIndexFunc(obj interface{}) ([]string, error) { } // NewAssumeCache creates an assume cache for general objects. -func NewAssumeCache(logger klog.Logger, informer Informer, description, indexName string, indexFunc cache.IndexFunc) AssumeCache { - c := &assumeCache{ +func NewAssumeCache(logger klog.Logger, informer Informer, description, indexName string, indexFunc cache.IndexFunc) *AssumeCache { + c := &AssumeCache{ logger: logger, description: description, indexFunc: indexFunc, @@ -219,11 +188,7 @@ func NewAssumeCache(logger klog.Logger, informer Informer, description, indexNam return c } -func (c *assumeCache) getImplementation() *assumeCache { - return c -} - -func (c *assumeCache) add(obj interface{}) { +func (c *AssumeCache) add(obj interface{}) { if obj == nil { return } @@ -266,11 +231,11 @@ func (c *assumeCache) add(obj interface{}) { } } -func (c *assumeCache) update(oldObj interface{}, newObj interface{}) { +func (c *AssumeCache) update(oldObj interface{}, newObj interface{}) { c.add(newObj) } -func (c *assumeCache) delete(obj interface{}) { +func (c *AssumeCache) delete(obj interface{}) { if obj == nil { return } @@ -291,7 +256,7 @@ func (c *assumeCache) delete(obj interface{}) { } } -func (c *assumeCache) getObjVersion(name string, obj interface{}) (int64, error) { +func (c *AssumeCache) getObjVersion(name string, obj interface{}) (int64, error) { objAccessor, err := meta.Accessor(obj) if err != nil { return -1, err @@ -305,7 +270,7 @@ func (c *assumeCache) getObjVersion(name string, obj interface{}) (int64, error) return objResourceVersion, nil } -func (c *assumeCache) getObjInfo(key string) (*objInfo, error) { +func (c *AssumeCache) getObjInfo(key string) (*objInfo, error) { obj, ok, err := c.store.GetByKey(key) if err != nil { return nil, err @@ -321,7 +286,8 @@ func (c *assumeCache) getObjInfo(key string) (*objInfo, error) { return objInfo, nil } -func (c *assumeCache) Get(key string) (interface{}, error) { +// Get the object by its key. +func (c *AssumeCache) Get(key string) (interface{}, error) { c.rwMutex.RLock() defer c.rwMutex.RUnlock() @@ -332,7 +298,8 @@ func (c *assumeCache) Get(key string) (interface{}, error) { return objInfo.latestObj, nil } -func (c *assumeCache) GetAPIObj(key string) (interface{}, error) { +// GetAPIObj gets the informer cache's version by its key. +func (c *AssumeCache) GetAPIObj(key string) (interface{}, error) { c.rwMutex.RLock() defer c.rwMutex.RUnlock() @@ -343,7 +310,8 @@ func (c *assumeCache) GetAPIObj(key string) (interface{}, error) { return objInfo.apiObj, nil } -func (c *assumeCache) List(indexObj interface{}) []interface{} { +// List all the objects in the cache. +func (c *AssumeCache) List(indexObj interface{}) []interface{} { c.rwMutex.RLock() defer c.rwMutex.RUnlock() @@ -371,7 +339,20 @@ func (c *assumeCache) List(indexObj interface{}) []interface{} { return allObjs } -func (c *assumeCache) Assume(obj interface{}) error { +// Assume updates the object in-memory only. +// +// The version of the object must be greater or equal to +// the current object, otherwise an error is returned. +// +// Storing an object with the same version is supported +// by the assume cache, but suffers from a race: if an +// update is received via the informer while such an +// object is assumed, it gets dropped in favor of the +// newer object from the apiserver. +// +// Only assuming objects that were returned by an apiserver +// operation (Update, Patch) is safe. +func (c *AssumeCache) Assume(obj interface{}) error { name, err := cache.MetaNamespaceKeyFunc(obj) if err != nil { return &ObjectNameError{err} @@ -405,7 +386,8 @@ func (c *assumeCache) Assume(obj interface{}) error { return nil } -func (c *assumeCache) Restore(objName string) { +// Restore the informer cache's version of the object. +func (c *AssumeCache) Restore(objName string) { c.rwMutex.Lock() defer c.rwMutex.Unlock() diff --git a/pkg/scheduler/util/assumecache/assume_cache_test.go b/pkg/scheduler/util/assumecache/assume_cache_test.go index febcb9e0a011d..6c11ac275fa71 100644 --- a/pkg/scheduler/util/assumecache/assume_cache_test.go +++ b/pkg/scheduler/util/assumecache/assume_cache_test.go @@ -71,18 +71,18 @@ func makeObj(name, version, namespace string) metav1.Object { } } -func newTest(t *testing.T) (ktesting.TContext, AssumeCache, *testInformer) { +func newTest(t *testing.T) (ktesting.TContext, *AssumeCache, *testInformer) { return newTestWithIndexer(t, "", nil) } -func newTestWithIndexer(t *testing.T, indexName string, indexFunc cache.IndexFunc) (ktesting.TContext, AssumeCache, *testInformer) { +func newTestWithIndexer(t *testing.T, indexName string, indexFunc cache.IndexFunc) (ktesting.TContext, *AssumeCache, *testInformer) { tCtx := ktesting.Init(t) informer := new(testInformer) cache := NewAssumeCache(tCtx.Logger(), informer, "TestObject", indexName, indexFunc) return tCtx, cache, informer } -func verify(tCtx ktesting.TContext, cache AssumeCache, key string, expectedObject, expectedAPIObject interface{}) { +func verify(tCtx ktesting.TContext, cache *AssumeCache, key string, expectedObject, expectedAPIObject interface{}) { tCtx.Helper() actualObject, err := cache.Get(key) if err != nil { @@ -100,7 +100,7 @@ func verify(tCtx ktesting.TContext, cache AssumeCache, key string, expectedObjec } } -func verifyList(tCtx ktesting.TContext, assumeCache AssumeCache, expectedObjs []interface{}, indexObj interface{}) { +func verifyList(tCtx ktesting.TContext, assumeCache *AssumeCache, expectedObjs []interface{}, indexObj interface{}) { actualObjs := assumeCache.List(indexObj) diff := cmp.Diff(expectedObjs, actualObjs, cmpopts.SortSlices(func(x, y interface{}) bool { xKey, err := cache.MetaNamespaceKeyFunc(x)