-
Notifications
You must be signed in to change notification settings - Fork 0
/
pod_watcher.go
129 lines (117 loc) · 3.66 KB
/
pod_watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package kube
import (
"context"
"fmt"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"github.com/utilitywarehouse/semaphore-policy/log"
"github.com/utilitywarehouse/semaphore-policy/metrics"
)
// PodEventHandler is the function to handle new events
type PodEventHandler = func(eventType watch.EventType, old *v1.Pod, new *v1.Pod)
// PodWatcher has a watch on the clients pods
type PodWatcher struct {
ctx context.Context
client kubernetes.Interface
resyncPeriod time.Duration
stopChannel chan struct{}
store cache.Store
controller cache.Controller
eventHandler PodEventHandler
labelSelector string
ListHealthy bool
WatchHealthy bool
}
// NewPodWatcher returns a new pod wathcer.
func NewPodWatcher(client kubernetes.Interface, resyncPeriod time.Duration, handler PodEventHandler, labelSelector string) *PodWatcher {
return &PodWatcher{
ctx: context.Background(),
client: client,
resyncPeriod: resyncPeriod,
stopChannel: make(chan struct{}),
eventHandler: handler,
labelSelector: labelSelector,
}
}
// Init sets up the list, watch functions and the cache.
func (pw *PodWatcher) Init() {
listWatch := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = pw.labelSelector
l, err := pw.client.CoreV1().Pods(metav1.NamespaceAll).List(pw.ctx, options)
if err != nil {
log.Logger.Error("pw: list error", "err", err)
pw.ListHealthy = false
metrics.IncPodWatcherFailures("list")
} else {
pw.ListHealthy = true
}
return l, err
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = pw.labelSelector
w, err := pw.client.CoreV1().Pods(metav1.NamespaceAll).Watch(pw.ctx, options)
if err != nil {
log.Logger.Error("pw: watch error", "err", err)
pw.WatchHealthy = false
metrics.IncPodWatcherFailures("watch")
} else {
pw.WatchHealthy = true
}
return w, err
},
}
eventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pw.eventHandler(watch.Added, nil, obj.(*v1.Pod))
},
UpdateFunc: func(oldObj, newObj interface{}) {
pw.eventHandler(watch.Modified, oldObj.(*v1.Pod), newObj.(*v1.Pod))
},
DeleteFunc: func(obj interface{}) {
pw.eventHandler(watch.Deleted, obj.(*v1.Pod), nil)
},
}
pw.store, pw.controller = cache.NewInformer(listWatch, &v1.Pod{}, pw.resyncPeriod, eventHandler)
}
// Run will not return unless writting in the stop channel
func (pw *PodWatcher) Run() {
log.Logger.Info("starting pod watcher")
// Running controller will block until writing on the stop channel.
pw.controller.Run(pw.stopChannel)
log.Logger.Info("stopped pod watcher")
}
// Stop stop the watcher via the respective channel
func (pw *PodWatcher) Stop() {
log.Logger.Info("stopping pod watcher")
close(pw.stopChannel)
}
// HasSynced calls controllers HasSync method to determine whether the watcher
// cache is synced.
func (pw *PodWatcher) HasSynced() bool {
return pw.controller.HasSynced()
}
// List lists all pods from the store
func (pw *PodWatcher) List() ([]*v1.Pod, error) {
var svcs []*v1.Pod
for _, obj := range pw.store.List() {
svc, ok := obj.(*v1.Pod)
if !ok {
return nil, fmt.Errorf("unexpected object in store: %+v", obj)
}
svcs = append(svcs, svc)
}
return svcs, nil
}
// Healthy is true when both list and watch handlers are running without errors.
func (pw *PodWatcher) Healthy() bool {
if pw.ListHealthy && pw.WatchHealthy {
return true
}
return false
}