Skip to content

Commit

Permalink
Merge pull request kubernetes#118112 from liggitt/automated-cherry-pi…
Browse files Browse the repository at this point in the history
…ck-of-#118104-upstream-release-1.26

Automated cherry pick of kubernetes#118104: Fix waiting for CRD sync at server start
  • Loading branch information
k8s-ci-robot committed May 19, 2023
2 parents 8ee0781 + 3ec283e commit 26dc477
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 47 deletions.
9 changes: 6 additions & 3 deletions cmd/kube-apiserver/app/aggregator.go
Expand Up @@ -27,7 +27,6 @@ import (

"k8s.io/klog/v2"

apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -117,7 +116,7 @@ func createAggregatorConfig(
return aggregatorConfig, nil
}

func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory) (*aggregatorapiserver.APIAggregator, error) {
func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory, crdAPIEnabled bool) (*aggregatorapiserver.APIAggregator, error) {
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer)
if err != nil {
return nil, err
Expand Down Expand Up @@ -147,8 +146,12 @@ func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delega
// let the CRD controller process the initial set of CRDs before starting the autoregistration controller.
// this prevents the autoregistration controller's initial sync from deleting APIServices for CRDs that still exist.
// we only need to do this if CRDs are enabled on this server. We can't use discovery because we are the source for discovery.
if aggregatorConfig.GenericConfig.MergedResourceConfig.ResourceEnabled(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions")) {
if crdAPIEnabled {
klog.Infof("waiting for initial CRD sync...")
crdRegistrationController.WaitForInitialSync()
klog.Infof("initial CRD sync complete...")
} else {
klog.Infof("CRD API not enabled, starting APIService registration without waiting for initial CRD sync")
}
autoRegistrationController.Run(5, context.StopCh)
}()
Expand Down
4 changes: 3 additions & 1 deletion cmd/kube-apiserver/app/server.go
Expand Up @@ -33,6 +33,7 @@ import (

oteltrace "go.opentelemetry.io/otel/trace"

apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
extensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilnet "k8s.io/apimachinery/pkg/util/net"
Expand Down Expand Up @@ -190,6 +191,7 @@ func CreateServerChain(completedOptions completedServerRunOptions) (*aggregatora
if err != nil {
return nil, err
}
crdAPIEnabled := apiExtensionsConfig.GenericConfig.MergedResourceConfig.ResourceEnabled(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"))

notFoundHandler := notfoundhandler.New(kubeAPIServerConfig.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
Expand All @@ -207,7 +209,7 @@ func CreateServerChain(completedOptions completedServerRunOptions) (*aggregatora
if err != nil {
return nil, err
}
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers, crdAPIEnabled)
if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
return nil, err
Expand Down
91 changes: 48 additions & 43 deletions cmd/kube-apiserver/app/testing/testserver.go
Expand Up @@ -62,6 +62,9 @@ type TearDownFunc func()

// TestServerInstanceOptions Instance options the TestServer
type TestServerInstanceOptions struct {
// SkipHealthzCheck returns without waiting for the server to become healthy.
// Useful for testing server configurations expected to prevent /healthz from completing.
SkipHealthzCheck bool
// Enable cert-auth for the kube-apiserver
EnableCertAuth bool
// Wrap the storage version interface of the created server's generic server.
Expand Down Expand Up @@ -263,60 +266,62 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
}
}(stopCh)

t.Logf("Waiting for /healthz to be ok...")

client, err := kubernetes.NewForConfig(server.GenericAPIServer.LoopbackClientConfig)
if err != nil {
return result, fmt.Errorf("failed to create a client: %v", err)
}

// wait until healthz endpoint returns ok
err = wait.Poll(100*time.Millisecond, time.Minute, func() (bool, error) {
select {
case err := <-errCh:
return false, err
default:
}
if !instanceOptions.SkipHealthzCheck {
t.Logf("Waiting for /healthz to be ok...")

req := client.CoreV1().RESTClient().Get().AbsPath("/healthz")
// The storage version bootstrap test wraps the storage version post-start
// hook, so the hook won't become health when the server bootstraps
if instanceOptions.StorageVersionWrapFunc != nil {
// We hardcode the param instead of having a new instanceOptions field
// to avoid confusing users with more options.
storageVersionCheck := fmt.Sprintf("poststarthook/%s", apiserver.StorageVersionPostStartHookName)
req.Param("exclude", storageVersionCheck)
}
result := req.Do(context.TODO())
status := 0
result.StatusCode(&status)
if status == 200 {
return true, nil
}
return false, nil
})
if err != nil {
return result, fmt.Errorf("failed to wait for /healthz to return ok: %v", err)
}
// wait until healthz endpoint returns ok
err = wait.Poll(100*time.Millisecond, time.Minute, func() (bool, error) {
select {
case err := <-errCh:
return false, err
default:
}

// wait until default namespace is created
err = wait.Poll(100*time.Millisecond, 30*time.Second, func() (bool, error) {
select {
case err := <-errCh:
return false, err
default:
req := client.CoreV1().RESTClient().Get().AbsPath("/healthz")
// The storage version bootstrap test wraps the storage version post-start
// hook, so the hook won't become health when the server bootstraps
if instanceOptions.StorageVersionWrapFunc != nil {
// We hardcode the param instead of having a new instanceOptions field
// to avoid confusing users with more options.
storageVersionCheck := fmt.Sprintf("poststarthook/%s", apiserver.StorageVersionPostStartHookName)
req.Param("exclude", storageVersionCheck)
}
result := req.Do(context.TODO())
status := 0
result.StatusCode(&status)
if status == 200 {
return true, nil
}
return false, nil
})
if err != nil {
return result, fmt.Errorf("failed to wait for /healthz to return ok: %v", err)
}

if _, err := client.CoreV1().Namespaces().Get(context.TODO(), "default", metav1.GetOptions{}); err != nil {
if !errors.IsNotFound(err) {
t.Logf("Unable to get default namespace: %v", err)
// wait until default namespace is created
err = wait.Poll(100*time.Millisecond, 30*time.Second, func() (bool, error) {
select {
case err := <-errCh:
return false, err
default:
}
return false, nil

if _, err := client.CoreV1().Namespaces().Get(context.TODO(), "default", metav1.GetOptions{}); err != nil {
if !errors.IsNotFound(err) {
t.Logf("Unable to get default namespace: %v", err)
}
return false, nil
}
return true, nil
})
if err != nil {
return result, fmt.Errorf("failed to wait for default namespace to be created: %v", err)
}
return true, nil
})
if err != nil {
return result, fmt.Errorf("failed to wait for default namespace to be created: %v", err)
}

tlsInfo := transport.TLSInfo{
Expand Down
173 changes: 173 additions & 0 deletions test/integration/examples/apiserver_test.go
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/stretchr/testify/assert"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
Expand All @@ -48,6 +49,7 @@ import (
aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/cmd/kube-apiserver/app"
kastesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/test/integration"
"k8s.io/kubernetes/test/integration/framework"
wardlev1alpha1 "k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1"
wardlev1beta1 "k8s.io/sample-apiserver/pkg/apis/wardle/v1beta1"
Expand All @@ -56,6 +58,177 @@ import (
netutils "k8s.io/utils/net"
)

func TestAPIServiceWaitOnStart(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
t.Cleanup(cancel)

stopCh := make(chan struct{})
defer close(stopCh)

etcdConfig := framework.SharedEtcd()

etcd3Client, _, err := integration.GetEtcdClients(etcdConfig.Transport)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { etcd3Client.Close() })

// Pollute CRD path in etcd so CRD lists cannot succeed and the informer cannot sync
bogusCRDEtcdPath := path.Join("/", etcdConfig.Prefix, "apiextensions.k8s.io/customresourcedefinitions/bogus")
if _, err := etcd3Client.KV.Put(ctx, bogusCRDEtcdPath, `bogus data`); err != nil {
t.Fatal(err)
}

// Populate a valid CRD and managed APIService in etcd
if _, err := etcd3Client.KV.Put(
ctx,
path.Join("/", etcdConfig.Prefix, "apiextensions.k8s.io/customresourcedefinitions/widgets.valid.example.com"),
`{
"apiVersion":"apiextensions.k8s.io/v1beta1",
"kind":"CustomResourceDefinition",
"metadata":{
"name":"widgets.valid.example.com",
"uid":"mycrd",
"creationTimestamp": "2022-06-08T23:46:32Z"
},
"spec":{
"scope": "Namespaced",
"group":"valid.example.com",
"version":"v1",
"names":{
"kind": "Widget",
"listKind": "WidgetList",
"plural": "widgets",
"singular": "widget"
}
},
"status": {
"acceptedNames": {
"kind": "Widget",
"listKind": "WidgetList",
"plural": "widgets",
"singular": "widget"
},
"conditions": [
{
"lastTransitionTime": "2023-05-18T15:03:57Z",
"message": "no conflicts found",
"reason": "NoConflicts",
"status": "True",
"type": "NamesAccepted"
},
{
"lastTransitionTime": "2023-05-18T15:03:57Z",
"message": "the initial names have been accepted",
"reason": "InitialNamesAccepted",
"status": "True",
"type": "Established"
}
],
"storedVersions": [
"v1"
]
}
}`); err != nil {
t.Fatal(err)
}
if _, err := etcd3Client.KV.Put(
ctx,
path.Join("/", etcdConfig.Prefix, "apiregistration.k8s.io/apiservices/v1.valid.example.com"),
`{
"apiVersion":"apiregistration.k8s.io/v1",
"kind":"APIService",
"metadata": {
"name": "v1.valid.example.com",
"uid":"foo",
"creationTimestamp": "2022-06-08T23:46:32Z",
"labels":{"kube-aggregator.kubernetes.io/automanaged":"true"}
},
"spec": {
"group": "valid.example.com",
"version": "v1",
"groupPriorityMinimum":100,
"versionPriority":10
}
}`,
); err != nil {
t.Fatal(err)
}

// Populate a stale managed APIService in etcd
if _, err := etcd3Client.KV.Put(
ctx,
path.Join("/", etcdConfig.Prefix, "apiregistration.k8s.io/apiservices/v1.stale.example.com"),
`{
"apiVersion":"apiregistration.k8s.io/v1",
"kind":"APIService",
"metadata": {
"name": "v1.stale.example.com",
"uid":"foo",
"creationTimestamp": "2022-06-08T23:46:32Z",
"labels":{"kube-aggregator.kubernetes.io/automanaged":"true"}
},
"spec": {
"group": "stale.example.com",
"version": "v1",
"groupPriorityMinimum":100,
"versionPriority":10
}
}`,
); err != nil {
t.Fatal(err)
}

// Start server
options := kastesting.NewDefaultTestServerOptions()
options.SkipHealthzCheck = true
testServer := kastesting.StartTestServerOrDie(t, options, nil, etcdConfig)
defer testServer.TearDownFn()

kubeClientConfig := rest.CopyConfig(testServer.ClientConfig)
aggregatorClient := aggregatorclient.NewForConfigOrDie(kubeClientConfig)

// ensure both APIService objects remain
for i := 0; i < 10; i++ {
if _, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1.valid.example.com", metav1.GetOptions{}); err != nil {
t.Fatal(err)
}
if _, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1.stale.example.com", metav1.GetOptions{}); err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)
}

// Clear the bogus CRD data so the informer can sync
if _, err := etcd3Client.KV.Delete(ctx, bogusCRDEtcdPath); err != nil {
t.Fatal(err)
}
t.Log("cleaned up bogus CRD data")

// ensure the stale APIService object is cleaned up
if err := wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) {
_, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1.stale.example.com", metav1.GetOptions{})
if err == nil {
t.Log("stale APIService still exists, waiting...")
return false, nil
}
if !apierrors.IsNotFound(err) {
return false, err
}
return true, nil
}); err != nil {
t.Fatal(err)
}

// ensure the valid APIService object remains
for i := 0; i < 5; i++ {
time.Sleep(time.Second)
if _, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1.valid.example.com", metav1.GetOptions{}); err != nil {
t.Fatal(err)
}
}
}

func TestAggregatedAPIServer(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
t.Cleanup(cancel)
Expand Down

0 comments on commit 26dc477

Please sign in to comment.