Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix waiting for CRD sync at server start #118104

Merged
merged 2 commits into from May 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 6 additions & 3 deletions cmd/kube-apiserver/app/aggregator.go
Expand Up @@ -27,7 +27,6 @@ import (

"k8s.io/klog/v2"

apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -117,7 +116,7 @@ func createAggregatorConfig(
return aggregatorConfig, nil
}

func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory) (*aggregatorapiserver.APIAggregator, error) {
func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory, crdAPIEnabled bool) (*aggregatorapiserver.APIAggregator, error) {
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer)
if err != nil {
return nil, err
Expand Down Expand Up @@ -147,8 +146,12 @@ func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delega
// let the CRD controller process the initial set of CRDs before starting the autoregistration controller.
// this prevents the autoregistration controller's initial sync from deleting APIServices for CRDs that still exist.
// we only need to do this if CRDs are enabled on this server. We can't use discovery because we are the source for discovery.
if aggregatorConfig.GenericConfig.MergedResourceConfig.ResourceEnabled(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions")) {
if crdAPIEnabled {
klog.Infof("waiting for initial CRD sync...")
crdRegistrationController.WaitForInitialSync()
klog.Infof("initial CRD sync complete...")
} else {
klog.Infof("CRD API not enabled, starting APIService registration without waiting for initial CRD sync")
}
autoRegistrationController.Run(5, context.StopCh)
}()
Expand Down
4 changes: 3 additions & 1 deletion cmd/kube-apiserver/app/server.go
Expand Up @@ -34,6 +34,7 @@ import (

oteltrace "go.opentelemetry.io/otel/trace"

apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
extensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilnet "k8s.io/apimachinery/pkg/util/net"
Expand Down Expand Up @@ -193,6 +194,7 @@ func CreateServerChain(completedOptions completedServerRunOptions) (*aggregatora
if err != nil {
return nil, err
}
crdAPIEnabled := apiExtensionsConfig.GenericConfig.MergedResourceConfig.ResourceEnabled(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"))

notFoundHandler := notfoundhandler.New(kubeAPIServerConfig.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
Expand All @@ -210,7 +212,7 @@ func CreateServerChain(completedOptions completedServerRunOptions) (*aggregatora
if err != nil {
return nil, err
}
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers, crdAPIEnabled)
if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
return nil, err
Expand Down
59 changes: 32 additions & 27 deletions cmd/kube-apiserver/app/testing/testserver.go
Expand Up @@ -62,6 +62,9 @@ type TearDownFunc func()

// TestServerInstanceOptions Instance options the TestServer
type TestServerInstanceOptions struct {
// SkipHealthzCheck returns without waiting for the server to become healthy.
// Useful for testing server configurations expected to prevent /healthz from completing.
SkipHealthzCheck bool
// Enable cert-auth for the kube-apiserver
EnableCertAuth bool
// Wrap the storage version interface of the created server's generic server.
Expand Down Expand Up @@ -262,40 +265,42 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
}
}(stopCh)

t.Logf("Waiting for /healthz to be ok...")

client, err := kubernetes.NewForConfig(server.GenericAPIServer.LoopbackClientConfig)
if err != nil {
return result, fmt.Errorf("failed to create a client: %v", err)
}

// wait until healthz endpoint returns ok
err = wait.Poll(100*time.Millisecond, time.Minute, func() (bool, error) {
select {
case err := <-errCh:
return false, err
default:
}
if !instanceOptions.SkipHealthzCheck {
t.Logf("Waiting for /healthz to be ok...")

req := client.CoreV1().RESTClient().Get().AbsPath("/healthz")
// The storage version bootstrap test wraps the storage version post-start
// hook, so the hook won't become health when the server bootstraps
if instanceOptions.StorageVersionWrapFunc != nil {
// We hardcode the param instead of having a new instanceOptions field
// to avoid confusing users with more options.
storageVersionCheck := fmt.Sprintf("poststarthook/%s", apiserver.StorageVersionPostStartHookName)
req.Param("exclude", storageVersionCheck)
}
result := req.Do(context.TODO())
status := 0
result.StatusCode(&status)
if status == 200 {
return true, nil
// wait until healthz endpoint returns ok
err = wait.Poll(100*time.Millisecond, time.Minute, func() (bool, error) {
select {
case err := <-errCh:
return false, err
default:
}

req := client.CoreV1().RESTClient().Get().AbsPath("/healthz")
// The storage version bootstrap test wraps the storage version post-start
// hook, so the hook won't become health when the server bootstraps
if instanceOptions.StorageVersionWrapFunc != nil {
// We hardcode the param instead of having a new instanceOptions field
// to avoid confusing users with more options.
storageVersionCheck := fmt.Sprintf("poststarthook/%s", apiserver.StorageVersionPostStartHookName)
req.Param("exclude", storageVersionCheck)
}
result := req.Do(context.TODO())
status := 0
result.StatusCode(&status)
if status == 200 {
return true, nil
}
return false, nil
})
if err != nil {
return result, fmt.Errorf("failed to wait for /healthz to return ok: %v", err)
}
return false, nil
})
if err != nil {
return result, fmt.Errorf("failed to wait for /healthz to return ok: %v", err)
}

// wait until default namespace is created
Expand Down
173 changes: 173 additions & 0 deletions test/integration/examples/apiserver_test.go
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/stretchr/testify/assert"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
Expand All @@ -48,6 +49,7 @@ import (
aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/cmd/kube-apiserver/app"
kastesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/test/integration"
"k8s.io/kubernetes/test/integration/framework"
wardlev1alpha1 "k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1"
wardlev1beta1 "k8s.io/sample-apiserver/pkg/apis/wardle/v1beta1"
Expand All @@ -56,6 +58,177 @@ import (
netutils "k8s.io/utils/net"
)

func TestAPIServiceWaitOnStart(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
t.Cleanup(cancel)

stopCh := make(chan struct{})
defer close(stopCh)

etcdConfig := framework.SharedEtcd()

etcd3Client, _, err := integration.GetEtcdClients(etcdConfig.Transport)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { etcd3Client.Close() })

// Pollute CRD path in etcd so CRD lists cannot succeed and the informer cannot sync
bogusCRDEtcdPath := path.Join("/", etcdConfig.Prefix, "apiextensions.k8s.io/customresourcedefinitions/bogus")
if _, err := etcd3Client.KV.Put(ctx, bogusCRDEtcdPath, `bogus data`); err != nil {
t.Fatal(err)
}

// Populate a valid CRD and managed APIService in etcd
if _, err := etcd3Client.KV.Put(
ctx,
path.Join("/", etcdConfig.Prefix, "apiextensions.k8s.io/customresourcedefinitions/widgets.valid.example.com"),
`{
"apiVersion":"apiextensions.k8s.io/v1beta1",
"kind":"CustomResourceDefinition",
"metadata":{
"name":"widgets.valid.example.com",
"uid":"mycrd",
"creationTimestamp": "2022-06-08T23:46:32Z"
},
"spec":{
"scope": "Namespaced",
"group":"valid.example.com",
"version":"v1",
"names":{
"kind": "Widget",
"listKind": "WidgetList",
"plural": "widgets",
"singular": "widget"
}
},
"status": {
"acceptedNames": {
"kind": "Widget",
"listKind": "WidgetList",
"plural": "widgets",
"singular": "widget"
},
"conditions": [
{
"lastTransitionTime": "2023-05-18T15:03:57Z",
"message": "no conflicts found",
"reason": "NoConflicts",
"status": "True",
"type": "NamesAccepted"
},
{
"lastTransitionTime": "2023-05-18T15:03:57Z",
"message": "the initial names have been accepted",
"reason": "InitialNamesAccepted",
"status": "True",
"type": "Established"
}
],
"storedVersions": [
"v1"
]
}
}`); err != nil {
t.Fatal(err)
}
if _, err := etcd3Client.KV.Put(
ctx,
path.Join("/", etcdConfig.Prefix, "apiregistration.k8s.io/apiservices/v1.valid.example.com"),
`{
"apiVersion":"apiregistration.k8s.io/v1",
"kind":"APIService",
"metadata": {
"name": "v1.valid.example.com",
"uid":"foo",
"creationTimestamp": "2022-06-08T23:46:32Z",
"labels":{"kube-aggregator.kubernetes.io/automanaged":"true"}
},
"spec": {
"group": "valid.example.com",
"version": "v1",
"groupPriorityMinimum":100,
"versionPriority":10
}
}`,
); err != nil {
t.Fatal(err)
}

// Populate a stale managed APIService in etcd
if _, err := etcd3Client.KV.Put(
ctx,
path.Join("/", etcdConfig.Prefix, "apiregistration.k8s.io/apiservices/v1.stale.example.com"),
`{
"apiVersion":"apiregistration.k8s.io/v1",
"kind":"APIService",
"metadata": {
"name": "v1.stale.example.com",
"uid":"foo",
"creationTimestamp": "2022-06-08T23:46:32Z",
"labels":{"kube-aggregator.kubernetes.io/automanaged":"true"}
},
"spec": {
"group": "stale.example.com",
"version": "v1",
"groupPriorityMinimum":100,
"versionPriority":10
}
}`,
); err != nil {
t.Fatal(err)
}

// Start server
options := kastesting.NewDefaultTestServerOptions()
options.SkipHealthzCheck = true
testServer := kastesting.StartTestServerOrDie(t, options, nil, etcdConfig)
defer testServer.TearDownFn()

kubeClientConfig := rest.CopyConfig(testServer.ClientConfig)
aggregatorClient := aggregatorclient.NewForConfigOrDie(kubeClientConfig)

// ensure both APIService objects remain
for i := 0; i < 10; i++ {
if _, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1.valid.example.com", metav1.GetOptions{}); err != nil {
t.Fatal(err)
}
if _, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1.stale.example.com", metav1.GetOptions{}); err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)
}

// Clear the bogus CRD data so the informer can sync
if _, err := etcd3Client.KV.Delete(ctx, bogusCRDEtcdPath); err != nil {
t.Fatal(err)
}
t.Log("cleaned up bogus CRD data")

// ensure the stale APIService object is cleaned up
if err := wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) {
_, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1.stale.example.com", metav1.GetOptions{})
if err == nil {
t.Log("stale APIService still exists, waiting...")
return false, nil
}
if !apierrors.IsNotFound(err) {
return false, err
}
return true, nil
}); err != nil {
t.Fatal(err)
}

// ensure the valid APIService object remains
for i := 0; i < 5; i++ {
time.Sleep(time.Second)
if _, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1.valid.example.com", metav1.GetOptions{}); err != nil {
t.Fatal(err)
}
}
}

func TestAggregatedAPIServer(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
t.Cleanup(cancel)
Expand Down