From c252ebe50cc5cabd34789c069d3b081b03125227 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Fri, 26 Apr 2024 11:44:21 +0200 Subject: [PATCH] controlplane/apiserver: move peer proxy code to allow aggregator construction This reverts commit 0bb6ff5e454cfc99c0b611e22162391eaa31e103. Signed-off-by: Dr. Stefan Schimanski --- cmd/kube-apiserver/app/config.go | 2 +- cmd/kube-apiserver/app/server.go | 6 +- pkg/controlplane/apiserver/completion.go | 4 ++ pkg/controlplane/apiserver/config.go | 14 ++++ pkg/controlplane/apiserver/peer.go | 89 ++++++++++++++++++++++++ pkg/controlplane/instance.go | 78 +++------------------ 6 files changed, 122 insertions(+), 71 deletions(-) create mode 100644 pkg/controlplane/apiserver/peer.go diff --git a/cmd/kube-apiserver/app/config.go b/cmd/kube-apiserver/app/config.go index 931bfff5de0f..7a67875cb604 100644 --- a/cmd/kube-apiserver/app/config.go +++ b/cmd/kube-apiserver/app/config.go @@ -84,7 +84,7 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) { } c.ApiExtensions = apiExtensions - aggregator, err := createAggregatorConfig(*kubeAPIs.ControlPlane.Generic, opts.CompletedOptions, kubeAPIs.ControlPlane.VersionedInformers, serviceResolver, kubeAPIs.ControlPlane.ProxyTransport, kubeAPIs.Extra.PeerProxy, pluginInitializer) + aggregator, err := createAggregatorConfig(*kubeAPIs.ControlPlane.Generic, opts.CompletedOptions, kubeAPIs.ControlPlane.VersionedInformers, serviceResolver, kubeAPIs.ControlPlane.ProxyTransport, kubeAPIs.ControlPlane.Extra.PeerProxy, pluginInitializer) if err != nil { return nil, err } diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 58b893312d4e..c37c778b220c 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -265,14 +265,14 @@ func CreateKubeAPIServerConfig(opts options.CompletedOptions) ( } if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) { - config.Extra.PeerEndpointLeaseReconciler, err = controlplane.CreatePeerEndpointLeaseReconciler(*genericConfig, storageFactory) + config.ControlPlane.PeerEndpointLeaseReconciler, err = controlplaneapiserver.CreatePeerEndpointLeaseReconciler(*genericConfig, storageFactory) if err != nil { return nil, nil, nil, err } // build peer proxy config only if peer ca file exists if opts.PeerCAFile != "" { - config.Extra.PeerProxy, err = controlplane.BuildPeerProxy(versionedInformers, genericConfig.StorageVersionManager, opts.ProxyClientCertFile, - opts.ProxyClientKeyFile, opts.PeerCAFile, opts.PeerAdvertiseAddress, genericConfig.APIServerID, config.Extra.PeerEndpointLeaseReconciler, config.ControlPlane.Generic.Serializer) + config.ControlPlane.PeerProxy, err = controlplaneapiserver.BuildPeerProxy(versionedInformers, genericConfig.StorageVersionManager, opts.ProxyClientCertFile, + opts.ProxyClientKeyFile, opts.PeerCAFile, opts.PeerAdvertiseAddress, genericConfig.APIServerID, config.ControlPlane.Extra.PeerEndpointLeaseReconciler, config.ControlPlane.Generic.Serializer) if err != nil { return nil, nil, nil, err } diff --git a/pkg/controlplane/apiserver/completion.go b/pkg/controlplane/apiserver/completion.go index e679f3f20441..7b44c3ae295f 100644 --- a/pkg/controlplane/apiserver/completion.go +++ b/pkg/controlplane/apiserver/completion.go @@ -40,5 +40,9 @@ func (c *Config) Complete() CompletedConfig { discoveryAddresses := discovery.DefaultAddresses{DefaultAddress: cfg.Generic.ExternalAddress} cfg.Generic.DiscoveryAddresses = discoveryAddresses + if cfg.Extra.PeerEndpointReconcileInterval == 0 { + cfg.Extra.PeerEndpointReconcileInterval = DefaultPeerEndpointReconcileInterval + } + return CompletedConfig{&cfg} } diff --git a/pkg/controlplane/apiserver/config.go b/pkg/controlplane/apiserver/config.go index 437198189149..c28601c5728b 100644 --- a/pkg/controlplane/apiserver/config.go +++ b/pkg/controlplane/apiserver/config.go @@ -31,12 +31,14 @@ import ( "k8s.io/apiserver/pkg/endpoints/discovery/aggregated" openapinamer "k8s.io/apiserver/pkg/endpoints/openapi" genericfeatures "k8s.io/apiserver/pkg/features" + peerreconcilers "k8s.io/apiserver/pkg/reconcilers" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/egressselector" "k8s.io/apiserver/pkg/server/filters" serverstorage "k8s.io/apiserver/pkg/server/storage" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/apiserver/pkg/util/openapi" + utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy" clientgoinformers "k8s.io/client-go/informers" clientgoclientset "k8s.io/client-go/kubernetes" "k8s.io/component-base/version" @@ -67,6 +69,18 @@ type Extra struct { EnableLogsSupport bool ProxyTransport *http.Transport + // PeerProxy, if not nil, sets proxy transport between kube-apiserver peers for requests + // that can not be served locally + PeerProxy utilpeerproxy.Interface + // PeerEndpointReconcileInterval defines how often the endpoint leases are reconciled in etcd. + PeerEndpointReconcileInterval time.Duration + // PeerEndpointLeaseReconciler updates the peer endpoint leases + PeerEndpointLeaseReconciler peerreconcilers.PeerEndpointLeaseReconciler + // PeerAdvertiseAddress is the IP for this kube-apiserver which is used by peer apiservers to route a request + // to this apiserver. This happens in cases where the peer is not able to serve the request due to + // version skew. If unset, AdvertiseAddress/BindAddress will be used. + PeerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress + ServiceAccountIssuer serviceaccount.TokenGenerator ServiceAccountMaxExpiration time.Duration ExtendExpiration bool diff --git a/pkg/controlplane/apiserver/peer.go b/pkg/controlplane/apiserver/peer.go new file mode 100644 index 000000000000..1d4d58441f95 --- /dev/null +++ b/pkg/controlplane/apiserver/peer.go @@ -0,0 +1,89 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "fmt" + "time" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apiserver/pkg/reconcilers" + genericapiserver "k8s.io/apiserver/pkg/server" + serverstorage "k8s.io/apiserver/pkg/server/storage" + "k8s.io/apiserver/pkg/storageversion" + utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy" + clientgoinformers "k8s.io/client-go/informers" + "k8s.io/client-go/transport" + "k8s.io/klog/v2" + api "k8s.io/kubernetes/pkg/apis/core" +) + +const ( + // DefaultPeerEndpointReconcileInterval is the default amount of time for how often + // the peer endpoint leases are reconciled. + DefaultPeerEndpointReconcileInterval = 10 * time.Second + // DefaultPeerEndpointReconcilerTTL is the default TTL timeout for peer endpoint + // leases on the storage layer + DefaultPeerEndpointReconcilerTTL = 15 * time.Second +) + +func BuildPeerProxy(versionedInformer clientgoinformers.SharedInformerFactory, svm storageversion.Manager, + proxyClientCertFile string, proxyClientKeyFile string, peerCAFile string, peerAdvertiseAddress reconcilers.PeerAdvertiseAddress, + apiServerID string, reconciler reconcilers.PeerEndpointLeaseReconciler, serializer runtime.NegotiatedSerializer) (utilpeerproxy.Interface, error) { + if proxyClientCertFile == "" { + return nil, fmt.Errorf("error building peer proxy handler, proxy-cert-file not specified") + } + if proxyClientKeyFile == "" { + return nil, fmt.Errorf("error building peer proxy handler, proxy-key-file not specified") + } + // create proxy client config + clientConfig := &transport.Config{ + TLS: transport.TLSConfig{ + Insecure: false, + CertFile: proxyClientCertFile, + KeyFile: proxyClientKeyFile, + CAFile: peerCAFile, + ServerName: "kubernetes.default.svc", + }} + + // build proxy transport + proxyRoundTripper, transportBuildingError := transport.New(clientConfig) + if transportBuildingError != nil { + klog.Error(transportBuildingError.Error()) + return nil, transportBuildingError + } + return utilpeerproxy.NewPeerProxyHandler( + versionedInformer, + svm, + proxyRoundTripper, + apiServerID, + reconciler, + serializer, + ), nil +} + +// CreatePeerEndpointLeaseReconciler creates a apiserver endpoint lease reconciliation loop +// The peer endpoint leases are used to find network locations of apiservers for peer proxy +func CreatePeerEndpointLeaseReconciler(c genericapiserver.Config, storageFactory serverstorage.StorageFactory) (reconcilers.PeerEndpointLeaseReconciler, error) { + ttl := DefaultPeerEndpointReconcilerTTL + config, err := storageFactory.NewConfig(api.Resource("apiServerPeerIPInfo")) + if err != nil { + return nil, fmt.Errorf("error creating storage factory config: %w", err) + } + reconciler, err := reconcilers.NewPeerEndpointLeaseReconciler(config, "/peerserverleases/", ttl) + return reconciler, err +} diff --git a/pkg/controlplane/instance.go b/pkg/controlplane/instance.go index 4089fdcd40c4..f94af8aec638 100644 --- a/pkg/controlplane/instance.go +++ b/pkg/controlplane/instance.go @@ -54,7 +54,6 @@ import ( storageapiv1beta1 "k8s.io/api/storage/v1beta1" svmv1alpha1 "k8s.io/api/storagemigration/v1alpha1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - kruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/runtime" @@ -67,14 +66,10 @@ import ( genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/dynamiccertificates" serverstorage "k8s.io/apiserver/pkg/server/storage" - "k8s.io/apiserver/pkg/storageversion" utilfeature "k8s.io/apiserver/pkg/util/feature" - utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy" - clientgoinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" discoveryclient "k8s.io/client-go/kubernetes/typed/discovery/v1" - "k8s.io/client-go/transport" "k8s.io/component-helpers/apimachinery/lease" "k8s.io/klog/v2" api "k8s.io/kubernetes/pkg/apis/core" @@ -157,16 +152,6 @@ type Extra struct { EndpointReconcilerConfig EndpointReconcilerConfig KubeletClientConfig kubeletclient.KubeletClientConfig - // PeerProxy, if not nil, sets proxy transport between kube-apiserver peers for requests - // that can not be served locally - PeerProxy utilpeerproxy.Interface - // PeerEndpointLeaseReconciler updates the peer endpoint leases - PeerEndpointLeaseReconciler peerreconcilers.PeerEndpointLeaseReconciler - // PeerAdvertiseAddress is the IP for this kube-apiserver which is used by peer apiservers to route a request - // to this apiserver. This happens in cases where the peer is not able to serve the request due to - // version skew. If unset, AdvertiseAddress/BindAddress will be used. - PeerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress - // Values to build the IP addresses used by discovery // The range of IPs to be assigned to services with type=ClusterIP or greater ServiceIPRange net.IPNet @@ -290,6 +275,12 @@ func (c *Config) createEndpointReconciler() reconcilers.EndpointReconciler { // Complete fills in any fields not set that are required to have valid data. It's mutating the receiver. func (c *Config) Complete() CompletedConfig { + if c.ControlPlane.PeerEndpointReconcileInterval == 0 && c.EndpointReconcilerConfig.Interval != 0 { + // default this to the endpoint reconciler value before the generic + // controlplane completion can kick in + c.ControlPlane.PeerEndpointReconcileInterval = c.EndpointReconcilerConfig.Interval + } + cfg := completedConfig{ c.ControlPlane.Complete(), &c.Extra, @@ -508,11 +499,11 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget) } if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) { - peeraddress := getPeerAddress(c.Extra.PeerAdvertiseAddress, c.ControlPlane.Generic.PublicAddress, publicServicePort) + peeraddress := getPeerAddress(c.ControlPlane.Extra.PeerAdvertiseAddress, c.ControlPlane.Generic.PublicAddress, publicServicePort) peerEndpointCtrl := peerreconcilers.New( c.ControlPlane.Generic.APIServerID, peeraddress, - c.Extra.PeerEndpointLeaseReconciler, + c.ControlPlane.Extra.PeerEndpointLeaseReconciler, c.Extra.EndpointReconcilerConfig.Interval, client) if err != nil { @@ -529,9 +520,9 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget) return nil }) // Add PostStartHooks for Unknown Version Proxy filter. - if c.Extra.PeerProxy != nil { + if c.ControlPlane.Extra.PeerProxy != nil { m.GenericAPIServer.AddPostStartHookOrDie("unknown-version-proxy-filter", func(context genericapiserver.PostStartHookContext) error { - err := c.Extra.PeerProxy.WaitForCacheSync(context.StopCh) + err := c.ControlPlane.Extra.PeerProxy.WaitForCacheSync(context.StopCh) return err }) } @@ -579,7 +570,7 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget) leaseName := m.GenericAPIServer.APIServerID holderIdentity := m.GenericAPIServer.APIServerID + "_" + string(uuid.NewUUID()) - peeraddress := getPeerAddress(c.Extra.PeerAdvertiseAddress, c.ControlPlane.Generic.PublicAddress, publicServicePort) + peeraddress := getPeerAddress(c.ControlPlane.Extra.PeerAdvertiseAddress, c.ControlPlane.Generic.PublicAddress, publicServicePort) // must replace ':,[]' in [ip:port] to be able to store this as a valid label value controller := lease.NewController( clock.RealClock{}, @@ -782,53 +773,6 @@ func DefaultAPIResourceConfigSource() *serverstorage.ResourceConfig { return ret } -// CreatePeerEndpointLeaseReconciler creates a apiserver endpoint lease reconciliation loop -// The peer endpoint leases are used to find network locations of apiservers for peer proxy -func CreatePeerEndpointLeaseReconciler(c genericapiserver.Config, storageFactory serverstorage.StorageFactory) (peerreconcilers.PeerEndpointLeaseReconciler, error) { - ttl := DefaultEndpointReconcilerTTL - config, err := storageFactory.NewConfig(api.Resource("apiServerPeerIPInfo")) - if err != nil { - return nil, fmt.Errorf("error creating storage factory config: %w", err) - } - reconciler, err := peerreconcilers.NewPeerEndpointLeaseReconciler(config, "/peerserverleases/", ttl) - return reconciler, err -} - -func BuildPeerProxy(versionedInformer clientgoinformers.SharedInformerFactory, svm storageversion.Manager, - proxyClientCertFile string, proxyClientKeyFile string, peerCAFile string, peerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress, - apiServerID string, reconciler peerreconcilers.PeerEndpointLeaseReconciler, serializer kruntime.NegotiatedSerializer) (utilpeerproxy.Interface, error) { - if proxyClientCertFile == "" { - return nil, fmt.Errorf("error building peer proxy handler, proxy-cert-file not specified") - } - if proxyClientKeyFile == "" { - return nil, fmt.Errorf("error building peer proxy handler, proxy-key-file not specified") - } - // create proxy client config - clientConfig := &transport.Config{ - TLS: transport.TLSConfig{ - Insecure: false, - CertFile: proxyClientCertFile, - KeyFile: proxyClientKeyFile, - CAFile: peerCAFile, - ServerName: "kubernetes.default.svc", - }} - - // build proxy transport - proxyRoundTripper, transportBuildingError := transport.New(clientConfig) - if transportBuildingError != nil { - klog.Error(transportBuildingError.Error()) - return nil, transportBuildingError - } - return utilpeerproxy.NewPeerProxyHandler( - versionedInformer, - svm, - proxyRoundTripper, - apiServerID, - reconciler, - serializer, - ), nil -} - // utility function to get the apiserver address that is used by peer apiservers to proxy // requests to this apiserver in case the peer is incapable of serving the request func getPeerAddress(peerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress, publicAddress net.IP, publicServicePort int) string {