New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
scheduler: move assume cache to utils #124102
scheduler: move assume cache to utils #124102
Conversation
Please note that we're already in Test Freeze for the Fast forwards are scheduled to happen every 6 hours, whereas the most recent run was: Thu Mar 28 14:11:21 UTC 2024. |
// by the assume cache, but suffers from a race: if an | ||
// update is received via the informer while such an | ||
// object is assumed, it gets dropped in favor of the | ||
// newer object from the apiserver. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the volumebinding plugin is affected by this race.
err = b.pvCache.Assume(newPV) |
stores an object that is locally modified:
kubernetes/staging/src/k8s.io/component-helpers/storage/volume/pv_helpers.go
Lines 128 to 152 in f4e246b
// The volume from method args can be pointing to watcher cache. We must not | |
// modify these, therefore create a copy. | |
volumeClone := volume.DeepCopy() | |
// Bind the volume to the claim if it is not bound yet | |
if volume.Spec.ClaimRef == nil || | |
volume.Spec.ClaimRef.Name != claim.Name || | |
volume.Spec.ClaimRef.Namespace != claim.Namespace || | |
volume.Spec.ClaimRef.UID != claim.UID { | |
claimRef, err := reference.GetReference(scheme.Scheme, claim) | |
if err != nil { | |
return nil, false, fmt.Errorf("unexpected error getting claim reference: %w", err) | |
} | |
volumeClone.Spec.ClaimRef = claimRef | |
dirty = true | |
} | |
// Set AnnBoundByController if it is not set yet | |
if shouldSetBoundByController && !metav1.HasAnnotation(volumeClone.ObjectMeta, AnnBoundByController) { | |
metav1.SetMetaDataAnnotation(&volumeClone.ObjectMeta, AnnBoundByController, "yes") | |
dirty = true | |
} | |
return volumeClone, dirty, nil |
@Huang-Wei: can you perhaps help review this? |
GitHub doesn't recognize the copied file, so here's a diff ( *** /dev/fd/63 2024-03-29 10:47:47.508859302 +0100
--- pkg/scheduler/util/assumecache/assume_cache.go 2024-03-29 10:46:52.973476773 +0100
***************
*** 14,79 ****
limitations under the License.
*/
! package volumebinding
import (
"fmt"
"strconv"
"sync"
"k8s.io/klog/v2"
- v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/tools/cache"
- storagehelpers "k8s.io/component-helpers/storage/volume"
)
// AssumeCache is a cache on top of the informer that allows for updating
// objects outside of informer events and also restoring the informer
! // cache's version of the object. Objects are assumed to be
! // Kubernetes API objects that implement meta.Interface
type AssumeCache interface {
! // Assume updates the object in-memory only
Assume(obj interface{}) error
! // Restore the informer cache's version of the object
! Restore(objName string)
! // Get the object by name
! Get(objName string) (interface{}, error)
! // GetAPIObj gets the API object by name
! GetAPIObj(objName string) (interface{}, error)
! // List all the objects in the cache
List(indexObj interface{}) []interface{}
}
! type errWrongType struct {
! typeName string
! object interface{}
}
! func (e *errWrongType) Error() string {
! return fmt.Sprintf("could not convert object to type %v: %+v", e.typeName, e.object)
}
! type errNotFound struct {
! typeName string
! objectName string
}
! func (e *errNotFound) Error() string {
! return fmt.Sprintf("could not find %v %q", e.typeName, e.objectName)
}
! type errObjectName struct {
! detailedErr error
}
! func (e *errObjectName) Error() string {
! return fmt.Sprintf("failed to get object name: %v", e.detailedErr)
}
// assumeCache stores two pointers to represent a single object:
--- 14,138 ----
limitations under the License.
*/
! package assumecache
import (
+ "errors"
"fmt"
"strconv"
"sync"
"k8s.io/klog/v2"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/tools/cache"
)
// AssumeCache is a cache on top of the informer that allows for updating
// objects outside of informer events and also restoring the informer
! // cache's version of the object. Objects are assumed to be
! // Kubernetes API objects that are supported by [meta.Accessor].
! //
! // Objects can referenced via their key, with [cache.MetaNamespaceKeyFunc]
! // as key function.
type AssumeCache interface {
! // Assume updates the object in-memory only.
! //
! // The version of the object must be greater or equal to
! // the current object, otherwise an error is returned.
! //
! // Storing an object with the same version is supported
! // by the assume cache, but suffers from a race: if an
! // update is received via the informer while such an
! // object is assumed, it gets dropped in favor of the
! // newer object from the apiserver.
! //
! // Only assuming objects that were returned by an apiserver
! // operation (Update, Patch) is safe.
Assume(obj interface{}) error
! // Restore the informer cache's version of the object.
! Restore(key string)
! // Get the object by its key.
! Get(key string) (interface{}, error)
! // GetAPIObj gets the informer cache's version by its key.
! GetAPIObj(key string) (interface{}, error)
! // List all the objects in the cache.
List(indexObj interface{}) []interface{}
+
+ // getImplementation is used internally by [AddTestObject], [UpdateTestObject], [DeleteTestObject].
+ getImplementation() *assumeCache
+ }
+
+ // Informer is the subset of [cache.SharedInformer] that NewAssumeCache depends upon.
+ type Informer interface {
+ AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error)
}
! // AddTestObject adds an object to the assume cache.
! // Only use this for unit testing!
! func AddTestObject(cache AssumeCache, obj interface{}) {
! cache.getImplementation().add(obj)
}
! // UpdateTestObject updates an object in the assume cache.
! // Only use this for unit testing!
! func UpdateTestObject(cache AssumeCache, obj interface{}) {
! cache.getImplementation().update(nil, obj)
}
! // DeleteTestObject deletes object in the assume cache.
! // Only use this for unit testing!
! func DeleteTestObject(cache AssumeCache, obj interface{}) {
! cache.getImplementation().delete(obj)
}
! // Sentinel errors that can be checked for with errors.Is.
! var (
! ErrWrongType = errors.New("object has wrong type")
! ErrNotFound = errors.New("object not found")
! ErrObjectName = errors.New("cannot determine object name")
! )
!
! type WrongTypeError struct {
! TypeName string
! Object interface{}
}
! func (e WrongTypeError) Error() string {
! return fmt.Sprintf("could not convert object to type %v: %+v", e.TypeName, e.Object)
}
! func (e WrongTypeError) Is(err error) bool {
! return err == ErrWrongType
! }
!
! type NotFoundError struct {
! TypeName string
! ObjectKey string
! }
!
! func (e NotFoundError) Error() string {
! return fmt.Sprintf("could not find %v %q", e.TypeName, e.ObjectKey)
! }
!
! func (e NotFoundError) Is(err error) bool {
! return err == ErrNotFound
! }
!
! type ObjectNameError struct {
! DetailedErr error
! }
!
! func (e ObjectNameError) Error() string {
! return fmt.Sprintf("failed to get object name: %v", e.DetailedErr)
! }
!
! func (e ObjectNameError) Is(err error) bool {
! return err == ErrObjectName
}
// assumeCache stores two pointers to represent a single object:
***************
*** 119,125 ****
func objInfoKeyFunc(obj interface{}) (string, error) {
objInfo, ok := obj.(*objInfo)
if !ok {
! return "", &errWrongType{"objInfo", obj}
}
return objInfo.name, nil
}
--- 178,184 ----
func objInfoKeyFunc(obj interface{}) (string, error) {
objInfo, ok := obj.(*objInfo)
if !ok {
! return "", &WrongTypeError{TypeName: "objInfo", Object: obj}
}
return objInfo.name, nil
}
***************
*** 127,139 ****
func (c *assumeCache) objInfoIndexFunc(obj interface{}) ([]string, error) {
objInfo, ok := obj.(*objInfo)
if !ok {
! return []string{""}, &errWrongType{"objInfo", obj}
}
return c.indexFunc(objInfo.latestObj)
}
// NewAssumeCache creates an assume cache for general objects.
! func NewAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer, description, indexName string, indexFunc cache.IndexFunc) AssumeCache {
c := &assumeCache{
logger: logger,
description: description,
--- 186,198 ----
func (c *assumeCache) objInfoIndexFunc(obj interface{}) ([]string, error) {
objInfo, ok := obj.(*objInfo)
if !ok {
! return []string{""}, &WrongTypeError{TypeName: "objInfo", Object: obj}
}
return c.indexFunc(objInfo.latestObj)
}
// NewAssumeCache creates an assume cache for general objects.
! func NewAssumeCache(logger klog.Logger, informer Informer, description, indexName string, indexFunc cache.IndexFunc) AssumeCache {
c := &assumeCache{
logger: logger,
description: description,
***************
*** 148,154 ****
// Unit tests don't use informers
if informer != nil {
! informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.add,
UpdateFunc: c.update,
--- 207,214 ----
// Unit tests don't use informers
if informer != nil {
! // Cannot fail in practice?! No-one bothers checking the error.
! _, _ = informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.add,
UpdateFunc: c.update,
***************
*** 159,164 ****
--- 219,228 ----
return c
}
+ func (c *assumeCache) getImplementation() *assumeCache {
+ return c
+ }
+
func (c *assumeCache) add(obj interface{}) {
if obj == nil {
return
***************
*** 166,172 ****
name, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
! c.logger.Error(&errObjectName{err}, "Add failed")
return
}
--- 230,236 ----
name, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
! c.logger.Error(&ObjectNameError{err}, "Add failed")
return
}
***************
*** 213,219 ****
name, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
! c.logger.Error(&errObjectName{err}, "Failed to delete")
return
}
--- 277,283 ----
name, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
! c.logger.Error(&ObjectNameError{err}, "Failed to delete")
return
}
***************
*** 235,277 ****
objResourceVersion, err := strconv.ParseInt(objAccessor.GetResourceVersion(), 10, 64)
if err != nil {
! return -1, fmt.Errorf("error parsing ResourceVersion %q for %v %q: %s", objAccessor.GetResourceVersion(), c.description, name, err)
}
return objResourceVersion, nil
}
! func (c *assumeCache) getObjInfo(name string) (*objInfo, error) {
! obj, ok, err := c.store.GetByKey(name)
if err != nil {
return nil, err
}
if !ok {
! return nil, &errNotFound{c.description, name}
}
objInfo, ok := obj.(*objInfo)
if !ok {
! return nil, &errWrongType{"objInfo", obj}
}
return objInfo, nil
}
! func (c *assumeCache) Get(objName string) (interface{}, error) {
c.rwMutex.RLock()
defer c.rwMutex.RUnlock()
! objInfo, err := c.getObjInfo(objName)
if err != nil {
return nil, err
}
return objInfo.latestObj, nil
}
! func (c *assumeCache) GetAPIObj(objName string) (interface{}, error) {
c.rwMutex.RLock()
defer c.rwMutex.RUnlock()
! objInfo, err := c.getObjInfo(objName)
if err != nil {
return nil, err
}
--- 299,342 ----
objResourceVersion, err := strconv.ParseInt(objAccessor.GetResourceVersion(), 10, 64)
if err != nil {
! //nolint:errorlint // Intentionally not wrapping the error, the underlying error is an implementation detail.
! return -1, fmt.Errorf("error parsing ResourceVersion %q for %v %q: %v", objAccessor.GetResourceVersion(), c.description, name, err)
}
return objResourceVersion, nil
}
! func (c *assumeCache) getObjInfo(key string) (*objInfo, error) {
! obj, ok, err := c.store.GetByKey(key)
if err != nil {
return nil, err
}
if !ok {
! return nil, &NotFoundError{TypeName: c.description, ObjectKey: key}
}
objInfo, ok := obj.(*objInfo)
if !ok {
! return nil, &WrongTypeError{"objInfo", obj}
}
return objInfo, nil
}
! func (c *assumeCache) Get(key string) (interface{}, error) {
c.rwMutex.RLock()
defer c.rwMutex.RUnlock()
! objInfo, err := c.getObjInfo(key)
if err != nil {
return nil, err
}
return objInfo.latestObj, nil
}
! func (c *assumeCache) GetAPIObj(key string) (interface{}, error) {
c.rwMutex.RLock()
defer c.rwMutex.RUnlock()
! objInfo, err := c.getObjInfo(key)
if err != nil {
return nil, err
}
***************
*** 298,304 ****
for _, obj := range objs {
objInfo, ok := obj.(*objInfo)
if !ok {
! c.logger.Error(&errWrongType{"objInfo", obj}, "List error")
continue
}
allObjs = append(allObjs, objInfo.latestObj)
--- 363,369 ----
for _, obj := range objs {
objInfo, ok := obj.(*objInfo)
if !ok {
! c.logger.Error(&WrongTypeError{TypeName: "objInfo", Object: obj}, "List error")
continue
}
allObjs = append(allObjs, objInfo.latestObj)
***************
*** 309,315 ****
func (c *assumeCache) Assume(obj interface{}) error {
name, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
! return &errObjectName{err}
}
c.rwMutex.Lock()
--- 374,380 ----
func (c *assumeCache) Assume(obj interface{}) error {
name, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
! return &ObjectNameError{err}
}
c.rwMutex.Lock() |
7181446
to
4620ca2
Compare
/triage accepted |
/priority important-soon This is blocking next steps that are on the critical path for autoscaler support in 1.31. |
/approve |
4620ca2
to
44df029
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Several comments. Maybe we should split this into two commits Next Time as hard to tell which part is appended.
List(indexObj interface{}) []interface{} | ||
|
||
// getImplementation is used internally by [AddTestObject], [UpdateTestObject], [DeleteTestObject]. | ||
getImplementation() *assumeCache |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we return assumeCache specifically, then this method make less sense as part of the interface, because assumeCache is one implementation. Or let's reflect the type directly in Add/Update/Delete method, like cache.(*assumeCache)
, I think we don't have other implementations right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So basically eliminate the entire AssumeCache
interface? Makes sense to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've pushed an update. The only real diff is this removal of the interface: https://github.com/kubernetes/kubernetes/compare/44df029ebdb6738bc724ba0d4d3e540c1c819405..7f54c5dfec3e95a2751156a3a1ae4407be68297a
But I also reorganized the commits as suggested in #124102 (review), so the second and third commit show the changes on top of the original assume_cache.go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I just mean get rid of getImplementation()
, actually I think better to keep the interface for reference, but if @jsafrane LGTM, I'm also LGTM. It's an internal interface for volumebinding plugin, I think users wouldn't use this. Put my approval here just not block this pr merge.
/approve
/hold for @jsafrane
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a strong preference myself. Keeping the interface minimizes overall changes, but the internal method on it is ugly and the interface itself isn't needed in the first place.
@jsafrane: do you prefer this with or without the change in the last commit? I can easily drop it - I kept it separate for a reason! 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the interface is not really useful. There is no other implementation of it and if we need one, we can create an interface for it later.
/hold cancel
/lgtm
getImplementation() *assumeCache | ||
} | ||
|
||
// Informer is the subset of [cache.SharedInformer] that NewAssumeCache depends upon. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have the ShardInformer now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might belong into my follow-up PR. I'll remove if not needed here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't strictly need to be in this PR, but I think it's worthwhile to merge already now because then my follow-up PR doesn't need to touch pkg/scheduler/framework/plugins/volumebinding
again. Basically, this interface is part of the revised assume cache API, which is this PR.
See #124102 (comment), but I get your point. I'll change into one commit for "git mv" and then changes on top of that. |
This is a verbatim move resp. copy of the files. They don't build in their new location yet.
This is now used by both the volumebinding and dynamicresources plugin, so promoting it to a common helper package is better. In terms of functionality, nothing was changed. Documentation got updated (warns about storing locally modified objects, clarifies what the Get parameters are). Code coverage should be a bit better than before (tested with and without indexer, exercises event handlers, more error paths). Checking for specific errors can now be done via errors.Is.
There's no reason for having the interface because there is only one implementation. Makes the implementation of the test functions a bit simpler (no casting). They are still stand-alone functions instead of methods because they should not be considered part of the "normal" API.
44df029
to
7f54c5d
Compare
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: jsafrane, kerthcet, pohly The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
LGTM label has been added. Git tree hash: 04bd312f9a21d7345c29a566748eb5177bf0dccf
|
What type of PR is this?
/kind cleanup
What this PR does / why we need it:
The assume cache is now used by both the volumebinding and dynamicresources plugin, so promoting it to a common helper package is better.
Which issue(s) this PR fixes:
Related-to: #123698
More changes will follow.
Special notes for your reviewer:
In terms of functionality, nothing was changed. Documentation got updated (warns about storing locally modified objects, clarifies what the Get parameters are). Code coverage should be a bit better than before (tested with and without indexer, exercises event handlers, more error paths).
Checking for specific errors can now be done via errors.Is.
Does this PR introduce a user-facing change?
/cc @jsafrane @msau42