Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to reload client certificates from disk #79083

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion staging/src/k8s.io/client-go/rest/request.go
Expand Up @@ -835,7 +835,7 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
return err
}
// For connection errors and apiserver shutdown errors retry.
if net.IsConnectionReset(err) {
if net.IsConnectionReset(err) || net.IsProbableEOF(err) {
// For the purpose of retry, we set the artificial "retry-after" response.
// TODO: Should we clean the original response if it exists?
resp = &http.Response{
Expand Down
5 changes: 5 additions & 0 deletions staging/src/k8s.io/client-go/transport/BUILD
Expand Up @@ -22,6 +22,7 @@ go_library(
name = "go_default_library",
srcs = [
"cache.go",
"cert_rotation.go",
"config.go",
"round_trippers.go",
"token_source.go",
Expand All @@ -31,6 +32,10 @@ go_library(
importpath = "k8s.io/client-go/transport",
deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/util/connrotation:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/golang.org/x/oauth2:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
Expand Down
29 changes: 25 additions & 4 deletions staging/src/k8s.io/client-go/transport/cache.go
Expand Up @@ -25,6 +25,7 @@ import (
"time"

utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait"
)

// TlsTransportCache caches TLS http.RoundTrippers different configurations. The
Expand All @@ -44,6 +45,8 @@ type tlsCacheKey struct {
caData string
certData string
keyData string
certFile string
keyFile string
getCert string
serverName string
nextProtos string
Expand Down Expand Up @@ -91,6 +94,16 @@ func (c *tlsTransportCache) get(config *Config) (http.RoundTripper, error) {
KeepAlive: 30 * time.Second,
}).DialContext
}

// If we use are reloading files, we need to handle certificate rotation properly
// TODO(jackkleeman): We can also add rotation here when config.HasCertCallback() is true
if config.TLS.ReloadTLSFiles {
dynamicCertDialer := certRotatingDialer(tlsConfig.GetClientCertificate, dial)
tlsConfig.GetClientCertificate = dynamicCertDialer.GetClientCertificate
dial = dynamicCertDialer.connDialer.DialContext
go dynamicCertDialer.Run(wait.NeverStop)
}

// Cache a single transport for these options
c.transports[key] = utilnet.SetTransportDefaults(&http.Transport{
Proxy: http.ProxyFromEnvironment,
Expand All @@ -109,15 +122,23 @@ func tlsConfigKey(c *Config) (tlsCacheKey, error) {
if err := loadTLSFiles(c); err != nil {
return tlsCacheKey{}, err
}
return tlsCacheKey{
k := tlsCacheKey{
insecure: c.TLS.Insecure,
caData: string(c.TLS.CAData),
certData: string(c.TLS.CertData),
keyData: string(c.TLS.KeyData),
getCert: fmt.Sprintf("%p", c.TLS.GetCert),
serverName: c.TLS.ServerName,
nextProtos: strings.Join(c.TLS.NextProtos, ","),
dial: fmt.Sprintf("%p", c.Dial),
disableCompression: c.DisableCompression,
}, nil
}

if c.TLS.ReloadTLSFiles {
k.certFile = c.TLS.CertFile
k.keyFile = c.TLS.KeyFile
} else {
k.certData = string(c.TLS.CertData)
k.keyData = string(c.TLS.KeyData)
}

return k, nil
}
176 changes: 176 additions & 0 deletions staging/src/k8s.io/client-go/transport/cert_rotation.go
@@ -0,0 +1,176 @@
/*
Copyright 2020 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package transport

import (
"bytes"
"crypto/tls"
"fmt"
"reflect"
"sync"
"time"

utilnet "k8s.io/apimachinery/pkg/util/net"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/connrotation"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
)

const workItemKey = "key"

// CertCallbackRefreshDuration is exposed so that integration tests can crank up the reload speed.
var CertCallbackRefreshDuration = 5 * time.Minute
mikedanese marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was the default chosen so high? The IO and CPU load would surely withstand a more responsive default like 15s / 30s. If I change a cert that is live reloaded I'd expect it to manifest way sooner then 5 minutes.


type reloadFunc func(*tls.CertificateRequestInfo) (*tls.Certificate, error)

type dynamicClientCert struct {
clientCert *tls.Certificate
certMtx sync.RWMutex

reload reloadFunc
connDialer *connrotation.Dialer

// queue only ever has one item, but it has nice error handling backoff/retry semantics
queue workqueue.RateLimitingInterface
}

func certRotatingDialer(reload reloadFunc, dial utilnet.DialFunc) *dynamicClientCert {
d := &dynamicClientCert{
reload: reload,
connDialer: connrotation.NewDialer(connrotation.DialFunc(dial)),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DynamicClientCertificate"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought colliding names produced errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, looking at the code the metrics will get shared if there are multiple queues with the same name
I could be wrong!
We could try to add the cert file names here, but technically we don't know them and there may be a GetCert function in which case we don't really have any kind of identifier beyond a pointer

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Name is only used for metrics AFAIK. Multiple queue's with same name just get aggreted in queue metrics which isn't terrible. Nit: traditionally the names have been lower snake case, but I see we've diverged on that convention for some of the other cert stuff.

}

return d
}

// loadClientCert calls the callback and rotates connections if needed
func (c *dynamicClientCert) loadClientCert() (*tls.Certificate, error) {
cert, err := c.reload(nil)
if err != nil {
return nil, err
}

// check to see if we have a change. If the values are the same, do nothing.
c.certMtx.RLock()
haveCert := c.clientCert != nil
if certsEqual(c.clientCert, cert) {
c.certMtx.RUnlock()
return c.clientCert, nil
}
c.certMtx.RUnlock()

c.certMtx.Lock()
c.clientCert = cert
c.certMtx.Unlock()

// The first certificate requested is not a rotation that is worth closing connections for
if !haveCert {
return cert, nil
}

klog.V(1).Infof("certificate rotation detected, shutting down client connections to start using new credentials")
c.connDialer.CloseAll()

return cert, nil
}

// certsEqual compares tls Certificates, ignoring the Leaf which may get filled in dynamically
func certsEqual(left, right *tls.Certificate) bool {
if left == nil || right == nil {
return left == right
}

if !byteMatrixEqual(left.Certificate, right.Certificate) {
return false
}

if !reflect.DeepEqual(left.PrivateKey, right.PrivateKey) {
return false
}

if !byteMatrixEqual(left.SignedCertificateTimestamps, right.SignedCertificateTimestamps) {
return false
}

if !bytes.Equal(left.OCSPStaple, right.OCSPStaple) {
return false
}

return true
}

func byteMatrixEqual(left, right [][]byte) bool {
if len(left) != len(right) {
return false
}

for i := range left {
if !bytes.Equal(left[i], right[i]) {
return false
}
}
return true
}

// run starts the controller and blocks until stopCh is closed.
func (c *dynamicClientCert) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()

klog.Infof("Starting client certificate rotation controller")
defer klog.Infof("Shutting down client certificate rotation controller")
Copy link
Member

@neolit123 neolit123 Mar 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these lines introduced non-verbose output to kubectl auth:

kubectl auth can-i get --kubeconfig=/etc/kubernetes/kubelet.conf --namespace=kube-system configmaps/kube-proxy
I0303 23:14:14.609921    7176 cert_rotation.go:137] Starting client certificate rotation controller
yes

potentially breaking consumers of CombinedOutput looking for just "yes".

unless there are immediate objections i'm going to send a PR to .V(1) the log messages.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely. I'd probably call it v3


go wait.Until(c.runWorker, time.Second, stopCh)

go wait.PollImmediateUntil(CertCallbackRefreshDuration, func() (bool, error) {
c.queue.Add(workItemKey)
return false, nil
}, stopCh)

<-stopCh
}

func (c *dynamicClientCert) runWorker() {
for c.processNextWorkItem() {
}
}

func (c *dynamicClientCert) processNextWorkItem() bool {
dsKey, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(dsKey)

_, err := c.loadClientCert()
if err == nil {
c.queue.Forget(dsKey)
return true
}

utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
c.queue.AddRateLimited(dsKey)

return true
}

func (c *dynamicClientCert) GetClientCertificate(*tls.CertificateRequestInfo) (*tls.Certificate, error) {
return c.loadClientCert()
}
7 changes: 4 additions & 3 deletions staging/src/k8s.io/client-go/transport/config.go
Expand Up @@ -115,9 +115,10 @@ func (c *Config) Wrap(fn WrapperFunc) {

// TLSConfig holds the information needed to set up a TLS transport.
type TLSConfig struct {
CAFile string // Path of the PEM-encoded server trusted root certificates.
CertFile string // Path of the PEM-encoded client certificate.
KeyFile string // Path of the PEM-encoded client key.
CAFile string // Path of the PEM-encoded server trusted root certificates.
CertFile string // Path of the PEM-encoded client certificate.
KeyFile string // Path of the PEM-encoded client key.
ReloadTLSFiles bool // Set to indicate that the original config provided files, and that they should be reloaded

Insecure bool // Server should be accessed without verifying the certificate. For testing only.
ServerName string // Override for the server name passed to the server for SNI and used to verify certificates.
Expand Down
60 changes: 59 additions & 1 deletion staging/src/k8s.io/client-go/transport/transport.go
Expand Up @@ -23,6 +23,8 @@ import (
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"

utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/klog"
Expand Down Expand Up @@ -81,7 +83,8 @@ func TLSConfigFor(c *Config) (*tls.Config, error) {
}

var staticCert *tls.Certificate
if c.HasCertAuth() {
// Treat cert as static if either key or cert was data, not a file
if c.HasCertAuth() && !c.TLS.ReloadTLSFiles {
// If key/cert were provided, verify them before setting up
// tlsConfig.GetClientCertificate.
cert, err := tls.X509KeyPair(c.TLS.CertData, c.TLS.KeyData)
Expand All @@ -91,13 +94,22 @@ func TLSConfigFor(c *Config) (*tls.Config, error) {
staticCert = &cert
}

var dynamicCertLoader func() (*tls.Certificate, error)
if c.TLS.ReloadTLSFiles {
dynamicCertLoader = cachingCertificateLoader(c.TLS.CertFile, c.TLS.KeyFile)
}

if c.HasCertAuth() || c.HasCertCallback() {
tlsConfig.GetClientCertificate = func(*tls.CertificateRequestInfo) (*tls.Certificate, error) {
// Note: static key/cert data always take precedence over cert
// callback.
if staticCert != nil {
return staticCert, nil
}
// key/cert files lead to ReloadTLSFiles being set - takes precedence over cert callback
if dynamicCertLoader != nil {
return dynamicCertLoader()
}
if c.HasCertCallback() {
cert, err := c.TLS.GetCert()
if err != nil {
Expand Down Expand Up @@ -129,6 +141,11 @@ func loadTLSFiles(c *Config) error {
return err
}

// Check that we are purely loading from files
if len(c.TLS.CertFile) > 0 && len(c.TLS.CertData) == 0 && len(c.TLS.KeyFile) > 0 && len(c.TLS.KeyData) == 0 {
c.TLS.ReloadTLSFiles = true
}

c.TLS.CertData, err = dataFromSliceOrFile(c.TLS.CertData, c.TLS.CertFile)
if err != nil {
return err
Expand Down Expand Up @@ -243,3 +260,44 @@ func tryCancelRequest(rt http.RoundTripper, req *http.Request) {
klog.Warningf("Unable to cancel request for %T", rt)
}
}

type certificateCacheEntry struct {
cert *tls.Certificate
err error
birth time.Time
}

// isStale returns true when this cache entry is too old to be usable
func (c *certificateCacheEntry) isStale() bool {
return time.Now().Sub(c.birth) > time.Second
}

func newCertificateCacheEntry(certFile, keyFile string) certificateCacheEntry {
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
return certificateCacheEntry{cert: &cert, err: err, birth: time.Now()}
}

// cachingCertificateLoader ensures that we don't hammer the filesystem when opening many connections
// the underlying cert files are read at most once every second
func cachingCertificateLoader(certFile, keyFile string) func() (*tls.Certificate, error) {
current := newCertificateCacheEntry(certFile, keyFile)
var currentMtx sync.RWMutex

return func() (*tls.Certificate, error) {
currentMtx.RLock()
if current.isStale() {
currentMtx.RUnlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a regular Mutex would make this a lot simpler.:

mu.Lock()
defer mu.Unlock()
if current.isStale() {
  current = refresh()
}
return current

Given this is only required on connection initiation, I doubt there'll be significant benifit from using a RWMutex here. Unlocks that aren't guarded to the scope by defers will deadlock if anything panics in the function you are calling (which has happened before in Go cert parsing).


currentMtx.Lock()
defer currentMtx.Unlock()

if current.isStale() {
current = newCertificateCacheEntry(certFile, keyFile)
}
} else {
defer currentMtx.RUnlock()
}

return current.cert, current.err
}
}