New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Storageversion API for CRDs (using workqueue) #123999
base: master
Are you sure you want to change the base?
Conversation
Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
16d7158
to
36c0eb1
Compare
staging/src/k8s.io/apiextensions-apiserver/pkg/storageversion/manager.go
Show resolved
Hide resolved
m.updateStorageVersion(ctx, latestSVUpdateInfo.crd, latestSVUpdateInfo.updateChannels.teardownFinishedCh, latestSVUpdateInfo.updateChannels.processedCh, latestSVUpdateInfo.updateChannels.errCh) | ||
markUpdateAsProcessed(latestSVUpdateInfo) | ||
return true | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we re-enqueue? I'm unclear on how the SV update is coordinated when the SV update request times out.
Loosely related: If the CRD update succeeds, and the cluster has only 1 apiserver, and that apiserver crashes after the CRD update but before the SV update, what happens?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we re-enqueue?
That might be a good suggestion. We do retries on just the storageversion update here, but there can be failures outside of this function. Like if the teardown of the old storage times out for any reason, do we want try the SV update (along with the teardown) again? Open to suggestions. If we re-enqueue, then for how long after the first failure do we keep doing that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the CRD update succeeds, and the cluster has only 1 apiserver, and that apiserver crashes after the CRD update but before the SV update, what happens?
From my understanding, the old handler and old storage would be torn down. When the apiserver comes up again:
- new handler (using new storageversion) is created when a CR request arrives
- the new CR request triggers waitForStorageVersionUpdate() which tries to fetch the latest entry from the storageVersionUpdateInfoMap
- finds nothing there, and returns nil
- CR is served using new storage but the storageversion API doesnt reflect that. Storageversion migrator wouldn't trigger the migration, unless a new CRD update happens.
Maybe we should change the behavior at #3
. We can queue the SV Update here instead of returning a nil. Thought the SV update might not be processed instantaneously, but we will atleast have it tracked..
But we should also be careful that we dont queue SV update for a deleted CRD.
cc @roycaihw for verifying my understanding and his thoughts on the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do the above, it also means that whenever a CR write arrives after the server is restarted, we will queue the relevant CRD for SV update regardless of whether its SV was already updated (i.e. before the server crashed)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a server reboots, it reconciles all the CRDs and update SVs accordingly. New CR write requests should be blocked until the corresponding SV is up-to-date.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added requeuing for whenever a CRD is not found in the storageversionUpdateInfoMap to accommodate for server bootstrap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: The requeuing would only happen when a CR write is observed after the server restarts. Which means that the SV for a CRD would not be reconciled until a corresponding CR write is made. This doesn't seem very ideal to me. Maybe we should instead have a poststarthook that does a list on all available CRDs and queues the SV updates for all CRDs found in the list? @roycaihw @jpbetz
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have modified the existing poststarthook (which currently only waited for the crd informer to sync, to also wait till all crds are enqueued for SV updates).
} | ||
|
||
func (m *Manager) updateLatestStorageVersionUpdateInfo(crdUID types.UID, svUpdateInfo *storageVersionUpdateInfo) { | ||
latestStorageVersionUpdateInfo.Store(crdUID, svUpdateInfo) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should entries be deleted from this map? It looks like is only ever grows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that part is still pending. Entries are made per CRD(UID) and should be deleted only when a CRD is deleted I think. Any other cases where you think we can delete the entry?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added functionality to delete entries from the map.
61765b0
to
6370889
Compare
44312ba
to
c0f169e
Compare
|
||
// alwaysServe is set to true when a CRD's storageversion is published. | ||
// All CR writes for a CRD are allowed if this is set to true. | ||
alwaysServe bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we use atomic.Bool
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thanks for the suggestion!
func isAllowedToServe(crdHandler *crdHandler, crdInfo *crdInfo, crd *apiextensionsv1.CustomResourceDefinition, requestInfo *apirequest.RequestInfo, w http.ResponseWriter, req *http.Request) error { | ||
if !utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionAPI) || | ||
!utilfeature.DefaultFeatureGate.Enabled(features.APIServerIdentity) { | ||
klog.V(4).Infof("not checking if handler is allowed to serve since StorageVersionAPI and/or APIServerIdentity feature are disabled.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be a lot of logs (for every CR requests) if the verbosity is turned above 4.
Can we only log once at server startup?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this logline.
return false, activeTeardownsCount | ||
} | ||
|
||
func (m *Manager) AlreadyPublishedLatestSV(crd *apiextensionsv1.CustomResourceDefinition) (bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is not checking the "latest". It's checking whether the storage version in the given CRD is reflected in SV. Could you rename the function and document?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed to StorageVersionFoundInCacheFor()
@@ -571,6 +664,35 @@ func (r *crdHandler) tearDown(oldInfo *crdInfo) { | |||
// destroy only the main storage. Those for the subresources share cacher and etcd clients. | |||
storage.CustomResource.DestroyFunc() | |||
} | |||
|
|||
r.updateActiveTeardownsCount(oldInfo.name, -1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider moving this to the top of the function and using a defer method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
crdName := key.(string) | ||
crd, err := m.crdInformer.Lister().Get(crdName) | ||
if err != nil { | ||
klog.V(4).Infof("crd: %s not found in CRD cache, abandoning storageversion update", crdName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will there be errors other than NotFound? Can we distinguish them? If the CRD doesn't exists, we don't need to write SV
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added separate handling for NotFound err - i.e. do not requeue SV update when this happens
745f498
to
94e9a45
Compare
} | ||
|
||
// TODO: indefinitely requeue on error? | ||
m.Enqueue(crd.Name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This smells like a hot loop. What prevents this from immediately de-queuing on the m.queue.Get()
? Perhaps structuring with the separate syncHandler
like our example in https://github.com/kubernetes/community/blob/master/contributors/devel/sig-api-machinery/controllers.md#rough-structure will help.
|
||
// alwaysServe is set to true when a CRD's storageversion is published. | ||
// All CR writes for a CRD are allowed if this is set to true. | ||
alwaysServe atomic.Bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: is this allowMutationRequests
?
err := apierrors.NewMethodNotSupported(schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}, requestInfo.Verb) | ||
err.ErrStatus.Message = fmt.Sprintf("%v not allowed while custom resource definition is terminating", requestInfo.Verb) | ||
responsewriters.ErrorNegotiated(err, Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req) | ||
return nil | ||
} | ||
|
||
if err := isAllowedToServe(r, crdInfo, crd, requestInfo, w, req); err != nil { | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What writes the response to the client?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What writes the response to the client?
Oh, having an isFoo
function write a response is unexpected. Can it refactor to make it practical to write the response here?
ok, err := crdHandler.storageVersionManager.StorageVersionFoundInCacheFor(crd) | ||
if err != nil { | ||
klog.V(4).Infof("handler not allowed to serve this request, err: %v", err) | ||
err = apierrors.NewServiceUnavailable(err.Error()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like a 429 would be more seamless.
return nil | ||
} | ||
|
||
ok, err := crdHandler.storageVersionManager.StorageVersionFoundInCacheFor(crd) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given an arbitrarily latent list/watch for StorageVersions, how does this prevent serving in a case of
- steady state CRD storing v1
- steady state StorageVersion encoding as v1
- update CRD to storing v2
- update StorageVersion encoding as v2, BUT watch event is delayed by five minutes
- update CRD to storing v1
- CR create arrives and is stored as v1, but StorageVersion indicates v2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I think this is not handled today.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's my understanding of this case:
- SV was written to v1 correctly, SV cache reflects that.
- CRD updated to v2, SV updated to v2, but SV cache is still at v1.
- CRD updated to v1 again, CR request received - finds CRD SV to match SV cache (v1) - stores in v1.
- the SV worker after 3. will find that CRD cache is at v1, SV cache is at v1, so will not process the v1 update again. Thus SV will be stuck at v2.
So to confirm, is the concern that
- the storageversion API will show v2 which is wrong since it should show v1?
- CR was allowed to be written in v1 before the SV was updated to v1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So to confirm, is the concern that
1. the storageversion API will show v2 which is wrong since it should show v1? 2. CR was allowed to be written in v1 before the SV was updated to v1?
Well a fix could come in either direction, but I guess the more specific problem is CR was allowed to be written in v1 before the storageversion was updated to v1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had a proposal to address this issue (#124543 (review), doc). @deads2k Interested to hear your opinion on it.
func (r *crdHandler) tearDown(oldInfo *crdInfo) { | ||
// TODO: Do better than multiple bool params. | ||
func (r *crdHandler) tearDownAndQueueSVUpdate(oldInfo *crdInfo, queueSVUpdate bool, deleteTeardownCountInfo bool) { | ||
r.updateActiveTeardownsCount(oldInfo.name, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this update done inside of a gofunc instead of under the lock?
a3253d7
to
9f7d63b
Compare
Pausing efforts on this support currently. The approach I have followed in this PR is summarized in this doc. Hope it benefits anyone who picks this up at a later point! |
9f7d63b
to
751f8f3
Compare
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: richabanker The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
9c5aa95
to
9aaaa67
Compare
…st SV updates async Signed-off-by: Andrew Sy Kim <andrewsy@google.com> Co-authored-by: Haowei Cai (Roy) <haoweic@google.com>
9aaaa67
to
926c77c
Compare
…informer cache to serve CR writes
…ow whether to serve a CR write
926c77c
to
729c800
Compare
@richabanker: The following tests failed, say
Full PR test history. Your PR dashboard. Please help us cut down on flakes by linking to an open issue when you hit one in your PR. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. I understand the commands that are listed here. |
Also posting a summary of all the approaches we have tried so far for this issue here |
What type of PR is this?
/kind feature
What this PR does / why we need it:
This PR is based off of #120582.
This is an attempt to drive the StorageVersion API graduation toward beta by addressing requirements/concerns for the same.
Which issue(s) this PR fixes:
Fixes kubernetes/enhancements#2339
Special notes for your reviewer:
Does this PR introduce a user-facing change?
Additional documentation e.g., KEPs (Kubernetes Enhancement Proposals), usage docs, etc.: