Skip to content

Commit

Permalink
staging dra: use MutationCache in controller
Browse files Browse the repository at this point in the history
Directly after modifying a ResourceClaim in the apiserver, the locally cached
copy is outdated until the informer receives the update. If any operation looks
at the claim during that time frame, it will act based on stale
information. For example, it might try to allocate again. If that works because
of idempotency, then the following update operation fails with a conflict
error.

This is harmless, but leads to confusing log output. It can be avoided by
keeping a copy of the updated claim and using that instead of the one from the
informer cache.
  • Loading branch information
pohly committed Nov 11, 2022
1 parent bb040ef commit b2c3979
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ type controller struct {
eventRecorder record.EventRecorder
rcLister resourcev1alpha1listers.ResourceClassLister
rcSynced cache.InformerSynced
claimLister resourcev1alpha1listers.ResourceClaimLister
claimCache cache.MutationCache
podSchedulingLister resourcev1alpha1listers.PodSchedulingLister
claimSynced cache.InformerSynced
podSchedulingSynced cache.InformerSynced
Expand Down Expand Up @@ -177,6 +177,13 @@ func New(
queue := workqueue.NewNamedRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("%s-queue", name))

// The mutation cache acts as an additional layer for the informer
// cache and after an update made by the controller returns a more
// recent copy until the informer catches up.
claimInformerCache := claimInformer.Informer().GetIndexer()
claimCache := cache.NewIntegerResourceVersionMutationCache(claimInformerCache, claimInformerCache, 60*time.Second,
false /* only cache updated claims that exist in the informer cache */)

ctrl := &controller{
ctx: ctx,
logger: logger,
Expand All @@ -186,7 +193,7 @@ func New(
kubeClient: kubeClient,
rcLister: rcInformer.Lister(),
rcSynced: rcInformer.Informer().HasSynced,
claimLister: claimInformer.Lister(),
claimCache: claimCache,
claimSynced: claimInformer.Informer().HasSynced,
podSchedulingLister: podSchedulingInformer.Lister(),
podSchedulingSynced: podSchedulingInformer.Informer().HasSynced,
Expand Down Expand Up @@ -354,12 +361,8 @@ func (ctrl *controller) syncKey(ctx context.Context, key string) (obj runtime.Ob

switch prefix {
case claimKeyPrefix:
claim, err := ctrl.claimLister.ResourceClaims(namespace).Get(name)
if err != nil {
if k8serrors.IsNotFound(err) {
klog.FromContext(ctx).V(5).Info("ResourceClaim was deleted, no need to process it")
return nil, nil
}
claim, err := ctrl.getCachedClaim(ctx, object)
if claim == nil || err != nil {
return nil, err
}
obj, finalErr = claim, ctrl.syncClaim(ctx, claim)
Expand All @@ -377,6 +380,22 @@ func (ctrl *controller) syncKey(ctx context.Context, key string) (obj runtime.Ob
return
}

func (ctrl *controller) getCachedClaim(ctx context.Context, key string) (*resourcev1alpha1.ResourceClaim, error) {
claimObj, exists, err := ctrl.claimCache.GetByKey(key)
if !exists || k8serrors.IsNotFound(err) {
klog.FromContext(ctx).V(5).Info("ResourceClaim not found, no need to process it")
return nil, nil
}
if err != nil {
return nil, err
}
claim, ok := claimObj.(*resourcev1alpha1.ResourceClaim)
if !ok {
return nil, fmt.Errorf("internal error: got %T instead of *resourcev1alpha1.ResourceClaim from claim cache", claimObj)
}
return claim, nil
}

// syncClaim determines which next action may be needed for a ResourceClaim
// and does it.
func (ctrl *controller) syncClaim(ctx context.Context, claim *resourcev1alpha1.ResourceClaim) error {
Expand Down Expand Up @@ -414,6 +433,7 @@ func (ctrl *controller) syncClaim(ctx context.Context, claim *resourcev1alpha1.R
if err != nil {
return fmt.Errorf("remove allocation: %v", err)
}
ctrl.claimCache.Mutation(claim)
} else {
// Ensure that there is no on-going allocation.
if err := ctrl.driver.Deallocate(ctx, claim); err != nil {
Expand All @@ -428,12 +448,15 @@ func (ctrl *controller) syncClaim(ctx context.Context, claim *resourcev1alpha1.R
if err != nil {
return fmt.Errorf("remove deallocation: %v", err)
}
ctrl.claimCache.Mutation(claim)
}

claim.Finalizers = ctrl.removeFinalizer(claim.Finalizers)
if _, err := ctrl.kubeClient.ResourceV1alpha1().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{}); err != nil {
claim, err = ctrl.kubeClient.ResourceV1alpha1().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("remove finalizer: %v", err)
}
ctrl.claimCache.Mutation(claim)
}

// Nothing further to do. The apiserver should remove it shortly.
Expand Down Expand Up @@ -515,6 +538,7 @@ func (ctrl *controller) allocateClaim(ctx context.Context,
if err != nil {
return fmt.Errorf("add finalizer: %v", err)
}
ctrl.claimCache.Mutation(claim)
}

logger.V(5).Info("Allocating")
Expand All @@ -528,16 +552,19 @@ func (ctrl *controller) allocateClaim(ctx context.Context,
claim.Status.ReservedFor = append(claim.Status.ReservedFor, *selectedUser)
}
logger.V(6).Info("Updating claim after allocation", "claim", claim)
if _, err := ctrl.kubeClient.ResourceV1alpha1().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}); err != nil {
claim, err = ctrl.kubeClient.ResourceV1alpha1().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("add allocation: %v", err)
}
ctrl.claimCache.Mutation(claim)
return nil
}

func (ctrl *controller) checkPodClaim(ctx context.Context, pod *v1.Pod, podClaim v1.PodResourceClaim) (*ClaimAllocation, error) {
claimName := resourceclaim.Name(pod, &podClaim)
claim, err := ctrl.claimLister.ResourceClaims(pod.Namespace).Get(claimName)
if err != nil {
key := pod.Namespace + "/" + claimName
claim, err := ctrl.getCachedClaim(ctx, key)
if claim == nil || err != nil {
return nil, err
}
if podClaim.Source.ResourceClaimTemplateName != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2/ktesting"
_ "k8s.io/klog/v2/ktesting/init"
)
Expand Down Expand Up @@ -378,6 +379,8 @@ func TestController(t *testing.T) {
} {
t.Run(name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)

initialObjects := []runtime.Object{}
for _, class := range test.classes {
initialObjects = append(initialObjects, class)
Expand All @@ -396,6 +399,10 @@ func TestController(t *testing.T) {
claimInformer := informerFactory.Resource().V1alpha1().ResourceClaims()
podInformer := informerFactory.Core().V1().Pods()
podSchedulingInformer := informerFactory.Resource().V1alpha1().PodSchedulings()
// Order is important: on function exit, we first must
// cancel, then wait (last-in-first-out).
defer informerFactory.Shutdown()
defer cancel()

for _, obj := range initialObjects {
switch obj.(type) {
Expand All @@ -416,6 +423,14 @@ func TestController(t *testing.T) {
driver.t = t

ctrl := New(ctx, driverName, driver, kubeClient, informerFactory)
informerFactory.Start(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(),
informerFactory.Resource().V1alpha1().ResourceClasses().Informer().HasSynced,
informerFactory.Resource().V1alpha1().ResourceClaims().Informer().HasSynced,
informerFactory.Resource().V1alpha1().PodSchedulings().Informer().HasSynced,
) {
t.Fatal("could not sync caches")
}
_, err := ctrl.(*controller).syncKey(ctx, test.key)
if err != nil && test.expectedError == "" {
t.Fatalf("unexpected error: %v", err)
Expand Down

0 comments on commit b2c3979

Please sign in to comment.