/
config_handler.go
113 lines (98 loc) · 2.79 KB
/
config_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package multicluster
import (
"context"
"sync"
"github.com/rotisserie/eris"
"github.com/solo-io/go-utils/errutils"
"github.com/solo-io/solo-kit/pkg/api/v1/clients/kube/cache"
"github.com/solo-io/solo-kit/pkg/multicluster/handler"
v1 "github.com/solo-io/solo-kit/pkg/multicluster/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
type RestConfigs map[string]*rest.Config
type RestConfigHandler struct {
kcWatcher KubeConfigWatcher
handlers []handler.ClusterHandler
cache RestConfigs
cacheAccess sync.Mutex
handlerAccess sync.Mutex
}
func NewRestConfigHandler(kcWatcher KubeConfigWatcher, handlers ...handler.ClusterHandler) *RestConfigHandler {
return &RestConfigHandler{kcWatcher: kcWatcher, handlers: handlers}
}
func (h *RestConfigHandler) Run(ctx context.Context, local *rest.Config, kubeClient kubernetes.Interface, kubeCache cache.KubeCoreCache) (<-chan error, error) {
kubeConfigs, errs, err := h.kcWatcher.WatchKubeConfigs(ctx, kubeClient, kubeCache)
if err != nil {
return nil, err
}
ourErrs := make(chan error)
go errutils.AggregateErrs(ctx, ourErrs, errs, "watching kubernetes *rest.Configs")
go func() {
for {
select {
case <-ctx.Done():
return
case kcs := <-kubeConfigs:
restConfigs, err := parseRestConfigs(local, kcs)
if err != nil {
ourErrs <- err
continue
}
h.handleNewRestConfigs(restConfigs)
}
}
}()
return errs, nil
}
func (h *RestConfigHandler) handleNewRestConfigs(cfgs RestConfigs) {
h.cacheAccess.Lock()
defer h.cacheAccess.Unlock()
for cluster, oldCfg := range h.cache {
if _, persisted := cfgs[cluster]; persisted {
continue
}
h.clusterRemoved(cluster, oldCfg)
}
for cluster, newCfg := range cfgs {
if _, exists := h.cache[cluster]; exists {
continue
}
h.clusterAdded(cluster, newCfg)
}
// update cache
h.cache = cfgs
}
func (h *RestConfigHandler) clusterAdded(cluster string, cfg *rest.Config) {
h.handlerAccess.Lock()
defer h.handlerAccess.Unlock()
for _, handler := range h.handlers {
handler.ClusterAdded(cluster, cfg)
}
}
func (h *RestConfigHandler) clusterRemoved(cluster string, cfg *rest.Config) {
h.handlerAccess.Lock()
defer h.handlerAccess.Unlock()
for _, handler := range h.handlers {
handler.ClusterRemoved(cluster, cfg)
}
}
func parseRestConfigs(local *rest.Config, kcs v1.KubeConfigList) (RestConfigs, error) {
cfgs := RestConfigs{}
if local != nil {
cfgs[LocalCluster] = local
}
for _, kc := range kcs {
raw, err := clientcmd.Write(kc.Config)
if err != nil {
return nil, err
}
restCfg, err := clientcmd.RESTConfigFromKubeConfig(raw)
if err != nil {
return nil, eris.Wrapf(err, "failed to construct *rest.Config from kubeconfig %v", kc.Metadata.Ref())
}
cfgs[kc.Cluster] = restCfg
}
return cfgs, nil
}