From 3be3997193af824eba52c8d789d0a36cb08692be Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Thu, 18 May 2023 11:07:53 -0400 Subject: [PATCH 1/2] Fix waiting for CRD sync at server start --- cmd/kube-apiserver/app/aggregator.go | 9 ++++++--- cmd/kube-apiserver/app/server.go | 4 +++- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/cmd/kube-apiserver/app/aggregator.go b/cmd/kube-apiserver/app/aggregator.go index 397c151e66a4..cf2347bfccc7 100644 --- a/cmd/kube-apiserver/app/aggregator.go +++ b/cmd/kube-apiserver/app/aggregator.go @@ -27,7 +27,6 @@ import ( "k8s.io/klog/v2" - apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextensionsinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -117,7 +116,7 @@ func createAggregatorConfig( return aggregatorConfig, nil } -func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory) (*aggregatorapiserver.APIAggregator, error) { +func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory, crdAPIEnabled bool) (*aggregatorapiserver.APIAggregator, error) { aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer) if err != nil { return nil, err @@ -147,8 +146,12 @@ func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delega // let the CRD controller process the initial set of CRDs before starting the autoregistration controller. // this prevents the autoregistration controller's initial sync from deleting APIServices for CRDs that still exist. // we only need to do this if CRDs are enabled on this server. We can't use discovery because we are the source for discovery. - if aggregatorConfig.GenericConfig.MergedResourceConfig.ResourceEnabled(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions")) { + if crdAPIEnabled { + klog.Infof("waiting for initial CRD sync...") crdRegistrationController.WaitForInitialSync() + klog.Infof("initial CRD sync complete...") + } else { + klog.Infof("CRD API not enabled, starting APIService registration without waiting for initial CRD sync") } autoRegistrationController.Run(5, context.StopCh) }() diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 7f7a58fbd6c3..da420e82162d 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -34,6 +34,7 @@ import ( oteltrace "go.opentelemetry.io/otel/trace" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" extensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver" utilerrors "k8s.io/apimachinery/pkg/util/errors" utilnet "k8s.io/apimachinery/pkg/util/net" @@ -193,6 +194,7 @@ func CreateServerChain(completedOptions completedServerRunOptions) (*aggregatora if err != nil { return nil, err } + crdAPIEnabled := apiExtensionsConfig.GenericConfig.MergedResourceConfig.ResourceEnabled(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions")) notFoundHandler := notfoundhandler.New(kubeAPIServerConfig.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey) apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler)) @@ -210,7 +212,7 @@ func CreateServerChain(completedOptions completedServerRunOptions) (*aggregatora if err != nil { return nil, err } - aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers) + aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers, crdAPIEnabled) if err != nil { // we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines return nil, err From e4102d5e300cad3189c649962a04f862e18310bd Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Thu, 18 May 2023 11:09:03 -0400 Subject: [PATCH 2/2] Test APIService safe handling at startup --- cmd/kube-apiserver/app/testing/testserver.go | 59 ++++--- test/integration/examples/apiserver_test.go | 173 +++++++++++++++++++ 2 files changed, 205 insertions(+), 27 deletions(-) diff --git a/cmd/kube-apiserver/app/testing/testserver.go b/cmd/kube-apiserver/app/testing/testserver.go index 56b23788d13a..2fe17fb3e345 100644 --- a/cmd/kube-apiserver/app/testing/testserver.go +++ b/cmd/kube-apiserver/app/testing/testserver.go @@ -62,6 +62,9 @@ type TearDownFunc func() // TestServerInstanceOptions Instance options the TestServer type TestServerInstanceOptions struct { + // SkipHealthzCheck returns without waiting for the server to become healthy. + // Useful for testing server configurations expected to prevent /healthz from completing. + SkipHealthzCheck bool // Enable cert-auth for the kube-apiserver EnableCertAuth bool // Wrap the storage version interface of the created server's generic server. @@ -262,40 +265,42 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo } }(stopCh) - t.Logf("Waiting for /healthz to be ok...") - client, err := kubernetes.NewForConfig(server.GenericAPIServer.LoopbackClientConfig) if err != nil { return result, fmt.Errorf("failed to create a client: %v", err) } - // wait until healthz endpoint returns ok - err = wait.Poll(100*time.Millisecond, time.Minute, func() (bool, error) { - select { - case err := <-errCh: - return false, err - default: - } + if !instanceOptions.SkipHealthzCheck { + t.Logf("Waiting for /healthz to be ok...") - req := client.CoreV1().RESTClient().Get().AbsPath("/healthz") - // The storage version bootstrap test wraps the storage version post-start - // hook, so the hook won't become health when the server bootstraps - if instanceOptions.StorageVersionWrapFunc != nil { - // We hardcode the param instead of having a new instanceOptions field - // to avoid confusing users with more options. - storageVersionCheck := fmt.Sprintf("poststarthook/%s", apiserver.StorageVersionPostStartHookName) - req.Param("exclude", storageVersionCheck) - } - result := req.Do(context.TODO()) - status := 0 - result.StatusCode(&status) - if status == 200 { - return true, nil + // wait until healthz endpoint returns ok + err = wait.Poll(100*time.Millisecond, time.Minute, func() (bool, error) { + select { + case err := <-errCh: + return false, err + default: + } + + req := client.CoreV1().RESTClient().Get().AbsPath("/healthz") + // The storage version bootstrap test wraps the storage version post-start + // hook, so the hook won't become health when the server bootstraps + if instanceOptions.StorageVersionWrapFunc != nil { + // We hardcode the param instead of having a new instanceOptions field + // to avoid confusing users with more options. + storageVersionCheck := fmt.Sprintf("poststarthook/%s", apiserver.StorageVersionPostStartHookName) + req.Param("exclude", storageVersionCheck) + } + result := req.Do(context.TODO()) + status := 0 + result.StatusCode(&status) + if status == 200 { + return true, nil + } + return false, nil + }) + if err != nil { + return result, fmt.Errorf("failed to wait for /healthz to return ok: %v", err) } - return false, nil - }) - if err != nil { - return result, fmt.Errorf("failed to wait for /healthz to return ok: %v", err) } // wait until default namespace is created diff --git a/test/integration/examples/apiserver_test.go b/test/integration/examples/apiserver_test.go index 0ce4944f65eb..ac2ed7cfc165 100644 --- a/test/integration/examples/apiserver_test.go +++ b/test/integration/examples/apiserver_test.go @@ -35,6 +35,7 @@ import ( "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/server/dynamiccertificates" @@ -48,6 +49,7 @@ import ( aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/cmd/kube-apiserver/app" kastesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" + "k8s.io/kubernetes/test/integration" "k8s.io/kubernetes/test/integration/framework" wardlev1alpha1 "k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1" wardlev1beta1 "k8s.io/sample-apiserver/pkg/apis/wardle/v1beta1" @@ -56,6 +58,177 @@ import ( netutils "k8s.io/utils/net" ) +func TestAPIServiceWaitOnStart(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + t.Cleanup(cancel) + + stopCh := make(chan struct{}) + defer close(stopCh) + + etcdConfig := framework.SharedEtcd() + + etcd3Client, _, err := integration.GetEtcdClients(etcdConfig.Transport) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { etcd3Client.Close() }) + + // Pollute CRD path in etcd so CRD lists cannot succeed and the informer cannot sync + bogusCRDEtcdPath := path.Join("/", etcdConfig.Prefix, "apiextensions.k8s.io/customresourcedefinitions/bogus") + if _, err := etcd3Client.KV.Put(ctx, bogusCRDEtcdPath, `bogus data`); err != nil { + t.Fatal(err) + } + + // Populate a valid CRD and managed APIService in etcd + if _, err := etcd3Client.KV.Put( + ctx, + path.Join("/", etcdConfig.Prefix, "apiextensions.k8s.io/customresourcedefinitions/widgets.valid.example.com"), + `{ + "apiVersion":"apiextensions.k8s.io/v1beta1", + "kind":"CustomResourceDefinition", + "metadata":{ + "name":"widgets.valid.example.com", + "uid":"mycrd", + "creationTimestamp": "2022-06-08T23:46:32Z" + }, + "spec":{ + "scope": "Namespaced", + "group":"valid.example.com", + "version":"v1", + "names":{ + "kind": "Widget", + "listKind": "WidgetList", + "plural": "widgets", + "singular": "widget" + } + }, + "status": { + "acceptedNames": { + "kind": "Widget", + "listKind": "WidgetList", + "plural": "widgets", + "singular": "widget" + }, + "conditions": [ + { + "lastTransitionTime": "2023-05-18T15:03:57Z", + "message": "no conflicts found", + "reason": "NoConflicts", + "status": "True", + "type": "NamesAccepted" + }, + { + "lastTransitionTime": "2023-05-18T15:03:57Z", + "message": "the initial names have been accepted", + "reason": "InitialNamesAccepted", + "status": "True", + "type": "Established" + } + ], + "storedVersions": [ + "v1" + ] + } + }`); err != nil { + t.Fatal(err) + } + if _, err := etcd3Client.KV.Put( + ctx, + path.Join("/", etcdConfig.Prefix, "apiregistration.k8s.io/apiservices/v1.valid.example.com"), + `{ + "apiVersion":"apiregistration.k8s.io/v1", + "kind":"APIService", + "metadata": { + "name": "v1.valid.example.com", + "uid":"foo", + "creationTimestamp": "2022-06-08T23:46:32Z", + "labels":{"kube-aggregator.kubernetes.io/automanaged":"true"} + }, + "spec": { + "group": "valid.example.com", + "version": "v1", + "groupPriorityMinimum":100, + "versionPriority":10 + } + }`, + ); err != nil { + t.Fatal(err) + } + + // Populate a stale managed APIService in etcd + if _, err := etcd3Client.KV.Put( + ctx, + path.Join("/", etcdConfig.Prefix, "apiregistration.k8s.io/apiservices/v1.stale.example.com"), + `{ + "apiVersion":"apiregistration.k8s.io/v1", + "kind":"APIService", + "metadata": { + "name": "v1.stale.example.com", + "uid":"foo", + "creationTimestamp": "2022-06-08T23:46:32Z", + "labels":{"kube-aggregator.kubernetes.io/automanaged":"true"} + }, + "spec": { + "group": "stale.example.com", + "version": "v1", + "groupPriorityMinimum":100, + "versionPriority":10 + } + }`, + ); err != nil { + t.Fatal(err) + } + + // Start server + options := kastesting.NewDefaultTestServerOptions() + options.SkipHealthzCheck = true + testServer := kastesting.StartTestServerOrDie(t, options, nil, etcdConfig) + defer testServer.TearDownFn() + + kubeClientConfig := rest.CopyConfig(testServer.ClientConfig) + aggregatorClient := aggregatorclient.NewForConfigOrDie(kubeClientConfig) + + // ensure both APIService objects remain + for i := 0; i < 10; i++ { + if _, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1.valid.example.com", metav1.GetOptions{}); err != nil { + t.Fatal(err) + } + if _, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1.stale.example.com", metav1.GetOptions{}); err != nil { + t.Fatal(err) + } + time.Sleep(time.Second) + } + + // Clear the bogus CRD data so the informer can sync + if _, err := etcd3Client.KV.Delete(ctx, bogusCRDEtcdPath); err != nil { + t.Fatal(err) + } + t.Log("cleaned up bogus CRD data") + + // ensure the stale APIService object is cleaned up + if err := wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) { + _, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1.stale.example.com", metav1.GetOptions{}) + if err == nil { + t.Log("stale APIService still exists, waiting...") + return false, nil + } + if !apierrors.IsNotFound(err) { + return false, err + } + return true, nil + }); err != nil { + t.Fatal(err) + } + + // ensure the valid APIService object remains + for i := 0; i < 5; i++ { + time.Sleep(time.Second) + if _, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1.valid.example.com", metav1.GetOptions{}); err != nil { + t.Fatal(err) + } + } +} + func TestAggregatedAPIServer(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) t.Cleanup(cancel)