forked from openshift/origin
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconfiggetter.go
140 lines (115 loc) · 4.69 KB
/
configgetter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package restoptions
import (
"fmt"
"strconv"
"strings"
"sync"
apiserveroptions "k8s.io/kubernetes/cmd/kube-apiserver/app/options"
kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/rest"
"k8s.io/kubernetes/pkg/api/unversioned"
genericrest "k8s.io/kubernetes/pkg/registry/generic"
genericetcd "k8s.io/kubernetes/pkg/registry/generic/etcd"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
kerrors "k8s.io/kubernetes/pkg/util/errors"
"github.com/golang/glog"
configapi "github.com/openshift/origin/pkg/cmd/server/api"
"github.com/openshift/origin/pkg/cmd/server/etcd"
cmdflags "github.com/openshift/origin/pkg/cmd/util/flags"
)
// UseConfiguredCacheSize indicates that the configured cache size should be used
const UseConfiguredCacheSize = -1
// configRESTOptionsGetter provides RESTOptions based on a provided config
type configRESTOptionsGetter struct {
masterOptions configapi.MasterConfig
restOptionsLock sync.Mutex
restOptionsMap map[unversioned.GroupResource]genericrest.RESTOptions
etcdHelper storage.Interface
cacheEnabled bool
defaultCacheSize int
cacheSizes map[unversioned.GroupResource]int
}
// NewConfigGetter returns a restoptions.Getter implemented using information from the provided master config.
// By default, the etcd watch cache is enabled with a size of 1000 per resource type.
func NewConfigGetter(masterOptions configapi.MasterConfig) Getter {
getter := &configRESTOptionsGetter{
masterOptions: masterOptions,
cacheEnabled: true,
defaultCacheSize: 1000,
cacheSizes: map[unversioned.GroupResource]int{},
restOptionsMap: map[unversioned.GroupResource]genericrest.RESTOptions{},
}
if err := getter.loadWatchCacheSettings(); err != nil {
glog.Error(err)
}
return getter
}
func (g *configRESTOptionsGetter) loadWatchCacheSettings() error {
if g.masterOptions.KubernetesMasterConfig == nil {
return nil
}
server := apiserveroptions.NewAPIServer()
if errs := cmdflags.Resolve(g.masterOptions.KubernetesMasterConfig.APIServerArguments, server.AddFlags); len(errs) > 0 {
return kerrors.NewAggregate(errs)
}
g.cacheEnabled = server.EnableWatchCache
errs := []error{}
for _, c := range server.WatchCacheSizes {
tokens := strings.Split(c, "#")
if len(tokens) != 2 {
errs = append(errs, fmt.Errorf("invalid watch cache size value '%s', expecting <resource>#<size> format (e.g. builds#100)", c))
continue
}
resource := unversioned.ParseGroupResource(tokens[0])
size, err := strconv.Atoi(tokens[1])
if err != nil {
errs = append(errs, fmt.Errorf("invalid watch cache size value '%s': %v", c, err))
continue
}
g.cacheSizes[resource] = size
}
return kerrors.NewAggregate(errs)
}
func (g *configRESTOptionsGetter) GetRESTOptions(resource unversioned.GroupResource) (genericrest.RESTOptions, error) {
g.restOptionsLock.Lock()
defer g.restOptionsLock.Unlock()
if resourceOptions, ok := g.restOptionsMap[resource]; ok {
return resourceOptions, nil
}
if g.etcdHelper == nil {
// TODO: choose destination etcd based on input resource
etcdClient, err := etcd.MakeNewEtcdClient(g.masterOptions.EtcdClientInfo)
if err != nil {
return genericrest.RESTOptions{}, err
}
// TODO: choose destination group/version based on input group/resource
groupVersion := unversioned.GroupVersion{Group: "", Version: g.masterOptions.EtcdStorageConfig.OpenShiftStorageVersion}
g.etcdHelper = etcdstorage.NewEtcdStorage(etcdClient, kapi.Codecs.LegacyCodec(groupVersion), g.masterOptions.EtcdStorageConfig.OpenShiftStoragePrefix, false)
}
configuredCacheSize, specified := g.cacheSizes[resource]
if !specified || configuredCacheSize < 0 {
configuredCacheSize = g.defaultCacheSize
}
decorator := func(s storage.Interface, requestedSize int, objectType runtime.Object, resourcePrefix string, scopeStrategy rest.NamespaceScopedStrategy, newListFunc func() runtime.Object) storage.Interface {
capacity := requestedSize
if capacity == UseConfiguredCacheSize {
capacity = configuredCacheSize
}
if capacity == 0 || !g.cacheEnabled {
glog.V(5).Infof("using uncached watch storage for %s", resource.String())
return genericrest.UndecoratedStorage(s, capacity, objectType, resourcePrefix, scopeStrategy, newListFunc)
} else {
glog.V(5).Infof("using watch cache storage (capacity=%d) for %s", capacity, resource.String())
return genericetcd.StorageWithCacher(s, capacity, objectType, resourcePrefix, scopeStrategy, newListFunc)
}
}
resourceOptions := genericrest.RESTOptions{
Storage: g.etcdHelper,
Decorator: decorator,
DeleteCollectionWorkers: 1,
}
g.restOptionsMap[resource] = resourceOptions
return resourceOptions, nil
}