Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #118104: Fix waiting for CRD sync at server start #118113

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 6 additions & 3 deletions cmd/kube-apiserver/app/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

"k8s.io/klog/v2"

apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -125,7 +124,7 @@ func createAggregatorConfig(
return aggregatorConfig, nil
}

func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory) (*aggregatorapiserver.APIAggregator, error) {
func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory, crdAPIEnabled bool) (*aggregatorapiserver.APIAggregator, error) {
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer)
if err != nil {
return nil, err
Expand All @@ -148,8 +147,12 @@ func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delega
// let the CRD controller process the initial set of CRDs before starting the autoregistration controller.
// this prevents the autoregistration controller's initial sync from deleting APIServices for CRDs that still exist.
// we only need to do this if CRDs are enabled on this server. We can't use discovery because we are the source for discovery.
if aggregatorConfig.GenericConfig.MergedResourceConfig.ResourceEnabled(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions")) {
if crdAPIEnabled {
klog.Infof("waiting for initial CRD sync...")
crdRegistrationController.WaitForInitialSync()
klog.Infof("initial CRD sync complete...")
} else {
klog.Infof("CRD API not enabled, starting APIService registration without waiting for initial CRD sync")
}
autoRegistrationController.Run(5, context.StopCh)
}()
Expand Down
4 changes: 3 additions & 1 deletion cmd/kube-apiserver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

oteltrace "go.opentelemetry.io/otel/trace"

apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
extensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilnet "k8s.io/apimachinery/pkg/util/net"
Expand Down Expand Up @@ -188,6 +189,7 @@ func CreateServerChain(completedOptions completedServerRunOptions) (*aggregatora
if err != nil {
return nil, err
}
crdAPIEnabled := apiExtensionsConfig.GenericConfig.MergedResourceConfig.ResourceEnabled(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"))

notFoundHandler := notfoundhandler.New(kubeAPIServerConfig.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
Expand All @@ -205,7 +207,7 @@ func CreateServerChain(completedOptions completedServerRunOptions) (*aggregatora
if err != nil {
return nil, err
}
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers, crdAPIEnabled)
if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
return nil, err
Expand Down
91 changes: 48 additions & 43 deletions cmd/kube-apiserver/app/testing/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ type TearDownFunc func()

// TestServerInstanceOptions Instance options the TestServer
type TestServerInstanceOptions struct {
// SkipHealthzCheck returns without waiting for the server to become healthy.
// Useful for testing server configurations expected to prevent /healthz from completing.
SkipHealthzCheck bool
// Enable cert-auth for the kube-apiserver
EnableCertAuth bool
// Wrap the storage version interface of the created server's generic server.
Expand Down Expand Up @@ -231,60 +234,62 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
}
}(stopCh)

t.Logf("Waiting for /healthz to be ok...")

client, err := kubernetes.NewForConfig(server.GenericAPIServer.LoopbackClientConfig)
if err != nil {
return result, fmt.Errorf("failed to create a client: %v", err)
}

// wait until healthz endpoint returns ok
err = wait.Poll(100*time.Millisecond, time.Minute, func() (bool, error) {
select {
case err := <-errCh:
return false, err
default:
}
if !instanceOptions.SkipHealthzCheck {
t.Logf("Waiting for /healthz to be ok...")

req := client.CoreV1().RESTClient().Get().AbsPath("/healthz")
// The storage version bootstrap test wraps the storage version post-start
// hook, so the hook won't become health when the server bootstraps
if instanceOptions.StorageVersionWrapFunc != nil {
// We hardcode the param instead of having a new instanceOptions field
// to avoid confusing users with more options.
storageVersionCheck := fmt.Sprintf("poststarthook/%s", apiserver.StorageVersionPostStartHookName)
req.Param("exclude", storageVersionCheck)
}
result := req.Do(context.TODO())
status := 0
result.StatusCode(&status)
if status == 200 {
return true, nil
}
return false, nil
})
if err != nil {
return result, fmt.Errorf("failed to wait for /healthz to return ok: %v", err)
}
// wait until healthz endpoint returns ok
err = wait.Poll(100*time.Millisecond, time.Minute, func() (bool, error) {
select {
case err := <-errCh:
return false, err
default:
}

// wait until default namespace is created
err = wait.Poll(100*time.Millisecond, 30*time.Second, func() (bool, error) {
select {
case err := <-errCh:
return false, err
default:
req := client.CoreV1().RESTClient().Get().AbsPath("/healthz")
// The storage version bootstrap test wraps the storage version post-start
// hook, so the hook won't become health when the server bootstraps
if instanceOptions.StorageVersionWrapFunc != nil {
// We hardcode the param instead of having a new instanceOptions field
// to avoid confusing users with more options.
storageVersionCheck := fmt.Sprintf("poststarthook/%s", apiserver.StorageVersionPostStartHookName)
req.Param("exclude", storageVersionCheck)
}
result := req.Do(context.TODO())
status := 0
result.StatusCode(&status)
if status == 200 {
return true, nil
}
return false, nil
})
if err != nil {
return result, fmt.Errorf("failed to wait for /healthz to return ok: %v", err)
}

if _, err := client.CoreV1().Namespaces().Get(context.TODO(), "default", metav1.GetOptions{}); err != nil {
if !errors.IsNotFound(err) {
t.Logf("Unable to get default namespace: %v", err)
// wait until default namespace is created
err = wait.Poll(100*time.Millisecond, 30*time.Second, func() (bool, error) {
select {
case err := <-errCh:
return false, err
default:
}
return false, nil

if _, err := client.CoreV1().Namespaces().Get(context.TODO(), "default", metav1.GetOptions{}); err != nil {
if !errors.IsNotFound(err) {
t.Logf("Unable to get default namespace: %v", err)
}
return false, nil
}
return true, nil
})
if err != nil {
return result, fmt.Errorf("failed to wait for default namespace to be created: %v", err)
}
return true, nil
})
if err != nil {
return result, fmt.Errorf("failed to wait for default namespace to be created: %v", err)
}

tlsInfo := transport.TLSInfo{
Expand Down
172 changes: 172 additions & 0 deletions test/integration/examples/apiserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,185 @@ import (
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
kastesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/test/integration"
"k8s.io/kubernetes/test/integration/framework"
wardlev1alpha1 "k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1"
wardlev1beta1 "k8s.io/sample-apiserver/pkg/apis/wardle/v1beta1"
sampleserver "k8s.io/sample-apiserver/pkg/cmd/server"
netutils "k8s.io/utils/net"
)

func TestAPIServiceWaitOnStart(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
t.Cleanup(cancel)

stopCh := make(chan struct{})
defer close(stopCh)

etcdConfig := framework.SharedEtcd()

etcd3Client, _, err := integration.GetEtcdClients(etcdConfig.Transport)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { etcd3Client.Close() })

// Pollute CRD path in etcd so CRD lists cannot succeed and the informer cannot sync
bogusCRDEtcdPath := path.Join("/", etcdConfig.Prefix, "apiextensions.k8s.io/customresourcedefinitions/bogus")
if _, err := etcd3Client.KV.Put(ctx, bogusCRDEtcdPath, `bogus data`); err != nil {
t.Fatal(err)
}

// Populate a valid CRD and managed APIService in etcd
if _, err := etcd3Client.KV.Put(
ctx,
path.Join("/", etcdConfig.Prefix, "apiextensions.k8s.io/customresourcedefinitions/widgets.valid.example.com"),
`{
"apiVersion":"apiextensions.k8s.io/v1beta1",
"kind":"CustomResourceDefinition",
"metadata":{
"name":"widgets.valid.example.com",
"uid":"mycrd",
"creationTimestamp": "2022-06-08T23:46:32Z"
},
"spec":{
"scope": "Namespaced",
"group":"valid.example.com",
"version":"v1",
"names":{
"kind": "Widget",
"listKind": "WidgetList",
"plural": "widgets",
"singular": "widget"
}
},
"status": {
"acceptedNames": {
"kind": "Widget",
"listKind": "WidgetList",
"plural": "widgets",
"singular": "widget"
},
"conditions": [
{
"lastTransitionTime": "2023-05-18T15:03:57Z",
"message": "no conflicts found",
"reason": "NoConflicts",
"status": "True",
"type": "NamesAccepted"
},
{
"lastTransitionTime": "2023-05-18T15:03:57Z",
"message": "the initial names have been accepted",
"reason": "InitialNamesAccepted",
"status": "True",
"type": "Established"
}
],
"storedVersions": [
"v1"
]
}
}`); err != nil {
t.Fatal(err)
}
if _, err := etcd3Client.KV.Put(
ctx,
path.Join("/", etcdConfig.Prefix, "apiregistration.k8s.io/apiservices/v1.valid.example.com"),
`{
"apiVersion":"apiregistration.k8s.io/v1",
"kind":"APIService",
"metadata": {
"name": "v1.valid.example.com",
"uid":"foo",
"creationTimestamp": "2022-06-08T23:46:32Z",
"labels":{"kube-aggregator.kubernetes.io/automanaged":"true"}
},
"spec": {
"group": "valid.example.com",
"version": "v1",
"groupPriorityMinimum":100,
"versionPriority":10
}
}`,
); err != nil {
t.Fatal(err)
}

// Populate a stale managed APIService in etcd
if _, err := etcd3Client.KV.Put(
ctx,
path.Join("/", etcdConfig.Prefix, "apiregistration.k8s.io/apiservices/v1.stale.example.com"),
`{
"apiVersion":"apiregistration.k8s.io/v1",
"kind":"APIService",
"metadata": {
"name": "v1.stale.example.com",
"uid":"foo",
"creationTimestamp": "2022-06-08T23:46:32Z",
"labels":{"kube-aggregator.kubernetes.io/automanaged":"true"}
},
"spec": {
"group": "stale.example.com",
"version": "v1",
"groupPriorityMinimum":100,
"versionPriority":10
}
}`,
); err != nil {
t.Fatal(err)
}

// Start server
options := kastesting.NewDefaultTestServerOptions()
options.SkipHealthzCheck = true
testServer := kastesting.StartTestServerOrDie(t, options, nil, etcdConfig)
defer testServer.TearDownFn()

kubeClientConfig := rest.CopyConfig(testServer.ClientConfig)
aggregatorClient := aggregatorclient.NewForConfigOrDie(kubeClientConfig)

// ensure both APIService objects remain
for i := 0; i < 10; i++ {
if _, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1.valid.example.com", metav1.GetOptions{}); err != nil {
t.Fatal(err)
}
if _, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1.stale.example.com", metav1.GetOptions{}); err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)
}

// Clear the bogus CRD data so the informer can sync
if _, err := etcd3Client.KV.Delete(ctx, bogusCRDEtcdPath); err != nil {
t.Fatal(err)
}
t.Log("cleaned up bogus CRD data")

// ensure the stale APIService object is cleaned up
if err := wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) {
_, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1.stale.example.com", metav1.GetOptions{})
if err == nil {
t.Log("stale APIService still exists, waiting...")
return false, nil
}
if !apierrors.IsNotFound(err) {
return false, err
}
return true, nil
}); err != nil {
t.Fatal(err)
}

// ensure the valid APIService object remains
for i := 0; i < 5; i++ {
time.Sleep(time.Second)
if _, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1.valid.example.com", metav1.GetOptions{}); err != nil {
t.Fatal(err)
}
}
}

func TestAggregatedAPIServer(t *testing.T) {
// makes the kube-apiserver very responsive. it's normally a minute
dynamiccertificates.FileRefreshDuration = 1 * time.Second
Expand Down