Skip to content

Commit

Permalink
Move SV update poststarthook to genericapiserver
Browse files Browse the repository at this point in the history
  • Loading branch information
richabanker committed Feb 10, 2024
1 parent 986c5ae commit f413e46
Show file tree
Hide file tree
Showing 19 changed files with 199 additions and 110 deletions.
2 changes: 2 additions & 0 deletions cmd/kube-apiserver/app/server.go
Expand Up @@ -36,6 +36,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/admission"
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
"k8s.io/apiserver/pkg/server"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/egressselector"
utilfeature "k8s.io/apiserver/pkg/util/feature"
Expand Down Expand Up @@ -216,6 +217,7 @@ func CreateKubeAPIServerConfig(opts options.CompletedOptions) (
proxyTransport := CreateProxyTransport()

genericConfig, versionedInformers, storageFactory, err := controlplaneapiserver.BuildGenericConfig(
server.KubeAPIServer,
opts.CompletedOptions,
[]*runtime.Scheme{legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme},
generatedopenapi.GetOpenAPIDefinitions,
Expand Down
7 changes: 5 additions & 2 deletions cmd/kube-apiserver/app/testing/testserver.go
Expand Up @@ -43,6 +43,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
sserver "k8s.io/apiserver/pkg/server"
serveroptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storageversion"
Expand All @@ -54,7 +55,6 @@ import (
"k8s.io/client-go/util/keyutil"
logsapi "k8s.io/component-base/logs/api/v1"
"k8s.io/klog/v2"
"k8s.io/kube-aggregator/pkg/apiserver"
"k8s.io/kubernetes/pkg/features"

"k8s.io/kubernetes/cmd/kube-apiserver/app"
Expand Down Expand Up @@ -391,8 +391,11 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
if instanceOptions.StorageVersionWrapFunc != nil {
// We hardcode the param instead of having a new instanceOptions field
// to avoid confusing users with more options.
storageVersionCheck := fmt.Sprintf("poststarthook/%s", apiserver.StorageVersionPostStartHookName)
storageVersionCheck := fmt.Sprintf("poststarthook/%s-%s", sserver.StorageVersionPostStartHookName, sserver.KubeAPIServer)
req.Param("exclude", storageVersionCheck)
storageVersionCheck = fmt.Sprintf("poststarthook/%s-%s", sserver.StorageVersionPostStartHookName, sserver.KubeAggregator)
req.Param("exclude", storageVersionCheck)

}
result := req.Do(context.TODO())
status := 0
Expand Down
3 changes: 2 additions & 1 deletion pkg/controlplane/apiserver/config.go
Expand Up @@ -55,6 +55,7 @@ import (

// BuildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it
func BuildGenericConfig(
name string,
s controlplaneapiserver.CompletedOptions,
schemes []*runtime.Scheme,
getOpenAPIDefinitions func(ref openapicommon.ReferenceCallback) map[string]openapicommon.OpenAPIDefinition,
Expand All @@ -65,7 +66,7 @@ func BuildGenericConfig(

lastErr error,
) {
genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs, name)
genericConfig.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource()

if lastErr = s.GenericServerRunOptions.ApplyTo(genericConfig); lastErr != nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/controlplane/instance.go
Expand Up @@ -63,6 +63,7 @@ import (
apiserverfeatures "k8s.io/apiserver/pkg/features"
peerreconcilers "k8s.io/apiserver/pkg/reconcilers"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/server"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
serverstorage "k8s.io/apiserver/pkg/server/storage"
Expand Down Expand Up @@ -312,7 +313,7 @@ func (c *Config) createEndpointReconciler() reconcilers.EndpointReconciler {
// Complete fills in any fields not set that are required to have valid data. It's mutating the receiver.
func (c *Config) Complete() CompletedConfig {
cfg := completedConfig{
c.GenericConfig.Complete(c.ExtraConfig.VersionedInformers),
c.GenericConfig.Complete(c.ExtraConfig.VersionedInformers, nil),
&c.ExtraConfig,
}

Expand Down Expand Up @@ -369,7 +370,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
return nil, fmt.Errorf("Master.New() called with empty config.KubeletClientConfig")
}

s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
s, err := c.GenericConfig.New(server.KubeAPIServer, delegationTarget)
if err != nil {
return nil, err
}
Expand Down
Expand Up @@ -45,6 +45,7 @@ import (
"k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
genericregistry "k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/server"
genericapiserver "k8s.io/apiserver/pkg/server"
serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/util/webhook"
Expand Down Expand Up @@ -130,7 +131,7 @@ func (cfg *Config) Complete() CompletedConfig {

// New returns a new instance of CustomResourceDefinitions from the given config.
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
genericServer, err := c.GenericConfig.New(server.KubeAPIExtensionsAPIServer, delegationTarget)
if err != nil {
return nil, err
}
Expand Down
Expand Up @@ -34,6 +34,7 @@ import (
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
genericregistry "k8s.io/apiserver/pkg/registry/generic"
genericapiserver "k8s.io/apiserver/pkg/server"
server "k8s.io/apiserver/pkg/server"
genericoptions "k8s.io/apiserver/pkg/server/options"
storagevalue "k8s.io/apiserver/pkg/storage/value"
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
Expand Down Expand Up @@ -102,7 +103,7 @@ func (o CustomResourceDefinitionsServerOptions) Config() (*apiserver.Config, err
return nil, fmt.Errorf("error creating self-signed certificates: %v", err)
}

serverConfig := genericapiserver.NewRecommendedConfig(apiserver.Codecs)
serverConfig := genericapiserver.NewRecommendedConfig(apiserver.Codecs, server.KubeAPIExtensionsAPIServer)
if err := o.ServerRunOptions.ApplyTo(&serverConfig.Config); err != nil {
return nil, err
}
Expand Down
Expand Up @@ -135,7 +135,9 @@ func StartTestServer(t Logger, _ *TestServerInstanceOptions, customFlags []strin
}
s.APIEnablement.RuntimeConfig.Set("api/all=true")

fs.Parse(customFlags)
if err := fs.Parse(customFlags); err != nil {
return result, err
}

if err := s.Complete(); err != nil {
return result, fmt.Errorf("failed to set default options: %v", err)
Expand Down
99 changes: 89 additions & 10 deletions staging/src/k8s.io/apiserver/pkg/server/config.go
Expand Up @@ -33,15 +33,17 @@ import (
"time"

jsonpatch "github.com/evanphx/json-patch"
"github.com/google/uuid"
"golang.org/x/crypto/cryptobyte"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup"
"k8s.io/apimachinery/pkg/version"
"k8s.io/apiserver/pkg/admission"
Expand Down Expand Up @@ -71,9 +73,10 @@ import (
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/component-base/logs"
"k8s.io/component-base/metrics/features"
metricsfeatures "k8s.io/component-base/metrics/features"
"k8s.io/component-base/metrics/prometheus/slis"
"k8s.io/component-base/tracing"
"k8s.io/klog/v2"
Expand All @@ -100,6 +103,13 @@ const (

// APIGroupPrefix is where non-legacy API group will be located.
APIGroupPrefix = "/apis"

StorageVersionPostStartHookName = "built-in-resources-storage-version-updater"

KubeAPIServer = "kube-apiserver"
KubeAggregator = "kube-aggregator"
KubeAggregatedAPIServer = "kube-aggregated-apiserver"
KubeAPIExtensionsAPIServer = "kube-apiextensions-apiserver"
)

// Config is a structure used to configure a GenericAPIServer.
Expand All @@ -119,6 +129,10 @@ type Config struct {
// TODO: move into SecureServing(WithLoopback) as soon as insecure serving is gone
LoopbackClientConfig *restclient.Config

// ClientConfig is a config for a loopback connection to the main kube-apiserver from an aggregated apiserver.
// This is an optional field.
ClientConfig *restclient.Config

// EgressSelector provides a lookup mechanism for dialing outbound connections.
// It does so based on a EgressSelectorConfiguration which was read at startup.
EgressSelector *egressselector.EgressSelector
Expand Down Expand Up @@ -367,11 +381,11 @@ type AuthorizationInfo struct {
}

func init() {
utilruntime.Must(features.AddFeatureGates(utilfeature.DefaultMutableFeatureGate))
utilruntime.Must(metricsfeatures.AddFeatureGates(utilfeature.DefaultMutableFeatureGate))
}

// NewConfig returns a Config struct with the default values
func NewConfig(codecs serializer.CodecFactory) *Config {
func NewConfig(codecs serializer.CodecFactory, name string) *Config {
defaultHealthChecks := []healthz.HealthChecker{healthz.PingHealthz, healthz.LogHealthz}
var id string
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) {
Expand All @@ -389,7 +403,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
b.AddBytes([]byte(hostname))
})
b.AddUint16LengthPrefixed(func(b *cryptobyte.Builder) {
b.AddBytes([]byte("kube-apiserver"))
b.AddBytes([]byte(name))
})
hashData, err := b.Bytes()
if err != nil {
Expand Down Expand Up @@ -455,9 +469,9 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
}

// NewRecommendedConfig returns a RecommendedConfig struct with the default values
func NewRecommendedConfig(codecs serializer.CodecFactory) *RecommendedConfig {
func NewRecommendedConfig(codecs serializer.CodecFactory, name string) *RecommendedConfig {
return &RecommendedConfig{
Config: *NewConfig(codecs),
Config: *NewConfig(codecs, name),
}
}

Expand Down Expand Up @@ -670,7 +684,7 @@ func (c *Config) DrainedNotify() <-chan struct{} {

// Complete fills in any fields not set that are required to have valid data and can be derived
// from other fields. If you're going to `ApplyOptions`, do that first. It's mutating the receiver.
func (c *Config) Complete(informers informers.SharedInformerFactory) CompletedConfig {
func (c *Config) Complete(informers informers.SharedInformerFactory, clientConfig *restclient.Config) CompletedConfig {
if len(c.ExternalAddress) == 0 && c.PublicAddress != nil {
c.ExternalAddress = c.PublicAddress.String()
}
Expand Down Expand Up @@ -715,13 +729,17 @@ func (c *Config) Complete(informers informers.SharedInformerFactory) CompletedCo
}
}

if c.ClientConfig == nil {
c.ClientConfig = clientConfig
}

return CompletedConfig{&completedConfig{c, informers}}
}

// Complete fills in any fields not set that are required to have valid data and can be derived
// from other fields. If you're going to `ApplyOptions`, do that first. It's mutating the receiver.
func (c *RecommendedConfig) Complete() CompletedConfig {
return c.Config.Complete(c.SharedInformerFactory)
return c.Config.Complete(c.SharedInformerFactory, c.ClientConfig)
}

var allowedMediaTypes = []string{
Expand Down Expand Up @@ -934,6 +952,67 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
}
}

if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) &&
utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) {
// Spawn a goroutine in aggregator apiserver to update storage version for
// all built-in resources
if !s.isPostStartHookRegistered(fmt.Sprintf("%s-%s", StorageVersionPostStartHookName, name)) {
s.AddPostStartHookOrDie(fmt.Sprintf("%s-%s", StorageVersionPostStartHookName, name), func(hookContext PostStartHookContext) error {
// Wait for apiserver-identity to exist first before updating storage
// versions, to avoid storage version GC accidentally garbage-collecting
// storage versions.
kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)
if err != nil {
return err
}

// TODO: checking for identity leases this way seems not ideal. Wonder if need to have the lease controller moved here as well.
// Meaning: do we want identity leases for aggregated-apiservers / extension apiservers?
if name != KubeAggregatedAPIServer {
if err := wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
_, err := kubeClient.CoordinationV1().Leases(metav1.NamespaceSystem).Get(
context.TODO(), s.APIServerID, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return false, nil
}
if err != nil {
return false, err
}
return true, nil
}, hookContext.StopCh); err != nil {
return fmt.Errorf("failed to wait for apiserver-identity lease %s to be created: %v",
s.APIServerID, err)
}
}
clientConfig := hookContext.LoopbackClientConfig
if c.ClientConfig != nil {
clientConfig = c.ClientConfig
}
// Technically an apiserver only needs to update storage version once during bootstrap.
// Reconcile StorageVersion objects every 10 minutes will help in the case that the
// StorageVersion objects get accidentally modified/deleted by a different agent. In that
// case, the reconciliation ensures future storage migration still works. If nothing gets
// changed, the reconciliation update is a noop and gets short-circuited by the apiserver,
// therefore won't change the resource version and trigger storage migration.
go wait.PollImmediateUntil(10*time.Minute, func() (bool, error) {
// All apiservers (aggregator-apiserver, kube-apiserver, apiextensions-apiserver)
// share the same generic apiserver config. The same StorageVersion manager is used
// to register all built-in resources when the generic apiservers install APIs.
s.StorageVersionManager.UpdateStorageVersions(clientConfig, s.APIServerID)
return false, nil
}, hookContext.StopCh)
// Once the storage version updater finishes the first round of update,
// the PostStartHook will return to unblock /healthz. The handler chain
// won't block write requests anymore. Check every second since it's not
// expensive.
wait.PollImmediateUntil(1*time.Second, func() (bool, error) {
return s.StorageVersionManager.Completed(), nil
}, hookContext.StopCh)
return nil
})
}
}

for _, delegateCheck := range delegationTarget.HealthzChecks() {
skip := false
for _, existingCheck := range c.HealthzChecks {
Expand Down Expand Up @@ -1136,7 +1215,7 @@ func AuthorizeClientBearerToken(loopback *restclient.Config, authn *Authenticati
}

privilegedLoopbackToken := loopback.BearerToken
var uid = uuid.New().String()
var uid = string(uuid.NewUUID())
tokens := make(map[string]*user.DefaultInfo)
tokens[privilegedLoopbackToken] = &user.DefaultInfo{
Name: user.APIServerUser,
Expand Down
2 changes: 1 addition & 1 deletion staging/src/k8s.io/apiserver/pkg/server/options/feature.go
Expand Up @@ -36,7 +36,7 @@ type FeatureOptions struct {
}

func NewFeatureOptions() *FeatureOptions {
defaults := server.NewConfig(serializer.CodecFactory{})
defaults := server.NewConfig(serializer.CodecFactory{}, server.KubeAPIServer)

return &FeatureOptions{
EnableProfiling: defaults.EnableProfiling,
Expand Down
Expand Up @@ -92,7 +92,7 @@ type ServerRunOptions struct {
}

func NewServerRunOptions() *ServerRunOptions {
defaults := server.NewConfig(serializer.CodecFactory{})
defaults := server.NewConfig(serializer.CodecFactory{}, server.KubeAPIServer)
return &ServerRunOptions{
MaxRequestsInFlight: defaults.MaxRequestsInFlight,
MaxMutatingRequestsInFlight: defaults.MaxMutatingRequestsInFlight,
Expand Down

0 comments on commit f413e46

Please sign in to comment.