Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #63492: Always track kubelet -> API connections #63832

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 8 additions & 5 deletions cmd/kubelet/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
var kubeClient clientset.Interface
var eventClient v1core.EventsGetter
var heartbeatClient v1core.CoreV1Interface
var closeAllConns func()
var externalKubeClient clientset.Interface

clientConfig, err := CreateAPIServerClientConfig(s)
Expand All @@ -342,11 +343,12 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
if err != nil {
return err
}
// we set exitIfExpired to true because we use this client configuration to request new certs - if we are unable
// to request new certs, we will be unable to continue normal operation
if err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, true); err != nil {
return err
}
}
// we set exitIfExpired to true because we use this client configuration to request new certs - if we are unable
// to request new certs, we will be unable to continue normal operation
closeAllConns, err = kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, true)
if err != nil {
return err
}

kubeClient, err = clientset.NewForConfig(clientConfig)
Expand Down Expand Up @@ -392,6 +394,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
kubeDeps.ExternalKubeClient = externalKubeClient
if heartbeatClient != nil {
kubeDeps.HeartbeatClient = heartbeatClient
kubeDeps.OnHeartbeatFailure = closeAllConns
}
if eventClient != nil {
kubeDeps.EventClient = eventClient
Expand Down
87 changes: 47 additions & 40 deletions pkg/kubelet/certificate/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,37 +38,24 @@ import (
//
// The config must not already provide an explicit transport.
//
// The returned function allows forcefully closing all active connections.
//
// The returned transport periodically checks the manager to determine if the
// certificate has changed. If it has, the transport shuts down all existing client
// connections, forcing the client to re-handshake with the server and use the
// new certificate.
//
// stopCh should be used to indicate when the transport is unused and doesn't need
// to continue checking the manager.
func UpdateTransport(stopCh <-chan struct{}, clientConfig *restclient.Config, clientCertificateManager certificate.Manager, exitIfExpired bool) error {
func UpdateTransport(stopCh <-chan struct{}, clientConfig *restclient.Config, clientCertificateManager certificate.Manager, exitIfExpired bool) (func(), error) {
return updateTransport(stopCh, 10*time.Second, clientConfig, clientCertificateManager, exitIfExpired)
}

// updateTransport is an internal method that exposes how often this method checks that the
// client cert has changed. Intended for testing.
func updateTransport(stopCh <-chan struct{}, period time.Duration, clientConfig *restclient.Config, clientCertificateManager certificate.Manager, exitIfExpired bool) error {
if clientConfig.Transport != nil {
return fmt.Errorf("there is already a transport configured")
}
tlsConfig, err := restclient.TLSConfigFor(clientConfig)
if err != nil {
return fmt.Errorf("unable to configure TLS for the rest client: %v", err)
}
if tlsConfig == nil {
tlsConfig = &tls.Config{}
}
tlsConfig.Certificates = nil
tlsConfig.GetClientCertificate = func(requestInfo *tls.CertificateRequestInfo) (*tls.Certificate, error) {
cert := clientCertificateManager.Current()
if cert == nil {
return &tls.Certificate{Certificate: nil}, nil
}
return cert, nil
func updateTransport(stopCh <-chan struct{}, period time.Duration, clientConfig *restclient.Config, clientCertificateManager certificate.Manager, exitIfExpired bool) (func(), error) {
if clientConfig.Transport != nil || clientConfig.Dial != nil {
return nil, fmt.Errorf("there is already a transport or dialer configured")
}

// Custom dialer that will track all connections it creates.
Expand All @@ -77,29 +64,48 @@ func updateTransport(stopCh <-chan struct{}, period time.Duration, clientConfig
conns: make(map[*closableConn]struct{}),
}

lastCert := clientCertificateManager.Current()
go wait.Until(func() {
curr := clientCertificateManager.Current()
if exitIfExpired && curr != nil && time.Now().After(curr.Leaf.NotAfter) {
if clientCertificateManager.ServerHealthy() {
glog.Fatalf("The currently active client certificate has expired and the server is responsive, exiting.")
} else {
glog.Errorf("The currently active client certificate has expired, but the server is not responsive. A restart may be necessary to retrieve new initial credentials.")
tlsConfig, err := restclient.TLSConfigFor(clientConfig)
if err != nil {
return nil, fmt.Errorf("unable to configure TLS for the rest client: %v", err)
}
if tlsConfig == nil {
tlsConfig = &tls.Config{}
}

if clientCertificateManager != nil {
tlsConfig.Certificates = nil
tlsConfig.GetClientCertificate = func(requestInfo *tls.CertificateRequestInfo) (*tls.Certificate, error) {
cert := clientCertificateManager.Current()
if cert == nil {
return &tls.Certificate{Certificate: nil}, nil
}
return cert, nil
}
if curr == nil || lastCert == curr {
// Cert hasn't been rotated.
return
}
lastCert = curr

glog.Infof("certificate rotation detected, shutting down client connections to start using new credentials")
// The cert has been rotated. Close all existing connections to force the client
// to reperform its TLS handshake with new cert.
//
// See: https://github.com/kubernetes-incubator/bootkube/pull/663#issuecomment-318506493
t.closeAllConns()
}, period, stopCh)
lastCert := clientCertificateManager.Current()
go wait.Until(func() {
curr := clientCertificateManager.Current()
if exitIfExpired && curr != nil && time.Now().After(curr.Leaf.NotAfter) {
if clientCertificateManager.ServerHealthy() {
glog.Fatalf("The currently active client certificate has expired and the server is responsive, exiting.")
} else {
glog.Errorf("The currently active client certificate has expired, but the server is not responsive. A restart may be necessary to retrieve new initial credentials.")
}
}
if curr == nil || lastCert == curr {
// Cert hasn't been rotated.
return
}
lastCert = curr

glog.Infof("certificate rotation detected, shutting down client connections to start using new credentials")
// The cert has been rotated. Close all existing connections to force the client
// to reperform its TLS handshake with new cert.
//
// See: https://github.com/kubernetes-incubator/bootkube/pull/663#issuecomment-318506493
t.closeAllConns()
}, period, stopCh)
}

clientConfig.Transport = utilnet.SetTransportDefaults(&http.Transport{
Proxy: http.ProxyFromEnvironment,
Expand All @@ -117,7 +123,8 @@ func updateTransport(stopCh <-chan struct{}, period time.Duration, clientConfig
clientConfig.CAData = nil
clientConfig.CAFile = ""
clientConfig.Insecure = false
return nil

return t.closeAllConns, nil
}

// connTracker is a dialer that tracks all open connections it creates.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/certificate/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func TestRotateShutsDownConnections(t *testing.T) {
}

// Check for a new cert every 10 milliseconds
if err := updateTransport(stop, 10*time.Millisecond, c, m, false); err != nil {
if _, err := updateTransport(stop, 10*time.Millisecond, c, m, false); err != nil {
t.Fatal(err)
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ type Dependencies struct {
DockerClientConfig *dockershim.ClientConfig
EventClient v1core.EventsGetter
HeartbeatClient v1core.CoreV1Interface
OnHeartbeatFailure func()
KubeClient clientset.Interface
ExternalKubeClient clientset.Interface
Mounter mount.Interface
Expand Down Expand Up @@ -514,6 +515,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
nodeName: nodeName,
kubeClient: kubeDeps.KubeClient,
heartbeatClient: kubeDeps.HeartbeatClient,
onRepeatedHeartbeatFailure: kubeDeps.OnHeartbeatFailure,
rootDirectory: rootDirectory,
resyncInterval: kubeCfg.SyncFrequency.Duration,
sourcesReady: config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources),
Expand Down Expand Up @@ -939,6 +941,9 @@ type Kubelet struct {
iptClient utilipt.Interface
rootDirectory string

// onRepeatedHeartbeatFailure is called when a heartbeat operation fails more than once. optional.
onRepeatedHeartbeatFailure func()

// podWorkers handle syncing Pods in response to events.
podWorkers PodWorkers

Expand Down
3 changes: 3 additions & 0 deletions pkg/kubelet/kubelet_node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,9 @@ func (kl *Kubelet) syncNodeStatus() {
func (kl *Kubelet) updateNodeStatus() error {
for i := 0; i < nodeStatusUpdateRetry; i++ {
if err := kl.tryUpdateNodeStatus(i); err != nil {
if i > 0 && kl.onRepeatedHeartbeatFailure != nil {
kl.onRepeatedHeartbeatFailure()
}
glog.Errorf("Error updating node status, will retry: %v", err)
} else {
return nil
Expand Down
8 changes: 8 additions & 0 deletions pkg/kubelet/kubelet_node_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) {

func TestUpdateExistingNodeStatusTimeout(t *testing.T) {
attempts := int64(0)
failureCallbacks := int64(0)

// set up a listener that hangs connections
ln, err := net.Listen("tcp", "127.0.0.1:0")
Expand Down Expand Up @@ -563,6 +564,9 @@ func TestUpdateExistingNodeStatusTimeout(t *testing.T) {
kubelet := testKubelet.kubelet
kubelet.kubeClient = nil // ensure only the heartbeat client is used
kubelet.heartbeatClient, err = v1core.NewForConfig(config)
kubelet.onRepeatedHeartbeatFailure = func() {
atomic.AddInt64(&failureCallbacks, 1)
}
kubelet.containerManager = &localCM{
ContainerManager: cm.NewStubContainerManager(),
allocatable: v1.ResourceList{
Expand All @@ -582,6 +586,10 @@ func TestUpdateExistingNodeStatusTimeout(t *testing.T) {
if actualAttempts := atomic.LoadInt64(&attempts); actualAttempts != nodeStatusUpdateRetry {
t.Errorf("Expected %d attempts, got %d", nodeStatusUpdateRetry, actualAttempts)
}
// should have gotten multiple failure callbacks
if actualFailureCallbacks := atomic.LoadInt64(&failureCallbacks); actualFailureCallbacks < (nodeStatusUpdateRetry - 1) {
t.Errorf("Expected %d failure callbacks, got %d", (nodeStatusUpdateRetry - 1), actualFailureCallbacks)
}
}

func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
Expand Down