-
Notifications
You must be signed in to change notification settings - Fork 0
/
mastercount.go
245 lines (225 loc) · 8.48 KB
/
mastercount.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package reconcilers master count based reconciler
package reconcilers
import (
"net"
"sync"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/retry"
"k8s.io/klog"
endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints"
)
// masterCountEndpointReconciler reconciles endpoints based on a specified expected number of
// masters. masterCountEndpointReconciler implements EndpointReconciler.
type masterCountEndpointReconciler struct {
masterCount int
endpointClient corev1client.EndpointsGetter
stopReconcilingCalled bool
reconcilingLock sync.Mutex
}
// NewMasterCountEndpointReconciler creates a new EndpointReconciler that reconciles based on a
// specified expected number of masters.
func NewMasterCountEndpointReconciler(masterCount int, endpointClient corev1client.EndpointsGetter) EndpointReconciler {
return &masterCountEndpointReconciler{
masterCount: masterCount,
endpointClient: endpointClient,
}
}
// ReconcileEndpoints sets the endpoints for the given apiserver service (ro or rw).
// ReconcileEndpoints expects that the endpoints objects it manages will all be
// managed only by ReconcileEndpoints; therefore, to understand this, you need only
// understand the requirements and the body of this function.
//
// Requirements:
// * All apiservers MUST use the same ports for their {rw, ro} services.
// * All apiservers MUST use ReconcileEndpoints and only ReconcileEndpoints to manage the
// endpoints for their {rw, ro} services.
// * All apiservers MUST know and agree on the number of apiservers expected
// to be running (c.masterCount).
// * ReconcileEndpoints is called periodically from all apiservers.
func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error {
r.reconcilingLock.Lock()
defer r.reconcilingLock.Unlock()
if r.stopReconcilingCalled {
return nil
}
e, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Get(serviceName, metav1.GetOptions{})
if err != nil {
e = &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Namespace: metav1.NamespaceDefault,
},
}
}
if errors.IsNotFound(err) {
// Simply create non-existing endpoints for the service.
e.Subsets = []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: ip.String()}},
Ports: endpointPorts,
}}
_, err = r.endpointClient.Endpoints(metav1.NamespaceDefault).Create(e)
return err
}
// First, determine if the endpoint is in the format we expect (one
// subset, ports matching endpointPorts, N IP addresses).
formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormat(e, ip.String(), endpointPorts, r.masterCount, reconcilePorts)
if !formatCorrect {
// Something is egregiously wrong, just re-make the endpoints record.
e.Subsets = []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: ip.String()}},
Ports: endpointPorts,
}}
klog.Warningf("Resetting endpoints for master service %q to %#v", serviceName, e)
_, err = r.endpointClient.Endpoints(metav1.NamespaceDefault).Update(e)
return err
}
if ipCorrect && portsCorrect {
return nil
}
if !ipCorrect {
// We *always* add our own IP address.
e.Subsets[0].Addresses = append(e.Subsets[0].Addresses, corev1.EndpointAddress{IP: ip.String()})
// Lexicographic order is retained by this step.
e.Subsets = endpointsv1.RepackSubsets(e.Subsets)
// If too many IP addresses, remove the ones lexicographically after our
// own IP address. Given the requirements stated at the top of
// this function, this should cause the list of IP addresses to
// become eventually correct.
if addrs := &e.Subsets[0].Addresses; len(*addrs) > r.masterCount {
// addrs is a pointer because we're going to mutate it.
for i, addr := range *addrs {
if addr.IP == ip.String() {
for len(*addrs) > r.masterCount {
// wrap around if necessary.
remove := (i + 1) % len(*addrs)
*addrs = append((*addrs)[:remove], (*addrs)[remove+1:]...)
}
break
}
}
}
}
if !portsCorrect {
// Reset ports.
e.Subsets[0].Ports = endpointPorts
}
klog.Warningf("Resetting endpoints for master service %q to %v", serviceName, e)
_, err = r.endpointClient.Endpoints(metav1.NamespaceDefault).Update(e)
return err
}
func (r *masterCountEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error {
r.reconcilingLock.Lock()
defer r.reconcilingLock.Unlock()
r.stopReconcilingCalled = true
e, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Get(serviceName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
// Endpoint doesn't exist
return nil
}
return err
}
// Remove our IP from the list of addresses
new := []corev1.EndpointAddress{}
for _, addr := range e.Subsets[0].Addresses {
if addr.IP != ip.String() {
new = append(new, addr)
}
}
e.Subsets[0].Addresses = new
e.Subsets = endpointsv1.RepackSubsets(e.Subsets)
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
_, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Update(e)
return err
})
return err
}
// Determine if the endpoint is in the format ReconcileEndpoints expects.
//
// Return values:
// * formatCorrect is true if exactly one subset is found.
// * ipCorrect is true when current master's IP is found and the number
// of addresses is less than or equal to the master count.
// * portsCorrect is true when endpoint ports exactly match provided ports.
// portsCorrect is only evaluated when reconcilePorts is set to true.
func checkEndpointSubsetFormat(e *corev1.Endpoints, ip string, ports []corev1.EndpointPort, count int, reconcilePorts bool) (formatCorrect bool, ipCorrect bool, portsCorrect bool) {
if len(e.Subsets) != 1 {
return false, false, false
}
sub := &e.Subsets[0]
portsCorrect = true
if reconcilePorts {
if len(sub.Ports) != len(ports) {
portsCorrect = false
}
for i, port := range ports {
if len(sub.Ports) <= i || port != sub.Ports[i] {
portsCorrect = false
break
}
}
}
for _, addr := range sub.Addresses {
if addr.IP == ip {
ipCorrect = len(sub.Addresses) <= count
break
}
}
return true, ipCorrect, portsCorrect
}
// GetMasterServiceUpdateIfNeeded sets service attributes for the
// given apiserver service.
// * GetMasterServiceUpdateIfNeeded expects that the service object it
// manages will be managed only by GetMasterServiceUpdateIfNeeded;
// therefore, to understand this, you need only understand the
// requirements and the body of this function.
// * GetMasterServiceUpdateIfNeeded ensures that the correct ports are
// are set.
//
// Requirements:
// * All apiservers MUST use GetMasterServiceUpdateIfNeeded and only
// GetMasterServiceUpdateIfNeeded to manage service attributes
// * updateMasterService is called periodically from all apiservers.
func GetMasterServiceUpdateIfNeeded(svc *corev1.Service, servicePorts []corev1.ServicePort, serviceType corev1.ServiceType) (s *corev1.Service, updated bool) {
// Determine if the service is in the format we expect
// (servicePorts are present and service type matches)
formatCorrect := checkServiceFormat(svc, servicePorts, serviceType)
if formatCorrect {
return svc, false
}
svc.Spec.Ports = servicePorts
svc.Spec.Type = serviceType
return svc, true
}
// Determine if the service is in the correct format
// GetMasterServiceUpdateIfNeeded expects (servicePorts are correct
// and service type matches).
func checkServiceFormat(s *corev1.Service, ports []corev1.ServicePort, serviceType corev1.ServiceType) (formatCorrect bool) {
if s.Spec.Type != serviceType {
return false
}
if len(ports) != len(s.Spec.Ports) {
return false
}
for i, port := range ports {
if port != s.Spec.Ports[i] {
return false
}
}
return true
}