forked from rancher/rancher
-
Notifications
You must be signed in to change notification settings - Fork 0
/
external_ip_service.go
137 lines (121 loc) · 3.44 KB
/
external_ip_service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package externalservice
import (
"context"
"reflect"
"encoding/json"
"github.com/rancher/types/apis/core/v1"
"github.com/rancher/types/config"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)
// this controller monitors services with "field.cattle.io/ipAddresses" annotation
// and creates an endpoint for it
const (
ExternalIPsAnnotation = "field.cattle.io/ipAddresses"
)
type Controller struct {
endpointsLister v1.EndpointsLister
endpoints v1.EndpointsInterface
}
func Register(ctx context.Context, workload *config.UserOnlyContext) {
c := &Controller{
endpoints: workload.Core.Endpoints(""),
endpointsLister: workload.Core.Endpoints("").Controller().Lister(),
}
workload.Core.Services("").AddHandler(ctx, "externalIpServiceController", c.sync)
}
func (c *Controller) sync(key string, obj *corev1.Service) (runtime.Object, error) {
if obj == nil || obj.DeletionTimestamp != nil {
return nil, nil
}
if obj.Annotations == nil {
return nil, nil
}
var newSubsets []corev1.EndpointSubset
if val, ok := obj.Annotations[ExternalIPsAnnotation]; !ok || val == "" {
return nil, nil
}
var ipsStr []string
err := json.Unmarshal([]byte(obj.Annotations[ExternalIPsAnnotation]), &ipsStr)
if err != nil {
logrus.Debugf("Failed to unmarshal ipAddresses, error: %v", err)
return nil, nil
}
if ipsStr == nil {
return nil, nil
}
var addresses []corev1.EndpointAddress
for _, ipStr := range ipsStr {
addresses = append(addresses, corev1.EndpointAddress{IP: ipStr})
}
var ports []corev1.EndpointPort
if len(addresses) > 0 {
for _, p := range obj.Spec.Ports {
epPort := corev1.EndpointPort{Name: p.Name, Protocol: p.Protocol, Port: p.TargetPort.IntVal}
ports = append(ports, epPort)
}
if len(ports) == 0 {
epPort := corev1.EndpointPort{Name: "default", Protocol: corev1.ProtocolTCP, Port: 42}
ports = append(ports, epPort)
}
newSubsets = append(newSubsets, corev1.EndpointSubset{
Addresses: addresses,
Ports: ports,
})
}
existing, err := c.endpointsLister.Get(obj.Namespace, obj.Name)
if err != nil && !apierrors.IsNotFound(err) {
return nil, err
}
if existing == nil {
controller := true
ownerRef := metav1.OwnerReference{
Name: obj.Name,
APIVersion: "v1",
UID: obj.UID,
Kind: "Service",
Controller: &controller,
}
ep := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: obj.Name,
OwnerReferences: []metav1.OwnerReference{ownerRef},
Namespace: obj.Namespace,
},
Subsets: newSubsets,
}
logrus.Infof("Creating endpoints for external ip service [%s]: %v", key, ep.Subsets)
if _, err := c.endpoints.Create(ep); err != nil {
return nil, err
}
} else if subsetsChanged(newSubsets, existing.Subsets) {
toUpdate := existing.DeepCopy()
toUpdate.Subsets = newSubsets
logrus.Infof("Updating endpoints for external ip service [%s]: %v", key, toUpdate.Subsets)
if _, err := c.endpoints.Update(toUpdate); err != nil {
return nil, err
}
}
return nil, nil
}
func subsetsChanged(new []corev1.EndpointSubset, old []corev1.EndpointSubset) bool {
if len(new) != len(old) {
return true
}
for _, newEP := range new {
found := false
for _, oldEP := range old {
if reflect.DeepEqual(newEP, oldEP) {
found = true
break
}
}
if !found {
return true
}
}
return false
}