/
loadBalancer.go
420 lines (373 loc) · 14.6 KB
/
loadBalancer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
package provider
import (
"context"
"fmt"
"net/netip"
"strings"
"github.com/kube-vip/kube-vip-cloud-provider/pkg/ipam"
"go4.org/netipx"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog"
)
const (
// this annotation is for specifying IPs for a loadbalancer
// use plural for dual stack support in the future
// Example: kube-vip.io/loadbalancerIPs: 10.1.2.3,fd00::100
loadbalancerIPsAnnotations = "kube-vip.io/loadbalancerIPs"
implementationLabelKey = "implementation"
implementationLabelValue = "kube-vip"
legacyIpamAddressLabelKey = "ipam-address"
)
// kubevipLoadBalancerManager -
type kubevipLoadBalancerManager struct {
kubeClient kubernetes.Interface
namespace string
cloudConfigMap string
}
func newLoadBalancer(kubeClient kubernetes.Interface, ns, cm string) cloudprovider.LoadBalancer {
k := &kubevipLoadBalancerManager{
kubeClient: kubeClient,
namespace: ns,
cloudConfigMap: cm,
}
return k
}
func (k *kubevipLoadBalancerManager) EnsureLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) (lbs *v1.LoadBalancerStatus, err error) {
return k.syncLoadBalancer(ctx, service)
}
func (k *kubevipLoadBalancerManager) UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) (err error) {
_, err = k.syncLoadBalancer(ctx, service)
return err
}
func (k *kubevipLoadBalancerManager) EnsureLoadBalancerDeleted(ctx context.Context, clusterName string, service *v1.Service) error {
return k.deleteLoadBalancer(ctx, service)
}
func (k *kubevipLoadBalancerManager) GetLoadBalancer(ctx context.Context, clusterName string, service *v1.Service) (status *v1.LoadBalancerStatus, exists bool, err error) {
if service.Labels[implementationLabelKey] == implementationLabelValue {
return &service.Status.LoadBalancer, true, nil
}
return nil, false, nil
}
// GetLoadBalancerName returns the name of the load balancer. Implementations must treat the
// *v1.Service parameter as read-only and not modify it.
func (k *kubevipLoadBalancerManager) GetLoadBalancerName(_ context.Context, clusterName string, service *v1.Service) string {
return getDefaultLoadBalancerName(service)
}
func getDefaultLoadBalancerName(service *v1.Service) string {
return cloudprovider.DefaultLoadBalancerName(service)
}
// nolint
func (k *kubevipLoadBalancerManager) deleteLoadBalancer(ctx context.Context, service *v1.Service) error {
klog.Infof("deleting service '%s' (%s)", service.Name, service.UID)
return nil
}
// syncLoadBalancer
// 1. Is this loadBalancer already created, and does it have an address? return status
// 2. Is this a new loadBalancer (with no IP address)
// 2a. Get all existing kube-vip services
// 2b. Get the network configuration for this service (namespace) / (CIDR/Range)
// 2c. Between the two find a free address
func (k *kubevipLoadBalancerManager) syncLoadBalancer(ctx context.Context, service *v1.Service) (*v1.LoadBalancerStatus, error) {
// This function reconciles the load balancer state
klog.Infof("syncing service '%s' (%s)", service.Name, service.UID)
// The loadBalancer address has already been populated
if service.Spec.LoadBalancerIP != "" {
if v, ok := service.Annotations[loadbalancerIPsAnnotations]; !ok || len(v) == 0 {
klog.Warningf("service.Spec.LoadBalancerIP is defined but annotations '%s' is not, assume it's a legacy service, updates its annotations", loadbalancerIPsAnnotations)
// assume it's legacy service, need to update the annotation.
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
recentService, getErr := k.kubeClient.CoreV1().Services(service.Namespace).Get(ctx, service.Name, metav1.GetOptions{})
if getErr != nil {
return getErr
}
if recentService.Annotations == nil {
recentService.Annotations = make(map[string]string)
}
recentService.Annotations[loadbalancerIPsAnnotations] = service.Spec.LoadBalancerIP
// remove ipam-address label
delete(recentService.Labels, legacyIpamAddressLabelKey)
// Update the actual service with the annotations
_, updateErr := k.kubeClient.CoreV1().Services(recentService.Namespace).Update(ctx, recentService, metav1.UpdateOptions{})
return updateErr
})
if err != nil {
return nil, fmt.Errorf("error updating Service Spec [%s] : %v", service.Name, err)
}
}
return &service.Status.LoadBalancer, nil
} else {
if v, ok := service.Annotations[loadbalancerIPsAnnotations]; ok && len(v) != 0 {
klog.Infof("service '%s/%s' annotations '%s' is defined but service.Spec.LoadBalancerIP is not. Assume it's not legacy service", service.Namespace, service.Name, loadbalancerIPsAnnotations)
// Set Label for service lookups
if service.Labels == nil || service.Labels[implementationLabelKey] != implementationLabelValue {
klog.Infof("service '%s/%s' created with pre-defined ip '%s'", service.Namespace, service.Name, v)
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
recentService, getErr := k.kubeClient.CoreV1().Services(service.Namespace).Get(ctx, service.Name, metav1.GetOptions{})
if getErr != nil {
return getErr
}
if recentService.Labels == nil {
// Just because ..
recentService.Labels = make(map[string]string)
}
recentService.Labels[implementationLabelKey] = implementationLabelValue
// Update the actual service with the annotations
_, updateErr := k.kubeClient.CoreV1().Services(recentService.Namespace).Update(ctx, recentService, metav1.UpdateOptions{})
return updateErr
})
if err != nil {
return nil, fmt.Errorf("error updating Service Spec [%s] : %v", service.Name, err)
}
}
return &service.Status.LoadBalancer, nil
}
}
// Get the clound controller configuration map
controllerCM, err := k.GetConfigMap(ctx, k.cloudConfigMap, k.namespace)
if err != nil {
klog.Errorf("Unable to retrieve kube-vip ipam config from configMap [%s] in %s", k.cloudConfigMap, k.namespace)
// TODO - determine best course of action, create one if it doesn't exist
controllerCM, err = k.CreateConfigMap(ctx, k.cloudConfigMap, k.namespace)
if err != nil {
return nil, err
}
}
// Get ip pool from configmap and determine if it is namespace specific or global
pool, global, err := discoverPool(controllerCM, service.Namespace, k.cloudConfigMap)
if err != nil {
return nil, err
}
// Get all services in this namespace or globally, that have the correct label
var svcs *v1.ServiceList
if global {
svcs, err = k.kubeClient.CoreV1().Services("").List(ctx, metav1.ListOptions{LabelSelector: getKubevipImplementationLabel()})
if err != nil {
return &service.Status.LoadBalancer, err
}
} else {
svcs, err = k.kubeClient.CoreV1().Services(service.Namespace).List(ctx, metav1.ListOptions{LabelSelector: getKubevipImplementationLabel()})
if err != nil {
return &service.Status.LoadBalancer, err
}
}
builder := &netipx.IPSetBuilder{}
for x := range svcs.Items {
if ip, ok := svcs.Items[x].Annotations[loadbalancerIPsAnnotations]; ok {
addr, err := netip.ParseAddr(ip)
if err != nil {
return nil, err
}
builder.Add(addr)
}
}
inUseSet, err := builder.IPSet()
if err != nil {
return nil, err
}
descOrder := getSearchOrder(controllerCM)
// If the LoadBalancer address is empty, then do a local IPAM lookup
loadBalancerIPs, err := discoverVIPs(service.Namespace, pool, inUseSet, descOrder, service.Spec.IPFamilyPolicy, service.Spec.IPFamilies)
if err != nil {
return nil, err
}
// Update the services with this new address
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
recentService, getErr := k.kubeClient.CoreV1().Services(service.Namespace).Get(ctx, service.Name, metav1.GetOptions{})
if getErr != nil {
return getErr
}
klog.Infof("Updating service [%s], with load balancer IPAM address(es) [%s]", service.Name, loadBalancerIPs)
if recentService.Labels == nil {
// Just because ..
recentService.Labels = make(map[string]string)
}
// Set Label for service lookups
recentService.Labels[implementationLabelKey] = implementationLabelValue
if recentService.Annotations == nil {
recentService.Annotations = make(map[string]string)
}
// use annotation instead of label to support ipv6
recentService.Annotations[loadbalancerIPsAnnotations] = loadBalancerIPs
// this line will be removed once kube-vip can recognize annotations
// Set IPAM address to Load Balancer Service
recentService.Spec.LoadBalancerIP = strings.Split(loadBalancerIPs, ",")[0]
// Update the actual service with the address and the labels
_, updateErr := k.kubeClient.CoreV1().Services(recentService.Namespace).Update(ctx, recentService, metav1.UpdateOptions{})
return updateErr
})
if retryErr != nil {
return nil, fmt.Errorf("error updating Service Spec [%s] : %v", service.Name, retryErr)
}
return &service.Status.LoadBalancer, nil
}
func discoverPool(cm *v1.ConfigMap, namespace, configMapName string) (pool string, global bool, err error) {
var cidr, ipRange string
var ok bool
// Find Cidr
cidrKey := fmt.Sprintf("cidr-%s", namespace)
// Lookup current namespace
if cidr, ok = cm.Data[cidrKey]; !ok {
klog.Info(fmt.Errorf("no cidr config for namespace [%s] exists in key [%s] configmap [%s]", namespace, cidrKey, configMapName))
// Lookup global cidr configmap data
if cidr, ok = cm.Data["cidr-global"]; !ok {
klog.Info(fmt.Errorf("no global cidr config exists [cidr-global]"))
} else {
klog.Infof("Taking address from [cidr-global] pool")
return cidr, true, nil
}
} else {
klog.Infof("Taking address from [%s] pool", cidrKey)
return cidr, false, nil
}
// Find Range
rangeKey := fmt.Sprintf("range-%s", namespace)
// Lookup current namespace
if ipRange, ok = cm.Data[rangeKey]; !ok {
klog.Info(fmt.Errorf("no range config for namespace [%s] exists in key [%s] configmap [%s]", namespace, rangeKey, configMapName))
// Lookup global range configmap data
if ipRange, ok = cm.Data["range-global"]; !ok {
klog.Info(fmt.Errorf("no global range config exists [range-global]"))
} else {
klog.Infof("Taking address from [range-global] pool")
return ipRange, true, nil
}
} else {
klog.Infof("Taking address from [%s] pool", rangeKey)
return ipRange, false, nil
}
return "", false, fmt.Errorf("no address pools could be found")
}
func discoverVIPs(
namespace, pool string, inUseIPSet *netipx.IPSet, descOrder bool,
ipFamilyPolicy *v1.IPFamilyPolicy, ipFamilies []v1.IPFamily,
) (vips string, err error) {
var ipv4Pool, ipv6Pool string
// Check if DHCP is required
if pool == "0.0.0.0/32" {
return "0.0.0.0", nil
// Check if ip pool contains a cidr, if not assume it is a range
} else if len(pool) == 0 {
return "", fmt.Errorf("could not discover address: pool is not specified")
} else if strings.Contains(pool, "/") {
ipv4Pool, ipv6Pool, err = ipam.SplitCIDRsByIPFamily(pool)
} else {
ipv4Pool, ipv6Pool, err = ipam.SplitRangesByIPFamily(pool)
}
if err != nil {
return "", err
}
vipBuilder := strings.Builder{}
// Handle single stack case
if ipFamilyPolicy == nil || *ipFamilyPolicy == v1.IPFamilyPolicySingleStack {
ipPool := ipv4Pool
if len(ipFamilies) == 0 {
if len(ipv4Pool) == 0 {
ipPool = ipv6Pool
}
} else if ipFamilies[0] == v1.IPv6Protocol {
ipPool = ipv6Pool
}
if len(ipPool) == 0 {
return "", fmt.Errorf("could not find suitable pool for the IP family of the service")
}
return discoverAddress(namespace, ipPool, inUseIPSet, descOrder)
}
// Handle dual stack case
if *ipFamilyPolicy == v1.IPFamilyPolicyRequireDualStack {
// With RequireDualStack, we want to make sure both pools with both IP
// families exist
if len(ipv4Pool) == 0 || len(ipv6Pool) == 0 {
return "", fmt.Errorf("service requires dual-stack, but the configuration does not have both IPv4 and IPv6 pools listed for the namespace")
}
}
primaryPool := ipv4Pool
secondaryPool := ipv6Pool
if len(ipFamilies) > 0 && ipFamilies[0] == v1.IPv6Protocol {
primaryPool = ipv6Pool
secondaryPool = ipv4Pool
}
// Provide VIPs from both IP families if possible (guaranteed if RequireDualStack)
var primaryPoolErr, secondaryPoolErr error
if len(primaryPool) > 0 {
primaryVip, err := discoverAddress(namespace, primaryPool, inUseIPSet, descOrder)
if err == nil {
_, _ = vipBuilder.WriteString(primaryVip)
} else if _, outOfIPs := err.(*ipam.OutOfIPsError); outOfIPs {
primaryPoolErr = err
} else {
return "", err
}
}
if len(secondaryPool) > 0 {
secondaryVip, err := discoverAddress(namespace, secondaryPool, inUseIPSet, descOrder)
if err == nil {
if vipBuilder.Len() > 0 {
vipBuilder.WriteByte(',')
}
_, _ = vipBuilder.WriteString(secondaryVip)
} else if _, outOfIPs := err.(*ipam.OutOfIPsError); outOfIPs {
secondaryPoolErr = err
} else {
return "", err
}
}
if *ipFamilyPolicy == v1.IPFamilyPolicyPreferDualStack {
if primaryPoolErr != nil && secondaryPoolErr != nil {
return "", fmt.Errorf("could not allocate any IP address for PreferDualStack service: %s", renderErrors(primaryPoolErr, secondaryPoolErr))
}
singleError := primaryPoolErr
if secondaryPoolErr != nil {
singleError = secondaryPoolErr
}
if singleError != nil {
klog.Warningf("PreferDualStack service will be single-stack because of error: %s", singleError)
}
} else if *ipFamilyPolicy == v1.IPFamilyPolicyRequireDualStack {
if primaryPoolErr != nil || secondaryPoolErr != nil {
return "", fmt.Errorf("could not allocate required IP addresses for RequireDualStack service: %s", renderErrors(primaryPoolErr, secondaryPoolErr))
}
}
return vipBuilder.String(), nil
}
func discoverAddress(namespace, pool string, inUseIPSet *netipx.IPSet, descOrder bool) (vip string, err error) {
// Check if DHCP is required
if pool == "0.0.0.0/32" {
vip = "0.0.0.0"
// Check if ip pool contains a cidr, if not assume it is a range
} else if strings.Contains(pool, "/") {
vip, err = ipam.FindAvailableHostFromCidr(namespace, pool, inUseIPSet, descOrder)
if err != nil {
return "", err
}
} else {
vip, err = ipam.FindAvailableHostFromRange(namespace, pool, inUseIPSet, descOrder)
if err != nil {
return "", err
}
}
return vip, err
}
func getKubevipImplementationLabel() string {
return fmt.Sprintf("%s=%s", implementationLabelKey, implementationLabelValue)
}
func getSearchOrder(cm *v1.ConfigMap) (descOrder bool) {
if searchOrder, ok := cm.Data["search-order"]; ok {
if searchOrder == "desc" {
return true
}
}
return false
}
func renderErrors(errs ...error) string {
s := strings.Builder{}
for _, err := range errs {
if err != nil {
s.WriteString(fmt.Sprintf("\n\t- %s", err))
}
}
return s.String()
}