Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kube-aggregator: correctly use client-go TLS cache with custom dialer #117258

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 19 additions & 12 deletions staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
Expand Up @@ -35,6 +35,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/version"
"k8s.io/client-go/transport"
openapicommon "k8s.io/kube-openapi/pkg/common"

"k8s.io/apiserver/pkg/server/dynamiccertificates"
Expand Down Expand Up @@ -129,7 +130,7 @@ type APIAggregator struct {

// proxyCurrentCertKeyContent holds he client cert used to identify this proxy. Backing APIServices use this to confirm the proxy's identity
proxyCurrentCertKeyContent certKeyFunc
proxyTransport *http.Transport
proxyTransportDial *transport.DialHolder

// proxyHandlers are the proxy handlers that are currently registered, keyed by apiservice.name
proxyHandlers map[string]*proxyHandler
Expand Down Expand Up @@ -160,10 +161,6 @@ type APIAggregator struct {
// when discovery with resources are requested
discoveryAggregationController DiscoveryAggregationController

// egressSelector selects the proper egress dialer to communicate with the custom apiserver
// overwrites proxyTransport dialer if not nil
egressSelector *egressselector.EgressSelector

// rejectForwardingRedirects is whether to allow to forward redirect response
rejectForwardingRedirects bool
}
Expand Down Expand Up @@ -210,18 +207,30 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
return nil, err
}

var proxyTransportDial *transport.DialHolder
if c.GenericConfig.EgressSelector != nil {
egressDialer, err := c.GenericConfig.EgressSelector.Lookup(egressselector.Cluster.AsNetworkContext())
if err != nil {
return nil, err
}
if egressDialer != nil {
proxyTransportDial = &transport.DialHolder{Dial: egressDialer}
}
} else if c.ExtraConfig.ProxyTransport != nil && c.ExtraConfig.ProxyTransport.DialContext != nil {
proxyTransportDial = &transport.DialHolder{Dial: c.ExtraConfig.ProxyTransport.DialContext}
}

s := &APIAggregator{
GenericAPIServer: genericServer,
delegateHandler: delegationTarget.UnprotectedHandler(),
proxyTransport: c.ExtraConfig.ProxyTransport,
proxyTransportDial: proxyTransportDial,
proxyHandlers: map[string]*proxyHandler{},
handledGroups: sets.String{},
lister: informerFactory.Apiregistration().V1().APIServices().Lister(),
APIRegistrationInformers: informerFactory,
serviceResolver: c.ExtraConfig.ServiceResolver,
openAPIConfig: c.GenericConfig.OpenAPIConfig,
openAPIV3Config: c.GenericConfig.OpenAPIV3Config,
egressSelector: c.GenericConfig.EgressSelector,
proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil },
rejectForwardingRedirects: c.ExtraConfig.RejectForwardingRedirects,
}
Expand Down Expand Up @@ -295,10 +304,9 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
c.GenericConfig.SharedInformerFactory.Core().V1().Services(),
c.GenericConfig.SharedInformerFactory.Core().V1().Endpoints(),
apiregistrationClient.ApiregistrationV1(),
c.ExtraConfig.ProxyTransport,
proxyTransportDial,
(func() ([]byte, []byte))(s.proxyCurrentCertKeyContent),
s.serviceResolver,
c.GenericConfig.EgressSelector,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -463,7 +471,7 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
// Forward calls to discovery manager to update discovery document
if s.discoveryAggregationController != nil {
handlerCopy := *proxyHandler
handlerCopy.setServiceAvailable(true)
handlerCopy.setServiceAvailable()
s.discoveryAggregationController.AddAPIService(apiService, &handlerCopy)
}
return nil
Expand All @@ -479,9 +487,8 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
proxyHandler := &proxyHandler{
localDelegate: s.delegateHandler,
proxyCurrentCertKeyContent: s.proxyCurrentCertKeyContent,
proxyTransport: s.proxyTransport,
proxyTransportDial: s.proxyTransportDial,
serviceResolver: s.serviceResolver,
egressSelector: s.egressSelector,
rejectForwardingRedirects: s.rejectForwardingRedirects,
}
proxyHandler.updateAPIService(apiService)
Expand Down
37 changes: 10 additions & 27 deletions staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go
Expand Up @@ -33,10 +33,8 @@ import (
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
endpointmetrics "k8s.io/apiserver/pkg/endpoints/metrics"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/server/egressselector"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/apiserver/pkg/util/x509metrics"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/transport"
"k8s.io/klog/v2"
apiregistrationv1api "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
Expand All @@ -59,17 +57,13 @@ type proxyHandler struct {

// proxyCurrentCertKeyContent holds the client cert used to identify this proxy. Backing APIServices use this to confirm the proxy's identity
proxyCurrentCertKeyContent certKeyFunc
proxyTransport *http.Transport
proxyTransportDial *transport.DialHolder

// Endpoints based routing to map from cluster IP to routable IP
serviceResolver ServiceResolver

handlingInfo atomic.Value

// egressSelector selects the proper egress dialer to communicate with the custom apiserver
// overwrites proxyTransport dialer if not nil
egressSelector *egressselector.EgressSelector

// reject to forward redirect response
rejectForwardingRedirects bool
}
Expand All @@ -80,8 +74,8 @@ type proxyHandlingInfo struct {

// name is the name of the APIService
name string
// restConfig holds the information for building a roundtripper
restConfig *restclient.Config
// transportConfig holds the information for building a roundtripper
transportConfig *transport.Config
// transportBuildingError is an error produced while building the transport. If this
// is non-nil, it will be reported to clients.
transportBuildingError error
Expand Down Expand Up @@ -233,7 +227,7 @@ func (r *responder) Error(_ http.ResponseWriter, _ *http.Request, err error) {

// Sets serviceAvailable value on proxyHandler
// not thread safe
func (r *proxyHandler) setServiceAvailable(value bool) {
func (r *proxyHandler) setServiceAvailable() {
info := r.handlingInfo.Load().(proxyHandlingInfo)
info.serviceAvailable = true
r.handlingInfo.Store(info)
Expand All @@ -247,41 +241,30 @@ func (r *proxyHandler) updateAPIService(apiService *apiregistrationv1api.APIServ

proxyClientCert, proxyClientKey := r.proxyCurrentCertKeyContent()

clientConfig := &restclient.Config{
TLSClientConfig: restclient.TLSClientConfig{
transportConfig := &transport.Config{
TLS: transport.TLSConfig{
Insecure: apiService.Spec.InsecureSkipTLSVerify,
ServerName: apiService.Spec.Service.Name + "." + apiService.Spec.Service.Namespace + ".svc",
CertData: proxyClientCert,
KeyData: proxyClientKey,
CAData: apiService.Spec.CABundle,
},
DialHolder: r.proxyTransportDial,
}
clientConfig.Wrap(x509metrics.NewDeprecatedCertificateRoundTripperWrapperConstructor(
transportConfig.Wrap(x509metrics.NewDeprecatedCertificateRoundTripperWrapperConstructor(
x509MissingSANCounter,
x509InsecureSHA1Counter,
))

newInfo := proxyHandlingInfo{
name: apiService.Name,
restConfig: clientConfig,
transportConfig: transportConfig,
serviceName: apiService.Spec.Service.Name,
serviceNamespace: apiService.Spec.Service.Namespace,
servicePort: *apiService.Spec.Service.Port,
serviceAvailable: apiregistrationv1apihelper.IsAPIServiceConditionTrue(apiService, apiregistrationv1api.Available),
}
if r.egressSelector != nil {
networkContext := egressselector.Cluster.AsNetworkContext()
var egressDialer utilnet.DialFunc
egressDialer, err := r.egressSelector.Lookup(networkContext)
if err != nil {
klog.Warning(err.Error())
} else {
newInfo.restConfig.Dial = egressDialer
}
} else if r.proxyTransport != nil && r.proxyTransport.DialContext != nil {
newInfo.restConfig.Dial = r.proxyTransport.DialContext
}
newInfo.proxyRoundTripper, newInfo.transportBuildingError = restclient.TransportFor(newInfo.restConfig)
newInfo.proxyRoundTripper, newInfo.transportBuildingError = transport.New(newInfo.transportConfig)
if newInfo.transportBuildingError != nil {
klog.Warning(newInfo.transportBuildingError.Error())
}
Expand Down
Expand Up @@ -36,6 +36,7 @@ import (

"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
"k8s.io/client-go/transport"

"golang.org/x/net/websocket"

Expand Down Expand Up @@ -325,7 +326,6 @@ func TestProxyHandler(t *testing.T) {
handler := &proxyHandler{
localDelegate: http.NewServeMux(),
serviceResolver: serviceResolver,
proxyTransport: &http.Transport{},
proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() },
}
server := httptest.NewServer(contextHandler(handler, tc.user))
Expand Down Expand Up @@ -551,15 +551,21 @@ func TestProxyUpgrade(t *testing.T) {
serverURL, _ := url.Parse(backendServer.URL)
proxyHandler := &proxyHandler{
serviceResolver: &mockedRouter{destinationHost: serverURL.Host},
proxyTransport: &http.Transport{},
proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() },
}

var dialer *mockEgressDialer
var selector *egressselector.EgressSelector
if tc.NewEgressSelector != nil {
dialer, selector = tc.NewEgressSelector()
proxyHandler.egressSelector = selector

egressDialer, err := selector.Lookup(egressselector.Cluster.AsNetworkContext())
if err != nil {
t.Fatal(err)
}
if egressDialer != nil {
proxyHandler.proxyTransportDial = &transport.DialHolder{Dial: egressDialer}
}
}

proxyHandler.updateAPIService(tc.APIService)
Expand Down
Expand Up @@ -19,7 +19,6 @@ package apiserver
import (
"context"
"fmt"
"net"
"net/http"
"net/url"
"reflect"
Expand All @@ -33,13 +32,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilnet "k8s.io/apimachinery/pkg/util/net"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/egressselector"
v1informers "k8s.io/client-go/informers/core/v1"
v1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/transport"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -77,8 +73,8 @@ type AvailableConditionController struct {
endpointsLister v1listers.EndpointsLister
endpointsSynced cache.InformerSynced

// dialContext specifies the dial function for creating unencrypted TCP connections.
dialContext func(ctx context.Context, network, address string) (net.Conn, error)
// proxyTransportDial specifies the dial function for creating unencrypted TCP connections.
proxyTransportDial *transport.DialHolder
proxyCurrentCertKeyContent certKeyFunc
serviceResolver ServiceResolver

Expand All @@ -91,67 +87,19 @@ type AvailableConditionController struct {
// this lock protects operations on the above cache
cacheLock sync.RWMutex

// TLS config with customized dialer cannot be cached by the client-go
// tlsTransportCache. Use a local cache here to reduce the chance of
// the controller spamming idle connections with short-lived transports.
// NOTE: the cache works because we assume that the transports constructed
// by the controller only vary on the dynamic cert/key.
tlsCache *tlsTransportCache

// metrics registered into legacy registry
metrics *availabilityMetrics
}

type tlsTransportCache struct {
mu sync.Mutex
transports map[tlsCacheKey]http.RoundTripper
}

func (c *tlsTransportCache) get(config *rest.Config) (http.RoundTripper, error) {
// If the available controller doesn't customzie the dialer (and we know from
// the code that the controller doesn't customzie other functions i.e. Proxy
// and GetCert (ExecProvider)), the config is cacheable by the client-go TLS
// transport cache. Let's skip the local cache and depend on the client-go cache.
if config.Dial == nil {
return rest.TransportFor(config)
}
c.mu.Lock()
defer c.mu.Unlock()
// See if we already have a custom transport for this config
key := tlsConfigKey(config)
if t, ok := c.transports[key]; ok {
return t, nil
}
restTransport, err := rest.TransportFor(config)
if err != nil {
return nil, err
}
c.transports[key] = restTransport
return restTransport, nil
}

type tlsCacheKey struct {
certData string
keyData string `datapolicy:"secret-key"`
}

func tlsConfigKey(c *rest.Config) tlsCacheKey {
return tlsCacheKey{
certData: string(c.TLSClientConfig.CertData),
keyData: string(c.TLSClientConfig.KeyData),
}
}

// NewAvailableConditionController returns a new AvailableConditionController.
func NewAvailableConditionController(
apiServiceInformer informers.APIServiceInformer,
serviceInformer v1informers.ServiceInformer,
endpointsInformer v1informers.EndpointsInformer,
apiServiceClient apiregistrationclient.APIServicesGetter,
proxyTransport *http.Transport,
proxyTransportDial *transport.DialHolder,
proxyCurrentCertKeyContent certKeyFunc,
serviceResolver ServiceResolver,
egressSelector *egressselector.EgressSelector,
) (*AvailableConditionController, error) {
c := &AvailableConditionController{
apiServiceClient: apiServiceClient,
Expand All @@ -165,23 +113,11 @@ func NewAvailableConditionController(
// the maximum disruption time to a minimum, but it does prevent hot loops.
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 30*time.Second),
"AvailableConditionController"),
proxyTransportDial: proxyTransportDial,
proxyCurrentCertKeyContent: proxyCurrentCertKeyContent,
tlsCache: &tlsTransportCache{transports: make(map[tlsCacheKey]http.RoundTripper)},
metrics: newAvailabilityMetrics(),
}

if egressSelector != nil {
networkContext := egressselector.Cluster.AsNetworkContext()
var egressDialer utilnet.DialFunc
egressDialer, err := egressSelector.Lookup(networkContext)
if err != nil {
return nil, err
}
c.dialContext = egressDialer
} else if proxyTransport != nil && proxyTransport.DialContext != nil {
c.dialContext = proxyTransport.DialContext
}

// resync on this one because it is low cardinality and rechecking the actual discovery
// allows us to detect health in a more timely fashion when network connectivity to
// nodes is snipped, but the network still attempts to route there. See
Expand Down Expand Up @@ -236,27 +172,20 @@ func (c *AvailableConditionController) sync(key string) error {
// if a particular transport was specified, use that otherwise build one
// construct an http client that will ignore TLS verification (if someone owns the network and messes with your status
// that's not so bad) and sets a very short timeout. This is a best effort GET that provides no additional information
restConfig := &rest.Config{
TLSClientConfig: rest.TLSClientConfig{
transportConfig := &transport.Config{
TLS: transport.TLSConfig{
Insecure: true,
},
DialHolder: c.proxyTransportDial,
}

if c.proxyCurrentCertKeyContent != nil {
proxyClientCert, proxyClientKey := c.proxyCurrentCertKeyContent()

restConfig.TLSClientConfig.CertData = proxyClientCert
restConfig.TLSClientConfig.KeyData = proxyClientKey
}
if c.dialContext != nil {
restConfig.Dial = c.dialContext
transportConfig.TLS.CertData = proxyClientCert
transportConfig.TLS.KeyData = proxyClientKey
}
// TLS config with customized dialer cannot be cached by the client-go
// tlsTransportCache. Use a local cache here to reduce the chance of
// the controller spamming idle connections with short-lived transports.
// NOTE: the cache works because we assume that the transports constructed
// by the controller only vary on the dynamic cert/key.
restTransport, err := c.tlsCache.get(restConfig)
restTransport, err := transport.New(transportConfig)
if err != nil {
return err
}
Expand Down