-
-
Notifications
You must be signed in to change notification settings - Fork 217
/
watch_services.go
339 lines (303 loc) · 11.7 KB
/
watch_services.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
package manager
import (
"context"
"fmt"
"os"
"sync"
"github.com/davecgh/go-spew/spew"
"github.com/kube-vip/kube-vip/pkg/vip"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
)
// TODO: Fix the naming of these contexts
// activeServiceLoadBalancer keeps track of services that already have a leaderElection in place
var activeServiceLoadBalancer map[string]context.Context
// activeServiceLoadBalancer keeps track of services that already have a leaderElection in place
var activeServiceLoadBalancerCancel map[string]func()
// activeService keeps track of services that already have a leaderElection in place
var activeService map[string]bool
// watchedService keeps track of services that are already being watched
var watchedService map[string]bool
// watchedService keeps track of routes that has been configured on the node
var configuredLocalRoutes sync.Map
func init() {
// Set up the caches for monitoring existing active or watched services
activeServiceLoadBalancerCancel = make(map[string]func())
activeServiceLoadBalancer = make(map[string]context.Context)
activeService = make(map[string]bool)
watchedService = make(map[string]bool)
}
// This function handles the watching of a services endpoints and updates a load balancers endpoint configurations accordingly
func (sm *Manager) servicesWatcher(ctx context.Context, serviceFunc func(context.Context, *v1.Service, *sync.WaitGroup) error) error {
// Watch function
var wg sync.WaitGroup
id, err := os.Hostname()
if err != nil {
return err
}
if sm.config.ServiceNamespace == "" {
// v1.NamespaceAll is actually "", but we'll stay with the const in case things change upstream
sm.config.ServiceNamespace = v1.NamespaceAll
log.Infof("(svcs) starting services watcher for all namespaces")
} else {
log.Infof("(svcs) starting services watcher for services in namespace [%s]", sm.config.ServiceNamespace)
}
// Use a restartable watcher, as this should help in the event of etcd or timeout issues
rw, err := watchtools.NewRetryWatcher("1", &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return sm.clientSet.CoreV1().Services(sm.config.ServiceNamespace).Watch(ctx, metav1.ListOptions{})
},
})
if err != nil {
return fmt.Errorf("error creating services watcher: %s", err.Error())
}
exitFunction := make(chan struct{})
go func() {
select {
case <-sm.shutdownChan:
log.Debug("(svcs) shutdown called")
// Stop the retry watcher
rw.Stop()
return
case <-exitFunction:
log.Debug("(svcs) function ending")
// Stop the retry watcher
rw.Stop()
return
}
}()
ch := rw.ResultChan()
// Used for tracking an active endpoint / pod
for event := range ch {
sm.countServiceWatchEvent.With(prometheus.Labels{"type": string(event.Type)}).Add(1)
// We need to inspect the event and get ResourceVersion out of it
switch event.Type {
case watch.Added, watch.Modified:
// log.Debugf("Endpoints for service [%s] have been Created or modified", s.service.ServiceName)
svc, ok := event.Object.(*v1.Service)
if !ok {
return fmt.Errorf("unable to parse Kubernetes services from API watcher")
}
// We only care about LoadBalancer services
if svc.Spec.Type != v1.ServiceTypeLoadBalancer {
break
}
svcAddresses := fetchServiceAddresses(svc)
// We only care about LoadBalancer services that have been allocated an address
if len(svcAddresses) <= 0 {
break
}
// Check the loadBalancer class
if svc.Spec.LoadBalancerClass != nil {
// if this isn't nil then it has been configured, check if it the kube-vip loadBalancer class
if *svc.Spec.LoadBalancerClass != sm.config.LoadBalancerClassName {
log.Infof("(svcs) [%s] specified the loadBalancer class [%s], ignoring", svc.Name, *svc.Spec.LoadBalancerClass)
break
}
} else if sm.config.LoadBalancerClassOnly {
// if kube-vip is configured to only recognize services with kube-vip's lb class, then ignore the services without any lb class
log.Infof("(svcs) kube-vip configured to only recognize services with kube-vip's lb class but the service [%s] didn't specify any loadBalancer class, ignoring", svc.Name)
break
}
// Check if we ignore this service
if svc.Annotations["kube-vip.io/ignore"] == "true" {
log.Infof("(svcs) [%s] has an ignore annotation for kube-vip", svc.Name)
break
}
// The modified event should only be triggered if the service has been modified (i.e. moved somewhere else)
if event.Type == watch.Modified {
for _, addr := range svcAddresses {
//log.Debugf("(svcs) Retreiving local addresses, to ensure that this modified address doesn't exist: %s", addr)
f, err := vip.GarbageCollect(sm.config.Interface, addr)
if err != nil {
log.Errorf("(svcs) cleaning existing address error: [%s]", err.Error())
}
if f {
log.Warnf("(svcs) already found existing address [%s] on adapter [%s]", addr, sm.config.Interface)
}
}
}
// Scenarios:
// 1.
if !activeService[string(svc.UID)] {
log.Debugf("(svcs) [%s] has been added/modified with addresses [%s]", svc.Name, fetchServiceAddresses(svc))
wg.Add(1)
activeServiceLoadBalancer[string(svc.UID)], activeServiceLoadBalancerCancel[string(svc.UID)] = context.WithCancel(context.TODO())
// Background the services election
// EnableServicesElection enabled
// watchEndpoint will do a ServicesElection by Service and understands local endpoints
//
// EnableRoutingTable enabled and EnableLeaderElection disabled
// watchEndpoint will also not do a leaderElection by service.
if sm.config.EnableServicesElection ||
((sm.config.EnableRoutingTable || sm.config.EnableBGP) && (!sm.config.EnableLeaderElection && !sm.config.EnableServicesElection)) {
if svc.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal {
// Start an endpoint watcher if we're not watching it already
if !watchedService[string(svc.UID)] {
// background the endpoint watcher
go func() {
if svc.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal {
// Add Endpoint or EndpointSlices watcher
wg.Add(1)
var provider epProvider
if !sm.config.EnableEndpointSlices {
provider = &endpointsProvider{label: "endpoints"}
} else {
provider = &endpointslicesProvider{label: "endpointslices"}
}
if err = sm.watchEndpoint(activeServiceLoadBalancer[string(svc.UID)], id, svc, &wg, provider); err != nil {
log.Error(err)
}
wg.Done()
}
}()
if (sm.config.EnableRoutingTable || sm.config.EnableBGP) && (!sm.config.EnableLeaderElection && !sm.config.EnableServicesElection) {
wg.Add(1)
go func() {
err = serviceFunc(activeServiceLoadBalancer[string(svc.UID)], svc, &wg)
if err != nil {
log.Error(err)
}
wg.Done()
}()
}
// We're now watching this service
watchedService[string(svc.UID)] = true
}
} else if (sm.config.EnableBGP || sm.config.EnableRoutingTable) && (!sm.config.EnableLeaderElection && !sm.config.EnableServicesElection) {
go func() {
if svc.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeCluster {
// Add Endpoint watcher
wg.Add(1)
var provider epProvider
if !sm.config.EnableEndpointSlices {
provider = &endpointsProvider{label: "endpoints"}
} else {
provider = &endpointslicesProvider{label: "endpointslices"}
}
if err = sm.watchEndpoint(activeServiceLoadBalancer[string(svc.UID)], id, svc, &wg, provider); err != nil {
log.Error(err)
}
wg.Done()
}
}()
wg.Add(1)
go func() {
err = serviceFunc(activeServiceLoadBalancer[string(svc.UID)], svc, &wg)
if err != nil {
log.Error(err)
}
wg.Done()
}()
} else {
// Increment the waitGroup before the service Func is called (Done is completed in there)
wg.Add(1)
go func() {
err = serviceFunc(activeServiceLoadBalancer[string(svc.UID)], svc, &wg)
if err != nil {
log.Error(err)
}
wg.Done()
}()
}
activeService[string(svc.UID)] = true
} else {
// Increment the waitGroup before the service Func is called (Done is completed in there)
wg.Add(1)
err = serviceFunc(activeServiceLoadBalancer[string(svc.UID)], svc, &wg)
if err != nil {
log.Error(err)
}
wg.Done()
}
}
case watch.Deleted:
svc, ok := event.Object.(*v1.Service)
if !ok {
return fmt.Errorf("unable to parse Kubernetes services from API watcher")
}
if activeService[string(svc.UID)] {
// We only care about LoadBalancer services
if svc.Spec.Type != v1.ServiceTypeLoadBalancer {
break
}
// We can ignore this service
if svc.Annotations["kube-vip.io/ignore"] == "true" {
log.Infof("(svcs) [%s] has an ignore annotation for kube-vip", svc.Name)
break
}
isRouteConfigured, err := isRouteConfigured(svc.UID)
if err != nil {
return fmt.Errorf("error while checkig if route is configured: %w", err)
}
// If no leader election is enabled, delete routes here
if !sm.config.EnableLeaderElection && !sm.config.EnableServicesElection &&
sm.config.EnableRoutingTable && isRouteConfigured {
if errs := sm.clearRoutes(svc); len(errs) == 0 {
configuredLocalRoutes.Store(string(svc.UID), false)
}
}
// If this is an active service then and additional leaderElection will handle stopping
err = sm.deleteService(string(svc.UID))
if err != nil {
log.Error(err)
}
// Calls the cancel function of the context
if activeServiceLoadBalancerCancel[string(svc.UID)] != nil {
activeServiceLoadBalancerCancel[string(svc.UID)]()
}
activeService[string(svc.UID)] = false
watchedService[string(svc.UID)] = false
}
if (sm.config.EnableBGP || sm.config.EnableRoutingTable) && sm.config.EnableLeaderElection && !sm.config.EnableServicesElection {
if sm.config.EnableBGP {
instance := sm.findServiceInstance(svc)
for _, vip := range instance.vipConfigs {
vipCidr := fmt.Sprintf("%s/%s", vip.VIP, vip.VIPCIDR)
err = sm.bgpServer.DelHost(vipCidr)
if err != nil {
log.Errorf("error deleting host %s: %s", vipCidr, err.Error())
}
}
} else {
sm.clearRoutes(svc)
}
}
log.Infof("(svcs) [%s/%s] has been deleted", svc.Namespace, svc.Name)
case watch.Bookmark:
// Un-used
case watch.Error:
log.Error("Error attempting to watch Kubernetes services")
// This round trip allows us to handle unstructured status
errObject := apierrors.FromObject(event.Object)
statusErr, ok := errObject.(*apierrors.StatusError)
if !ok {
log.Errorf(spew.Sprintf("Received an error which is not *metav1.Status but %#+v", event.Object))
}
status := statusErr.ErrStatus
log.Errorf("services -> %v", status)
default:
}
}
close(exitFunction)
log.Warnln("Stopping watching services for type: LoadBalancer in all namespaces")
return nil
}
func isRouteConfigured(serviceUID types.UID) (bool, error) {
isConfigured := false
value, ok := configuredLocalRoutes.Load(string(serviceUID))
if ok {
isConfigured, ok = value.(bool)
if !ok {
return false, fmt.Errorf("error converting configuredLocalRoute item to boolean value")
}
}
return isConfigured, nil
}