Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

apiextensions apiserver: update storage version for custom resources #96403

Closed
wants to merge 10 commits into from
1 change: 1 addition & 0 deletions staging/src/k8s.io/apiextensions-apiserver/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ filegroup(
"//staging/src/k8s.io/apiextensions-apiserver/pkg/generated/openapi:all-srcs",
"//staging/src/k8s.io/apiextensions-apiserver/pkg/registry/customresource:all-srcs",
"//staging/src/k8s.io/apiextensions-apiserver/pkg/registry/customresourcedefinition:all-srcs",
"//staging/src/k8s.io/apiextensions-apiserver/pkg/storageversion:all-srcs",
"//staging/src/k8s.io/apiextensions-apiserver/test/integration:all-srcs",
],
tags = ["automanaged"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ go_library(
"//staging/src/k8s.io/apiextensions-apiserver/pkg/registry/customresource:go_default_library",
"//staging/src/k8s.io/apiextensions-apiserver/pkg/registry/customresource/tableconvertor:go_default_library",
"//staging/src/k8s.io/apiextensions-apiserver/pkg/registry/customresourcedefinition:go_default_library",
"//staging/src/k8s.io/apiextensions-apiserver/pkg/storageversion:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
Expand Down Expand Up @@ -84,6 +85,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/util/openapi:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/webhook:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/warning:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/scale:go_default_library",
"//staging/src/k8s.io/client-go/scale/scheme/autoscalingv1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,22 @@ import (
openapicontroller "k8s.io/apiextensions-apiserver/pkg/controller/openapi"
"k8s.io/apiextensions-apiserver/pkg/controller/status"
"k8s.io/apiextensions-apiserver/pkg/registry/customresourcedefinition"
"k8s.io/apiextensions-apiserver/pkg/storageversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/version"
"k8s.io/apiserver/pkg/endpoints/discovery"
genericfeatures "k8s.io/apiserver/pkg/features"
genericregistry "k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/rest"
genericapiserver "k8s.io/apiserver/pkg/server"
serverstorage "k8s.io/apiserver/pkg/server/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/webhook"
"k8s.io/client-go/kubernetes"
)

var (
Expand Down Expand Up @@ -188,6 +192,16 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
delegate: delegateHandler,
}
establishingController := establish.NewEstablishingController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
var storageVersionManager storageversion.Manager
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) &&
sttts marked this conversation as resolved.
Show resolved Hide resolved
utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) {
kubeclientset, err := kubernetes.NewForConfig(s.GenericAPIServer.LoopbackClientConfig)
if err != nil {
return nil, fmt.Errorf("failed to create clientset for storage versions: %v", err)
}
sc := kubeclientset.InternalV1alpha1().StorageVersions()
storageVersionManager = storageversion.NewManager(sc, c.GenericConfig.APIServerID)
}
crdHandler, err := NewCustomResourceDefinitionHandler(
versionDiscoveryHandler,
groupDiscoveryHandler,
Expand All @@ -204,6 +218,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
time.Duration(c.GenericConfig.MinRequestTimeout)*time.Second,
apiGroupInfo.StaticOpenAPISpec,
c.GenericConfig.MaxRequestBodyBytes,
storageVersionManager,
)
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"k8s.io/apiextensions-apiserver/pkg/crdserverscheme"
"k8s.io/apiextensions-apiserver/pkg/registry/customresource"
"k8s.io/apiextensions-apiserver/pkg/registry/customresource/tableconvertor"
"k8s.io/apiextensions-apiserver/pkg/storageversion"

apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -89,6 +90,17 @@ import (
"k8s.io/kube-openapi/pkg/validation/validate"
)

const (
// If the StorageVersionAPI feature gate is enabled, after a CRD storage
// (a.k.a. serving info) is created, the API server will block CR write
// requests that hit this storage until the corresponding storage
// version update gets processed/aborted by the storage version manager.
// The API server will unblock CR write requests if the storage version
// update takes longer than storageVersionUpdateTimeout after the
// storage is created.
storageVersionUpdateTimeout = 15 * time.Second
)

