From 49e158a9740142d9a270b4b1015577cf50185630 Mon Sep 17 00:00:00 2001 From: Ryan Sanna Date: Mon, 14 Dec 2020 11:51:46 -0700 Subject: [PATCH] Revert "Revert "Encryption Key Rotation Changes"" --- cluster/encryption.go | 229 +++++++++++++++++---- cmd/encryption.go | 49 +++-- cmd/up.go | 9 +- log/log.go | 1 - types/rke_types.go | 2 + vendor/k8s.io/client-go/util/retry/OWNERS | 4 + vendor/k8s.io/client-go/util/retry/util.go | 105 ++++++++++ vendor/modules.txt | 1 + 8 files changed, 337 insertions(+), 63 deletions(-) create mode 100644 vendor/k8s.io/client-go/util/retry/OWNERS create mode 100644 vendor/k8s.io/client-go/util/retry/util.go diff --git a/cluster/encryption.go b/cluster/encryption.go index cdc06e75f..0e95394c6 100644 --- a/cluster/encryption.go +++ b/cluster/encryption.go @@ -6,26 +6,30 @@ import ( "encoding/base64" "encoding/json" "fmt" + "strings" + "sync" ghodssyaml "github.com/ghodss/yaml" + "github.com/pkg/errors" normantypes "github.com/rancher/norman/types" - "github.com/sirupsen/logrus" - "golang.org/x/sync/errgroup" - sigsyaml "sigs.k8s.io/yaml" - "github.com/rancher/rke/k8s" "github.com/rancher/rke/log" "github.com/rancher/rke/services" "github.com/rancher/rke/templates" v3 "github.com/rancher/rke/types" "github.com/rancher/rke/util" + "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" apiserverconfig "k8s.io/apiserver/pkg/apis/config" apiserverconfigv1 "k8s.io/apiserver/pkg/apis/config/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/util/retry" + sigsyaml "sigs.k8s.io/yaml" ) const ( @@ -113,87 +117,206 @@ func (c *Cluster) DisableSecretsEncryption(ctx context.Context, currentCluster * return nil } +const ( + rewriteSecretsOperation = "rewrite-secrets" + secretBatchSize = 250 +) + +// RewriteSecrets does the following: +// - retrieves all cluster secrets in batches with size of +// - triggers rewrites with new encryption key by sending each secret over a channel consumed by workers that perform the rewrite +// - logs progress of rewrite operation func (c *Cluster) RewriteSecrets(ctx context.Context) error { log.Infof(ctx, "Rewriting cluster secrets") - var errgrp errgroup.Group - k8sClient, err := k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport) - if err != nil { - return fmt.Errorf("failed to initialize new kubernetes client: %v", err) + + k8sClient, cliErr := k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport) + if cliErr != nil { + return fmt.Errorf("failed to initialize new kubernetes client: %v", cliErr) } - secretsList, err := k8s.GetSecretsList(k8sClient, "") - if err != nil { - return err + + rewrites := make(chan interface{}, secretBatchSize) + go func() { + defer close(rewrites) // exiting this go routine triggers workers to exit + + retryErr := func(err error) bool { // all returned errors can be retried + return true + } + + var continueToken string + var secrets []v1.Secret + var restart bool + for { + err := retry.OnError(retry.DefaultRetry, retryErr, func() error { + l, err := k8sClient.CoreV1().Secrets("").List(ctx, metav1.ListOptions{ + Limit: secretBatchSize, // keep the per request secrets batch size small to avoid client timeouts + Continue: continueToken, + }) + if err != nil { + if isExpiredTokenErr(err) { // restart list operation due to token expiration + logrus.Debugf("[%v] continue token expired, restarting list operation", rewriteSecretsOperation) + continueToken = "" + restart = true + return nil + } + return err + } + + secrets = append(secrets, l.Items...) + continueToken = l.Continue + + return nil + }) + if err != nil { + cliErr = err + break + } + + // send this batch to workers for rewrite + // duplicates are ok because we cache the names of secrets that have been rewritten, thus workers will only rewrite each secret once + for _, s := range secrets { + rewrites <- s + } + secrets = nil // reset secrets since they've been sent to workers + + // if there's no continue token and the list operation doesn't need to be restarted, we've retrieved all secrets + if continueToken == "" && !restart { + break + } + + restart = false + } + + logrus.Debugf("[%v] All secrets retrieved and sent for rewrite", rewriteSecretsOperation) + }() + + // NOTE: since we retrieve secrets in batches, we don't know total number of secrets up front. + // Telling the user how many we've rewritten so far is the best we can do + done := make(chan struct{}, SyncWorkers) + defer close(done) + go func() { + var rewritten int + for range done { + rewritten++ + if rewritten%50 == 0 { // log a message every 50 secrets + log.Infof(ctx, "[%s] %v secrets rewritten", rewriteSecretsOperation, rewritten) + } + } + }() + + getSecretID := func(s v1.Secret) string { + return strings.Join([]string{s.Namespace, s.Name}, "/") } - secretsQueue := util.GetObjectQueue(secretsList.Items) + // track secrets that have been rewritten + // this is needed in case the continue token expires and the list secrets operation needs to be restarted + rewritten := make(map[string]struct{}) + var rmtx sync.RWMutex + + // spawn workers to perform secret rewrites + var errgrp errgroup.Group for w := 0; w < SyncWorkers; w++ { errgrp.Go(func() error { var errList []error - for secret := range secretsQueue { + for secret := range rewrites { s := secret.(v1.Secret) - err := rewriteSecret(k8sClient, &s) - if err != nil { - errList = append(errList, err) + id := getSecretID(s) + + rmtx.RLock() + _, ok := rewritten[id] + rmtx.RUnlock() + + if !ok { + err := rewriteSecret(k8sClient, &s) + if err != nil { + errList = append(errList, err) + } + + rmtx.Lock() + rewritten[id] = struct{}{} + rmtx.Unlock() + + done <- struct{}{} } } + return util.ErrList(errList) }) } if err := errgrp.Wait(); err != nil { - return err + logrus.Errorf("[%v] error: %v", rewriteSecretsOperation, err) + return err // worker error from rewrites } - log.Infof(ctx, "Cluster secrets rewritten successfully") - return nil + + if cliErr != nil { + log.Infof(ctx, "[%s] Operation encountered error: %v", rewriteSecretsOperation, cliErr) + } else { + log.Infof(ctx, "[%s] Operation completed", rewriteSecretsOperation) + } + + return cliErr } func (c *Cluster) RotateEncryptionKey(ctx context.Context, fullState *FullState) error { - //generate new key + // generate new key newKey, err := generateEncryptionKey() if err != nil { return err } + oldKey, err := c.extractActiveKey(c.EncryptionConfig.EncryptionProviderFile) if err != nil { return err } - // reverse the keys order in the file, making newKey the Active Key - initialKeyList := []*encryptionKey{ // order is critical here! - newKey, - oldKey, - } - initialProviderConfig, err := providerFileFromKeyList(keyList{KeyList: initialKeyList}) + + logrus.Debug("adding new encryption key, provider config: [newKey, oldKey]") + + // Ensure encryption is done with newKey + err = c.updateEncryptionProvider(ctx, []*encryptionKey{newKey, oldKey}, fullState) if err != nil { return err } - c.EncryptionConfig.EncryptionProviderFile = initialProviderConfig - if err := c.DeployEncryptionProviderFile(ctx); err != nil { - return err - } - // commit to state as soon as possible - logrus.Debugf("[%s] Updating cluster state", services.ControlRole) - if err := c.UpdateClusterCurrentState(ctx, fullState); err != nil { - return err - } - if err := services.RestartKubeAPIWithHealthcheck(ctx, c.ControlPlaneHosts, c.LocalConnDialerFactory, c.Certificates); err != nil { - return err - } - // rewrite secrets + + // rewrite secrets via updates to secrets if err := c.RewriteSecrets(ctx); err != nil { + // if there's a rewrite error, the cluster will need to be restored, so redeploy the initial encryption provider config + var updateErr error + for i := 0; i < 3; i++ { // up to 3 retries + updateErr = c.updateEncryptionProvider(ctx, []*encryptionKey{oldKey}, fullState) + if updateErr == nil { + break + } + } + + if updateErr != nil { + err = errors.Wrap(err, updateErr.Error()) + } + return err } + // At this point, all secrets have been rewritten using the newKey, so we remove the old one. - finalKeyList := []*encryptionKey{ - newKey, + logrus.Debug("removing old encryption key, provider config: [newKey]") + + err = c.updateEncryptionProvider(ctx, []*encryptionKey{newKey}, fullState) + if err != nil { + return err } - finalProviderConfig, err := providerFileFromKeyList(keyList{KeyList: finalKeyList}) + + return nil +} + +func (c *Cluster) updateEncryptionProvider(ctx context.Context, keys []*encryptionKey, fullState *FullState) error { + providerConfig, err := providerFileFromKeyList(keyList{KeyList: keys}) if err != nil { return err } - c.EncryptionConfig.EncryptionProviderFile = finalProviderConfig + + c.EncryptionConfig.EncryptionProviderFile = providerConfig if err := c.DeployEncryptionProviderFile(ctx); err != nil { return err } - // commit to state + + // commit to state as soon as possible logrus.Debugf("[%s] Updating cluster state", services.ControlRole) if err := c.UpdateClusterCurrentState(ctx, fullState); err != nil { return err @@ -201,6 +324,7 @@ func (c *Cluster) RotateEncryptionKey(ctx context.Context, fullState *FullState) if err := services.RestartKubeAPIWithHealthcheck(ctx, c.ControlPlaneHosts, c.LocalConnDialerFactory, c.Certificates); err != nil { return err } + return nil } @@ -301,6 +425,18 @@ func (c *Cluster) generateDisabledEncryptionProviderFile() (string, error) { return disabledProviderFileFromKey(key) } +const ( + errExpiredToken = "The provided continue parameter is too old" +) + +// isExpiredTokenErr returns true if the error passed in is due to a continue token expiring +func isExpiredTokenErr(err error) bool { + if strings.Contains(err.Error(), errExpiredToken) { + return true + } + return false +} + func rewriteSecret(k8sClient *kubernetes.Clientset, secret *v1.Secret) error { var err error if err = k8s.UpdateSecret(k8sClient, secret); err == nil { @@ -309,6 +445,10 @@ func rewriteSecret(k8sClient *kubernetes.Clientset, secret *v1.Secret) error { if apierrors.IsConflict(err) { secret, err = k8s.GetSecret(k8sClient, secret.Name, secret.Namespace) if err != nil { + // if the secret no longer exists, we can skip it since it does not need to be rewritten + if apierrors.IsNotFound(err) { + return nil + } return err } err = k8s.UpdateSecret(k8sClient, secret) @@ -335,6 +475,7 @@ func isEncryptionEnabled(rkeConfig *v3.RancherKubernetesEngineConfig) bool { } return false } + func isEncryptionCustomConfig(rkeConfig *v3.RancherKubernetesEngineConfig) bool { if isEncryptionEnabled(rkeConfig) && rkeConfig.Services.KubeAPI.SecretsEncryptionConfig.CustomConfig != nil { diff --git a/cmd/encryption.go b/cmd/encryption.go index 070d63684..61ed2cc35 100644 --- a/cmd/encryption.go +++ b/cmd/encryption.go @@ -8,6 +8,7 @@ import ( "github.com/rancher/rke/hosts" "github.com/rancher/rke/log" "github.com/rancher/rke/pki" + "github.com/rancher/rke/pki/cert" v3 "github.com/rancher/rke/types" "github.com/sirupsen/logrus" "github.com/urfave/cli" @@ -57,18 +58,25 @@ func rotateEncryptionKeyFromCli(ctx *cli.Context) error { // setting up the flags flags := cluster.GetExternalFlags(false, false, false, false, "", filePath) - return RotateEncryptionKey(context.Background(), rkeConfig, hosts.DialersOptions{}, flags) + _, _, _, _, _, err = RotateEncryptionKey(context.Background(), rkeConfig, hosts.DialersOptions{}, flags) + return err } -func RotateEncryptionKey(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, - dialersOptions hosts.DialersOptions, flags cluster.ExternalFlags) error { - log.Infof(ctx, "Rotating cluster secrets encryption key..") +func RotateEncryptionKey( + ctx context.Context, + rkeConfig *v3.RancherKubernetesEngineConfig, + dialersOptions hosts.DialersOptions, + flags cluster.ExternalFlags, +) (string, string, string, string, map[string]pki.CertificatePKI, error) { + log.Infof(ctx, "Rotating cluster secrets encryption key") + + var APIURL, caCrt, clientCert, clientKey string + stateFilePath := cluster.GetStateFilePath(flags.ClusterFilePath, flags.ConfigDir) rkeFullState, _ := cluster.ReadStateFile(ctx, stateFilePath) - // We generate the first encryption config in ClusterInit, to store it ASAP. It's written - // to the DesiredState - stateEncryptionConfig := rkeFullState.DesiredState.EncryptionConfig + // We generate the first encryption config in ClusterInit, to store it ASAP. It's written to the DesiredState + stateEncryptionConfig := rkeFullState.DesiredState.EncryptionConfig // if CurrentState has EncryptionConfig, it means this is NOT the first time we enable encryption, we should use the _latest_ applied value from the current cluster if rkeFullState.CurrentState.EncryptionConfig != "" { stateEncryptionConfig = rkeFullState.CurrentState.EncryptionConfig @@ -76,32 +84,43 @@ func RotateEncryptionKey(ctx context.Context, rkeConfig *v3.RancherKubernetesEng kubeCluster, err := cluster.InitClusterObject(ctx, rkeConfig, flags, stateEncryptionConfig) if err != nil { - return err + return APIURL, caCrt, clientCert, clientKey, nil, err } + if kubeCluster.IsEncryptionCustomConfig() { - return fmt.Errorf("can't rotate encryption keys: Key Rotation is not supported with custom configuration") + return APIURL, caCrt, clientCert, clientKey, nil, fmt.Errorf("can't rotate encryption keys: Key Rotation is not supported with custom configuration") } if !kubeCluster.IsEncryptionEnabled() { - return fmt.Errorf("can't rotate encryption keys: Encryption Configuration is disabled") + return APIURL, caCrt, clientCert, clientKey, nil, fmt.Errorf("can't rotate encryption keys: Encryption Configuration is disabled") } + kubeCluster.Certificates = rkeFullState.DesiredState.CertificatesBundle if err := kubeCluster.SetupDialers(ctx, dialersOptions); err != nil { - return err + return APIURL, caCrt, clientCert, clientKey, nil, err } if err := kubeCluster.TunnelHosts(ctx, flags); err != nil { - return err + return APIURL, caCrt, clientCert, clientKey, nil, err + } + if len(kubeCluster.ControlPlaneHosts) > 0 { + APIURL = fmt.Sprintf("https://%s:6443", kubeCluster.ControlPlaneHosts[0].Address) } + clientCert = string(cert.EncodeCertPEM(kubeCluster.Certificates[pki.KubeAdminCertName].Certificate)) + clientKey = string(cert.EncodePrivateKeyPEM(kubeCluster.Certificates[pki.KubeAdminCertName].Key)) + caCrt = string(cert.EncodeCertPEM(kubeCluster.Certificates[pki.CACertName].Certificate)) err = kubeCluster.RotateEncryptionKey(ctx, rkeFullState) if err != nil { - return err + return APIURL, caCrt, clientCert, clientKey, nil, err } + // make sure we have the latest state rkeFullState, _ = cluster.ReadStateFile(ctx, stateFilePath) + log.Infof(ctx, "Reconciling cluster state") if err := kubeCluster.ReconcileDesiredStateEncryptionConfig(ctx, rkeFullState); err != nil { - return err + return APIURL, caCrt, clientCert, clientKey, nil, err } + log.Infof(ctx, "Cluster secrets encryption key rotated successfully") - return nil + return APIURL, caCrt, clientCert, clientKey, kubeCluster.Certificates, nil } diff --git a/cmd/up.go b/cmd/up.go index 91d76d019..ed4527d8d 100644 --- a/cmd/up.go +++ b/cmd/up.go @@ -85,10 +85,9 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c if err != nil { return APIURL, caCrt, clientCert, clientKey, nil, err } - // We generate the first encryption config in ClusterInit, to store it ASAP. It's written - // to the DesiredState - stateEncryptionConfig := clusterState.DesiredState.EncryptionConfig + // We generate the first encryption config in ClusterInit, to store it ASAP. It's written to the DesiredState + stateEncryptionConfig := clusterState.DesiredState.EncryptionConfig // if CurrentState has EncryptionConfig, it means this is NOT the first time we enable encryption, we should use the _latest_ applied value from the current cluster if clusterState.CurrentState.EncryptionConfig != "" { stateEncryptionConfig = clusterState.CurrentState.EncryptionConfig @@ -103,6 +102,10 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c if kubeCluster.RancherKubernetesEngineConfig.RotateCertificates != nil { return rebuildClusterWithRotatedCertificates(ctx, dialersOptions, flags, svcOptionsData) } + // if we need to rotate the encryption key, do so and then return + if kubeCluster.RancherKubernetesEngineConfig.RotateEncryptionKey { + return RotateEncryptionKey(ctx, clusterState.CurrentState.RancherKubernetesEngineConfig.DeepCopy(), dialersOptions, flags) + } log.Infof(ctx, "Building Kubernetes cluster") err = kubeCluster.SetupDialers(ctx, dialersOptions) diff --git a/log/log.go b/log/log.go index d2639f18e..396cd1063 100644 --- a/log/log.go +++ b/log/log.go @@ -32,7 +32,6 @@ func getLogger(ctx context.Context) logger { func Infof(ctx context.Context, msg string, args ...interface{}) { getLogger(ctx).Infof(msg, args...) - } func Warnf(ctx context.Context, msg string, args ...interface{}) { diff --git a/types/rke_types.go b/types/rke_types.go index 85e3a3d36..27ebb6b9c 100644 --- a/types/rke_types.go +++ b/types/rke_types.go @@ -57,6 +57,8 @@ type RancherKubernetesEngineConfig struct { Restore RestoreConfig `yaml:"restore" json:"restore,omitempty"` // Rotating Certificates Option RotateCertificates *RotateCertificates `yaml:"rotate_certificates,omitempty" json:"rotateCertificates,omitempty"` + // Rotate Encryption Key Option + RotateEncryptionKey bool `yaml:"rotate_encryption_key" json:"rotateEncryptionKey"` // DNS Config DNS *DNSConfig `yaml:"dns" json:"dns,omitempty"` // Upgrade Strategy for the cluster diff --git a/vendor/k8s.io/client-go/util/retry/OWNERS b/vendor/k8s.io/client-go/util/retry/OWNERS new file mode 100644 index 000000000..dec3e88d6 --- /dev/null +++ b/vendor/k8s.io/client-go/util/retry/OWNERS @@ -0,0 +1,4 @@ +# See the OWNERS docs at https://go.k8s.io/owners + +reviewers: +- caesarxuchao diff --git a/vendor/k8s.io/client-go/util/retry/util.go b/vendor/k8s.io/client-go/util/retry/util.go new file mode 100644 index 000000000..15e2722f3 --- /dev/null +++ b/vendor/k8s.io/client-go/util/retry/util.go @@ -0,0 +1,105 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package retry + +import ( + "time" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/wait" +) + +// DefaultRetry is the recommended retry for a conflict where multiple clients +// are making changes to the same resource. +var DefaultRetry = wait.Backoff{ + Steps: 5, + Duration: 10 * time.Millisecond, + Factor: 1.0, + Jitter: 0.1, +} + +// DefaultBackoff is the recommended backoff for a conflict where a client +// may be attempting to make an unrelated modification to a resource under +// active management by one or more controllers. +var DefaultBackoff = wait.Backoff{ + Steps: 4, + Duration: 10 * time.Millisecond, + Factor: 5.0, + Jitter: 0.1, +} + +// OnError allows the caller to retry fn in case the error returned by fn is retriable +// according to the provided function. backoff defines the maximum retries and the wait +// interval between two retries. +func OnError(backoff wait.Backoff, retriable func(error) bool, fn func() error) error { + var lastErr error + err := wait.ExponentialBackoff(backoff, func() (bool, error) { + err := fn() + switch { + case err == nil: + return true, nil + case retriable(err): + lastErr = err + return false, nil + default: + return false, err + } + }) + if err == wait.ErrWaitTimeout { + err = lastErr + } + return err +} + +// RetryOnConflict is used to make an update to a resource when you have to worry about +// conflicts caused by other code making unrelated updates to the resource at the same +// time. fn should fetch the resource to be modified, make appropriate changes to it, try +// to update it, and return (unmodified) the error from the update function. On a +// successful update, RetryOnConflict will return nil. If the update function returns a +// "Conflict" error, RetryOnConflict will wait some amount of time as described by +// backoff, and then try again. On a non-"Conflict" error, or if it retries too many times +// and gives up, RetryOnConflict will return an error to the caller. +// +// err := retry.RetryOnConflict(retry.DefaultRetry, func() error { +// // Fetch the resource here; you need to refetch it on every try, since +// // if you got a conflict on the last update attempt then you need to get +// // the current version before making your own changes. +// pod, err := c.Pods("mynamespace").Get(name, metav1.GetOptions{}) +// if err ! nil { +// return err +// } +// +// // Make whatever updates to the resource are needed +// pod.Status.Phase = v1.PodFailed +// +// // Try to update +// _, err = c.Pods("mynamespace").UpdateStatus(pod) +// // You have to return err itself here (not wrapped inside another error) +// // so that RetryOnConflict can identify it correctly. +// return err +// }) +// if err != nil { +// // May be conflict if max retries were hit, or may be something unrelated +// // like permissions or a network error +// return err +// } +// ... +// +// TODO: Make Backoff an interface? +func RetryOnConflict(backoff wait.Backoff, fn func() error) error { + return OnError(backoff, errors.IsConflict, fn) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index e10b3b56d..61f4e7da9 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -510,6 +510,7 @@ k8s.io/client-go/util/flowcontrol k8s.io/client-go/util/homedir k8s.io/client-go/util/jsonpath k8s.io/client-go/util/keyutil +k8s.io/client-go/util/retry k8s.io/client-go/util/workqueue # k8s.io/component-base v0.18.0 k8s.io/component-base/version