Skip to content

Commit

Permalink
Merge pull request #105653 from p0lyn0mial/crd-503-refactor
Browse files Browse the repository at this point in the history
apiextentionserver: refactor returning 503 for custom resource requests during server start
  • Loading branch information
k8s-ci-robot committed Nov 2, 2021
2 parents 2a821d7 + 86d8458 commit 07d3a92
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,13 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
return nil, err
}

// hasCRDInformerSyncedSignal is closed when the CRD informer this server uses has been fully synchronized.
// It ensures that requests to potential custom resource endpoints while the server hasn't installed all known HTTP paths get a 503 error instead of a 404
hasCRDInformerSyncedSignal := make(chan struct{})
if err := genericServer.RegisterMuxAndDiscoveryCompleteSignal("CRDInformerHasNotSynced", hasCRDInformerSyncedSignal); err != nil {
return nil, err
}

s := &CustomResourceDefinitions{
GenericAPIServer: genericServer,
}
Expand Down Expand Up @@ -245,7 +252,11 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
// but we won't go healthy until we can handle the ones already present.
s.GenericAPIServer.AddPostStartHookOrDie("crd-informer-synced", func(context genericapiserver.PostStartHookContext) error {
return wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
return s.Informers.Apiextensions().V1().CustomResourceDefinitions().Informer().HasSynced(), nil
if s.Informers.Apiextensions().V1().CustomResourceDefinitions().Informer().HasSynced() {
close(hasCRDInformerSyncedSignal)
return true, nil
}
return false, nil
}, context.StopCh)
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ type crdHandler struct {
customStorage atomic.Value

crdLister listers.CustomResourceDefinitionLister
hasSynced func() bool

delegate http.Handler
restOptionsGetter generic.RESTOptionsGetter
Expand Down Expand Up @@ -192,7 +191,6 @@ func NewCustomResourceDefinitionHandler(
groupDiscoveryHandler: groupDiscoveryHandler,
customStorage: atomic.Value{},
crdLister: crdInformer.Lister(),
hasSynced: crdInformer.Informer().HasSynced,
delegate: delegate,
restOptionsGetter: restOptionsGetter,
admission: admission,
Expand Down Expand Up @@ -246,19 +244,11 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// only match /apis/<group>/<version>
// only registered under /apis
if len(pathParts) == 3 {
if !r.hasSynced() {
responsewriters.ErrorNegotiated(serverStartingError(), Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req)
return
}
r.versionDiscoveryHandler.ServeHTTP(w, req)
return
}
// only match /apis/<group>
if len(pathParts) == 2 {
if !r.hasSynced() {
responsewriters.ErrorNegotiated(serverStartingError(), Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req)
return
}
r.groupDiscoveryHandler.ServeHTTP(w, req)
return
}
Expand All @@ -270,11 +260,6 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
crdName := requestInfo.Resource + "." + requestInfo.APIGroup
crd, err := r.crdLister.Get(crdName)
if apierrors.IsNotFound(err) {
if !r.hasSynced() {
responsewriters.ErrorNegotiated(serverStartingError(), Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req)
return
}

r.delegate.ServeHTTP(w, req)
return
}
Expand Down Expand Up @@ -1362,18 +1347,6 @@ func hasServedCRDVersion(spec *apiextensionsv1.CustomResourceDefinitionSpec, ver
return false
}

// serverStartingError returns a ServiceUnavailble error with a retry-after time
func serverStartingError() error {
err := apierrors.NewServiceUnavailable("server is starting")
if err.ErrStatus.Details == nil {
err.ErrStatus.Details = &metav1.StatusDetails{}
}
if err.ErrStatus.Details.RetryAfterSeconds == 0 {
err.ErrStatus.Details.RetryAfterSeconds = int32(10)
}
return err
}

// buildOpenAPIModelsForApply constructs openapi models from any validation schemas specified in the custom resource,
// and merges it with the models defined in the static OpenAPI spec.
// Returns nil models if the ServerSideApply feature is disabled, or the static spec is nil, or an error is encountered.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,25 @@ func TestRouting(t *testing.T) {
crdIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
crdLister := listers.NewCustomResourceDefinitionLister(crdIndexer)

// note that in production we delegate to the special handler that is attached at the end of the delegation chain that checks if the server has installed all known HTTP paths before replying to the client.
// it returns 503 if not all registered signals have been ready (closed) otherwise it simply replies with 404.
// the apiextentionserver is considered to be initialized once hasCRDInformerSyncedSignal is closed.
//
// here, in this test the delegate represent the special handler and hasSync represents the signal.
// primarily we just want to make sure that the delegate has been called.
// the behaviour of the real delegate is tested elsewhere.
delegateCalled := false
delegate := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
delegateCalled = true
if !hasSynced {
http.Error(w, "", 503)
return
}
http.Error(w, "", 418)
})
customV1 := schema.GroupVersion{Group: "custom", Version: "v1"}
handler := &crdHandler{
crdLister: crdLister,
hasSynced: func() bool { return hasSynced },
delegate: delegate,
versionDiscoveryHandler: &versionDiscoveryHandler{
discovery: map[schema.GroupVersion]*discovery.APIVersionHandler{
Expand Down Expand Up @@ -209,7 +219,7 @@ func TestRouting(t *testing.T) {
HasSynced: false,
IsResourceRequest: false,
ExpectDelegateCalled: false,
ExpectStatus: 503,
ExpectStatus: 200,
},
{
Name: "existing group discovery",
Expand All @@ -231,7 +241,7 @@ func TestRouting(t *testing.T) {
APIVersion: "",
HasSynced: false,
IsResourceRequest: false,
ExpectDelegateCalled: false,
ExpectDelegateCalled: true,
ExpectStatus: 503,
},
{
Expand All @@ -255,7 +265,7 @@ func TestRouting(t *testing.T) {
HasSynced: false,
IsResourceRequest: false,
ExpectDelegateCalled: false,
ExpectStatus: 503,
ExpectStatus: 200,
},
{
Name: "existing group version discovery",
Expand All @@ -277,7 +287,7 @@ func TestRouting(t *testing.T) {
APIVersion: "v1",
HasSynced: false,
IsResourceRequest: false,
ExpectDelegateCalled: false,
ExpectDelegateCalled: true,
ExpectStatus: 503,
},
{
Expand All @@ -300,7 +310,7 @@ func TestRouting(t *testing.T) {
APIVersion: "v2",
HasSynced: false,
IsResourceRequest: false,
ExpectDelegateCalled: false,
ExpectDelegateCalled: true,
ExpectStatus: 503,
},
{
Expand All @@ -325,7 +335,7 @@ func TestRouting(t *testing.T) {
Resource: "foos",
HasSynced: false,
IsResourceRequest: true,
ExpectDelegateCalled: false,
ExpectDelegateCalled: true,
ExpectStatus: 503,
},
{
Expand Down

0 comments on commit 07d3a92

Please sign in to comment.