From dc8efd078f5a6f65f2c95deaed921a5c6f663b16 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 7 Mar 2024 16:02:01 +0100 Subject: [PATCH] dra scheduler: preserve allocation in assume cache Storing a modified claim with allocation and the original resource version was not reliable: if an update was received, it replaced the modified claim and the resource that was reserved for the claim might have been used for some other claim. To fix this, the assume cache gets copied and modified slightly so that it keeps the allocation until the process of writing it back or dropping it is complete. This is then not a general-purpose cache anymore. Logging got extended to diagnose this problem better. It started to occur in E2E tests after splitting the claim update so that first the finalizer is set and then the status, because setting the finalizer triggered an update. --- .../plugins/dynamicresources/assume_cache.go | 368 ++++++++++++++++++ .../dynamicresources/dynamicresources.go | 17 +- .../namedresources/namedresourcesmodel.go | 31 +- .../dynamicresources/structuredparameters.go | 20 +- 4 files changed, 407 insertions(+), 29 deletions(-) create mode 100644 pkg/scheduler/framework/plugins/dynamicresources/assume_cache.go diff --git a/pkg/scheduler/framework/plugins/dynamicresources/assume_cache.go b/pkg/scheduler/framework/plugins/dynamicresources/assume_cache.go new file mode 100644 index 0000000000000..e8e2e7f23e317 --- /dev/null +++ b/pkg/scheduler/framework/plugins/dynamicresources/assume_cache.go @@ -0,0 +1,368 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dynamicresources + +import ( + "fmt" + "strconv" + "sync" + + resourcev1alpha2 "k8s.io/api/resource/v1alpha2" + "k8s.io/klog/v2" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/client-go/tools/cache" +) + +// AssumeCache is a cache on top of the informer that allows for updating +// objects outside of informer events and also restoring the informer +// cache's version of the object. Objects are assumed to be +// Kubernetes API objects that implement meta.Interface +type AssumeCache interface { + // Assume updates the object in-memory only + Assume(obj interface{}) error + + // Restore the informer cache's version of the object + Restore(objName string) + + // Get the object by name + Get(objName string) (interface{}, error) + + // GetAPIObj gets the API object by name + GetAPIObj(objName string) (interface{}, error) + + // List all the objects in the cache + List(indexObj interface{}) []interface{} +} + +type errWrongType struct { + typeName string + object interface{} +} + +func (e *errWrongType) Error() string { + return fmt.Sprintf("could not convert object to type %v: %+v", e.typeName, e.object) +} + +type errNotFound struct { + typeName string + objectName string +} + +func (e *errNotFound) Error() string { + return fmt.Sprintf("could not find %v %q", e.typeName, e.objectName) +} + +type errObjectName struct { + detailedErr error +} + +func (e *errObjectName) Error() string { + return fmt.Sprintf("failed to get object name: %v", e.detailedErr) +} + +// assumeCache stores two pointers to represent a single object: +// - The pointer to the informer object. +// - The pointer to the latest object, which could be the same as +// the informer object, or an in-memory object. +// +// An informer update always overrides the latest object pointer. +// +// Assume() only updates the latest object pointer. +// Restore() sets the latest object pointer back to the informer object. +// Get/List() always returns the latest object pointer. +type assumeCache struct { + // The logger that was chosen when setting up the cache. + // Will be used for all operations. + logger klog.Logger + + // Synchronizes updates to store + rwMutex sync.RWMutex + + // describes the object stored + description string + + // Stores objInfo pointers + store cache.Indexer + + // Index function for object + indexFunc cache.IndexFunc + indexName string +} + +type objInfo struct { + // name of the object + name string + + // Latest version of object could be cached-only or from informer + latestObj interface{} + + // Latest object from informer + apiObj interface{} +} + +func objInfoKeyFunc(obj interface{}) (string, error) { + objInfo, ok := obj.(*objInfo) + if !ok { + return "", &errWrongType{"objInfo", obj} + } + return objInfo.name, nil +} + +func (c *assumeCache) objInfoIndexFunc(obj interface{}) ([]string, error) { + objInfo, ok := obj.(*objInfo) + if !ok { + return []string{""}, &errWrongType{"objInfo", obj} + } + return c.indexFunc(objInfo.latestObj) +} + +// NewAssumeCache creates an assume cache for general objects. +func NewAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer, description, indexName string, indexFunc cache.IndexFunc) AssumeCache { + c := &assumeCache{ + logger: logger, + description: description, + indexFunc: indexFunc, + indexName: indexName, + } + indexers := cache.Indexers{} + if indexName != "" && indexFunc != nil { + indexers[indexName] = c.objInfoIndexFunc + } + c.store = cache.NewIndexer(objInfoKeyFunc, indexers) + + // Unit tests don't use informers + if informer != nil { + informer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.add, + UpdateFunc: c.update, + DeleteFunc: c.delete, + }, + ) + } + return c +} + +func (c *assumeCache) add(obj interface{}) { + if obj == nil { + return + } + + name, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + c.logger.Error(&errObjectName{err}, "Add failed") + return + } + + c.rwMutex.Lock() + defer c.rwMutex.Unlock() + + latestObj := obj + if objInfo, _ := c.getObjInfo(name); objInfo != nil { + newVersion, err := c.getObjVersion(name, obj) + if err != nil { + c.logger.Error(err, "Add failed: couldn't get object version") + return + } + + storedVersion, err := c.getObjVersion(name, objInfo.latestObj) + if err != nil { + c.logger.Error(err, "Add failed: couldn't get stored object version") + return + } + + // Only update object if version is newer. + // This is so we don't override assumed objects due to informer resync. + if newVersion <= storedVersion { + c.logger.V(10).Info("Skip adding object to assume cache because version is not newer than storedVersion", "description", c.description, "cacheKey", name, "newVersion", newVersion, "storedVersion", storedVersion) + return + } + + // If we we have an assumed object, then it always has an + // allocation. If the new object doesn't, then we have to keep + // overriding allocation because the scheduler plugin is still + // in the process of writing out that change. Eventually, the + // plugin will store the actual written claim or restore the + // original claim. + if claim := obj.(*resourcev1alpha2.ResourceClaim); claim.Status.Allocation == nil { + claim := claim.DeepCopy() + claim.Status.DriverName = objInfo.latestObj.(*resourcev1alpha2.ResourceClaim).Status.DriverName + claim.Status.Allocation = objInfo.latestObj.(*resourcev1alpha2.ResourceClaim).Status.Allocation + latestObj = claim + } + } + + objInfo := &objInfo{name: name, latestObj: latestObj, apiObj: obj} + if err = c.store.Update(objInfo); err != nil { + c.logger.Info("Error occurred while updating stored object", "err", err) + } else { + c.logger.V(10).Info("Adding object to assume cache", "description", c.description, "cacheKey", name, "assumeCache", obj) + } +} + +func (c *assumeCache) update(oldObj interface{}, newObj interface{}) { + c.add(newObj) +} + +func (c *assumeCache) delete(obj interface{}) { + if obj == nil { + return + } + + name, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + c.logger.Error(&errObjectName{err}, "Failed to delete") + return + } + + c.rwMutex.Lock() + defer c.rwMutex.Unlock() + + objInfo := &objInfo{name: name} + err = c.store.Delete(objInfo) + if err != nil { + c.logger.Error(err, "Failed to delete", "description", c.description, "cacheKey", name) + } +} + +func (c *assumeCache) getObjVersion(name string, obj interface{}) (int64, error) { + objAccessor, err := meta.Accessor(obj) + if err != nil { + return -1, err + } + + objResourceVersion, err := strconv.ParseInt(objAccessor.GetResourceVersion(), 10, 64) + if err != nil { + return -1, fmt.Errorf("error parsing ResourceVersion %q for %v %q: %s", objAccessor.GetResourceVersion(), c.description, name, err) + } + return objResourceVersion, nil +} + +func (c *assumeCache) getObjInfo(name string) (*objInfo, error) { + obj, ok, err := c.store.GetByKey(name) + if err != nil { + return nil, err + } + if !ok { + return nil, &errNotFound{c.description, name} + } + + objInfo, ok := obj.(*objInfo) + if !ok { + return nil, &errWrongType{"objInfo", obj} + } + return objInfo, nil +} + +func (c *assumeCache) Get(objName string) (interface{}, error) { + c.rwMutex.RLock() + defer c.rwMutex.RUnlock() + + objInfo, err := c.getObjInfo(objName) + if err != nil { + return nil, err + } + return objInfo.latestObj, nil +} + +func (c *assumeCache) GetAPIObj(objName string) (interface{}, error) { + c.rwMutex.RLock() + defer c.rwMutex.RUnlock() + + objInfo, err := c.getObjInfo(objName) + if err != nil { + return nil, err + } + return objInfo.apiObj, nil +} + +func (c *assumeCache) List(indexObj interface{}) []interface{} { + c.rwMutex.RLock() + defer c.rwMutex.RUnlock() + + allObjs := []interface{}{} + var objs []interface{} + if c.indexName != "" { + o, err := c.store.Index(c.indexName, &objInfo{latestObj: indexObj}) + if err != nil { + c.logger.Error(err, "List index error") + return nil + } + objs = o + } else { + objs = c.store.List() + } + + for _, obj := range objs { + objInfo, ok := obj.(*objInfo) + if !ok { + c.logger.Error(&errWrongType{"objInfo", obj}, "List error") + continue + } + allObjs = append(allObjs, objInfo.latestObj) + } + return allObjs +} + +func (c *assumeCache) Assume(obj interface{}) error { + name, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + return &errObjectName{err} + } + + c.rwMutex.Lock() + defer c.rwMutex.Unlock() + + objInfo, err := c.getObjInfo(name) + if err != nil { + return err + } + + newVersion, err := c.getObjVersion(name, obj) + if err != nil { + return err + } + + storedVersion, err := c.getObjVersion(name, objInfo.latestObj) + if err != nil { + return err + } + + if newVersion < storedVersion { + return fmt.Errorf("%v %q is out of sync (stored: %d, assume: %d)", c.description, name, storedVersion, newVersion) + } + + // Only update the cached object + objInfo.latestObj = obj + c.logger.V(4).Info("Assumed object", "description", c.description, "cacheKey", name, "version", newVersion) + return nil +} + +func (c *assumeCache) Restore(objName string) { + c.rwMutex.Lock() + defer c.rwMutex.Unlock() + + objInfo, err := c.getObjInfo(objName) + if err != nil { + // This could be expected if object got deleted + c.logger.V(5).Info("Restore object", "description", c.description, "cacheKey", objName, "err", err) + } else { + objInfo.latestObj = objInfo.apiObj + c.logger.V(4).Info("Restored object", "description", c.description, "cacheKey", objName) + } +} diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index ed6a87a20d5bc..b26e7ac6b447b 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -46,7 +46,6 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" - "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" schedutil "k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/utils/ptr" ) @@ -302,7 +301,7 @@ type dynamicResources struct { // When implementing cluster autoscaler support, this assume cache or // something like it (see https://github.com/kubernetes/kubernetes/pull/112202) // might have to be managed by the cluster autoscaler. - claimAssumeCache volumebinding.AssumeCache + claimAssumeCache AssumeCache // inFlightAllocations is map from claim UUIDs to true for those claims // for which allocation was triggered during a scheduling cycle and the @@ -355,7 +354,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe classParametersLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClassParameters().Lister(), resourceSliceLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceSlices().Lister(), claimNameLookup: resourceclaim.NewNameLookup(fh.ClientSet()), - claimAssumeCache: volumebinding.NewAssumeCache(logger, fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Informer(), "claim", "", nil), + claimAssumeCache: NewAssumeCache(logger, fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Informer(), "claim", "", nil), } return pl, nil @@ -944,6 +943,7 @@ func (pl *dynamicResources) PreFilter(ctx context.Context, state *framework.Cycl // this step here turns out to be expensive, we may have to // maintain and update state more persistently. resources, err := newResourceModel(logger, pl.resourceSliceLister, pl.claimAssumeCache) + logger.V(5).Info("Resource usage", "resources", klog.Format(resources)) if err != nil { return nil, statusError(logger, err) } @@ -1386,10 +1386,15 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat claim = claim.DeepCopy() claim.Status.DriverName = driverName claim.Status.Allocation = allocation + // This assumed object is not one which was actually stored in + // the apiserver yet! If we receive an update, the assumed + // object will get overwritten. The claimAssumeCache handles + // this case for us by copying the allocation into the updated + // object to produce a newer assumed object. if err := pl.claimAssumeCache.Assume(claim); err != nil { return statusError(logger, fmt.Errorf("update claim assume cache: %v", err)) } - logger.V(5).Info("prepared allocation result", "claim", klog.KObj(claim), "driver", driverName, "allocation", allocation) + logger.V(5).Info("prepared allocation result", "claim", klog.KObj(claim), "driver", driverName, "allocation", klog.Format(allocation)) } // When there is only one pending resource, we can go ahead with @@ -1556,7 +1561,7 @@ func (pl *dynamicResources) reserveClaim(ctx context.Context, state *stateData, allocationPatch := "" allocation := state.informationsForClaim[index].allocation - logger.V(5).Info("preparing claim status patch", "claim", klog.KObj(state.claims[index]), "allocation", allocation) + logger.V(5).Info("preparing claim status patch", "claim", klog.KObj(state.claims[index]), "allocation", klog.Format(allocation)) // Do we need to store an allocation result from Reserve? if allocation != nil { @@ -1610,6 +1615,8 @@ func (pl *dynamicResources) reserveClaim(ctx context.Context, state *stateData, // completed, either successfully or with a failure. if err != nil { pl.claimAssumeCache.Restore(claim.Namespace + "/" + claim.Name) + } else { + pl.claimAssumeCache.Assume(claim) } pl.inFlightAllocations.Delete(claim.UID) } diff --git a/pkg/scheduler/framework/plugins/dynamicresources/structured/namedresources/namedresourcesmodel.go b/pkg/scheduler/framework/plugins/dynamicresources/structured/namedresources/namedresourcesmodel.go index 08ba8a32dd6cc..c65265c83803e 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/structured/namedresources/namedresourcesmodel.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/structured/namedresources/namedresourcesmodel.go @@ -27,13 +27,16 @@ import ( "k8s.io/dynamic-resource-allocation/structured/namedresources/cel" ) +// These types and fields are all exported to allow logging them with +// pretty-printed JSON. + type Model struct { - instances []instanceAllocation + Instances []InstanceAllocation } -type instanceAllocation struct { - allocated bool - instance *resourceapi.NamedResourcesInstance +type InstanceAllocation struct { + Allocated bool + Instance *resourceapi.NamedResourcesInstance } // AddResources must be called first to create entries for all existing @@ -44,7 +47,7 @@ func AddResources(m *Model, resources *resourceapi.NamedResourcesResources) { } for i := range resources.Instances { - m.instances = append(m.instances, instanceAllocation{instance: &resources.Instances[i]}) + m.Instances = append(m.Instances, InstanceAllocation{Instance: &resources.Instances[i]}) } } @@ -54,9 +57,9 @@ func AddAllocation(m *Model, result *resourceapi.NamedResourcesAllocationResult) if result == nil { return } - for i := range m.instances { - if m.instances[i].instance.Name == result.Name { - m.instances[i].allocated = true + for i := range m.Instances { + if m.Instances[i].Instance.Name == result.Name { + m.Instances[i].Allocated = true break } } @@ -103,23 +106,23 @@ func (c *Controller) Allocate(ctx context.Context, model Model) ([]*resourceapi. } results := make([]*resourceapi.NamedResourcesAllocationResult, len(c.requests)) for i := range c.requests { - results[i] = &resourceapi.NamedResourcesAllocationResult{Name: model.instances[indices[i]].instance.Name} + results[i] = &resourceapi.NamedResourcesAllocationResult{Name: model.Instances[indices[i]].Instance.Name} } return results, nil } func (c *Controller) allocate(ctx context.Context, model Model) ([]int, error) { // Shallow copy, we need to modify the allocated boolean. - instances := slices.Clone(model.instances) + instances := slices.Clone(model.Instances) indices := make([]int, 0, len(c.requests)) for _, request := range c.requests { for i, instance := range instances { - if instance.allocated { + if instance.Allocated { continue } if c.filter != nil { - okay, err := c.filter.Evaluate(ctx, instance.instance.Attributes) + okay, err := c.filter.Evaluate(ctx, instance.Instance.Attributes) if err != nil { return nil, fmt.Errorf("evaluate filter CEL expression: %w", err) } @@ -127,7 +130,7 @@ func (c *Controller) allocate(ctx context.Context, model Model) ([]int, error) { continue } } - okay, err := request.Evaluate(ctx, instance.instance.Attributes) + okay, err := request.Evaluate(ctx, instance.Instance.Attributes) if err != nil { return nil, fmt.Errorf("evaluate request CEL expression: %w", err) } @@ -140,7 +143,7 @@ func (c *Controller) allocate(ctx context.Context, model Model) ([]int, error) { // allocating one "large" instances for a "small" request may // make a following "large" request impossible to satisfy when // only "small" instances are left. - instances[i].allocated = true + instances[i].Allocated = true indices = append(indices, i) break } diff --git a/pkg/scheduler/framework/plugins/dynamicresources/structuredparameters.go b/pkg/scheduler/framework/plugins/dynamicresources/structuredparameters.go index 6ac1d263f164c..40a84e8d596b3 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/structuredparameters.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/structuredparameters.go @@ -32,12 +32,12 @@ import ( // resources is a map "node name" -> "driver name" -> available and // allocated resources per structured parameter model. -type resources map[string]map[string]resourceModels +type resources map[string]map[string]ResourceModels -// resourceModels may have more than one entry because it is valid for a driver to +// ResourceModels may have more than one entry because it is valid for a driver to // use more than one structured parameter model. -type resourceModels struct { - namedresources namedresourcesmodel.Model +type ResourceModels struct { + NamedResources namedresourcesmodel.Model } // newResourceModel parses the available information about resources. Objects @@ -53,10 +53,10 @@ func newResourceModel(logger klog.Logger, resourceSliceLister resourcev1alpha2li } for _, slice := range slices { if model[slice.NodeName] == nil { - model[slice.NodeName] = make(map[string]resourceModels) + model[slice.NodeName] = make(map[string]ResourceModels) } resource := model[slice.NodeName][slice.DriverName] - namedresourcesmodel.AddResources(&resource.namedresources, slice.NamedResources) + namedresourcesmodel.AddResources(&resource.NamedResources, slice.NamedResources) model[slice.NodeName][slice.DriverName] = resource } @@ -75,12 +75,12 @@ func newResourceModel(logger klog.Logger, resourceSliceLister resourcev1alpha2li continue } if model[structured.NodeName] == nil { - model[structured.NodeName] = make(map[string]resourceModels) + model[structured.NodeName] = make(map[string]ResourceModels) } resource := model[structured.NodeName][handle.DriverName] for _, result := range structured.Results { // Call AddAllocation for each known model. Each call itself needs to check for nil. - namedresourcesmodel.AddAllocation(&resource.namedresources, result.NamedResources) + namedresourcesmodel.AddAllocation(&resource.NamedResources, result.NamedResources) } } } @@ -159,7 +159,7 @@ type perDriverController struct { func (c claimController) nodeIsSuitable(ctx context.Context, nodeName string, resources resources) (bool, error) { nodeResources := resources[nodeName] for driverName, perDriver := range c.namedresources { - okay, err := perDriver.controller.NodeIsSuitable(ctx, nodeResources[driverName].namedresources) + okay, err := perDriver.controller.NodeIsSuitable(ctx, nodeResources[driverName].NamedResources) if err != nil { // This is an error in the CEL expression which needs // to be fixed. Better fail very visibly instead of @@ -191,7 +191,7 @@ func (c claimController) allocate(ctx context.Context, nodeName string, resource for driverName, perDriver := range c.namedresources { // Must return one entry for each request. The entry may be nil. This way, // the result can be correlated with the per-request parameters. - results, err := perDriver.controller.Allocate(ctx, nodeResources[driverName].namedresources) + results, err := perDriver.controller.Allocate(ctx, nodeResources[driverName].NamedResources) if err != nil { return "", nil, fmt.Errorf("allocating via named resources structured model: %w", err) }