From f413e4694cb24124ac7429e8649c525bc7d8fef6 Mon Sep 17 00:00:00 2001 From: Richa Banker Date: Thu, 8 Feb 2024 09:21:30 -0800 Subject: [PATCH] Move SV update poststarthook to genericapiserver --- cmd/kube-apiserver/app/server.go | 2 + cmd/kube-apiserver/app/testing/testserver.go | 7 +- pkg/controlplane/apiserver/config.go | 3 +- pkg/controlplane/instance.go | 5 +- .../pkg/apiserver/apiserver.go | 3 +- .../pkg/cmd/server/options/options.go | 3 +- .../pkg/cmd/server/testing/testserver.go | 4 +- .../src/k8s.io/apiserver/pkg/server/config.go | 99 +++++++++++++++++-- .../apiserver/pkg/server/options/feature.go | 2 +- .../pkg/server/options/server_run_options.go | 2 +- .../pkg/apiserver/apiserver.go | 56 +---------- .../kube-aggregator/pkg/cmd/server/start.go | 4 +- .../pkg/apiserver/apiserver.go | 3 +- .../sample-apiserver/pkg/cmd/server/start.go | 3 +- .../test/integration/fixtures/server.go | 38 +------ .../localhost_127.0.0.1_localhost.crt | 39 ++++++++ .../localhost_127.0.0.1_localhost.key | 27 +++++ test/e2e/apimachinery/aggregator.go | 6 ++ ...orage_version_aggregated_apiserver_test.go | 3 + 19 files changed, 199 insertions(+), 110 deletions(-) create mode 100644 staging/src/k8s.io/sample-apiserver/test/integration/fixtures/testdata/localhost_127.0.0.1_localhost.crt create mode 100644 staging/src/k8s.io/sample-apiserver/test/integration/fixtures/testdata/localhost_127.0.0.1_localhost.key diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index f8eefb7c7dd2..e63cb62793d5 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -36,6 +36,7 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apiserver/pkg/admission" genericapifilters "k8s.io/apiserver/pkg/endpoints/filters" + "k8s.io/apiserver/pkg/server" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/egressselector" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -216,6 +217,7 @@ func CreateKubeAPIServerConfig(opts options.CompletedOptions) ( proxyTransport := CreateProxyTransport() genericConfig, versionedInformers, storageFactory, err := controlplaneapiserver.BuildGenericConfig( + server.KubeAPIServer, opts.CompletedOptions, []*runtime.Scheme{legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme}, generatedopenapi.GetOpenAPIDefinitions, diff --git a/cmd/kube-apiserver/app/testing/testserver.go b/cmd/kube-apiserver/app/testing/testserver.go index 357f01b9227b..7bdeec8d48b0 100644 --- a/cmd/kube-apiserver/app/testing/testserver.go +++ b/cmd/kube-apiserver/app/testing/testserver.go @@ -43,6 +43,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/wait" + sserver "k8s.io/apiserver/pkg/server" serveroptions "k8s.io/apiserver/pkg/server/options" "k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/storageversion" @@ -54,7 +55,6 @@ import ( "k8s.io/client-go/util/keyutil" logsapi "k8s.io/component-base/logs/api/v1" "k8s.io/klog/v2" - "k8s.io/kube-aggregator/pkg/apiserver" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/cmd/kube-apiserver/app" @@ -391,8 +391,11 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo if instanceOptions.StorageVersionWrapFunc != nil { // We hardcode the param instead of having a new instanceOptions field // to avoid confusing users with more options. - storageVersionCheck := fmt.Sprintf("poststarthook/%s", apiserver.StorageVersionPostStartHookName) + storageVersionCheck := fmt.Sprintf("poststarthook/%s-%s", sserver.StorageVersionPostStartHookName, sserver.KubeAPIServer) req.Param("exclude", storageVersionCheck) + storageVersionCheck = fmt.Sprintf("poststarthook/%s-%s", sserver.StorageVersionPostStartHookName, sserver.KubeAggregator) + req.Param("exclude", storageVersionCheck) + } result := req.Do(context.TODO()) status := 0 diff --git a/pkg/controlplane/apiserver/config.go b/pkg/controlplane/apiserver/config.go index 07a3260f1f5c..8e571e007904 100644 --- a/pkg/controlplane/apiserver/config.go +++ b/pkg/controlplane/apiserver/config.go @@ -55,6 +55,7 @@ import ( // BuildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it func BuildGenericConfig( + name string, s controlplaneapiserver.CompletedOptions, schemes []*runtime.Scheme, getOpenAPIDefinitions func(ref openapicommon.ReferenceCallback) map[string]openapicommon.OpenAPIDefinition, @@ -65,7 +66,7 @@ func BuildGenericConfig( lastErr error, ) { - genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs) + genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs, name) genericConfig.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource() if lastErr = s.GenericServerRunOptions.ApplyTo(genericConfig); lastErr != nil { diff --git a/pkg/controlplane/instance.go b/pkg/controlplane/instance.go index 73aec574b189..3796e40504dc 100644 --- a/pkg/controlplane/instance.go +++ b/pkg/controlplane/instance.go @@ -63,6 +63,7 @@ import ( apiserverfeatures "k8s.io/apiserver/pkg/features" peerreconcilers "k8s.io/apiserver/pkg/reconcilers" "k8s.io/apiserver/pkg/registry/generic" + "k8s.io/apiserver/pkg/server" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/dynamiccertificates" serverstorage "k8s.io/apiserver/pkg/server/storage" @@ -312,7 +313,7 @@ func (c *Config) createEndpointReconciler() reconcilers.EndpointReconciler { // Complete fills in any fields not set that are required to have valid data. It's mutating the receiver. func (c *Config) Complete() CompletedConfig { cfg := completedConfig{ - c.GenericConfig.Complete(c.ExtraConfig.VersionedInformers), + c.GenericConfig.Complete(c.ExtraConfig.VersionedInformers, nil), &c.ExtraConfig, } @@ -369,7 +370,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) return nil, fmt.Errorf("Master.New() called with empty config.KubeletClientConfig") } - s, err := c.GenericConfig.New("kube-apiserver", delegationTarget) + s, err := c.GenericConfig.New(server.KubeAPIServer, delegationTarget) if err != nil { return nil, err } diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go index 517fc9e531a7..468ca1a5c150 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go @@ -45,6 +45,7 @@ import ( "k8s.io/apiserver/pkg/endpoints/discovery/aggregated" genericregistry "k8s.io/apiserver/pkg/registry/generic" "k8s.io/apiserver/pkg/registry/rest" + "k8s.io/apiserver/pkg/server" genericapiserver "k8s.io/apiserver/pkg/server" serverstorage "k8s.io/apiserver/pkg/server/storage" "k8s.io/apiserver/pkg/util/webhook" @@ -130,7 +131,7 @@ func (cfg *Config) Complete() CompletedConfig { // New returns a new instance of CustomResourceDefinitions from the given config. func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) { - genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget) + genericServer, err := c.GenericConfig.New(server.KubeAPIExtensionsAPIServer, delegationTarget) if err != nil { return nil, err } diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/options/options.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/options/options.go index ef77ee42e91c..3046ed3ea655 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/options/options.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/options/options.go @@ -34,6 +34,7 @@ import ( openapinamer "k8s.io/apiserver/pkg/endpoints/openapi" genericregistry "k8s.io/apiserver/pkg/registry/generic" genericapiserver "k8s.io/apiserver/pkg/server" + server "k8s.io/apiserver/pkg/server" genericoptions "k8s.io/apiserver/pkg/server/options" storagevalue "k8s.io/apiserver/pkg/storage/value" flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" @@ -102,7 +103,7 @@ func (o CustomResourceDefinitionsServerOptions) Config() (*apiserver.Config, err return nil, fmt.Errorf("error creating self-signed certificates: %v", err) } - serverConfig := genericapiserver.NewRecommendedConfig(apiserver.Codecs) + serverConfig := genericapiserver.NewRecommendedConfig(apiserver.Codecs, server.KubeAPIExtensionsAPIServer) if err := o.ServerRunOptions.ApplyTo(&serverConfig.Config); err != nil { return nil, err } diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/testing/testserver.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/testing/testserver.go index e2340bc704d0..b3a7013835db 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/testing/testserver.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/testing/testserver.go @@ -135,7 +135,9 @@ func StartTestServer(t Logger, _ *TestServerInstanceOptions, customFlags []strin } s.APIEnablement.RuntimeConfig.Set("api/all=true") - fs.Parse(customFlags) + if err := fs.Parse(customFlags); err != nil { + return result, err + } if err := s.Complete(); err != nil { return result, fmt.Errorf("failed to set default options: %v", err) diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index ea0b82a350ad..161a51bf8426 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -33,15 +33,17 @@ import ( "time" jsonpatch "github.com/evanphx/json-patch" - "github.com/google/uuid" "golang.org/x/crypto/cryptobyte" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/apimachinery/pkg/util/wait" utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup" "k8s.io/apimachinery/pkg/version" "k8s.io/apiserver/pkg/admission" @@ -71,9 +73,10 @@ import ( utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" "k8s.io/component-base/logs" - "k8s.io/component-base/metrics/features" + metricsfeatures "k8s.io/component-base/metrics/features" "k8s.io/component-base/metrics/prometheus/slis" "k8s.io/component-base/tracing" "k8s.io/klog/v2" @@ -100,6 +103,13 @@ const ( // APIGroupPrefix is where non-legacy API group will be located. APIGroupPrefix = "/apis" + + StorageVersionPostStartHookName = "built-in-resources-storage-version-updater" + + KubeAPIServer = "kube-apiserver" + KubeAggregator = "kube-aggregator" + KubeAggregatedAPIServer = "kube-aggregated-apiserver" + KubeAPIExtensionsAPIServer = "kube-apiextensions-apiserver" ) // Config is a structure used to configure a GenericAPIServer. @@ -119,6 +129,10 @@ type Config struct { // TODO: move into SecureServing(WithLoopback) as soon as insecure serving is gone LoopbackClientConfig *restclient.Config + // ClientConfig is a config for a loopback connection to the main kube-apiserver from an aggregated apiserver. + // This is an optional field. + ClientConfig *restclient.Config + // EgressSelector provides a lookup mechanism for dialing outbound connections. // It does so based on a EgressSelectorConfiguration which was read at startup. EgressSelector *egressselector.EgressSelector @@ -367,11 +381,11 @@ type AuthorizationInfo struct { } func init() { - utilruntime.Must(features.AddFeatureGates(utilfeature.DefaultMutableFeatureGate)) + utilruntime.Must(metricsfeatures.AddFeatureGates(utilfeature.DefaultMutableFeatureGate)) } // NewConfig returns a Config struct with the default values -func NewConfig(codecs serializer.CodecFactory) *Config { +func NewConfig(codecs serializer.CodecFactory, name string) *Config { defaultHealthChecks := []healthz.HealthChecker{healthz.PingHealthz, healthz.LogHealthz} var id string if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) { @@ -389,7 +403,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config { b.AddBytes([]byte(hostname)) }) b.AddUint16LengthPrefixed(func(b *cryptobyte.Builder) { - b.AddBytes([]byte("kube-apiserver")) + b.AddBytes([]byte(name)) }) hashData, err := b.Bytes() if err != nil { @@ -455,9 +469,9 @@ func NewConfig(codecs serializer.CodecFactory) *Config { } // NewRecommendedConfig returns a RecommendedConfig struct with the default values -func NewRecommendedConfig(codecs serializer.CodecFactory) *RecommendedConfig { +func NewRecommendedConfig(codecs serializer.CodecFactory, name string) *RecommendedConfig { return &RecommendedConfig{ - Config: *NewConfig(codecs), + Config: *NewConfig(codecs, name), } } @@ -670,7 +684,7 @@ func (c *Config) DrainedNotify() <-chan struct{} { // Complete fills in any fields not set that are required to have valid data and can be derived // from other fields. If you're going to `ApplyOptions`, do that first. It's mutating the receiver. -func (c *Config) Complete(informers informers.SharedInformerFactory) CompletedConfig { +func (c *Config) Complete(informers informers.SharedInformerFactory, clientConfig *restclient.Config) CompletedConfig { if len(c.ExternalAddress) == 0 && c.PublicAddress != nil { c.ExternalAddress = c.PublicAddress.String() } @@ -715,13 +729,17 @@ func (c *Config) Complete(informers informers.SharedInformerFactory) CompletedCo } } + if c.ClientConfig == nil { + c.ClientConfig = clientConfig + } + return CompletedConfig{&completedConfig{c, informers}} } // Complete fills in any fields not set that are required to have valid data and can be derived // from other fields. If you're going to `ApplyOptions`, do that first. It's mutating the receiver. func (c *RecommendedConfig) Complete() CompletedConfig { - return c.Config.Complete(c.SharedInformerFactory) + return c.Config.Complete(c.SharedInformerFactory, c.ClientConfig) } var allowedMediaTypes = []string{ @@ -934,6 +952,67 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G } } + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) && + utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) { + // Spawn a goroutine in aggregator apiserver to update storage version for + // all built-in resources + if !s.isPostStartHookRegistered(fmt.Sprintf("%s-%s", StorageVersionPostStartHookName, name)) { + s.AddPostStartHookOrDie(fmt.Sprintf("%s-%s", StorageVersionPostStartHookName, name), func(hookContext PostStartHookContext) error { + // Wait for apiserver-identity to exist first before updating storage + // versions, to avoid storage version GC accidentally garbage-collecting + // storage versions. + kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig) + if err != nil { + return err + } + + // TODO: checking for identity leases this way seems not ideal. Wonder if need to have the lease controller moved here as well. + // Meaning: do we want identity leases for aggregated-apiservers / extension apiservers? + if name != KubeAggregatedAPIServer { + if err := wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) { + _, err := kubeClient.CoordinationV1().Leases(metav1.NamespaceSystem).Get( + context.TODO(), s.APIServerID, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + return false, nil + } + if err != nil { + return false, err + } + return true, nil + }, hookContext.StopCh); err != nil { + return fmt.Errorf("failed to wait for apiserver-identity lease %s to be created: %v", + s.APIServerID, err) + } + } + clientConfig := hookContext.LoopbackClientConfig + if c.ClientConfig != nil { + clientConfig = c.ClientConfig + } + // Technically an apiserver only needs to update storage version once during bootstrap. + // Reconcile StorageVersion objects every 10 minutes will help in the case that the + // StorageVersion objects get accidentally modified/deleted by a different agent. In that + // case, the reconciliation ensures future storage migration still works. If nothing gets + // changed, the reconciliation update is a noop and gets short-circuited by the apiserver, + // therefore won't change the resource version and trigger storage migration. + go wait.PollImmediateUntil(10*time.Minute, func() (bool, error) { + // All apiservers (aggregator-apiserver, kube-apiserver, apiextensions-apiserver) + // share the same generic apiserver config. The same StorageVersion manager is used + // to register all built-in resources when the generic apiservers install APIs. + s.StorageVersionManager.UpdateStorageVersions(clientConfig, s.APIServerID) + return false, nil + }, hookContext.StopCh) + // Once the storage version updater finishes the first round of update, + // the PostStartHook will return to unblock /healthz. The handler chain + // won't block write requests anymore. Check every second since it's not + // expensive. + wait.PollImmediateUntil(1*time.Second, func() (bool, error) { + return s.StorageVersionManager.Completed(), nil + }, hookContext.StopCh) + return nil + }) + } + } + for _, delegateCheck := range delegationTarget.HealthzChecks() { skip := false for _, existingCheck := range c.HealthzChecks { @@ -1136,7 +1215,7 @@ func AuthorizeClientBearerToken(loopback *restclient.Config, authn *Authenticati } privilegedLoopbackToken := loopback.BearerToken - var uid = uuid.New().String() + var uid = string(uuid.NewUUID()) tokens := make(map[string]*user.DefaultInfo) tokens[privilegedLoopbackToken] = &user.DefaultInfo{ Name: user.APIServerUser, diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/feature.go b/staging/src/k8s.io/apiserver/pkg/server/options/feature.go index f01195560a4d..bd9693fec310 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/feature.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/feature.go @@ -36,7 +36,7 @@ type FeatureOptions struct { } func NewFeatureOptions() *FeatureOptions { - defaults := server.NewConfig(serializer.CodecFactory{}) + defaults := server.NewConfig(serializer.CodecFactory{}, server.KubeAPIServer) return &FeatureOptions{ EnableProfiling: defaults.EnableProfiling, diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go b/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go index 1373d8a4d730..3faefac266d0 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go @@ -92,7 +92,7 @@ type ServerRunOptions struct { } func NewServerRunOptions() *ServerRunOptions { - defaults := server.NewConfig(serializer.CodecFactory{}) + defaults := server.NewConfig(serializer.CodecFactory{}, server.KubeAPIServer) return &ServerRunOptions{ MaxRequestsInFlight: defaults.MaxRequestsInFlight, MaxMutatingRequestsInFlight: defaults.MaxMutatingRequestsInFlight, diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index f0f2b7140b4e..c7bbffd10986 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -22,20 +22,18 @@ import ( "net/http" "time" - apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/endpoints/discovery/aggregated" genericfeatures "k8s.io/apiserver/pkg/features" peerreconcilers "k8s.io/apiserver/pkg/reconcilers" + "k8s.io/apiserver/pkg/server" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/dynamiccertificates" "k8s.io/apiserver/pkg/server/egressselector" serverstorage "k8s.io/apiserver/pkg/server/storage" utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/transport" "k8s.io/component-base/version" v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" @@ -194,7 +192,7 @@ func (cfg *Config) Complete() CompletedConfig { // NewWithDelegate returns a new instance of APIAggregator from the given config. func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) { - genericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget) + genericServer, err := c.GenericConfig.New(server.KubeAggregator, delegationTarget) if err != nil { return nil, err } @@ -343,56 +341,6 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg return nil }) - if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) && - utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) { - // Spawn a goroutine in aggregator apiserver to update storage version for - // all built-in resources - s.GenericAPIServer.AddPostStartHookOrDie(StorageVersionPostStartHookName, func(hookContext genericapiserver.PostStartHookContext) error { - // Wait for apiserver-identity to exist first before updating storage - // versions, to avoid storage version GC accidentally garbage-collecting - // storage versions. - kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig) - if err != nil { - return err - } - if err := wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) { - _, err := kubeClient.CoordinationV1().Leases(metav1.NamespaceSystem).Get( - context.TODO(), s.GenericAPIServer.APIServerID, metav1.GetOptions{}) - if apierrors.IsNotFound(err) { - return false, nil - } - if err != nil { - return false, err - } - return true, nil - }, hookContext.StopCh); err != nil { - return fmt.Errorf("failed to wait for apiserver-identity lease %s to be created: %v", - s.GenericAPIServer.APIServerID, err) - } - // Technically an apiserver only needs to update storage version once during bootstrap. - // Reconcile StorageVersion objects every 10 minutes will help in the case that the - // StorageVersion objects get accidentally modified/deleted by a different agent. In that - // case, the reconciliation ensures future storage migration still works. If nothing gets - // changed, the reconciliation update is a noop and gets short-circuited by the apiserver, - // therefore won't change the resource version and trigger storage migration. - go wait.PollImmediateUntil(10*time.Minute, func() (bool, error) { - // All apiservers (aggregator-apiserver, kube-apiserver, apiextensions-apiserver) - // share the same generic apiserver config. The same StorageVersion manager is used - // to register all built-in resources when the generic apiservers install APIs. - s.GenericAPIServer.StorageVersionManager.UpdateStorageVersions(hookContext.LoopbackClientConfig, s.GenericAPIServer.APIServerID) - return false, nil - }, hookContext.StopCh) - // Once the storage version updater finishes the first round of update, - // the PostStartHook will return to unblock /healthz. The handler chain - // won't block write requests anymore. Check every second since it's not - // expensive. - wait.PollImmediateUntil(1*time.Second, func() (bool, error) { - return s.GenericAPIServer.StorageVersionManager.Completed(), nil - }, hookContext.StopCh) - return nil - }) - } - return s, nil } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go b/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go index 88abac45418c..872b1eb4173d 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go @@ -24,6 +24,7 @@ import ( "github.com/spf13/cobra" "github.com/spf13/pflag" openapinamer "k8s.io/apiserver/pkg/endpoints/openapi" + "k8s.io/apiserver/pkg/server" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" @@ -125,7 +126,7 @@ func (o AggregatorOptions) RunAggregator(stopCh <-chan struct{}) error { return fmt.Errorf("error creating self-signed certificates: %v", err) } - serverConfig := genericapiserver.NewRecommendedConfig(aggregatorscheme.Codecs) + serverConfig := genericapiserver.NewRecommendedConfig(aggregatorscheme.Codecs, server.KubeAggregator) if err := o.ServerRunOptions.ApplyTo(&serverConfig.Config); err != nil { return err @@ -136,6 +137,7 @@ func (o AggregatorOptions) RunAggregator(stopCh <-chan struct{}) error { if err := o.APIEnablement.ApplyTo(&serverConfig.Config, apiserver.DefaultAPIResourceConfigSource(), aggregatorscheme.Scheme); err != nil { return err } + serverConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck( sets.NewString("watch", "proxy"), sets.NewString("attach", "exec", "proxy", "log", "portforward"), diff --git a/staging/src/k8s.io/sample-apiserver/pkg/apiserver/apiserver.go b/staging/src/k8s.io/sample-apiserver/pkg/apiserver/apiserver.go index 384c46d4ae31..c098f3e1cc3e 100644 --- a/staging/src/k8s.io/sample-apiserver/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/sample-apiserver/pkg/apiserver/apiserver.go @@ -23,6 +23,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/version" "k8s.io/apiserver/pkg/registry/rest" + "k8s.io/apiserver/pkg/server" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/sample-apiserver/pkg/apis/wardle" @@ -101,7 +102,7 @@ func (cfg *Config) Complete() CompletedConfig { // New returns a new instance of WardleServer from the given config. func (c completedConfig) New() (*WardleServer, error) { - genericServer, err := c.GenericConfig.New("sample-apiserver", genericapiserver.NewEmptyDelegate()) + genericServer, err := c.GenericConfig.New(server.KubeAggregatedAPIServer, genericapiserver.NewEmptyDelegate()) if err != nil { return nil, err } diff --git a/staging/src/k8s.io/sample-apiserver/pkg/cmd/server/start.go b/staging/src/k8s.io/sample-apiserver/pkg/cmd/server/start.go index 16d73445f011..f3286d226e19 100644 --- a/staging/src/k8s.io/sample-apiserver/pkg/cmd/server/start.go +++ b/staging/src/k8s.io/sample-apiserver/pkg/cmd/server/start.go @@ -29,6 +29,7 @@ import ( utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/endpoints/openapi" + "k8s.io/apiserver/pkg/server" genericapiserver "k8s.io/apiserver/pkg/server" genericoptions "k8s.io/apiserver/pkg/server/options" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -138,7 +139,7 @@ func (o *WardleServerOptions) Config() (*apiserver.Config, error) { return []admission.PluginInitializer{wardleinitializer.New(informerFactory)}, nil } - serverConfig := genericapiserver.NewRecommendedConfig(apiserver.Codecs) + serverConfig := genericapiserver.NewRecommendedConfig(apiserver.Codecs, server.KubeAggregatedAPIServer) serverConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(sampleopenapi.GetOpenAPIDefinitions, openapi.NewDefinitionNamer(apiserver.Scheme)) serverConfig.OpenAPIConfig.Info.Title = "Wardle" serverConfig.OpenAPIConfig.Info.Version = "0.1" diff --git a/staging/src/k8s.io/sample-apiserver/test/integration/fixtures/server.go b/staging/src/k8s.io/sample-apiserver/test/integration/fixtures/server.go index 30db5a4984f3..37d6201d38f2 100644 --- a/staging/src/k8s.io/sample-apiserver/test/integration/fixtures/server.go +++ b/staging/src/k8s.io/sample-apiserver/test/integration/fixtures/server.go @@ -30,24 +30,19 @@ import ( "github.com/spf13/pflag" clientv3 "go.etcd.io/etcd/client/v3" "k8s.io/apimachinery/pkg/util/wait" - genericfeatures "k8s.io/apiserver/pkg/features" genericapiserver "k8s.io/apiserver/pkg/server" + sserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/storageversion" - utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" - "k8s.io/kube-aggregator/pkg/apiserver" "k8s.io/kubernetes/test/utils/kubeconfig" "k8s.io/sample-apiserver/pkg/cmd/server" ) -// StorageVersionPostStartHookName is the name of the storage version updater post start hook. -const StorageVersionPostStartHookName = "built-in-resources-storage-version-updater" - // TearDownFunc is to be called to tear down a test server. type TearDownFunc func() @@ -203,6 +198,9 @@ func StartTestAggregatedServer(t Logger, kubeconfig *rest.Config, instanceOption return result, fmt.Errorf("failed to create config from options: %v", err) } completedConfig := config.Complete() + // check if there's a better way to pass KAS config here + completedConfig.GenericConfig.ClientConfig = kubeconfig + server, err := completedConfig.New() if err != nil { return result, fmt.Errorf("failed to create server: %v", err) @@ -218,31 +216,6 @@ func StartTestAggregatedServer(t Logger, kubeconfig *rest.Config, instanceOption return nil }) - // TODO: chek if this needs to be a part of genericapiserver flow?? - if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) && - utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) { - // Spawn a goroutine in aggregator apiserver to update storage version for - // all built-in resources - server.GenericAPIServer.AddPostStartHookOrDie(StorageVersionPostStartHookName, func(hookContext genericapiserver.PostStartHookContext) error { - // TODO: figure out how can we wait for apiserver identity lease to be created here before updating Svs - go wait.PollImmediateUntil(10*time.Minute, func() (bool, error) { - // All apiservers (aggregator-apiserver, kube-apiserver, apiextensions-apiserver) - // share the same generic apiserver config. The same StorageVersion manager is used - // to register all built-in resources when the generic apiservers install APIs. - server.GenericAPIServer.StorageVersionManager.UpdateStorageVersions(kubeconfig, server.GenericAPIServer.APIServerID) - return false, nil - }, hookContext.StopCh) - // Once the storage version updater finishes the first round of update, - // the PostStartHook will return to unblock /healthz. The handler chain - // won't block write requests anymore. Check every second since it's not - // expensive. - wait.PollImmediateUntil(1*time.Second, func() (bool, error) { - return server.GenericAPIServer.StorageVersionManager.Completed(), nil - }, hookContext.StopCh) - return nil - }) - } - errCh = make(chan error) go func(stopCh <-chan struct{}) { defer close(errCh) @@ -269,7 +242,7 @@ func StartTestAggregatedServer(t Logger, kubeconfig *rest.Config, instanceOption if instanceOptions.StorageVersionWrapFunc != nil { // We hardcode the param instead of having a new instanceOptions field // to avoid confusing users with more options. - storageVersionCheck := fmt.Sprintf("poststarthook/%s", apiserver.StorageVersionPostStartHookName) + storageVersionCheck := fmt.Sprintf("poststarthook/%s-%s", sserver.StorageVersionPostStartHookName, sserver.KubeAggregatedAPIServer) req.Param("exclude", storageVersionCheck) } @@ -286,7 +259,6 @@ func StartTestAggregatedServer(t Logger, kubeconfig *rest.Config, instanceOption } // from here the caller must call tearDown - result.ClientConfig = server.GenericAPIServer.LoopbackClientConfig result.ServerOpts = aggregatedServerOptions result.TearDownFn = tearDown diff --git a/staging/src/k8s.io/sample-apiserver/test/integration/fixtures/testdata/localhost_127.0.0.1_localhost.crt b/staging/src/k8s.io/sample-apiserver/test/integration/fixtures/testdata/localhost_127.0.0.1_localhost.crt new file mode 100644 index 000000000000..0d59f36991f8 --- /dev/null +++ b/staging/src/k8s.io/sample-apiserver/test/integration/fixtures/testdata/localhost_127.0.0.1_localhost.crt @@ -0,0 +1,39 @@ +-----BEGIN CERTIFICATE----- +MIIDQjCCAiqgAwIBAgIIA4kIUQtc6P0wDQYJKoZIhvcNAQELBQAwIjEgMB4GA1UE +AwwXbG9jYWxob3N0LWNhQDE3MDczNTM1NTkwIBcNMjQwMjA3MjM1MjM5WhgPMjEy +NDAxMTQyMzUyMzlaMB8xHTAbBgNVBAMMFGxvY2FsaG9zdEAxNzA3MzUzNTU5MIIB +IjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEApLZ825W8Ip41HsFf8UA5tb+9 +JjAnjQ+Qn05oyqD92g1YjhjQccP3/vegX5PitU0OJ8B4JfNO128AJFAYdPB8DndP +7Do9G3JKL4tOdbSm0Q+NXbddY4OKNZtrKkmge1+FMLjAK8L81S1WRVBcTkWBevf2 +vwamg4SiMBn4O9tLwyFUHh0A5zE+U2Iy9WdXVEUo/BjyySxpItAjs2Fe22uc4VNE +sWupudAobidXRrvlV5MMcDLfKK9iqz5Z9In1WSexrkGIL0er9Cm116BsDRzdPqHl +yoQM6NI4hlZUKcOkug2eOGBwEyXjtSVcvY8F1N662+1mNQop7ql+29rJ5M7RmwID +AQABo30wezAOBgNVHQ8BAf8EBAMCBaAwEwYDVR0lBAwwCgYIKwYBBQUHAwEwDAYD +VR0TAQH/BAIwADAfBgNVHSMEGDAWgBTYql9WcZqGXacRCWxKo+V5o3sPhDAlBgNV +HREEHjAcgglsb2NhbGhvc3SCCWxvY2FsaG9zdIcEfwAAATANBgkqhkiG9w0BAQsF +AAOCAQEAttCKjapfEyNLbVsFA2rbfgNZgHJ02HeM+JRUBMZIbXf95mTQTKsCpmRi +udBrbGSgVVGQysPAI2Q14OpYaPvT8fnOjWEFLSCrroBIQoHeH4BRdZlL8m1TQmQH +z5S8nMBkKsj0+7UAO/cNElHvDh6KB1cUq3W1ef6xqrw8ANfqT1CLMjzOya+OitLS +9NHeajOMQH5bqhUnb1Pu4AeJ1lnOeu5LORwq9y0kbfRP1OdUWQDXzUehvIU+vLkC +mE+V6a/6K+Y3N6Unc/uB/S2ln9s7/NQLYikfprc9LW5v6nK5k7KWKR1NR5KhPffD +7MZO82B0JkzhPzNzVhQBA7QRLmHi6g== +-----END CERTIFICATE----- +-----BEGIN CERTIFICATE----- +MIIDCjCCAfKgAwIBAgIIHDiIpN+lfhowDQYJKoZIhvcNAQELBQAwIjEgMB4GA1UE +AwwXbG9jYWxob3N0LWNhQDE3MDczNTM1NTkwIBcNMjQwMjA3MjM1MjM5WhgPMjEy +NDAxMTQyMzUyMzlaMCIxIDAeBgNVBAMMF2xvY2FsaG9zdC1jYUAxNzA3MzUzNTU5 +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1C6b22YoZnF88Dq61Vwg +PQ1rAwg25A+QbHzGXkbrTslGXsr8ys7AClj2IDDw2j7KjKxHRztiUtXUDGYgSMom +Pbn7lTEFTXHZznx/3jBf+0UGEm5HB47Xby2/krkAoIsDk/1szznE0mUur0pTOjry +yH6tRw+U8PMY5WF1I+r3Kc4AKDXJp2AMu0hbC/oGCPhTB6A4VFOfyrY12C9yCvDx +9nG+OHy5UOFgzAuzAi/v9jRwrGP6xi6DWbISTRRvPOxrQgbdLbUxh5fQg1cPqFae +HuooMyRqIGRpuKsgDQnosviurKXScuSABH14WL0cskjCOH94NsF6PF3XaIrYbbXi +gQIDAQABo0IwQDAOBgNVHQ8BAf8EBAMCAqQwDwYDVR0TAQH/BAUwAwEB/zAdBgNV +HQ4EFgQU2KpfVnGahl2nEQlsSqPleaN7D4QwDQYJKoZIhvcNAQELBQADggEBAC0p +0se3OYkhxYIKxUGzh/0EPLyfZeZyLQUgCP2uVBlPK2R4LLF5hWzHrcDQvJoHu3eh +Hnmg9/HYMxENl+a9J9E+v5mLFRWeb++fp1tecjKUI366tZHiSU4KAM/r69uuRq7E +O431VlfXjdM2YM7u4ApvTSadf4xewcpS9kfQCe/RSJU1FmslK/HvwTDsfmhbG9Wr +4JKYPjaQ00+xo/E1ocv6Ki4Nu29aOF8pf4JHi5njOx7WWMweS0eLEj2k6vNMGVDj +EPjDUJGyWeplIEFSlrVpuJVNoWf3/XyzBxoRTv8DLCTu9HYdXPVLBqNnQvK90TS9 +6YUyN0czF5v/BosDpT0= +-----END CERTIFICATE----- diff --git a/staging/src/k8s.io/sample-apiserver/test/integration/fixtures/testdata/localhost_127.0.0.1_localhost.key b/staging/src/k8s.io/sample-apiserver/test/integration/fixtures/testdata/localhost_127.0.0.1_localhost.key new file mode 100644 index 000000000000..a65775829fd2 --- /dev/null +++ b/staging/src/k8s.io/sample-apiserver/test/integration/fixtures/testdata/localhost_127.0.0.1_localhost.key @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEogIBAAKCAQEApLZ825W8Ip41HsFf8UA5tb+9JjAnjQ+Qn05oyqD92g1YjhjQ +ccP3/vegX5PitU0OJ8B4JfNO128AJFAYdPB8DndP7Do9G3JKL4tOdbSm0Q+NXbdd +Y4OKNZtrKkmge1+FMLjAK8L81S1WRVBcTkWBevf2vwamg4SiMBn4O9tLwyFUHh0A +5zE+U2Iy9WdXVEUo/BjyySxpItAjs2Fe22uc4VNEsWupudAobidXRrvlV5MMcDLf +KK9iqz5Z9In1WSexrkGIL0er9Cm116BsDRzdPqHlyoQM6NI4hlZUKcOkug2eOGBw +EyXjtSVcvY8F1N662+1mNQop7ql+29rJ5M7RmwIDAQABAoIBACgU8FVPHrUd4rdV +P0+p3WFJA3JjjXxaKUta/U0x2BRT4xTRHQhSM+srvy8DAcw0sBkkURiLGMh8DoDH +rOvoCYhLqHtHwM8JBNyE/dbO2zlMm44OrP8mZ1+cLnvf5tme6P5uNvVvQa9ulOvR +HmU9rMCLztO1fpfKkt05tJp8Rvd4pMPET4PFslBotFN8BjueKBSicGgQTKHCGJge +Lkb/lxNVjQ6545pdnSmgJZYgxNON4i7NCtGjFoSYaqqtwb3soaaKjNNri3trb5e/ +2hlum1ZnrMROmBiAaZqlZBsyfZWyMdh4sUKFhfZv4Lx1cw2jiqYMPs0FKXN0Dz/Z +JfyF0uECgYEA1kXs+TjXhc904Pu3gEnWeApuL2zvIW8ioYqdLGXhjS6sQBRHvcv8 +7ToLhmq6Vlh47JqeFpEmGttv2rjgHznZWtyZd8FXPpFns1DP2mfE9qj9Tz9tASnu +nYC4RonyX1mU9vYj5i7cAUX2pKlwGzM877h89UPqyzB68qXq5K+Vg5cCgYEAxMna +EOz/qq06nZwxg5NBPH0m7Q5HWYTTawIyVztWFU0g4Tr7bsg+VsLjQrqk26gOSDXo +GbVtSHV7bD3nZ+GQOVxHogXqR+ePC0Wm3il5NnGjQMLE6qDlPb3plSjMDrt1YtVt +HefGgXrWxDVo9PvNtSdPO06s2KpRLU/OiqWXkp0CgYA+jVNMvidMzO9V7qX3MZcG +nCTme2qj3AnEFu8jUuqxiHqIimMXKUHBDzzmFKopItLKX6je9aimFoFk3Kuw8fZd +7CBxMWQCw/f+BQ1ouPpq95JbXCy6Nj+Og1FIZmT3KPOv1NNOE0fupzoFf+FWT3Lu +mHECOL2Ga7S4uW8FK6z+KQKBgFggYRS1XeVsQFS2zwGyX0OcdGMywscXEarZR/wC +qZFOggv7YwAAktI94pJOjWeFAihaY7+nnPIXEpetvUSrIfBeIAIyDfH3g/qzTYAu +KIc2KrrkvlSoiyC0PLe8lm1+i5ogJAaBdkcj4Kvrkz4qybt3iCbyG7vHwiS9maxD +bfaNAoGAf8sqzSxhcQ2lBn4VSiQE+xJ6nBdrmoEjKxEpZXFX74ysc/Tt2R+eOtsA +pFkB0ySF3DyJ6BkZ6XmmHz4ABLydy8epoGmlVWEZukcghjlLHQiaMWGU09OoLCDM +sXHqTKjTp68STCwDuHT9OymP7yj43KnakW0V82VxOE0nQEqq2PA= +-----END RSA PRIVATE KEY----- diff --git a/test/e2e/apimachinery/aggregator.go b/test/e2e/apimachinery/aggregator.go index 529c3353228b..cb0f90029bcd 100644 --- a/test/e2e/apimachinery/aggregator.go +++ b/test/e2e/apimachinery/aggregator.go @@ -726,6 +726,12 @@ func TestSampleAPIServer(ctx context.Context, f *framework.Framework, aggrclient apiServiceLabelSelector := labels.SelectorFromSet(updatedApiService.Labels).String() ginkgo.By(fmt.Sprintf("APIService deleteCollection with labelSelector: %q", apiServiceLabelSelector)) + ginkgo.By("List StorageVersions") + listStorageVersions, err := client.InternalV1alpha1().StorageVersions().List(ctx, metav1.ListOptions{}) + framework.Logf("Neww RICHAAA Found these SVs %s", listStorageVersions) + + framework.ExpectNoError(err, "No response for /apis/internal.apiserver.k8s.io/v1alpha1 Error: %v", err) + err = aggrclient.ApiregistrationV1().APIServices().DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: apiServiceLabelSelector}) diff --git a/test/integration/storageversion/storage_version_aggregated_apiserver_test.go b/test/integration/storageversion/storage_version_aggregated_apiserver_test.go index adfb03c61a72..93dbb75d5d6e 100644 --- a/test/integration/storageversion/storage_version_aggregated_apiserver_test.go +++ b/test/integration/storageversion/storage_version_aggregated_apiserver_test.go @@ -70,6 +70,9 @@ func TestStorageVersionAPI(t *testing.T) { StorageVersionWrapFunc: storageVersionManagerWrapperFunc(storageVersionManagerConfigAggregatedServer), }, ) + if err != nil { + t.Fatal(err) + } defer tearDown() signalStorageVersionUpdate(storageVersionManagerConfigAggregatedServer)