Skip to content

Commit

Permalink
handle upgrade cases
Browse files Browse the repository at this point in the history
backup state to kubernetes
  • Loading branch information
moelsayed authored and Alena Prokharchyk committed Nov 9, 2018
1 parent 8b88703 commit 6da3525
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 9 deletions.
15 changes: 13 additions & 2 deletions cluster/certificates.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,13 @@ func regenerateAPICertificate(c *Cluster, certificates map[string]pki.Certificat
return certificates, nil
}

func getClusterCerts(ctx context.Context, kubeClient *kubernetes.Clientset, etcdHosts []*hosts.Host) (map[string]pki.CertificatePKI, error) {
func GetClusterCertsFromKubernetes(ctx context.Context, localConfigPath string, k8sWrapTransport k8s.WrapTransport, etcdHosts []*hosts.Host) (map[string]pki.CertificatePKI, error) {
log.Infof(ctx, "[certificates] Getting Cluster certificates from Kubernetes")

k8sClient, err := k8s.NewClient(localConfigPath, k8sWrapTransport)
if err != nil {
return nil, fmt.Errorf("Failed to create Kubernetes Client: %v", err)
}
certificatesNames := []string{
pki.CACertName,
pki.KubeAPICertName,
Expand All @@ -63,7 +68,7 @@ func getClusterCerts(ctx context.Context, kubeClient *kubernetes.Clientset, etcd

certMap := make(map[string]pki.CertificatePKI)
for _, certName := range certificatesNames {
secret, err := k8s.GetSecret(kubeClient, certName)
secret, err := k8s.GetSecret(k8sClient, certName)
if err != nil && !strings.HasPrefix(certName, "kube-etcd") &&
!strings.Contains(certName, pki.RequestHeaderCACertName) &&
!strings.Contains(certName, pki.APIProxyClientCertName) &&
Expand Down Expand Up @@ -104,6 +109,12 @@ func getClusterCerts(ctx context.Context, kubeClient *kubernetes.Clientset, etcd
ConfigPath: string(secret.Data["ConfigPath"]),
}
}
// Handle service account token key issue
kubeAPICert := certMap[pki.KubeAPICertName]
if certMap[pki.ServiceAccountTokenKeyName].Key == nil {
log.Infof(ctx, "[certificates] Creating service account token key")
certMap[pki.ServiceAccountTokenKeyName] = pki.ToCertObject(pki.ServiceAccountTokenKeyName, pki.ServiceAccountTokenKeyName, "", kubeAPICert.Certificate, kubeAPICert.Key)
}
log.Infof(ctx, "[certificates] Successfully fetched Cluster certificates from Kubernetes")
return certMap, nil
}
Expand Down
5 changes: 5 additions & 0 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Cluster struct {
const (
X509AuthenticationProvider = "x509"
StateConfigMapName = "cluster-state"
FullStateConfigMapName = "full-cluster-state"
UpdateStateTimeout = 30
GetStateTimeout = 30
KubernetesClientTimeOut = 30
Expand Down Expand Up @@ -204,6 +205,10 @@ func (c *Cluster) SetupDialers(ctx context.Context, dockerDialerFactory,
return nil
}

func RebuildKubeconfig(ctx context.Context, kubeCluster *Cluster) error {
return rebuildLocalAdminConfig(ctx, kubeCluster)
}

func rebuildLocalAdminConfig(ctx context.Context, kubeCluster *Cluster) error {
if len(kubeCluster.ControlPlaneHosts) == 0 {
return nil
Expand Down
47 changes: 44 additions & 3 deletions cluster/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,36 @@ func (c *Cluster) GetClusterState(ctx context.Context, fullState *RKEFullState,

return currentCluster, nil
}
func SaveFullStateToKubernetes(ctx context.Context, localConfigPath string, k8sWrapTransport k8s.WrapTransport, fullState *RKEFullState) error {
k8sClient, err := k8s.NewClient(localConfigPath, k8sWrapTransport)
if err != nil {
return fmt.Errorf("Failed to create Kubernetes Client: %v", err)
}
log.Infof(ctx, "[state] Saving full cluster state to Kubernetes")
stateFile, err := json.Marshal(*fullState)
if err != nil {
return err
}
timeout := make(chan bool, 1)
go func() {
for {
_, err := k8s.UpdateConfigMap(k8sClient, stateFile, FullStateConfigMapName)
if err != nil {
time.Sleep(time.Second * 5)
continue
}
log.Infof(ctx, "[state] Successfully Saved full cluster state to Kubernetes ConfigMap: %s", StateConfigMapName)
timeout <- true
break
}
}()
select {
case <-timeout:
return nil
case <-time.After(time.Second * UpdateStateTimeout):
return fmt.Errorf("[state] Timeout waiting for kubernetes to be ready")
}
}

func saveStateToKubernetes(ctx context.Context, kubeClient *kubernetes.Clientset, kubeConfigPath string, rkeConfig *v3.RancherKubernetesEngineConfig) error {
log.Infof(ctx, "[state] Saving cluster state to Kubernetes")
Expand Down Expand Up @@ -216,15 +246,18 @@ func saveStateToNodes(ctx context.Context, uniqueHosts []*hosts.Host, clusterSta
return nil
}

func getStateFromKubernetes(ctx context.Context, kubeClient *kubernetes.Clientset, kubeConfigPath string) (*Cluster, error) {
func GetStateFromKubernetes(ctx context.Context, kubeConfigPath string, k8sWrapTransport k8s.WrapTransport) (*Cluster, error) {
log.Infof(ctx, "[state] Fetching cluster state from Kubernetes")
k8sClient, err := k8s.NewClient(kubeConfigPath, k8sWrapTransport)
if err != nil {
return nil, fmt.Errorf("Failed to create Kubernetes Client: %v", err)
}
var cfgMap *v1.ConfigMap
var currentCluster Cluster
var err error
timeout := make(chan bool, 1)
go func() {
for {
cfgMap, err = k8s.GetConfigMap(kubeClient, StateConfigMapName)
cfgMap, err = k8s.GetConfigMap(k8sClient, StateConfigMapName)
if err != nil {
time.Sleep(time.Second * 5)
continue
Expand Down Expand Up @@ -382,3 +415,11 @@ func RemoveStateFile(ctx context.Context, statePath string) {
}
log.Infof(ctx, "State file removed successfully")
}

func RemoveLegacyStateFromKubernets(ctx context.Context, kubeConfigPath string, k8sWrapTransport k8s.WrapTransport) error {
k8sClient, err := k8s.NewClient(kubeConfigPath, k8sWrapTransport)
if err != nil {
return fmt.Errorf("Failed to create Kubernetes Client: %v", err)
}
return k8s.DeleteConfigMap(k8sClient, StateConfigMapName)
}
60 changes: 56 additions & 4 deletions cmd/up.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cmd
import (
"context"
"fmt"
"os"
"strings"
"time"

Expand All @@ -13,6 +14,7 @@ import (
"github.com/rancher/rke/log"
"github.com/rancher/rke/pki"
"github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
"k8s.io/client-go/util/cert"
)
Expand Down Expand Up @@ -64,21 +66,66 @@ func UpCommand() cli.Command {
Flags: upFlags,
}
}
func doUpgradeLegacyCluster(ctx context.Context, kubeCluster *cluster.Cluster, fullState *cluster.RKEFullState, k8sWrapTransport k8s.WrapTransport) error {
if _, err := os.Stat(kubeCluster.LocalKubeConfigPath); os.IsNotExist(err) {
// there is no kubeconfig. This is a new cluster
logrus.Debug("[state] local kubeconfig not found, this is a new cluster")
return nil
}
if fullState.CurrentState.RancherKubernetesEngineConfig != nil {
// this cluster has a previous state, I don't need to upgrade!
logrus.Debug("[state] previous state found, this is not a legacy cluster")
return nil
}
// We have a kubeconfig and no current state. This is a legacy cluster or a new cluster with old kubeconfig
// let's try to upgrade
log.Infof(ctx, "[state] Possible legacy cluster detected, trying to upgrade")
if err := cluster.RebuildKubeconfig(ctx, kubeCluster); err != nil {
return err
}
recoveredCluster, err := cluster.GetStateFromKubernetes(ctx, kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport)
if err != nil {
return err
}
// if we found a recovered cluster, we will need override the current state
if recoveredCluster != nil {
recoveredCerts, err := cluster.GetClusterCertsFromKubernetes(ctx, kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport, kubeCluster.EtcdHosts)
if err != nil {
return err
}
fullState.CurrentState.RancherKubernetesEngineConfig = recoveredCluster.RancherKubernetesEngineConfig.DeepCopy()
fullState.CurrentState.CertificatesBundle = cluster.TransformCertsToV3Certs(recoveredCerts)

// we don't want to regenerate certificates
fullState.DesiredState.CertificatesBundle = cluster.TransformCertsToV3Certs(recoveredCerts)
if err = fullState.WriteStateFile(ctx, kubeCluster.StateFilePath); err != nil {
return err
}
return cluster.RemoveLegacyStateFromKubernets(ctx, kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport)
}

func ClusterInit(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, configDir string) error {
return nil
}

func ClusterInit(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig, configDir string, k8sWrapTransport k8s.WrapTransport) error {
log.Infof(ctx, "Initiating Kubernetes cluster")
stateFilePath := cluster.GetStateFilePath(clusterFilePath, configDir)
rkeFullState, _ := cluster.ReadStateFile(ctx, stateFilePath)

kubeCluster, err := cluster.InitClusterObject(ctx, rkeConfig, clusterFilePath, configDir)
if err != nil {
return err
}

err = doUpgradeLegacyCluster(ctx, kubeCluster, rkeFullState, k8sWrapTransport)
if err != nil {
log.Warnf(ctx, "[state] can't fetch legacy cluster state from Kubernetes")
}

fullState, err := cluster.RebuildState(ctx, &kubeCluster.RancherKubernetesEngineConfig, rkeFullState, clusterFilePath, configDir)
if err != nil {
return err
}

rkeState := cluster.RKEFullState{
DesiredState: fullState.DesiredState,
CurrentState: fullState.CurrentState,
Expand Down Expand Up @@ -176,6 +223,11 @@ func ClusterUp(
return APIURL, caCrt, clientCert, clientKey, nil, err
}

err = cluster.SaveFullStateToKubernetes(ctx, kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport, clusterState)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
}

err = kubeCluster.DeployWorkerPlane(ctx)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
Expand Down Expand Up @@ -241,9 +293,9 @@ func clusterUpFromCli(ctx *cli.Context) error {
updateOnly := ctx.Bool("update-only")
disablePortCheck := ctx.Bool("disable-port-check")
if ctx.Bool("init") {
return ClusterInit(context.Background(), rkeConfig, "")
return ClusterInit(context.Background(), rkeConfig, "", nil)
}
if err := ClusterInit(context.Background(), rkeConfig, ""); err != nil {
if err := ClusterInit(context.Background(), rkeConfig, "", nil); err != nil {
return err
}
_, _, _, _, _, err = ClusterUp(context.Background(), rkeConfig, nil, nil, nil, false, "", updateOnly, disablePortCheck)
Expand Down
4 changes: 4 additions & 0 deletions k8s/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,7 @@ func UpdateConfigMap(k8sClient *kubernetes.Clientset, configYaml []byte, configM
func GetConfigMap(k8sClient *kubernetes.Clientset, configMapName string) (*v1.ConfigMap, error) {
return k8sClient.CoreV1().ConfigMaps(metav1.NamespaceSystem).Get(configMapName, metav1.GetOptions{})
}

func DeleteConfigMap(k8sClient *kubernetes.Clientset, configMapName string) error {
return k8sClient.CoreV1().ConfigMaps(metav1.NamespaceSystem).Delete(configMapName, &metav1.DeleteOptions{})
}

0 comments on commit 6da3525

Please sign in to comment.