-
Notifications
You must be signed in to change notification settings - Fork 75
/
watcher.go
183 lines (171 loc) · 6.39 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
// Package watcher is a module that handles talking to the k8s api, and watching ConfigMaps for a set of configurations, and emitting them when
// they change.
package watcher
import (
"context"
"errors"
"fmt"
"io/ioutil"
"strings"
"github.com/golang/glog"
"github.com/tumblr/k8s-sidecar-injector/internal/pkg/config"
"k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
k8sv1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
const (
serviceAccountNamespaceFilePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
)
// ErrWatchChannelClosed should restart watcher
var ErrWatchChannelClosed = errors.New("watcher channel has closed")
// K8sConfigMapWatcher is a struct that connects to the API and collects, parses, and emits sidecar configurations
type K8sConfigMapWatcher struct {
Config
client k8sv1.CoreV1Interface
}
// New creates a new K8sConfigMapWatcher
func New(cfg Config) (*K8sConfigMapWatcher, error) {
c := K8sConfigMapWatcher{Config: cfg}
if c.Namespace == "" {
// ENHANCEMENT: support downward API/env vars instead? https://github.com/kubernetes/kubernetes/blob/release-1.0/docs/user-guide/downward-api.md
// load from file on disk for serviceaccount: /var/run/secrets/kubernetes.io/serviceaccount/namespace
ns, err := ioutil.ReadFile(serviceAccountNamespaceFilePath)
if err != nil {
return nil, fmt.Errorf("%s: maybe you should specify --configmap-namespace if you are running outside of kubernetes", err.Error())
}
if string(ns) != "" {
c.Namespace = string(ns)
glog.V(2).Infof("Inferred ConfigMap search namespace=%s from %s", c.Namespace, serviceAccountNamespaceFilePath)
}
}
var (
err error
k8sConfig *rest.Config
)
if c.Kubeconfig != "" || c.MasterURL != "" {
glog.V(2).Infof("Creating Kubernetes client from kubeconfig=%s with masterurl=%s", c.Kubeconfig, c.MasterURL)
k8sConfig, err = clientcmd.BuildConfigFromFlags(c.MasterURL, c.Kubeconfig)
if err != nil {
return nil, err
}
} else {
glog.V(2).Infof("Creating Kubernetes client from in-cluster discovery")
k8sConfig, err = rest.InClusterConfig()
if err != nil {
return nil, err
}
}
clientset, err := kubernetes.NewForConfig(k8sConfig)
if err != nil {
return nil, err
}
c.client = clientset.CoreV1()
err = validate(&c)
if err != nil {
return nil, fmt.Errorf("validation failed for K8sConfigMapWatcher: %s", err.Error())
}
glog.V(2).Infof("Created ConfigMap watcher: apiserver=%s namespace=%s watchlabels=%v", k8sConfig.Host, c.Namespace, c.ConfigMapLabels)
return &c, nil
}
func validate(c *K8sConfigMapWatcher) error {
if c == nil {
return fmt.Errorf("configmap watcher was nil")
}
if c.Namespace == "" {
return fmt.Errorf("namespace is empty")
}
if c.ConfigMapLabels == nil {
return fmt.Errorf("configmap labels was an uninitialized map")
}
if c.client == nil {
return fmt.Errorf("k8s client was not setup properly")
}
return nil
}
// Watch watches for events impacting watched ConfigMaps and emits their events across a channel
func (c *K8sConfigMapWatcher) Watch(ctx context.Context, notifyMe chan<- interface{}) error {
glog.V(3).Infof("Watching for ConfigMaps for changes on namespace=%s with labels=%v", c.Namespace, c.ConfigMapLabels)
watcher, err := c.client.ConfigMaps(c.Namespace).Watch(ctx, metav1.ListOptions{
LabelSelector: mapStringStringToLabelSelector(c.ConfigMapLabels),
})
if err != nil {
return fmt.Errorf("unable to create watcher (possible serviceaccount RBAC/ACL failure?): %s", err.Error())
}
defer watcher.Stop()
for {
select {
case e, ok := <-watcher.ResultChan():
// channel may closed caused by HTTP timeout, should restart watcher
// detail at https://github.com/kubernetes/client-go/issues/334
if !ok {
glog.Errorf("channel has closed, should restart watcher")
return ErrWatchChannelClosed
}
if e.Type == watch.Error {
return apierrs.FromObject(e.Object)
}
glog.V(3).Infof("event: %s %s", e.Type, e.Object.GetObjectKind())
switch e.Type {
case watch.Added:
fallthrough
case watch.Modified:
fallthrough
case watch.Deleted:
// signal reconciliation of all InjectionConfigs
glog.V(3).Infof("signalling event received from watch channel: %s %s", e.Type, e.Object.GetObjectKind())
notifyMe <- struct{}{}
default:
glog.Errorf("got unsupported event %s for %s! skipping", e.Type, e.Object.GetObjectKind())
}
// events! yay!
case <-ctx.Done():
glog.V(2).Infof("stopping configmap watcher, context indicated we are done")
// clean up, we cancelled the context, so stop the watch
return nil
}
}
}
func mapStringStringToLabelSelector(m map[string]string) string {
// https://github.com/kubernetes/apimachinery/issues/47
return labels.Set(m).String()
}
// Get fetches all matching ConfigMaps
func (c *K8sConfigMapWatcher) Get(ctx context.Context) (cfgs []*config.InjectionConfig, err error) {
glog.V(1).Infof("Fetching ConfigMaps...")
clist, err := c.client.ConfigMaps(c.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: mapStringStringToLabelSelector(c.ConfigMapLabels),
})
if err != nil {
return cfgs, err
}
glog.V(1).Infof("Fetched %d ConfigMaps", len(clist.Items))
for _, cm := range clist.Items {
injectionConfigsForCM, err := InjectionConfigsFromConfigMap(cm)
if err != nil {
return cfgs, fmt.Errorf("error getting ConfigMaps from API: %s", err.Error())
}
glog.V(1).Infof("Found %d InjectionConfigs in %s", len(injectionConfigsForCM), cm.ObjectMeta.Name)
cfgs = append(cfgs, injectionConfigsForCM...)
}
return cfgs, nil
}
// InjectionConfigsFromConfigMap parse items in a configmap into a list of InjectionConfigs
func InjectionConfigsFromConfigMap(cm v1.ConfigMap) ([]*config.InjectionConfig, error) {
ics := []*config.InjectionConfig{}
for name, payload := range cm.Data {
glog.V(3).Infof("Parsing %s/%s:%s into InjectionConfig", cm.ObjectMeta.Namespace, cm.ObjectMeta.Name, name)
ic, err := config.LoadInjectionConfig(strings.NewReader(payload))
if err != nil {
return nil, fmt.Errorf("error parsing ConfigMap %s item %s into injection config: %s", cm.ObjectMeta.Name, name, err.Error())
}
glog.V(2).Infof("Loaded InjectionConfig %s from ConfigMap %s:%s", ic.Name, cm.ObjectMeta.Name, name)
ics = append(ics, ic)
}
return ics, nil
}