forked from rancher/rke
-
Notifications
You must be signed in to change notification settings - Fork 0
/
state.go
156 lines (145 loc) · 5.11 KB
/
state.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package cluster
import (
"context"
"fmt"
"os"
"time"
"github.com/rancher/rke/k8s"
"github.com/rancher/rke/log"
"github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
"k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
)
func (c *Cluster) SaveClusterState(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig) error {
if len(c.ControlPlaneHosts) > 0 {
// Reinitialize kubernetes Client
var err error
c.KubeClient, err = k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport)
if err != nil {
return fmt.Errorf("Failed to re-initialize Kubernetes Client: %v", err)
}
err = saveClusterCerts(ctx, c.KubeClient, c.Certificates)
if err != nil {
return fmt.Errorf("[certificates] Failed to Save Kubernetes certificates: %v", err)
}
err = saveStateToKubernetes(ctx, c.KubeClient, c.LocalKubeConfigPath, rkeConfig)
if err != nil {
return fmt.Errorf("[state] Failed to save configuration state: %v", err)
}
}
return nil
}
func (c *Cluster) GetClusterState(ctx context.Context) (*Cluster, error) {
var err error
var currentCluster *Cluster
// check if local kubeconfig file exists
if _, err = os.Stat(c.LocalKubeConfigPath); !os.IsNotExist(err) {
log.Infof(ctx, "[state] Found local kube config file, trying to get state from cluster")
// to handle if current local admin is down and we need to use new cp from the list
if !isLocalConfigWorking(ctx, c.LocalKubeConfigPath, c.K8sWrapTransport) {
if err := rebuildLocalAdminConfig(ctx, c); err != nil {
return nil, err
}
}
// initiate kubernetes client
c.KubeClient, err = k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport)
if err != nil {
log.Warnf(ctx, "Failed to initiate new Kubernetes Client: %v", err)
return nil, nil
}
// Get previous kubernetes state
currentCluster = getStateFromKubernetes(ctx, c.KubeClient, c.LocalKubeConfigPath)
// Get previous kubernetes certificates
if currentCluster != nil {
if err := currentCluster.InvertIndexHosts(); err != nil {
return nil, fmt.Errorf("Failed to classify hosts from fetched cluster: %v", err)
}
currentCluster.Certificates, err = getClusterCerts(ctx, c.KubeClient, currentCluster.EtcdHosts)
currentCluster.DockerDialerFactory = c.DockerDialerFactory
currentCluster.LocalConnDialerFactory = c.LocalConnDialerFactory
if err != nil {
return nil, fmt.Errorf("Failed to Get Kubernetes certificates: %v", err)
}
// setting cluster defaults for the fetched cluster as well
currentCluster.setClusterDefaults(ctx)
currentCluster.Certificates, err = regenerateAPICertificate(c, currentCluster.Certificates)
if err != nil {
return nil, fmt.Errorf("Failed to regenerate KubeAPI certificate %v", err)
}
}
}
return currentCluster, nil
}
func saveStateToKubernetes(ctx context.Context, kubeClient *kubernetes.Clientset, kubeConfigPath string, rkeConfig *v3.RancherKubernetesEngineConfig) error {
log.Infof(ctx, "[state] Saving cluster state to Kubernetes")
clusterFile, err := yaml.Marshal(*rkeConfig)
if err != nil {
return err
}
timeout := make(chan bool, 1)
go func() {
for {
err := k8s.UpdateConfigMap(kubeClient, clusterFile, StateConfigMapName)
if err != nil {
time.Sleep(time.Second * 5)
continue
}
log.Infof(ctx, "[state] Successfully Saved cluster state to Kubernetes ConfigMap: %s", StateConfigMapName)
timeout <- true
break
}
}()
select {
case <-timeout:
return nil
case <-time.After(time.Second * UpdateStateTimeout):
return fmt.Errorf("[state] Timeout waiting for kubernetes to be ready")
}
}
func getStateFromKubernetes(ctx context.Context, kubeClient *kubernetes.Clientset, kubeConfigPath string) *Cluster {
log.Infof(ctx, "[state] Fetching cluster state from Kubernetes")
var cfgMap *v1.ConfigMap
var currentCluster Cluster
var err error
timeout := make(chan bool, 1)
go func() {
for {
cfgMap, err = k8s.GetConfigMap(kubeClient, StateConfigMapName)
if err != nil {
time.Sleep(time.Second * 5)
continue
}
log.Infof(ctx, "[state] Successfully Fetched cluster state to Kubernetes ConfigMap: %s", StateConfigMapName)
timeout <- true
break
}
}()
select {
case <-timeout:
clusterData := cfgMap.Data[StateConfigMapName]
err := yaml.Unmarshal([]byte(clusterData), ¤tCluster)
if err != nil {
return nil
}
return ¤tCluster
case <-time.After(time.Second * GetStateTimeout):
log.Infof(ctx, "Timed out waiting for kubernetes cluster to get state")
return nil
}
}
func GetK8sVersion(localConfigPath string, k8sWrapTransport k8s.WrapTransport) (string, error) {
logrus.Debugf("[version] Using %s to connect to Kubernetes cluster..", localConfigPath)
k8sClient, err := k8s.NewClient(localConfigPath, k8sWrapTransport)
if err != nil {
return "", fmt.Errorf("Failed to create Kubernetes Client: %v", err)
}
discoveryClient := k8sClient.DiscoveryClient
logrus.Debugf("[version] Getting Kubernetes server version..")
serverVersion, err := discoveryClient.ServerVersion()
if err != nil {
return "", fmt.Errorf("Failed to get Kubernetes server version: %v", err)
}
return fmt.Sprintf("%#v", *serverVersion), nil
}