New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add initial support for StorageVersion API for CRDs #120582
base: master
Are you sure you want to change the base?
Add initial support for StorageVersion API for CRDs #120582
Conversation
Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
b447044
to
62a4a3e
Compare
/assign @andrewsykim |
8b18ac9
to
bde2f04
Compare
2b73cb9
to
1d21b54
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/remove-area kubeadm
/remove-sig cluster-lifecycle
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few comments on the structure of the code.
return fmt.Errorf("error while updating storage version for crd %v: %v", crd, err) | ||
} | ||
// close processCh after the update is done | ||
if processedCh != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why processedCh
is needed given it is closed synchronously. i.e. doesn't returning from the function achieve the same thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its mostly needed while handling mutate requests for a CR, to check whether a CRD storageversion is under processing before serving a CR write.
But for the CRD requests themselves, there's no dependency on processedCh as such..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I was asking why use a channel at all if when the function returns it is always closed? Its not asynchronous
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its used for these invocations of waitForStorageVersionUpdate() which have no knowledge of when UpdateStorageVersion() returns.
func UpdateStorageVersionFor(ctx context.Context, c Client, apiserverID string, gr schema.GroupResource, encodingVersion string, decodableVersions []string, servedVersions []string, postProcessFunc processStorageVersionFunc) error { | ||
err := singleUpdate(ctx, c, apiserverID, gr, encodingVersion, decodableVersions, servedVersions, postProcessFunc) | ||
if err != nil { | ||
time.Sleep(1 * time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this sleep here intentionally? I believe it was in the original code to space out the retries, but to sleep for 1s here before returning error seems off...
Also curious why retrying is removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah no, this was leftover from when I moved the retry logic from here to here
But now that I look at it, it seems that retry block didn't need to be moved really, since UpdateStorageVersionFor() already does perform the retries. Reverted the retry block move.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After reading your comment I expected the sleep to be removed/reverted; but its still here. Is that intentional
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reverted back the retry block so that it remains in-place, and so does the time.sleep. (or am I not seeing the right changes on my end?)
The only change that's done to UpdateStorageVersionFor() is add ctx and processStorageVersionFunc params now.
return nil | ||
} | ||
// Get the latest CRD to make sure it's not terminating or deleted | ||
crd, err := r.crdLister.Get(crdName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The CRD is passed as an argument, so its not clear to me why we need to get it again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the comment above explain why?
cc @roycaihw since this was originally authored by him
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to try to avoid races with terminating
condition being updated since it was fetched, but I am not sure it is necessary, since the caller of the function fetches the CRD not too long before calling this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't recall why I added this. It doesn't feel StorageVersion specific. We can exclude this from the PR
} | ||
// Get the latest CRD to make sure it's not terminating or deleted | ||
crd, err := r.crdLister.Get(crdName) | ||
if err != nil || crd.UID != crdUID || apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Terminating) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change has different conditions for when this terminating
block gets executed and I'm unclear why
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, yeah maybe waiting for the storage version update makes more sense to occur after we have established whether the CRD is terminating/deleted. Made the change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Im not seeing the code change, was it pushed?
func (m *manager) UpdateStorageVersion(ctx context.Context, crd *apiextensionsv1.CustomResourceDefinition, | ||
waitCh <-chan struct{}, processedCh chan<- struct{}, errCh chan<- struct{}) error { | ||
|
||
if waitCh != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find it awkward that the caller must pass a channel they might like to wait on instead of just waiting on it themselves
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the main purpose for passing the waitCh here, was to ensure that we wait on the teardown of the old storage before proceeding with the CRD update. But yeah it does look like it can just be a blocking call, that doesn't require a waitCh to signal that the teardown has finished. I'll get back on this.
cc @roycaihw
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, teardown() is always called as a goroutine, and this seems to be the case even today, so the waitCh is necessary to wait for the goroutine to finish
staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go
Show resolved
Hide resolved
// Manager provides methods for updating StorageVersion for CRDs. It does | ||
// goroutine management to allow CRD storage version updates running in the | ||
// background and not blocking the caller. | ||
type Manager interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This CRD storageversion.Manager
type to me feels better suited just as a standalone helper function in customresource_handler.go
than its own type + interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possibly the idea was to keep the structure similar to the staging/k8s.io/apiserver's storageversion manager ? cc @roycaihw
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC the main purpose of having the manager is to implement the queueing logic, which is unique to CRD
c6662e5
to
72b0ab8
Compare
ctx := apirequest.NewContext() | ||
// customStorageLock will be released even if UpdateStorageVersion() fails. This is safe | ||
// since we are deleting the old storage here and not creating a new one. | ||
err := r.storageVersionManager.UpdateStorageVersion(ctx, crd, tearDownFinishedCh, nil, nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This call turns this method from non-blocking to blocking since the UpdateStorageVersion
won't complete until tearDownFinishedCh
is closed. The async looks worthwhile to avoid blocking and to make the code more responsive to connections that haven't closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So thinking through this and the UpdateStorageVersion
call in in getOrCreateServingInfoFor
.
If we blindly make this async to unblock handling of other CRDs (probably required), what could happen.
- standing CRD storage version is v1
- CRD is updated with storage version v2
- that update is observed and this cleanup method is called
- a CR CREATE is in progress and extremely slow, it will store in v1. This (properly) stops the
UpdateStorageVersion
from being called until the requests are drained from the old handler - a new CR request is made to create a new instance
getOrCreateServingInfoFor
has a call toUpdateStorageVersion
which does not wait (no wait channel passed)- storageversion is updated to indicate v2 instead of v1
- very quickly we do a storage version migration because it's now safe and it completes
- the CR CREATE request from step 4 finishes, result is an instance stored in v1
- we now have data stored in v1 format in etcd. This isn't expected.
Trying to think through what conditions need to be correct in order to change the storage version, but I think this is racy.
idea for a solution here: https://github.com/kubernetes/kubernetes/pull/120582/files#r1490154633
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, I see the potential for a resource in a long-running request to be written in an older storage version if we make the update async without any mitigations.
Doesn't the solution as proposed still leave the possibility for a write to complete in a version that is not the currently active StorageVersion? (new writes would use the newer CRD version and complete without updating SV)
I suppose the distinction is if you are stepping the crd incrementally (not going backwards) any new resources or updates should always be stored at a version at least as high as the published storage version.? That seems OK for SVM's use case. Is this a guarantee for this Storage Version API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose the distinction is if you are stepping the crd incrementally (not going backwards) any new resources or updates should always be stored at a version at least as high as the published storage version.?
Yes, that's the distinction that I see.
Is this a guarantee for this Storage Version API?
The guarantee for in-tree resources is stronger. I think to achieve better, we would need to block writes to the CR until all previous storage has been destroyed. This could be built and may not be too bad to construct.
if utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionAPI) && | ||
utilfeature.DefaultFeatureGate.Enabled(features.APIServerIdentity) { | ||
ctx := apirequest.NewContext() | ||
err := r.storageVersionManager.UpdateStorageVersion(ctx, newCRD, tearDownFinishedCh, nil, nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Working through a potential solution to https://github.com/kubernetes/kubernetes/pull/120582/files#r1490138375
Observations
- Teardowns for ordered CRD updates do NOT complete in order because some requests can be slower than others
- Because storage version can move between v1 and v2 and back to v1, we should probably avoid writing in order of update and in order of completion because new writes using newer storage versions could invalidate it.
- Teardown has a maximum time out of two minutes
- I don't think we really care how quickly the storage version updates.
Idea:
- What if we create a workqueue for handling StorageVersion writes, keyed by CRD name
- On CreateCRD and UpdateCRD we can call a "queue CRD for 125 seconds from now" and write into a map[crd.name] -> CRD+timeToWrite. This gives us only the latest instance of the CRD with the latest time to wait
- when the workqueue pops an item, we lookup in the map, see if timeToWrite has passed and if so, write storage and remove on success (requeue on error). if not ready, do nothing
We would not need to have the CRD handle block reads or writes since all old writes would be over by the time we write the storage version.
Some future improvement (that I'd prefer to not have in this PR), could try to track "have all known storage versions completed teardown" and that could be sufficient instead of the two minute timer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this can guarantee that at the time we reach to update the storage version on any CRD, the teardown timeout has expired on any actively running requests for any older crd at most recent update, but won't there still be a race between the timeout expiring and the actual storage being destroyed?
kubernetes/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go
Lines 564 to 573 in d151f22
select { | |
case <-time.After(r.requestTimeout * 2): | |
klog.Warningf("timeout waiting for requests to drain for %s/%s, tearing down storage", oldInfo.spec.Group, oldInfo.spec.Names.Kind) | |
case <-requestsDrained: | |
} | |
for _, storage := range oldInfo.storages { | |
// destroy only the main storage. Those for the subresources share cacher and etcd clients. | |
storage.CustomResource.DestroyFunc() | |
} |
I do feel like this simplifies things a lot like waiting and many channel usages. I'm trying to think if theres a simple way to synchronize things directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delaying StorageVersion API updates to avoid races between SVM and old version CR writes sounds pragmatic. It just occurred to me that the there might be a pathological delay when a partitioned apiserver in an HA configuration reconnects to etcd.. so long term it would be ideal if SVM triggers after apiservers agree on storage version, but this seems like a step forward.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
won't there still be a race between the timeout expiring and the actual storage being destroyed?
+1.
Also IIUC, if we want the new CRD writes to happen only after the old CR writes have completed, we do want to make the CRD writes- blocking or delayed until the previous CR writes have drained? So I dont fully understand why not just use the current approach as it is? Is it because there's a possibility that we might timeout on new CRD writes while waiting for UpdateStorageVersion() to return?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Meaning, I dont understand the difference between the approach the PR currently vs. the workqueue approach suggested here. The former blocks a CRD update till the prev CR writes have drained which is guaranteed to finish in 2 mins anyway because of the teardown-timeout, and the latter just delays the CRD update by a fixed amount and relies on the same fact that all requests will either finish or get dropped because of the teardown-timeout. So what;s the advantage of using a workqueue here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After my chat with @deads2k, realized the race condition that the current PR has: because UpdateStorageVersion() is called in 2 places:
- within getOrCreateServingInfoFor() which happens when a new CR request comes in
- after a CRD update is issued
The updateSV from #1 does not wait for old CR writes to finish, and thus can result in SV being updated to a newer version while the older CR writes are still in progress.
As next steps, I think we need to decide :
- whether we need UpdateStorageVersion() to be called in 2 places. Maybe just calling it after a CRD update suffices?
- As a part of CRD update, have a way to preserve the status of the SVUpdate (expressed using processedCh, errorCh) which can be referenced at the time of CR writes
Also, another thought on using a workqueue approach as suggested:
Because storage version can move between v1 and v2 and back to v1, we should probably avoid writing in order of update and in order of completion because new writes using newer storage versions could invalidate it.
There is a concern of missing to write intermediate SV writes in cases when a CRD is updated from v1 -> v2 -> v1 which is expressed here. Which suggested that we infact need to capture both the v1 -> v2 update and the v2 -> v3 update by UpdatingSV at both the stages.
// StorageVersionAPI feature gate is disabled | ||
if !utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionAPI) || | ||
!utilfeature.DefaultFeatureGate.Enabled(features.APIServerIdentity) { | ||
klog.V(2).Infof("Skipped waiting for storage version to finish updating since StorageVersionAPI and/or APIServerIdentity feature are disabled.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC this line will be logged at V2 for every CR write requests when the feature is disabled. That seems to be a lot of clusters and a lot of logs.
return nil | ||
} | ||
// Get the latest CRD to make sure it's not terminating or deleted | ||
crd, err := r.crdLister.Get(crdName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't recall why I added this. It doesn't feel StorageVersion specific. We can exclude this from the PR
// Manager provides methods for updating StorageVersion for CRDs. It does | ||
// goroutine management to allow CRD storage version updates running in the | ||
// background and not blocking the caller. | ||
type Manager interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC the main purpose of having the manager is to implement the queueing logic, which is unique to CRD
Signed-off-by: Andrew Sy Kim <andrewsy@google.com> Co-authored-by: Haowei Cai (Roy) <haoweic@google.com>
72b0ab8
to
588ae6a
Compare
588ae6a
to
b4a3ed5
Compare
b4a3ed5
to
d76c0f4
Compare
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: richabanker The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
@richabanker: The following tests failed, say
Full PR test history. Your PR dashboard. Please help us cut down on flakes by linking to an open issue when you hit one in your PR. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. I understand the commands that are listed here. |
Created a new PR which uses a workqueue to perform async SV udpates here #123999 |
What type of PR is this?
/kind feature
What this PR does / why we need it:
This PR is based off of #113498, but rebased off of master. This is an attempt to drive the StorageVersion API graduation toward beta by addressing requirements/concerns for the same.
Which issue(s) this PR fixes:
Fixes kubernetes/enhancements#2339
Special notes for your reviewer:
Does this PR introduce a user-facing change?
Additional documentation e.g., KEPs (Kubernetes Enhancement Proposals), usage docs, etc.: