Skip to content

Commit

Permalink
unify processedCh and abortedCh
Browse files Browse the repository at this point in the history
When a storage version update is aborted because the CRD was deleted, we
don't need to reject in-flight CR requests. We can safely allow these
requests to proceed, because the CRD finalizer guarantees no write
requests can succeed after a CRD is deleted-- CREATE requests are 405
NOT ALLOWED, other requests will get 404 NOT FOUND.
  • Loading branch information
roycaihw committed Feb 17, 2021
1 parent 35d9ffc commit 8661f73
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ const (
// If the StorageVersionAPI feature gate is enabled, after a CRD storage
// (a.k.a. serving info) is created, the API server will block CR write
// requests that hit this storage until the corresponding storage
// version update gets processed/aborted by the storage version manager.
// version update gets processed by the storage version manager.
// The API server will unblock CR write requests if the storage version
// update takes longer than storageVersionUpdateTimeout after the
// storage is created.
Expand Down Expand Up @@ -193,20 +193,16 @@ type crdInfo struct {
}

// storageVersionUpdate holds information about a storage version update,
// indicating whether the update gets processed, aborted, or timed-out.
// indicating whether the update gets processed, or timed-out.
type storageVersionUpdate struct {
// processedCh is closed by the storage version manager after the
// storage version update gets processed (either succeeded or failed).
// The API server will unblock and allow CR write requests if this
// channel is closed.
processedCh <-chan struct{}
// abortedCh is closed by the storage version manager if the storage
// version update gets aborted. The API server will unblock and reject
// CR write requests if this channel is closed.
abortedCh <-chan struct{}
// timeout is the time when the API server will unblock and allow CR
// write requests even if the storage version update hasn't been
// processed nor aborted.
// processed.
timeout time.Time
// timeoutLogOnce allows the API server to log once when the timeout is
// hit.
Expand Down Expand Up @@ -368,22 +364,7 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
(requestInfo.Verb == "create" ||
requestInfo.Verb == "update" ||
requestInfo.Verb == "patch") {
var aborted bool
crdInfo, aborted, err = r.getOrCreateServingInfoForWrite(crd.UID, crd.Name)
// The server aborted storage version update due to the CRD being deleted.
// The CR request is forbidden.
if aborted {
err = apierrors.NewForbidden(schema.GroupResource{
Group: requestInfo.APIGroup,
Resource: requestInfo.Resource,
}, requestInfo.Name, fmt.Errorf("storage version update aborted due to CRD deletion"))
utilruntime.HandleError(err)
responsewriters.ErrorNegotiated(
err, Codecs, schema.GroupVersion{Group: requestInfo.APIGroup,
Version: requestInfo.APIVersion}, w, req,
)
return
}
crdInfo, err = r.getOrCreateServingInfoForWrite(crd.UID, crd.Name)
} else {
crdInfo, err = r.getOrCreateServingInfoFor(crd.UID, crd.Name)
}
Expand Down Expand Up @@ -557,7 +538,7 @@ func (r *crdHandler) createCustomResourceDefinition(obj interface{}) {
}
// Update storage version with the latest info in the watch event
if r.storageVersionManager != nil {
r.storageVersionManager.EnqueueStorageVersionUpdate(crd, tearDownFinishedCh, nil, nil)
r.storageVersionManager.EnqueueStorageVersionUpdate(crd, tearDownFinishedCh, nil)
}
}

Expand Down Expand Up @@ -607,7 +588,7 @@ func (r *crdHandler) updateCustomResourceDefinition(oldObj, newObj interface{})
}
// Update storage version with the latest info in the watch event
if r.storageVersionManager != nil {
r.storageVersionManager.EnqueueStorageVersionUpdate(newCRD, tearDownFinishedCh, nil, nil)
r.storageVersionManager.EnqueueStorageVersionUpdate(newCRD, tearDownFinishedCh, nil)
}
}

Expand Down Expand Up @@ -703,17 +684,11 @@ func (r *crdHandler) GetCustomResourceListerCollectionDeleter(crd *apiextensions
}

// getOrCreateServingInfoForWrite is getOrCreateServingInfoFor, but blocks CR
// write requests until the storage version update is processed, aborted, or
// write requests until the storage version update is processed, or
// timed-out.
// If the storage version update is processed, unblock and allow the write request.
// If the storage version update is aborted, unblock and reject the write request.
// If the storage version update timed-out, unblock and allow the write request.
//
// If aborted is true, the returned error (403) should be surfaced to the client,
// indicating that the server forbids the CR request due to the CRD deletion.
// Otherwise, any non-nil error returned by getOrCreateServingInfoFor may be
// wrapped by the server as an internal error (500) before surfacing to the client.
func (r *crdHandler) getOrCreateServingInfoForWrite(uid types.UID, name string) (ret *crdInfo, aborted bool, err error) {
func (r *crdHandler) getOrCreateServingInfoForWrite(uid types.UID, name string) (ret *crdInfo, err error) {
ret, err = r.getOrCreateServingInfoFor(uid, name)

// Surface non-nil error early.
Expand All @@ -722,7 +697,7 @@ func (r *crdHandler) getOrCreateServingInfoForWrite(uid types.UID, name string)
r.storageVersionManager == nil ||
ret.storageVersionUpdate == nil ||
ret.storageVersionUpdate.processedCh == nil {
return ret, false, err
return ret, err
}

// NOTE: currently the graceful CRD deletion waits 1s for in-flight requests
Expand All @@ -732,10 +707,6 @@ func (r *crdHandler) getOrCreateServingInfoForWrite(uid types.UID, name string)
// first CR request establishes the underlying storage).
select {
case <-ret.storageVersionUpdate.processedCh:
case <-ret.storageVersionUpdate.abortedCh:
// Storage version update aborted. The caller knows which error
// to return to the client (403).
return nil, true, nil
// Unblock the requests if the storage version update takes a long time, otherwise
// CR requests may stack up and overwhelm the API server.
// TODO(roycaihw): benchmark the storage version update latency to adjust the timeout.
Expand All @@ -745,7 +716,7 @@ func (r *crdHandler) getOrCreateServingInfoForWrite(uid types.UID, name string)
ret.spec.Group, ret.spec.Names.Plural)
})
}
return ret, false, nil
return ret, nil
}

// getOrCreateServingInfoFor gets the CRD serving info for the given CRD UID if the key exists in the storage map.
Expand Down Expand Up @@ -1092,11 +1063,9 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
if r.storageVersionManager != nil {
// spawn storage version update in background and use channels to make handlers wait
processedCh := make(chan struct{})
abortedCh := make(chan struct{})
r.storageVersionManager.EnqueueStorageVersionUpdate(crd, nil, processedCh, abortedCh)
r.storageVersionManager.EnqueueStorageVersionUpdate(crd, nil, processedCh)
ret.storageVersionUpdate = &storageVersionUpdate{
processedCh: processedCh,
abortedCh: abortedCh,
timeout: time.Now().Add(storageVersionUpdateTimeout),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,14 @@ const (
type Manager interface {
// EnqueueStorageVersionUpdate queues a StorageVesrion update for the given
// CRD and returns immediately. Optionally, the caller may specify a
// non-nil waitCh and/or a non-nil processedCh and a non-nil abortedCh.
// non-nil waitCh and/or a non-nil processedCh.
// A non-nil waitCh will block the StorageVersion update until waitCh is
// closed.
// The manager will close the non-nil processedCh if it finished
// processing the StorageVersion update (note that the update can either
// succeeded or failed), or close the non-nil abortedCh if it aborted
// processing the update.
// succeeded or failed).
EnqueueStorageVersionUpdate(crd *apiextensionsv1.CustomResourceDefinition,
waitCh <-chan struct{}, processedCh, abortedCh chan<- struct{})
waitCh <-chan struct{}, processedCh chan<- struct{})
// TeardownFor aborts all pending updates for the given CRD UID, and
// stops the corresponding goroutine.
TeardownFor(uid types.UID)
Expand All @@ -69,8 +68,6 @@ type update struct {
waitCh <-chan struct{}
// If non-nil, close the channel after the update process is finished.
processedCh chan<- struct{}
// If non-nil, close the channel if the update process is aborted.
abortedCh chan<- struct{}
}

// updateQueue is a queue of StorageVersion updates. Upon creation, a goroutine
Expand Down Expand Up @@ -117,15 +114,14 @@ func NewManager(client genericstorageversion.Client, apiserverID string) Manager

// EnqueueStorageVersionUpdate queues a StorageVesrion update for the given
// CRD and returns immediately. Optionally, the caller may specify a
// non-nil waitCh and/or a non-nil processedCh and a non-nil abortedCh.
// non-nil waitCh and/or a non-nil processedCh.
// A non-nil waitCh will block the StorageVersion update until waitCh is
// closed.
// The manager will close the non-nil processedCh if it finished
// processing the StorageVersion update (note that the update can either
// succeeded or failed), or close the non-nil abortedCh if it aborted
// processing the update.
// succeeded or failed).
func (m *manager) EnqueueStorageVersionUpdate(crd *apiextensionsv1.CustomResourceDefinition,
waitCh <-chan struct{}, processedCh, abortedCh chan<- struct{}) {
waitCh <-chan struct{}, processedCh chan<- struct{}) {
m.lock.Lock()
defer m.lock.Unlock()
q := m.getOrCreateUpdateQueueLocked(crd.UID)
Expand All @@ -148,7 +144,6 @@ func (m *manager) EnqueueStorageVersionUpdate(crd *apiextensionsv1.CustomResourc
crd: crd,
waitCh: waitCh,
processedCh: processedCh,
abortedCh: abortedCh,
}
}

Expand Down Expand Up @@ -180,17 +175,17 @@ func (m *manager) getOrCreateUpdateQueueLocked(uid types.UID) chan<- *update {
select {
case <-ctx.Done():
// The queue was cancelled. Abort the update.
if update.abortedCh != nil {
close(update.abortedCh)
if update.processedCh != nil {
close(update.processedCh)
}
continue
default:
}

// TODO(roycaihw): there are two types of updates:
// 1) the ones with nil processedCh and abortedCh, requested by
// 1) the ones with nil processedCh, requested by
// watch events handler
// 2) the ones with non-nil processedCh and abortedCh, requested
// 2) the ones with non-nil processedCh, requested
// by newly-created CRD storage
// An update of type 1) can be merged with a consecutive update,
// where the latter update's storage version is honored, and both
Expand All @@ -202,16 +197,7 @@ func (m *manager) getOrCreateUpdateQueueLocked(uid types.UID) chan<- *update {
utilruntime.HandleError(err)
}
if update.processedCh != nil {
select {
case <-ctx.Done():
// The queue was cancelled. Potentially we didn't finish this
// storage version update.
if update.abortedCh != nil {
close(update.abortedCh)
}
default:
close(update.processedCh)
}
close(update.processedCh)
}
}
}()
Expand Down

0 comments on commit 8661f73

Please sign in to comment.