-
Notifications
You must be signed in to change notification settings - Fork 590
/
endpoints.go
213 lines (182 loc) · 5.7 KB
/
endpoints.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
package adminapi
import (
"context"
"fmt"
"strings"
discoveryv1 "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/client"
cfgtypes "github.com/kong/kubernetes-ingress-controller/v3/internal/manager/config/types"
)
// DiscoveredAdminAPI represents an Admin API discovered from a Kubernetes Service.
type DiscoveredAdminAPI struct {
Address string
PodRef k8stypes.NamespacedName
}
type Discoverer struct {
// portNames is the set of port names that Admin API Service ports will be
// matched against.
portNames sets.Set[string]
// dnsStrategy is the DNS strategy to use when resolving Admin API Service
// addresses.
dnsStrategy cfgtypes.DNSStrategy
}
func NewDiscoverer(
adminAPIPortNames sets.Set[string],
dnsStrategy cfgtypes.DNSStrategy,
) (*Discoverer, error) {
if adminAPIPortNames.Len() == 0 {
return nil, fmt.Errorf("no admin API port names provided")
}
if err := dnsStrategy.Validate(); err != nil {
return nil, fmt.Errorf("invalid dns strategy: %w", err)
}
return &Discoverer{
portNames: adminAPIPortNames,
dnsStrategy: dnsStrategy,
}, nil
}
// GetAdminAPIsForService performs an endpoint lookup, using provided kubeClient
// to list provided Admin API Service EndpointSlices.
// The retrieved EndpointSlices' ports are compared with the provided portNames set.
func (d *Discoverer) GetAdminAPIsForService(
ctx context.Context,
kubeClient client.Client,
service k8stypes.NamespacedName,
) (sets.Set[DiscoveredAdminAPI], error) {
const (
defaultEndpointSliceListPagingLimit = 100
)
// Get all the EndpointSlices assigned to the provided service.
labelReq, err := labels.NewRequirement("kubernetes.io/service-name", selection.Equals, []string{service.Name})
if err != nil {
return nil, err
}
var (
addresses = sets.New[DiscoveredAdminAPI]()
continueToken string
labelSelector = labels.NewSelector().Add(*labelReq)
)
for {
var endpointsList discoveryv1.EndpointSliceList
if err := kubeClient.List(ctx, &endpointsList, &client.ListOptions{
LabelSelector: labelSelector,
Namespace: service.Namespace,
Continue: continueToken,
Limit: defaultEndpointSliceListPagingLimit,
}); err != nil {
return nil, err
}
for _, es := range endpointsList.Items {
adminAPI, err := d.AdminAPIsFromEndpointSlice(es)
if err != nil {
return nil, err
}
addresses = addresses.Union(adminAPI)
}
if endpointsList.Continue == "" {
break
}
continueToken = endpointsList.Continue
}
return addresses, nil
}
// AdminAPIsFromEndpointSlice returns a list of Admin APIs when given
// an EndpointSlice.
func (d *Discoverer) AdminAPIsFromEndpointSlice(
endpoints discoveryv1.EndpointSlice,
) (sets.Set[DiscoveredAdminAPI], error) {
discoveredAdminAPIs := sets.New[DiscoveredAdminAPI]()
for _, p := range endpoints.Ports {
if p.Name == nil {
continue
}
if !d.portNames.Has(*p.Name) {
continue
}
var serviceName string
for _, or := range endpoints.OwnerReferences {
if or.Kind == "Service" && or.APIVersion == "v1" {
serviceName = or.Name
break
}
}
for _, e := range endpoints.Endpoints {
if e.Conditions.Terminating != nil && *e.Conditions.Terminating {
continue
}
// We do not take into account endpoints that are not backed by a Pod.
if e.TargetRef == nil || e.TargetRef.Kind != "Pod" {
continue
}
if len(e.Addresses) < 1 {
continue
}
svc := k8stypes.NamespacedName{
Name: serviceName,
Namespace: endpoints.Namespace,
}
adminAPI, err := adminAPIFromEndpoint(e, p, svc, d.dnsStrategy, endpoints.AddressType)
if err != nil {
return nil, err
}
discoveredAdminAPIs = discoveredAdminAPIs.Insert(adminAPI)
}
}
return discoveredAdminAPIs, nil
}
func adminAPIFromEndpoint(
endpoint discoveryv1.Endpoint,
port discoveryv1.EndpointPort,
service k8stypes.NamespacedName,
dnsStrategy cfgtypes.DNSStrategy,
addressFamily discoveryv1.AddressType,
) (DiscoveredAdminAPI, error) {
podNN := k8stypes.NamespacedName{
Name: endpoint.TargetRef.Name,
Namespace: endpoint.TargetRef.Namespace,
}
// NOTE: Endpoint's addresses are assumed to be fungible, therefore we pick
// only the first one.
// For the context please see the `Endpoint.Addresses` godoc.
eAddress := endpoint.Addresses[0]
// NOTE: We assume https below because the referenced Admin API
// server will live in another Pod/elsewhere so allowing http would
// not be considered best practice.
switch dnsStrategy {
case cfgtypes.ServiceScopedPodDNSStrategy:
if service.Name == "" {
return DiscoveredAdminAPI{}, fmt.Errorf(
"service name is empty for an endpoint with TargetRef %s/%s",
endpoint.TargetRef.Namespace, endpoint.TargetRef.Name,
)
}
ipAddr := strings.ReplaceAll(eAddress, ".", "-")
address := fmt.Sprintf("%s.%s.%s.svc", ipAddr, service.Name, service.Namespace)
return DiscoveredAdminAPI{
Address: fmt.Sprintf("https://%s:%d", address, *port.Port),
PodRef: podNN,
}, nil
case cfgtypes.NamespaceScopedPodDNSStrategy:
ipAddr := strings.ReplaceAll(eAddress, ".", "-")
address := fmt.Sprintf("%s.%s.pod", ipAddr, service.Namespace)
return DiscoveredAdminAPI{
Address: fmt.Sprintf("https://%s:%d", address, *port.Port),
PodRef: podNN,
}, nil
case cfgtypes.IPDNSStrategy:
bounded := eAddress
if addressFamily == discoveryv1.AddressTypeIPv6 {
bounded = fmt.Sprintf("[%s]", bounded)
}
return DiscoveredAdminAPI{
Address: fmt.Sprintf("https://%s:%d", bounded, *port.Port),
PodRef: podNN,
}, nil
default:
return DiscoveredAdminAPI{}, fmt.Errorf("unknown dns strategy: %s", dnsStrategy)
}
}