forked from Qihoo360/wayne
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.go
248 lines (213 loc) · 6.79 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
package client
import (
"encoding/json"
"errors"
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
kcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
clientcmdlatest "k8s.io/client-go/tools/clientcmd/api/latest"
clientcmdapiv1 "k8s.io/client-go/tools/clientcmd/api/v1"
"github.com/Qihoo360/wayne/src/backend/models"
"github.com/Qihoo360/wayne/src/backend/util/logs"
)
const (
// High enough QPS to fit all expected use cases.
defaultQPS = 1e6
// High enough Burst to fit all expected use cases.
defaultBurst = 1e6
// full resyc cache resource time
defaultResyncPeriod = 30 * time.Second
)
var (
ErrNotExist = errors.New("cluster not exist. ")
ErrMaintaining = errors.New("cluster being maintaining .please try again later. ")
)
var (
clusterManagerSets = make(map[string]*ClusterManager)
)
type ClusterManager struct {
Cluster *models.Cluster
Client *kubernetes.Clientset
Config *rest.Config
Indexer *CacheIndexer
}
type CacheIndexer struct {
stopChans chan struct{}
Pod kcache.Indexer
Event kcache.Indexer
}
func (c ClusterManager) Close() {
c.Indexer.stopChans <- struct{}{}
}
func BuildApiserverClient() {
newClusters, err := models.ClusterModel.GetAllNormal()
if err != nil {
logs.Error("build apiserver client get all cluster error.", err)
return
}
changed := clusterChanged(newClusters)
if changed {
logs.Info("cluster changed, so resync info...")
// build new clientManager
newClusterManagerSets := make(map[string]*ClusterManager)
for i := 0; i < len(newClusters); i++ {
cluster := newClusters[i]
if cluster.Master == "" {
logs.Warning("cluster's master is null:%s", cluster.Name)
continue
}
clientSet, config, err := buildClient(cluster.Master, cluster.KubeConfig)
if err != nil {
logs.Warning("build cluster (%s) client error :%v", cluster.Name, err)
continue
}
cacheIndexer := buildCacheController(clientSet)
clusterManager := &ClusterManager{
Client: clientSet,
Config: config,
Cluster: &cluster,
Indexer: cacheIndexer,
}
newClusterManagerSets[cluster.Name] = clusterManager
}
// stop all old cacheController
stopAllCacheController()
clusterManagerSets = newClusterManagerSets
}
}
func stopAllCacheController() {
// TODO 停止之后,controller 仍然会定期list维护中的集群资源,需要解决
for _, manager := range clusterManagerSets {
manager.Close()
}
}
func clusterChanged(clusters []models.Cluster) bool {
if len(clusterManagerSets) == 0 {
return true
}
if len(clusterManagerSets) != len(clusters) {
return true
}
for _, cluster := range clusters {
manager, ok := clusterManagerSets[cluster.Name]
if !ok {
// maybe add new cluster
return true
}
// master changed, the cluster is changed, ignore others
if manager.Cluster.Master != cluster.Master {
logs.Info("cluster master (%s) changed to (%s).", manager.Cluster.Master, cluster.Master)
return true
}
if manager.Cluster.Status != cluster.Status {
logs.Info("cluster status (%d) changed to (%d).", manager.Cluster.Status, cluster.Status)
return true
}
if manager.Cluster.KubeConfig != manager.Cluster.KubeConfig {
logs.Info("cluster kubeConfig (%d) changed to (%d).", manager.Cluster.KubeConfig, cluster.KubeConfig)
return true
}
}
return false
}
func buildCacheController(client *kubernetes.Clientset) *CacheIndexer {
stopCh := make(chan struct{})
// create the pod watcher
podListWatcher := kcache.NewListWatchFromClient(client.CoreV1().RESTClient(), "pods", v1.NamespaceAll, fields.Everything())
podIndexer, podInformer := kcache.NewIndexerInformer(podListWatcher, &v1.Pod{}, defaultResyncPeriod, kcache.ResourceEventHandlerFuncs{}, kcache.Indexers{})
go podInformer.Run(stopCh)
// create the event watcher
eventListWatcher := kcache.NewListWatchFromClient(client.CoreV1().RESTClient(), "events", v1.NamespaceAll, fields.Everything())
eventIndexer, eventInformer := kcache.NewIndexerInformer(eventListWatcher, &v1.Event{}, defaultResyncPeriod, kcache.ResourceEventHandlerFuncs{}, kcache.Indexers{})
go eventInformer.Run(stopCh)
return &CacheIndexer{
Pod: podIndexer,
Event: eventIndexer,
stopChans: stopCh,
}
}
func Cluster(cluster string) (*models.Cluster, error) {
manager, exist := clusterManagerSets[cluster]
// 如果不存在,则重新获取一次集群信息
if !exist {
BuildApiserverClient()
manager, exist = clusterManagerSets[cluster]
if !exist {
return nil, ErrNotExist
}
}
if manager.Cluster.Status == models.ClusterStatusMaintaining {
return nil, ErrMaintaining
}
return manager.Cluster, nil
}
func Client(cluster string) (*kubernetes.Clientset, error) {
manager, exist := clusterManagerSets[cluster]
// 如果不存在,则重新获取一次集群信息
if !exist {
BuildApiserverClient()
manager, exist = clusterManagerSets[cluster]
if !exist {
return nil, ErrNotExist
}
}
if manager.Cluster.Status == models.ClusterStatusMaintaining {
return nil, ErrMaintaining
}
return manager.Client, nil
}
func Manager(cluster string) (*ClusterManager, error) {
manager, exist := clusterManagerSets[cluster]
// 如果不存在,则重新获取一次集群信息
if !exist {
BuildApiserverClient()
manager, exist = clusterManagerSets[cluster]
if !exist {
return nil, ErrNotExist
}
}
if manager.Cluster.Status == models.ClusterStatusMaintaining {
return nil, ErrMaintaining
}
return manager, nil
}
func Clients() map[string]*kubernetes.Clientset {
clientSets := map[string]*kubernetes.Clientset{}
for cluster, cManager := range clusterManagerSets {
clientSets[cluster] = cManager.Client
}
return clientSets
}
func Managers() map[string]*ClusterManager {
return clusterManagerSets
}
func buildClient(master string, kubeconfig string) (*kubernetes.Clientset, *rest.Config, error) {
configV1 := clientcmdapiv1.Config{}
err := json.Unmarshal([]byte(kubeconfig), &configV1)
if err != nil {
logs.Error("json unmarshal kubeconfig error. %v ", err)
return nil, nil, err
}
configObject, err := clientcmdlatest.Scheme.ConvertToVersion(&configV1, clientcmdapi.SchemeGroupVersion)
configInternal := configObject.(*clientcmdapi.Config)
clientConfig, err := clientcmd.NewDefaultClientConfig(*configInternal, &clientcmd.ConfigOverrides{
ClusterDefaults: clientcmdapi.Cluster{Server: master},
}).ClientConfig()
if err != nil {
logs.Error("build client config error. %v ", err)
return nil, nil, err
}
clientConfig.QPS = defaultQPS
clientConfig.Burst = defaultBurst
clientSet, err := kubernetes.NewForConfig(clientConfig)
if err != nil {
logs.Error("(%s) kubernetes.NewForConfig(%v) error.%v", master, err, clientConfig)
return nil, nil, err
}
return clientSet, clientConfig, nil
}