Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce RESTOptions to configure per-resource storage #21666

Merged
merged 1 commit into from Feb 23, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/controller/serviceaccount/tokengetter.go
Expand Up @@ -71,7 +71,7 @@ func (r *registryGetter) GetSecret(namespace, name string) (*api.Secret, error)
// uses the specified storage to retrieve service accounts and secrets.
func NewGetterFromStorageInterface(s storage.Interface) serviceaccount.ServiceAccountTokenGetter {
return NewGetterFromRegistries(
serviceaccountregistry.NewRegistry(serviceaccountetcd.NewREST(s, generic.UndecoratedStorage)),
secret.NewRegistry(secretetcd.NewREST(s, generic.UndecoratedStorage)),
serviceaccountregistry.NewRegistry(serviceaccountetcd.NewREST(generic.RESTOptions{s, generic.UndecoratedStorage})),
secret.NewRegistry(secretetcd.NewREST(generic.RESTOptions{s, generic.UndecoratedStorage})),
)
}
83 changes: 47 additions & 36 deletions pkg/master/master.go
Expand Up @@ -324,38 +324,42 @@ func (m *Master) InstallAPIs(c *Config) {
}

func (m *Master) initV1ResourcesStorage(c *Config) {
storageDecorator := m.StorageDecorator()
dbClient := func(resource string) storage.Interface { return c.StorageDestinations.Get("", resource) }
restOptions := func(resource string) generic.RESTOptions {
return generic.RESTOptions{
Storage: dbClient(resource),
Decorator: m.StorageDecorator(),
}
}

podTemplateStorage := podtemplateetcd.NewREST(dbClient("podTemplates"), storageDecorator)
podTemplateStorage := podtemplateetcd.NewREST(restOptions("podTemplates"))

eventStorage := eventetcd.NewREST(dbClient("events"), storageDecorator, uint64(c.EventTTL.Seconds()))
limitRangeStorage := limitrangeetcd.NewREST(dbClient("limitRanges"), storageDecorator)
eventStorage := eventetcd.NewREST(restOptions("events"), uint64(c.EventTTL.Seconds()))
limitRangeStorage := limitrangeetcd.NewREST(restOptions("limitRanges"))

resourceQuotaStorage, resourceQuotaStatusStorage := resourcequotaetcd.NewREST(dbClient("resourceQuotas"), storageDecorator)
secretStorage := secretetcd.NewREST(dbClient("secrets"), storageDecorator)
serviceAccountStorage := serviceaccountetcd.NewREST(dbClient("serviceAccounts"), storageDecorator)
persistentVolumeStorage, persistentVolumeStatusStorage := pvetcd.NewREST(dbClient("persistentVolumes"), storageDecorator)
persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage := pvcetcd.NewREST(dbClient("persistentVolumeClaims"), storageDecorator)
configMapStorage := configmapetcd.NewREST(dbClient("configMaps"), storageDecorator)
resourceQuotaStorage, resourceQuotaStatusStorage := resourcequotaetcd.NewREST(restOptions("resourceQuotas"))
secretStorage := secretetcd.NewREST(restOptions("secrets"))
serviceAccountStorage := serviceaccountetcd.NewREST(restOptions("serviceAccounts"))
persistentVolumeStorage, persistentVolumeStatusStorage := pvetcd.NewREST(restOptions("persistentVolumes"))
persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage := pvcetcd.NewREST(restOptions("persistentVolumeClaims"))
configMapStorage := configmapetcd.NewREST(restOptions("configMaps"))

namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewREST(dbClient("namespaces"), storageDecorator)
namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewREST(restOptions("namespaces"))
m.namespaceRegistry = namespace.NewRegistry(namespaceStorage)

endpointsStorage := endpointsetcd.NewREST(dbClient("endpoints"), storageDecorator)
endpointsStorage := endpointsetcd.NewREST(restOptions("endpoints"))
m.endpointRegistry = endpoint.NewRegistry(endpointsStorage)

nodeStorage := nodeetcd.NewStorage(dbClient("nodes"), storageDecorator, c.KubeletClient, m.ProxyTransport)
nodeStorage := nodeetcd.NewStorage(restOptions("nodes"), c.KubeletClient, m.ProxyTransport)
m.nodeRegistry = node.NewRegistry(nodeStorage.Node)

podStorage := podetcd.NewStorage(
dbClient("pods"),
storageDecorator,
restOptions("pods"),
kubeletclient.ConnectionInfoGetter(nodeStorage.Node),
m.ProxyTransport,
)

serviceStorage, serviceStatusStorage := serviceetcd.NewREST(dbClient("services"), storageDecorator)
serviceStorage, serviceStatusStorage := serviceetcd.NewREST(restOptions("services"))
m.serviceRegistry = service.NewRegistry(serviceStorage)

var serviceClusterIPRegistry service.RangeRegistry
Expand All @@ -381,7 +385,7 @@ func (m *Master) initV1ResourcesStorage(c *Config) {
})
m.serviceNodePortAllocator = serviceNodePortRegistry

controllerStorage, controllerStatusStorage := controlleretcd.NewREST(dbClient("replicationControllers"), storageDecorator)
controllerStorage, controllerStatusStorage := controlleretcd.NewREST(restOptions("replicationControllers"))

serviceRest := service.NewStorage(m.serviceRegistry, m.endpointRegistry, serviceClusterIPAllocator, serviceNodePortAllocator, m.ProxyTransport)

Expand Down Expand Up @@ -608,7 +612,7 @@ func (m *Master) InstallThirdPartyResource(rsrc *extensions.ThirdPartyResource)
}

func (m *Master) thirdpartyapi(group, kind, version string) *apiserver.APIGroupVersion {
resourceStorage := thirdpartyresourcedataetcd.NewREST(m.thirdPartyStorage, generic.UndecoratedStorage, group, kind)
resourceStorage := thirdpartyresourcedataetcd.NewREST(generic.RESTOptions{m.thirdPartyStorage, generic.UndecoratedStorage}, group, kind)

apiRoot := makeThirdPartyPath("")

Expand Down Expand Up @@ -656,20 +660,23 @@ func (m *Master) getExtensionResources(c *Config) map[string]rest.Storage {
}
return enabled
}
storageDecorator := m.StorageDecorator()
dbClient := func(resource string) storage.Interface {
return c.StorageDestinations.Get(extensions.GroupName, resource)
restOptions := func(resource string) generic.RESTOptions {
return generic.RESTOptions{
Storage: c.StorageDestinations.Get(extensions.GroupName, resource),
Decorator: m.StorageDecorator(),
}
}

storage := map[string]rest.Storage{}
if isEnabled("horizontalpodautoscalers") {
m.constructHPAResources(c, storage)
controllerStorage := expcontrolleretcd.NewStorage(c.StorageDestinations.Get("", "replicationControllers"), storageDecorator)
controllerStorage := expcontrolleretcd.NewStorage(
generic.RESTOptions{c.StorageDestinations.Get("", "replicationControllers"), m.StorageDecorator()})
storage["replicationcontrollers"] = controllerStorage.ReplicationController
storage["replicationcontrollers/scale"] = controllerStorage.Scale
}
if isEnabled("thirdpartyresources") {
thirdPartyResourceStorage := thirdpartyresourceetcd.NewREST(dbClient("thirdpartyresources"), storageDecorator)
thirdPartyResourceStorage := thirdpartyresourceetcd.NewREST(restOptions("thirdpartyresources"))
thirdPartyControl := ThirdPartyController{
master: m,
thirdPartyResourceRegistry: thirdPartyResourceStorage,
Expand All @@ -686,12 +693,12 @@ func (m *Master) getExtensionResources(c *Config) map[string]rest.Storage {
}

if isEnabled("daemonsets") {
daemonSetStorage, daemonSetStatusStorage := daemonetcd.NewREST(dbClient("daemonsets"), storageDecorator)
daemonSetStorage, daemonSetStatusStorage := daemonetcd.NewREST(restOptions("daemonsets"))
storage["daemonsets"] = daemonSetStorage
storage["daemonsets/status"] = daemonSetStatusStorage
}
if isEnabled("deployments") {
deploymentStorage := deploymentetcd.NewStorage(dbClient("deployments"), storageDecorator)
deploymentStorage := deploymentetcd.NewStorage(restOptions("deployments"))
storage["deployments"] = deploymentStorage.Deployment
storage["deployments/status"] = deploymentStorage.Status
// TODO(madhusudancs): Install scale when Scale group issues are fixed (see issue #18528).
Expand All @@ -702,16 +709,16 @@ func (m *Master) getExtensionResources(c *Config) map[string]rest.Storage {
m.constructJobResources(c, storage)
}
if isEnabled("ingresses") {
ingressStorage, ingressStatusStorage := ingressetcd.NewREST(dbClient("ingresses"), storageDecorator)
ingressStorage, ingressStatusStorage := ingressetcd.NewREST(restOptions("ingresses"))
storage["ingresses"] = ingressStorage
storage["ingresses/status"] = ingressStatusStorage
}
if isEnabled("podsecuritypolicy") {
podSecurityPolicyStorage := pspetcd.NewREST(dbClient("podsecuritypolicy"), storageDecorator)
podSecurityPolicyStorage := pspetcd.NewREST(restOptions("podsecuritypolicy"))
storage["podSecurityPolicies"] = podSecurityPolicyStorage
}
if isEnabled("replicasets") {
replicaSetStorage := replicasetetcd.NewStorage(dbClient("replicasets"), storageDecorator)
replicaSetStorage := replicasetetcd.NewStorage(restOptions("replicasets"))
storage["replicasets"] = replicaSetStorage.ReplicaSet
storage["replicasets/status"] = replicaSetStorage.Status
}
Expand All @@ -726,11 +733,13 @@ func (m *Master) constructHPAResources(c *Config, restStorage map[string]rest.St
// Note that hpa's storage settings are changed by changing the autoscaling
// group. Clearly we want all hpas to be stored in the same place no
// matter where they're accessed from.
storageDecorator := m.StorageDecorator()
dbClient := func(resource string) storage.Interface {
return c.StorageDestinations.Search([]string{autoscaling.GroupName, extensions.GroupName}, resource)
restOptions := func(resource string) generic.RESTOptions {
return generic.RESTOptions{
Storage: c.StorageDestinations.Search([]string{autoscaling.GroupName, extensions.GroupName}, resource),
Decorator: m.StorageDecorator(),
}
}
autoscalerStorage, autoscalerStatusStorage := horizontalpodautoscaleretcd.NewREST(dbClient("horizontalpodautoscalers"), storageDecorator)
autoscalerStorage, autoscalerStatusStorage := horizontalpodautoscaleretcd.NewREST(restOptions("horizontalpodautoscalers"))
restStorage["horizontalpodautoscalers"] = autoscalerStorage
restStorage["horizontalpodautoscalers/status"] = autoscalerStatusStorage
}
Expand Down Expand Up @@ -760,11 +769,13 @@ func (m *Master) constructJobResources(c *Config, restStorage map[string]rest.St
// Note that job's storage settings are changed by changing the batch
// group. Clearly we want all jobs to be stored in the same place no
// matter where they're accessed from.
storageDecorator := m.StorageDecorator()
dbClient := func(resource string) storage.Interface {
return c.StorageDestinations.Search([]string{batch.GroupName, extensions.GroupName}, resource)
restOptions := func(resource string) generic.RESTOptions {
return generic.RESTOptions{
Storage: c.StorageDestinations.Search([]string{batch.GroupName, extensions.GroupName}, resource),
Decorator: m.StorageDecorator(),
}
}
jobStorage, jobStatusStorage := jobetcd.NewREST(dbClient("jobs"), storageDecorator)
jobStorage, jobStatusStorage := jobetcd.NewREST(restOptions("jobs"))
restStorage["jobs"] = jobStorage
restStorage["jobs/status"] = jobStatusStorage
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/registry/configmap/etcd/etcd.go
Expand Up @@ -21,7 +21,6 @@ import (
"k8s.io/kubernetes/pkg/registry/configmap"
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"

etcdgeneric "k8s.io/kubernetes/pkg/registry/generic/etcd"
)
Expand All @@ -32,12 +31,12 @@ type REST struct {
}

// NewREST returns a RESTStorage object that will work with ConfigMap objects.
func NewREST(s storage.Interface, storageDecorator generic.StorageDecorator) *REST {
func NewREST(opts generic.RESTOptions) *REST {
prefix := "/configmaps"

newListFunc := func() runtime.Object { return &api.ConfigMapList{} }
storageInterface := storageDecorator(
s, 100, &api.ConfigMap{}, prefix, configmap.Strategy, newListFunc)
storageInterface := opts.Decorator(
opts.Storage, 100, &api.ConfigMap{}, prefix, configmap.Strategy, newListFunc)

store := &etcdgeneric.Etcd{
NewFunc: func() runtime.Object {
Expand Down
3 changes: 2 additions & 1 deletion pkg/registry/configmap/etcd/etcd_test.go
Expand Up @@ -30,7 +30,8 @@ import (

func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
return NewREST(etcdStorage, generic.UndecoratedStorage), server
restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage}
return NewREST(restOptions), server
}

func validNewConfigMap() *api.ConfigMap {
Expand Down
7 changes: 3 additions & 4 deletions pkg/registry/controller/etcd/etcd.go
Expand Up @@ -27,20 +27,19 @@ import (
"k8s.io/kubernetes/pkg/registry/generic"
etcdgeneric "k8s.io/kubernetes/pkg/registry/generic/etcd"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)

type REST struct {
*etcdgeneric.Etcd
}

// NewREST returns a RESTStorage object that will work against replication controllers.
func NewREST(s storage.Interface, storageDecorator generic.StorageDecorator) (*REST, *StatusREST) {
func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
prefix := "/controllers"

newListFunc := func() runtime.Object { return &api.ReplicationControllerList{} }
storageInterface := storageDecorator(
s, cachesize.GetWatchCacheSizeByResource(cachesize.Controllers), &api.ReplicationController{}, prefix, controller.Strategy, newListFunc)
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Controllers), &api.ReplicationController{}, prefix, controller.Strategy, newListFunc)

store := &etcdgeneric.Etcd{
NewFunc: func() runtime.Object { return &api.ReplicationController{} },
Expand Down
3 changes: 2 additions & 1 deletion pkg/registry/controller/etcd/etcd_test.go
Expand Up @@ -30,7 +30,8 @@ import (

func newStorage(t *testing.T) (*REST, *StatusREST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
controllerStorage, statusStorage := NewREST(etcdStorage, generic.UndecoratedStorage)
restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage}
controllerStorage, statusStorage := NewREST(restOptions)
return controllerStorage, statusStorage, server
}

Expand Down
7 changes: 3 additions & 4 deletions pkg/registry/daemonset/etcd/etcd.go
Expand Up @@ -26,7 +26,6 @@ import (
"k8s.io/kubernetes/pkg/registry/generic"
etcdgeneric "k8s.io/kubernetes/pkg/registry/generic/etcd"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)

// rest implements a RESTStorage for DaemonSets against etcd
Expand All @@ -35,12 +34,12 @@ type REST struct {
}

// NewREST returns a RESTStorage object that will work against DaemonSets.
func NewREST(s storage.Interface, storageDecorator generic.StorageDecorator) (*REST, *StatusREST) {
func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
prefix := "/daemonsets"

newListFunc := func() runtime.Object { return &extensions.DaemonSetList{} }
storageInterface := storageDecorator(
s, cachesize.GetWatchCacheSizeByResource(cachesize.Daemonsets), &extensions.DaemonSet{}, prefix, daemonset.Strategy, newListFunc)
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Daemonsets), &extensions.DaemonSet{}, prefix, daemonset.Strategy, newListFunc)

store := &etcdgeneric.Etcd{
NewFunc: func() runtime.Object { return &extensions.DaemonSet{} },
Expand Down
3 changes: 2 additions & 1 deletion pkg/registry/daemonset/etcd/etcd_test.go
Expand Up @@ -32,7 +32,8 @@ import (

func newStorage(t *testing.T) (*REST, *StatusREST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, extensions.GroupName)
daemonSetStorage, statusStorage := NewREST(etcdStorage, generic.UndecoratedStorage)
restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage}
daemonSetStorage, statusStorage := NewREST(restOptions)
return daemonSetStorage, statusStorage, server
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/registry/deployment/etcd/etcd.go
Expand Up @@ -43,8 +43,8 @@ type DeploymentStorage struct {
Rollback *RollbackREST
}

func NewStorage(s storage.Interface, storageDecorator generic.StorageDecorator) DeploymentStorage {
deploymentRest, deploymentStatusRest, deploymentRollbackRest := NewREST(s, storageDecorator)
func NewStorage(opts generic.RESTOptions) DeploymentStorage {
deploymentRest, deploymentStatusRest, deploymentRollbackRest := NewREST(opts)
deploymentRegistry := deployment.NewRegistry(deploymentRest)

return DeploymentStorage{
Expand All @@ -60,12 +60,12 @@ type REST struct {
}

// NewREST returns a RESTStorage object that will work against deployments.
func NewREST(s storage.Interface, storageDecorator generic.StorageDecorator) (*REST, *StatusREST, *RollbackREST) {
func NewREST(opts generic.RESTOptions) (*REST, *StatusREST, *RollbackREST) {
prefix := "/deployments"

newListFunc := func() runtime.Object { return &extensions.DeploymentList{} }
storageInterface := storageDecorator(
s, cachesize.GetWatchCacheSizeByResource(cachesize.Deployments), &extensions.Deployment{}, prefix, deployment.Strategy, newListFunc)
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Deployments), &extensions.Deployment{}, prefix, deployment.Strategy, newListFunc)

store := &etcdgeneric.Etcd{
NewFunc: func() runtime.Object { return &extensions.Deployment{} },
Expand Down
3 changes: 2 additions & 1 deletion pkg/registry/deployment/etcd/etcd_test.go
Expand Up @@ -36,7 +36,8 @@ import (

func newStorage(t *testing.T) (*DeploymentStorage, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, extensions.GroupName)
deploymentStorage := NewStorage(etcdStorage, generic.UndecoratedStorage)
restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage}
deploymentStorage := NewStorage(restOptions)
return &deploymentStorage, server
}

Expand Down
7 changes: 3 additions & 4 deletions pkg/registry/endpoint/etcd/etcd.go
Expand Up @@ -25,20 +25,19 @@ import (
"k8s.io/kubernetes/pkg/registry/generic"
etcdgeneric "k8s.io/kubernetes/pkg/registry/generic/etcd"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)

type REST struct {
*etcdgeneric.Etcd
}

// NewREST returns a RESTStorage object that will work against endpoints.
func NewREST(s storage.Interface, storageDecorator generic.StorageDecorator) *REST {
func NewREST(opts generic.RESTOptions) *REST {
prefix := "/services/endpoints"

newListFunc := func() runtime.Object { return &api.EndpointsList{} }
storageInterface := storageDecorator(
s, cachesize.GetWatchCacheSizeByResource(cachesize.Endpoints), &api.Endpoints{}, prefix, endpoint.Strategy, newListFunc)
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Endpoints), &api.Endpoints{}, prefix, endpoint.Strategy, newListFunc)

store := &etcdgeneric.Etcd{
NewFunc: func() runtime.Object { return &api.Endpoints{} },
Expand Down
3 changes: 2 additions & 1 deletion pkg/registry/endpoint/etcd/etcd_test.go
Expand Up @@ -30,7 +30,8 @@ import (

func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
return NewREST(etcdStorage, generic.UndecoratedStorage), server
restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage}
return NewREST(restOptions), server
}

func validNewEndpoints() *api.Endpoints {
Expand Down
5 changes: 2 additions & 3 deletions pkg/registry/event/etcd/etcd.go
Expand Up @@ -24,20 +24,19 @@ import (
"k8s.io/kubernetes/pkg/registry/generic"
etcdgeneric "k8s.io/kubernetes/pkg/registry/generic/etcd"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)

type REST struct {
*etcdgeneric.Etcd
}

// NewREST returns a RESTStorage object that will work against events.
func NewREST(s storage.Interface, storageDecorator generic.StorageDecorator, ttl uint64) *REST {
func NewREST(opts generic.RESTOptions, ttl uint64) *REST {
prefix := "/events"

// We explicitly do NOT do any decoration here - switching on Cacher
// for events will lead to too high memory consumption.
storageInterface := s
storageInterface := opts.Storage

store := &etcdgeneric.Etcd{
NewFunc: func() runtime.Object { return &api.Event{} },
Expand Down
3 changes: 2 additions & 1 deletion pkg/registry/event/etcd/etcd_test.go
Expand Up @@ -30,7 +30,8 @@ var testTTL uint64 = 60

func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
return NewREST(etcdStorage, generic.UndecoratedStorage, testTTL), server
restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage}
return NewREST(restOptions, testTTL), server
}

func validNewEvent(namespace string) *api.Event {
Expand Down