// crdHandler serves the `/apis` endpoint.
// This is registered as a filter so that it never collides with any explicitly registered endpoints
type crdHandler struct {
Expand Down Expand Up @@ -134,6 +146,13 @@ type crdHandler struct {
// The limit on the request size that would be accepted and decoded in a write request
// 0 means no limit.
maxRequestBodyBytes int64

// storageVersionManager manages CRD StorageVersion updates.
// NOTE: since we want StorageVersion updates happen in the same order
// as the CRD spec updates / underlying storage changes, one must hold
// customStorageLock when calling methods in storageVersionManager, to
// make sure the updates get queued up in the right order.
storageVersionManager storageversion.Manager
}

// crdInfo stores enough information to serve the storage for the custom resource
Expand Down Expand Up @@ -165,6 +184,33 @@ type crdInfo struct {
storageVersion string

waitGroup *utilwaitgroup.SafeWaitGroup

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

storageVersionUpdate struct {
   ...
}

makes code a little bit prettier everywhere.

// storageVersionUpdate holds information about the storage version
// update issued when the crdInfo was created. The API server uses the
// information to decide whether a CR write request should be allowed,
// rejected, or blocked.
storageVersionUpdate *storageVersionUpdate
}

// storageVersionUpdate holds information about a storage version update,
// indicating whether the update gets processed, aborted, or timed-out.
type storageVersionUpdate struct {
// processedCh is closed by the storage version manager after the
// storage version update gets processed (either succeeded or failed).
// The API server will unblock and allow CR write requests if this
// channel is closed.
processedCh <-chan struct{}
// abortedCh is closed by the storage version manager if the storage
// version update gets aborted. The API server will unblock and reject
// CR write requests if this channel is closed.
abortedCh <-chan struct{}
// timeout is the time when the API server will unblock and allow CR
// write requests even if the storage version update hasn't been
// processed nor aborted.
timeout time.Time
// timeoutLogOnce allows the API server to log once when the timeout is
// hit.
timeoutLogOnce sync.Once
}

// crdStorageMap goes from customresourcedefinition to its storage
Expand All @@ -185,7 +231,8 @@ func NewCustomResourceDefinitionHandler(
requestTimeout time.Duration,
minRequestTimeout time.Duration,
staticOpenAPISpec *goopenapispec.Swagger,
maxRequestBodyBytes int64) (*crdHandler, error) {
maxRequestBodyBytes int64,
storageVersionManager storageversion.Manager) (*crdHandler, error) {
ret := &crdHandler{
versionDiscoveryHandler: versionDiscoveryHandler,
groupDiscoveryHandler: groupDiscoveryHandler,
Expand All @@ -202,6 +249,7 @@ func NewCustomResourceDefinitionHandler(
minRequestTimeout: minRequestTimeout,
staticOpenAPISpec: staticOpenAPISpec,
maxRequestBodyBytes: maxRequestBodyBytes,
storageVersionManager: storageVersionManager,
}
crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ret.createCustomResourceDefinition,
Expand Down Expand Up @@ -315,7 +363,30 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {

terminating := apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Terminating)

crdInfo, err := r.getOrCreateServingInfoFor(crd.UID, crd.Name)
var crdInfo *crdInfo
if r.storageVersionManager != nil &&
(requestInfo.Verb == "create" ||
requestInfo.Verb == "update" ||
requestInfo.Verb == "patch") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • this checking here seems fragile... should we not intercept in the actual serveResource function?
  • delete and deletecollection can also write
  • the featuregates seem like they could be checked once at initialization time and compacted into a local boolean checked everywhere in this file instead of the double feature gate check

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete and deletecollection can also write

the goal of blocking writes before StorageVersion update is to make sure we don't accidentally persist data in some version that the storage migrator is unaware of. Deleted CRs are not persisted, so I think it's okay to unblock these requests.

the featuregates seem like they could be checked once

done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleted CRs are not persisted, so I think it's okay to unblock these requests.

They are if they have finalizers on them (a delete request populates metadata.deletionTimestamp and re-persists)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a delete request populates metadata.deletionTimestamp and re-persists

Ack. I will double check and think about the deadlock

var aborted bool
crdInfo, aborted, err = r.getOrCreateServingInfoForWrite(crd.UID, crd.Name)
// The server aborted storage version update due to the CRD being deleted.
// The CR request is forbidden.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Requests to non-existent resources return 403

Are we sure the only case for abort==true is CRD deletion?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(other than the new "should not happen" panic handling) yes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • 403 doesn't seem right
  • if the CRD is being deleted, but is pending cleanup of CRs, won't erroring here deadlock?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

won't erroring here deadlock?

we don't block delete requests, and I'm not sure if the storage DestroyFunc goes through this code path.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we won't deadlock because the CRD finalizer doesn't go through this code path to cleanup CRs

if aborted {
err = apierrors.NewForbidden(schema.GroupResource{
Group: requestInfo.APIGroup,
Resource: requestInfo.Resource,
}, requestInfo.Name, fmt.Errorf("storage version update aborted due to CRD deletion"))
utilruntime.HandleError(err)
responsewriters.ErrorNegotiated(
err, Codecs, schema.GroupVersion{Group: requestInfo.APIGroup,
Version: requestInfo.APIVersion}, w, req,
)
return
}
} else {
crdInfo, err = r.getOrCreateServingInfoFor(crd.UID, crd.Name)
}
if apierrors.IsNotFound(err) {
r.delegate.ServeHTTP(w, req)
return
Expand Down Expand Up @@ -470,15 +541,24 @@ func (r *crdHandler) createCustomResourceDefinition(obj interface{}) {
// this could happen if the create event is merged from create-update events
storageMap := r.customStorage.Load().(crdStorageMap)
oldInfo, found := storageMap[crd.UID]
if !found {
return
}
if apiequality.Semantic.DeepEqual(&crd.Spec, oldInfo.spec) && apiequality.Semantic.DeepEqual(&crd.Status.AcceptedNames, oldInfo.acceptedNames) {
// If an old storage with the same spec and accepted names exists, skip
// tearing down the storage; also skip updating storage version because
// the same update has been done when the old storage was created.
if found && apiequality.Semantic.DeepEqual(&crd.Spec, oldInfo.spec) &&
apiequality.Semantic.DeepEqual(&crd.Status.AcceptedNames, oldInfo.acceptedNames) {
klog.V(6).Infof("Ignoring customresourcedefinition %s create event because a storage with the same spec and accepted names exists",
crd.Name)
return
}
r.removeStorage_locked(crd.UID)
// Tear down the old storage
var tearDownFinishedCh <-chan struct{}
if found {
tearDownFinishedCh = r.removeStorage_locked(crd.UID)
}
// Update storage version with the latest info in the watch event
if r.storageVersionManager != nil {
r.storageVersionManager.EnqueueStorageVersionUpdate(crd, tearDownFinishedCh, nil, nil)
}
}

// updateCustomResourceDefinition removes potentially stale storage so it gets re-created
Expand All @@ -504,26 +584,38 @@ func (r *crdHandler) updateCustomResourceDefinition(oldObj, newObj interface{})

if oldCRD.UID != newCRD.UID {
r.removeStorage_locked(oldCRD.UID)
if r.storageVersionManager != nil {
r.storageVersionManager.TeardownFor(oldCRD.UID)
}
}

storageMap := r.customStorage.Load().(crdStorageMap)
oldInfo, found := storageMap[newCRD.UID]
if !found {
return
}
if apiequality.Semantic.DeepEqual(&newCRD.Spec, oldInfo.spec) && apiequality.Semantic.DeepEqual(&newCRD.Status.AcceptedNames, oldInfo.acceptedNames) {
// If an old storage with the same spec and accepted names exists, skip
// tearing down the storage; also skip updating storage version because
// the same update has been done when the old storage was created.
if found && apiequality.Semantic.DeepEqual(&newCRD.Spec, oldInfo.spec) &&
apiequality.Semantic.DeepEqual(&newCRD.Status.AcceptedNames, oldInfo.acceptedNames) {
klog.V(6).Infof("Ignoring customresourcedefinition %s update because neither spec, nor accepted names changed", oldCRD.Name)
return
}

klog.V(4).Infof("Updating customresourcedefinition %s", newCRD.Name)
r.removeStorage_locked(newCRD.UID)
// Tear down the old storage
var tearDownFinishedCh <-chan struct{}
if found {
klog.V(4).Infof("Updating customresourcedefinition %s", newCRD.Name)
tearDownFinishedCh = r.removeStorage_locked(newCRD.UID)
}
// Update storage version with the latest info in the watch event
if r.storageVersionManager != nil {
r.storageVersionManager.EnqueueStorageVersionUpdate(newCRD, tearDownFinishedCh, nil, nil)
}
}

// removeStorage_locked removes the cached storage with the given uid as key from the storage map. This function
// updates r.customStorage with the cleaned-up storageMap and tears down the old storage.
// NOTE: Caller MUST hold r.customStorageLock to write r.customStorage thread-safely.
func (r *crdHandler) removeStorage_locked(uid types.UID) {
func (r *crdHandler) removeStorage_locked(uid types.UID) <-chan struct{} {
tearDownFinishedCh := make(chan struct{})
storageMap := r.customStorage.Load().(crdStorageMap)
if oldInfo, ok := storageMap[uid]; ok {
// Copy because we cannot write to storageMap without a race
Expand All @@ -535,8 +627,11 @@ func (r *crdHandler) removeStorage_locked(uid types.UID) {
r.customStorage.Store(storageMap2)

// Tear down the old storage
go r.tearDown(oldInfo)
go r.tearDown(oldInfo, tearDownFinishedCh)
} else {
close(tearDownFinishedCh)
}
return tearDownFinishedCh
}

// removeDeadStorage removes REST storage that isn't being used
Expand All @@ -563,13 +658,19 @@ func (r *crdHandler) removeDeadStorage() {
for uid, crdInfo := range storageMap {
if _, ok := storageMap2[uid]; !ok {
klog.V(4).Infof("Removing dead CRD storage for %s/%s", crdInfo.spec.Group, crdInfo.spec.Names.Kind)
go r.tearDown(crdInfo)
go r.tearDown(crdInfo, nil)
if r.storageVersionManager != nil {
r.storageVersionManager.TeardownFor(uid)
}
}
}
}

// Wait up to a minute for requests to drain, then tear down storage
func (r *crdHandler) tearDown(oldInfo *crdInfo) {
func (r *crdHandler) tearDown(oldInfo *crdInfo, finishedCh chan<- struct{}) {
if finishedCh != nil {
defer close(finishedCh)
}
requestsDrained := make(chan struct{})
go func() {
defer close(requestsDrained)
Expand Down Expand Up @@ -601,6 +702,52 @@ func (r *crdHandler) GetCustomResourceListerCollectionDeleter(crd *apiextensions
return info.storages[info.storageVersion].CustomResource, nil
}

// getOrCreateServingInfoForWrite is getOrCreateServingInfoFor, but blocks CR
// write requests until the storage version update is processed, aborted, or
// timed-out.
// If the storage version update is processed, unblock and allow the write request.
// If the storage version update is aborted, unblock and reject the write request.
// If the storage version update timed-out, unblock and allow the write request.
//
// If aborted is true, the returned error (403) should be surfaced to the client,
// indicating that the server forbids the CR request due to the CRD deletion.
// Otherwise, any non-nil error returned by getOrCreateServingInfoFor may be
// wrapped by the server as an internal error (500) before surfacing to the client.
func (r *crdHandler) getOrCreateServingInfoForWrite(uid types.UID, name string) (ret *crdInfo, aborted bool, err error) {
ret, err = r.getOrCreateServingInfoFor(uid, name)

// Surface non-nil error early.
if err != nil ||
// Return early if the StorageVersionAPI feature gate is disabled.
r.storageVersionManager == nil ||
ret.storageVersionUpdate == nil ||
ret.storageVersionUpdate.processedCh == nil {
return ret, false, err
}

// NOTE: currently the graceful CRD deletion waits 1s for in-flight requests
// to register themselves to the wait group. Ideally the storage version update should
// not cause the requests to miss the 1s window; otherwise the requests may
// fail ungracefully (e.g. it may happen if the CRD was deleted immediately after the
// first CR request establishes the underlying storage).
select {
case <-ret.storageVersionUpdate.processedCh:
case <-ret.storageVersionUpdate.abortedCh:
// Storage version update aborted. The caller knows which error
// to return to the client (403).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • how does a server/user get out of this error state?
  • 403 does not seem like the correct error code... would "unavailable" or something similar be better?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does a server/user get out of this error state

the error state is transient and only for in-flight requests. A CR client can retry:

  • if the CRD is gone forever, the client will get a 404.
  • if the CRD gets re-created, things continue to work

would "unavailable" or something similar be better

had 503 originally (ref #96403 (comment)). I'm okay with 503. If we assume a CR client and a CRD client are the same user, then 403 (let the user fix the request/situation) also makes sense

return nil, true, nil
// Unblock the requests if the storage version update takes a long time, otherwise
// CR requests may stack up and overwhelm the API server.
// TODO(roycaihw): benchmark the storage version update latency to adjust the timeout.
case <-time.After(ret.storageVersionUpdate.timeout.Sub(time.Now())):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this timeout we risk an inconsistent storage version state, right? Am still unsure whether is worse: bad storage version object or failing CRD writes.

@liggitt would like to hear your opinion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm finding the handling of a CR really hard to reason about in this PR, especially with a long (O(seconds)) delay between constructing the serving info at the top of this method and returning to unblock serving

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bad storage version object or failing CRD writes

@sttts I considered failing CRD writes a regression, but I'm okay with that direction if we think it's necessary. Let me add some change in that direction

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we cannot write the StorageVersion object, we have big problems anyway. Do we care about CRD writes in this case?

ret.storageVersionUpdate.timeoutLogOnce.Do(func() {
klog.V(4).Infof("timeout waiting for CRD storage version update, group: %v, resource: %v; unblocking write requests",
ret.spec.Group, ret.spec.Names.Plural)
})
}
return ret, false, nil
}

// getOrCreateServingInfoFor gets the CRD serving info for the given CRD UID if the key exists in the storage map.
// Otherwise the function fetches the up-to-date CRD using the given CRD name and creates CRD serving info.
func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crdInfo, error) {
Expand Down Expand Up @@ -942,6 +1089,17 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
storageVersion: storageVersion,
waitGroup: &utilwaitgroup.SafeWaitGroup{},
}
if r.storageVersionManager != nil {
// spawn storage version update in background and use channels to make handlers wait
processedCh := make(chan struct{})
abortedCh := make(chan struct{})
r.storageVersionManager.EnqueueStorageVersionUpdate(crd, nil, processedCh, abortedCh)
ret.storageVersionUpdate = &storageVersionUpdate{
processedCh: processedCh,
abortedCh: abortedCh,
timeout: time.Now().Add(storageVersionUpdateTimeout),
}
}

// Copy because we cannot write to storageMap without a race
// as it is used without locking elsewhere.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func testHandlerConversion(t *testing.T, enableWatchCache bool) {
func(r webhook.AuthenticationInfoResolver) webhook.AuthenticationInfoResolver { return r },
1,
dummyAuthorizerImpl{},
time.Minute, time.Minute, nil, 3*1024*1024)
time.Minute, time.Minute, nil, 3*1024*1024, nil)
if err != nil {
t.Fatal(err)
}
Expand Down