/
storage_core.go
493 lines (431 loc) · 19.1 KB
/
storage_core.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rest
import (
"crypto/tls"
goerrors "errors"
"fmt"
"net"
"net/http"
"sync"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/rest"
genericapiserver "k8s.io/apiserver/pkg/server"
serverstorage "k8s.io/apiserver/pkg/server/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
networkingv1alpha1client "k8s.io/client-go/kubernetes/typed/networking/v1alpha1"
policyclient "k8s.io/client-go/kubernetes/typed/policy/v1"
"k8s.io/kubernetes/pkg/api/legacyscheme"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/cluster/ports"
"k8s.io/kubernetes/pkg/features"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/registry/core/componentstatus"
endpointsstore "k8s.io/kubernetes/pkg/registry/core/endpoint/storage"
limitrangestore "k8s.io/kubernetes/pkg/registry/core/limitrange/storage"
nodestore "k8s.io/kubernetes/pkg/registry/core/node/storage"
pvstore "k8s.io/kubernetes/pkg/registry/core/persistentvolume/storage"
pvcstore "k8s.io/kubernetes/pkg/registry/core/persistentvolumeclaim/storage"
podstore "k8s.io/kubernetes/pkg/registry/core/pod/storage"
podtemplatestore "k8s.io/kubernetes/pkg/registry/core/podtemplate/storage"
"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
controllerstore "k8s.io/kubernetes/pkg/registry/core/replicationcontroller/storage"
"k8s.io/kubernetes/pkg/registry/core/service/allocator"
serviceallocator "k8s.io/kubernetes/pkg/registry/core/service/allocator/storage"
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
serviceipallocatorcontroller "k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller"
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
portallocatorcontroller "k8s.io/kubernetes/pkg/registry/core/service/portallocator/controller"
servicestore "k8s.io/kubernetes/pkg/registry/core/service/storage"
serviceaccountstore "k8s.io/kubernetes/pkg/registry/core/serviceaccount/storage"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/util/async"
netutils "k8s.io/utils/net"
)
// Config provides information needed to build RESTStorage for core.
type Config struct {
GenericConfig
Proxy ProxyConfig
Services ServicesConfig
}
type ProxyConfig struct {
Transport http.RoundTripper
KubeletClientConfig kubeletclient.KubeletClientConfig
}
type ServicesConfig struct {
// Service IP ranges
ClusterIPRange net.IPNet
SecondaryClusterIPRange net.IPNet
NodePortRange utilnet.PortRange
IPRepairInterval time.Duration
}
type rangeRegistries struct {
clusterIP rangeallocation.RangeRegistry
secondaryClusterIP rangeallocation.RangeRegistry
nodePort rangeallocation.RangeRegistry
}
type legacyProvider struct {
Config
primaryServiceClusterIPAllocator ipallocator.Interface
serviceClusterIPAllocators map[api.IPFamily]ipallocator.Interface
serviceNodePortAllocator *portallocator.PortAllocator
startServiceNodePortsRepair, startServiceClusterIPRepair func(onFirstSuccess func(), stopCh chan struct{})
}
func New(c Config) (*legacyProvider, error) {
rangeRegistries, serviceClusterIPAllocator, serviceIPAllocators, serviceNodePortAllocator, err := c.newServiceIPAllocators()
if err != nil {
return nil, err
}
p := &legacyProvider{
Config: c,
primaryServiceClusterIPAllocator: serviceClusterIPAllocator,
serviceClusterIPAllocators: serviceIPAllocators,
serviceNodePortAllocator: serviceNodePortAllocator,
}
// create service node port repair controller
client, err := kubernetes.NewForConfig(c.LoopbackClientConfig)
if err != nil {
return nil, err
}
p.startServiceNodePortsRepair = portallocatorcontroller.NewRepair(c.Services.IPRepairInterval, client.CoreV1(), client.EventsV1(), c.Services.NodePortRange, rangeRegistries.nodePort).RunUntil
// create service cluster ip repair controller
if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
p.startServiceClusterIPRepair = serviceipallocatorcontroller.NewRepair(
c.Services.IPRepairInterval,
client.CoreV1(),
client.EventsV1(),
&c.Services.ClusterIPRange,
rangeRegistries.clusterIP,
&c.Services.SecondaryClusterIPRange,
rangeRegistries.secondaryClusterIP,
).RunUntil
} else {
p.startServiceClusterIPRepair = serviceipallocatorcontroller.NewRepairIPAddress(
c.Services.IPRepairInterval,
client,
c.Informers.Core().V1().Services(),
c.Informers.Networking().V1alpha1().ServiceCIDRs(),
c.Informers.Networking().V1alpha1().IPAddresses(),
).RunUntil
}
return p, nil
}
func (p *legacyProvider) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, error) {
apiGroupInfo, err := p.GenericConfig.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
if err != nil {
return genericapiserver.APIGroupInfo{}, err
}
podDisruptionClient, err := policyclient.NewForConfig(p.LoopbackClientConfig)
if err != nil {
return genericapiserver.APIGroupInfo{}, err
}
podTemplateStorage, err := podtemplatestore.NewREST(restOptionsGetter)
if err != nil {
return genericapiserver.APIGroupInfo{}, err
}
limitRangeStorage, err := limitrangestore.NewREST(restOptionsGetter)
if err != nil {
return genericapiserver.APIGroupInfo{}, err
}
persistentVolumeStorage, persistentVolumeStatusStorage, err := pvstore.NewREST(restOptionsGetter)
if err != nil {
return genericapiserver.APIGroupInfo{}, err
}
persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage, err := pvcstore.NewREST(restOptionsGetter)
if err != nil {
return genericapiserver.APIGroupInfo{}, err
}
endpointsStorage, err := endpointsstore.NewREST(restOptionsGetter)
if err != nil {
return genericapiserver.APIGroupInfo{}, err
}
nodeStorage, err := nodestore.NewStorage(restOptionsGetter, p.Proxy.KubeletClientConfig, p.Proxy.Transport)
if err != nil {
return genericapiserver.APIGroupInfo{}, err
}
podStorage, err := podstore.NewStorage(
restOptionsGetter,
nodeStorage.KubeletConnectionInfo,
p.Proxy.Transport,
podDisruptionClient,
)
if err != nil {
return genericapiserver.APIGroupInfo{}, err
}
serviceRESTStorage, serviceStatusStorage, serviceRESTProxy, err := servicestore.NewREST(
restOptionsGetter,
p.primaryServiceClusterIPAllocator.IPFamily(),
p.serviceClusterIPAllocators,
p.serviceNodePortAllocator,
endpointsStorage,
podStorage.Pod,
p.Proxy.Transport)
if err != nil {
return genericapiserver.APIGroupInfo{}, err
}
storage := apiGroupInfo.VersionedResourcesStorageMap["v1"]
if storage == nil {
storage = map[string]rest.Storage{}
}
// potentially override the generic serviceaccount storage with one that supports pods
var serviceAccountStorage *serviceaccountstore.REST
if p.ServiceAccountIssuer != nil {
var nodeGetter rest.Getter
if utilfeature.DefaultFeatureGate.Enabled(features.ServiceAccountTokenNodeBinding) ||
utilfeature.DefaultFeatureGate.Enabled(features.ServiceAccountTokenPodNodeInfo) {
nodeGetter = nodeStorage.Node.Store
}
serviceAccountStorage, err = serviceaccountstore.NewREST(restOptionsGetter, p.ServiceAccountIssuer, p.APIAudiences, p.ServiceAccountMaxExpiration, podStorage.Pod.Store, storage["secrets"].(rest.Getter), nodeGetter, p.ExtendExpiration)
if err != nil {
return genericapiserver.APIGroupInfo{}, err
}
}
if resource := "pods"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = podStorage.Pod
storage[resource+"/attach"] = podStorage.Attach
storage[resource+"/status"] = podStorage.Status
storage[resource+"/log"] = podStorage.Log
storage[resource+"/exec"] = podStorage.Exec
storage[resource+"/portforward"] = podStorage.PortForward
storage[resource+"/proxy"] = podStorage.Proxy
storage[resource+"/binding"] = podStorage.Binding
if podStorage.Eviction != nil {
storage[resource+"/eviction"] = podStorage.Eviction
}
storage[resource+"/ephemeralcontainers"] = podStorage.EphemeralContainers
}
if resource := "bindings"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = podStorage.LegacyBinding
}
if resource := "podtemplates"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = podTemplateStorage
}
if resource := "replicationcontrollers"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
controllerStorage, err := controllerstore.NewStorage(restOptionsGetter)
if err != nil {
return genericapiserver.APIGroupInfo{}, err
}
storage[resource] = controllerStorage.Controller
storage[resource+"/status"] = controllerStorage.Status
if legacyscheme.Scheme.IsVersionRegistered(schema.GroupVersion{Group: "autoscaling", Version: "v1"}) {
storage[resource+"/scale"] = controllerStorage.Scale
}
}
// potentially override generic storage for service account (with pod support)
if resource := "serviceaccounts"; serviceAccountStorage != nil && apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
// don't leak go routines
storage[resource].Destroy()
if storage[resource+"/token"] != nil {
storage[resource+"/token"].Destroy()
}
storage[resource] = serviceAccountStorage
if serviceAccountStorage.Token != nil {
storage[resource+"/token"] = serviceAccountStorage.Token
}
}
if resource := "services"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = serviceRESTStorage
storage[resource+"/proxy"] = serviceRESTProxy
storage[resource+"/status"] = serviceStatusStorage
}
if resource := "endpoints"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = endpointsStorage
}
if resource := "nodes"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = nodeStorage.Node
storage[resource+"/proxy"] = nodeStorage.Proxy
storage[resource+"/status"] = nodeStorage.Status
}
if resource := "limitranges"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = limitRangeStorage
}
if resource := "persistentvolumes"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = persistentVolumeStorage
storage[resource+"/status"] = persistentVolumeStatusStorage
}
if resource := "persistentvolumeclaims"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = persistentVolumeClaimStorage
storage[resource+"/status"] = persistentVolumeClaimStatusStorage
}
if resource := "componentstatuses"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = componentstatus.NewStorage(componentStatusStorage{p.StorageFactory}.serversToValidate)
}
if len(storage) > 0 {
apiGroupInfo.VersionedResourcesStorageMap["v1"] = storage
}
return apiGroupInfo, nil
}
func (c *Config) newServiceIPAllocators() (registries rangeRegistries, primaryClusterIPAllocator ipallocator.Interface, clusterIPAllocators map[api.IPFamily]ipallocator.Interface, nodePortAllocator *portallocator.PortAllocator, err error) {
clusterIPAllocators = map[api.IPFamily]ipallocator.Interface{}
serviceStorageConfig, err := c.StorageFactory.NewConfig(api.Resource("services"))
if err != nil {
return rangeRegistries{}, nil, nil, nil, err
}
serviceClusterIPRange := c.Services.ClusterIPRange
if serviceClusterIPRange.IP == nil {
return rangeRegistries{}, nil, nil, nil, fmt.Errorf("service clusterIPRange is missing")
}
if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
primaryClusterIPAllocator, err = ipallocator.New(&serviceClusterIPRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) {
var mem allocator.Snapshottable
mem = allocator.NewAllocationMapWithOffset(max, rangeSpec, offset)
// TODO etcdallocator package to return a storage interface via the storageFactory
etcd, err := serviceallocator.NewEtcd(mem, "/ranges/serviceips", serviceStorageConfig.ForResource(api.Resource("serviceipallocations")))
if err != nil {
return nil, err
}
registries.clusterIP = etcd
return etcd, nil
})
if err != nil {
return rangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster IP allocator: %v", err)
}
} else {
networkingv1alphaClient, err := networkingv1alpha1client.NewForConfig(c.LoopbackClientConfig)
if err != nil {
return rangeRegistries{}, nil, nil, nil, err
}
// TODO(aojea) Revisit the initialization of the allocators
// since right now it depends on the service-cidr flags and
// sets the default IPFamily that may not be coherent with the
// existing default ServiceCIDR
primaryClusterIPAllocator, err = ipallocator.NewMetaAllocator(
networkingv1alphaClient,
c.Informers.Networking().V1alpha1().ServiceCIDRs(),
c.Informers.Networking().V1alpha1().IPAddresses(),
netutils.IsIPv6CIDR(&serviceClusterIPRange),
)
if err != nil {
return rangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster IP allocator: %v", err)
}
}
primaryClusterIPAllocator.EnableMetrics()
clusterIPAllocators[primaryClusterIPAllocator.IPFamily()] = primaryClusterIPAllocator
var secondaryClusterIPAllocator ipallocator.Interface
if c.Services.SecondaryClusterIPRange.IP != nil {
if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
var err error
secondaryClusterIPAllocator, err = ipallocator.New(&c.Services.SecondaryClusterIPRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) {
var mem allocator.Snapshottable
mem = allocator.NewAllocationMapWithOffset(max, rangeSpec, offset)
// TODO etcdallocator package to return a storage interface via the storageFactory
etcd, err := serviceallocator.NewEtcd(mem, "/ranges/secondaryserviceips", serviceStorageConfig.ForResource(api.Resource("serviceipallocations")))
if err != nil {
return nil, err
}
registries.secondaryClusterIP = etcd
return etcd, nil
})
if err != nil {
return rangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster secondary IP allocator: %v", err)
}
} else {
networkingv1alphaClient, err := networkingv1alpha1client.NewForConfig(c.LoopbackClientConfig)
if err != nil {
return rangeRegistries{}, nil, nil, nil, err
}
// TODO(aojea) Revisit the initialization of the allocators
// since right now it depends on the service-cidr flags and
// sets the default IPFamily that may not be coherent with the
// existing default ServiceCIDR
secondaryClusterIPAllocator, err = ipallocator.NewMetaAllocator(
networkingv1alphaClient,
c.Informers.Networking().V1alpha1().ServiceCIDRs(),
c.Informers.Networking().V1alpha1().IPAddresses(),
netutils.IsIPv6CIDR(&c.Services.SecondaryClusterIPRange),
)
if err != nil {
return rangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster secondary IP allocator: %v", err)
}
}
secondaryClusterIPAllocator.EnableMetrics()
clusterIPAllocators[secondaryClusterIPAllocator.IPFamily()] = secondaryClusterIPAllocator
}
nodePortAllocator, err = portallocator.New(c.Services.NodePortRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) {
mem := allocator.NewAllocationMapWithOffset(max, rangeSpec, offset)
// TODO etcdallocator package to return a storage interface via the storageFactory
etcd, err := serviceallocator.NewEtcd(mem, "/ranges/servicenodeports", serviceStorageConfig.ForResource(api.Resource("servicenodeportallocations")))
if err != nil {
return nil, err
}
registries.nodePort = etcd
return etcd, nil
})
if err != nil {
return rangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster port allocator: %v", err)
}
nodePortAllocator.EnableMetrics()
return
}
var _ genericapiserver.PostStartHookProvider = &legacyProvider{}
func (p *legacyProvider) PostStartHook() (string, genericapiserver.PostStartHookFunc, error) {
return "start-service-ip-repair-controllers", func(context genericapiserver.PostStartHookContext) error {
// We start both repairClusterIPs and repairNodePorts to ensure repair
// loops of ClusterIPs and NodePorts.
// We run both repair loops using RunUntil public interface.
// However, we want to fail liveness/readiness until the first
// successful repair loop, so we basically pass appropriate
// callbacks to RunUtil methods.
// Additionally, we ensure that we don't wait for it for longer
// than 1 minute for backward compatibility of failing the whole
// apiserver if we can't repair them.
wg := sync.WaitGroup{}
wg.Add(2)
runner := async.NewRunner(
func(stopCh chan struct{}) { p.startServiceClusterIPRepair(wg.Done, stopCh) },
func(stopCh chan struct{}) { p.startServiceNodePortsRepair(wg.Done, stopCh) },
)
runner.Start()
go func() {
defer runner.Stop()
<-context.StopCh
}()
// For backward compatibility, we ensure that if we never are able
// to repair clusterIPs and/or nodeports, we not only fail the liveness
// and/or readiness, but also explicitly fail.
done := make(chan struct{})
go func() {
defer close(done)
wg.Wait()
}()
select {
case <-done:
case <-time.After(time.Minute):
return goerrors.New("unable to perform initial IP and Port allocation check")
}
return nil
}, nil
}
func (p *legacyProvider) GroupName() string {
return api.GroupName
}
type componentStatusStorage struct {
storageFactory serverstorage.StorageFactory
}
func (s componentStatusStorage) serversToValidate() map[string]componentstatus.Server {
// this is fragile, which assumes that the default port is being used
// TODO: switch to secure port until these components remove the ability to serve insecurely.
serversToValidate := map[string]componentstatus.Server{
"controller-manager": &componentstatus.HttpServer{EnableHTTPS: true, TLSConfig: &tls.Config{InsecureSkipVerify: true}, Addr: "127.0.0.1", Port: ports.KubeControllerManagerPort, Path: "/healthz"},
"scheduler": &componentstatus.HttpServer{EnableHTTPS: true, TLSConfig: &tls.Config{InsecureSkipVerify: true}, Addr: "127.0.0.1", Port: kubeschedulerconfig.DefaultKubeSchedulerPort, Path: "/healthz"},
}
for ix, cfg := range s.storageFactory.Configs() {
serversToValidate[fmt.Sprintf("etcd-%d", ix)] = &componentstatus.EtcdServer{Config: cfg}
}
return serversToValidate
}