New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support SV API for CRDs (using workqueue and wait channel) #124543
base: master
Are you sure you want to change the base?
Conversation
Skipping CI for Draft Pull Request. |
Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
This issue is currently awaiting triage. If a SIG or subproject determines this is a relevant issue, they will accept it by applying the The Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: richabanker The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
cc @roycaihw |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding some context here for the new proposal. The proposal is to mainly address this race condition in "handling CR writes" caught by @deads2k .
"Handling CR writes" is equivalent to "unblocking one crdInfo
", or "unblocking one handler".
In ideal world, each new handler will have one corresponding SV update. In reality, each new handler will have zero or one corresponding SV update. The SV update may not happen due to a newer handler comes and short-circuits the old handler's SV update.
Essentially what we are trying to achieve here is "unblock one handler when its corresponding SV update succeeds".
I think instead of introducing intermediate concepts (e.g. comparing CRD cache version and SV cache version) which try to remotely represent the ordering between "SV update" and "handler serving CR writes", we should just track the 1-to-1 relationship between a handler and its corresponding SV update. Because a remote tracking representation is hard to get right with various race conditions.
That means, when the SV update worker successfully performs an SV update, the corresponding handler should just know that its SV update succeeded.
To achieve this, I think we can do the following:
- In
createCustomResourceDefinition
andupdateCustomResourceDefinition
methods, create a newcrdInfo
(i.e. the new handler) when a oldcrdInfo
is removed from the map. Create a correspondingsvUpdatedSucceeded
channel. Store the channel info inside of thecrdInfo
. - Have a "pending SV update" cache, which is a map from "CRD name" to "SV + channel". The cache should be protected by a lock. Every time we write to the map for an existing key, simply overwrite the value (i.e. short-circuit older versions)
- Instead of letting the SV update worker to look at the CRD cache to decide what SV to update to, let the SV update worker to look at the new "pending SV update" cache. The worker will grab the "SV + channel". It will perform an SV update, and upon success, close the channel.
@@ -290,8 +300,6 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { | |||
return | |||
} | |||
|
|||
terminating := apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Terminating) | |||
|
|||
crdInfo, err := r.getOrCreateServingInfoFor(crd.UID, crd.Name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We create handlers and track their corresponding SV updates in the createCustomResourceDefinition
and updateCustomResourceDefinition
methods. I think in the ServeHTTP
method we can only get handlers but not create handlers, since we don't schedule SV update here.
This is because we want to make sure the 1-to-1 relationship between a handler and it's corresponding SV update is tracked.
@@ -453,14 +532,31 @@ func (r *crdHandler) createCustomResourceDefinition(obj interface{}) { | |||
storageMap := r.customStorage.Load().(crdStorageMap) | |||
oldInfo, found := storageMap[crd.UID] | |||
if !found { | |||
crdInfo, err := r.createServingInfoFor(crd.Name) | |||
if err != nil { | |||
klog.Errorf("createCustomResourceDefinition failed, error creating new handler: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Previously you mentioned that createServingInfoFor
may fail if the CRD is just created and it hasn't been approved by the naming controller. When the naming controller approves the name and updates the CRD status, a new watch event will trigger a successfully handler creation. An optimization here can be detecting whether a naming is approved or not, and logging at a different level (instead of Errorf
)
} | ||
|
||
// updateStorageVersion updates a StorageVersion for the given CRD. | ||
func (m *Manager) updateStorageVersion(ctx context.Context, crd *apiextensionsv1.CustomResourceDefinition) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updateStorageVersion
and updateCRDStorageVersion
are duplicates
} | ||
|
||
// UpdateActiveTeardownsCount updates the teardown count of the CRD in sync.Map. | ||
func (m *Manager) UpdateActiveTeardownsCount(crdName string, value int) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we unit test this method?
func (m *Manager) alreadyPublishedLatestSV(svUpdateInfo *storageVersionUpdateInfo) bool { | ||
select { | ||
case <-svUpdateInfo.processedCh: | ||
klog.V(4).Infof("Storageversion is already updated to the latest value for crd: %s, returning", svUpdateInfo.crd.Name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this situation possible?
svInfo := val.(*storageVersionUpdateInfo) | ||
svInfo.crd = crd | ||
if processedCh != nil { | ||
svInfo.processedCh = processedCh |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use the Store
method to update the sync.Map
, to make sure the update is atomic?
What type of PR is this?
/kind feature
What this PR does / why we need it:
This PR is based off of #120582.
Pseudocode
createCustomResourceDefinition
andupdateCustomResourceDefinition
methods, create a newcrdInfo
(i.e. the new handler) when a oldcrdInfo
is removed from the map. Create a correspondingsvUpdatedSucceeded
channel. Store the channel info inside of thecrdInfo
.Which issue(s) this PR fixes:
Fixes kubernetes/enhancements#2339
Special notes for your reviewer:
Does this PR introduce a user-facing change?
Additional documentation e.g., KEPs (Kubernetes Enhancement Proposals), usage docs, etc.: