From 8f385c563f8b1d73bc9d5be71fd85dd611171fb2 Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Fri, 30 Oct 2015 10:17:09 +0100 Subject: [PATCH] Refactor code for creating Cacher. --- pkg/master/master.go | 12 ++++-- pkg/registry/endpoint/etcd/etcd.go | 21 +++------- pkg/registry/endpoint/etcd/etcd_test.go | 3 +- pkg/registry/node/etcd/etcd.go | 21 +++------- pkg/registry/node/etcd/etcd_test.go | 3 +- pkg/registry/pod/etcd/etcd.go | 21 +++------- pkg/registry/pod/etcd/etcd_test.go | 3 +- pkg/storage/cacher.go | 55 ++++++++++++++++++++++++- pkg/storage/cacher_test.go | 2 +- 9 files changed, 85 insertions(+), 56 deletions(-) diff --git a/pkg/master/master.go b/pkg/master/master.go index a04ca42750d9..597614b62d04 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -535,8 +535,14 @@ func (m *Master) init(c *Config) { healthzChecks := []healthz.HealthzChecker{} + var storageFactory storage.StorageFactory + if c.EnableWatchCache { + storageFactory = storage.NewCacher + } else { + storageFactory = storage.NoDecoration + } dbClient := func(resource string) storage.Interface { return c.StorageDestinations.get("", resource) } - podStorage := podetcd.NewStorage(dbClient("pods"), c.EnableWatchCache, c.KubeletClient, m.proxyTransport) + podStorage := podetcd.NewStorage(dbClient("pods"), storageFactory, c.KubeletClient, m.proxyTransport) podTemplateStorage := podtemplateetcd.NewREST(dbClient("podTemplates")) @@ -552,10 +558,10 @@ func (m *Master) init(c *Config) { namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewREST(dbClient("namespaces")) m.namespaceRegistry = namespace.NewRegistry(namespaceStorage) - endpointsStorage := endpointsetcd.NewREST(dbClient("endpoints"), c.EnableWatchCache) + endpointsStorage := endpointsetcd.NewREST(dbClient("endpoints"), storageFactory) m.endpointRegistry = endpoint.NewRegistry(endpointsStorage) - nodeStorage, nodeStatusStorage := nodeetcd.NewREST(dbClient("nodes"), c.EnableWatchCache, c.KubeletClient, m.proxyTransport) + nodeStorage, nodeStatusStorage := nodeetcd.NewREST(dbClient("nodes"), storageFactory, c.KubeletClient, m.proxyTransport) m.nodeRegistry = node.NewRegistry(nodeStorage) serviceStorage := serviceetcd.NewREST(dbClient("services")) diff --git a/pkg/registry/endpoint/etcd/etcd.go b/pkg/registry/endpoint/etcd/etcd.go index 123889686265..5b63d21e3607 100644 --- a/pkg/registry/endpoint/etcd/etcd.go +++ b/pkg/registry/endpoint/etcd/etcd.go @@ -32,27 +32,16 @@ type REST struct { } // NewREST returns a RESTStorage object that will work against endpoints. -func NewREST(s storage.Interface, useCacher bool) *REST { +func NewREST(s storage.Interface, storageFactory storage.StorageFactory) *REST { prefix := "/services/endpoints" - storageInterface := s - if useCacher { - config := storage.CacherConfig{ - CacheCapacity: 1000, - Storage: s, - Type: &api.Endpoints{}, - ResourcePrefix: prefix, - KeyFunc: func(obj runtime.Object) (string, error) { - return storage.NamespaceKeyFunc(prefix, obj) - }, - NewListFunc: func() runtime.Object { return &api.EndpointsList{} }, - } - storageInterface = storage.NewCacher(config) - } + newListFunc := func() runtime.Object { return &api.EndpointsList{} } + storageInterface := storageFactory( + s, 1000, nil, &api.Endpoints{}, prefix, true, newListFunc) store := &etcdgeneric.Etcd{ NewFunc: func() runtime.Object { return &api.Endpoints{} }, - NewListFunc: func() runtime.Object { return &api.EndpointsList{} }, + NewListFunc: newListFunc, KeyRootFunc: func(ctx api.Context) string { return etcdgeneric.NamespaceKeyRootFunc(ctx, prefix) }, diff --git a/pkg/registry/endpoint/etcd/etcd_test.go b/pkg/registry/endpoint/etcd/etcd_test.go index 7ec7e3e106a0..d10f0bc34813 100644 --- a/pkg/registry/endpoint/etcd/etcd_test.go +++ b/pkg/registry/endpoint/etcd/etcd_test.go @@ -24,12 +24,13 @@ import ( "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/registry/registrytest" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" "k8s.io/kubernetes/pkg/tools" ) func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) { etcdStorage, fakeClient := registrytest.NewEtcdStorage(t, "") - return NewREST(etcdStorage, false), fakeClient + return NewREST(etcdStorage, storage.NoDecoration), fakeClient } func validNewEndpoints() *api.Endpoints { diff --git a/pkg/registry/node/etcd/etcd.go b/pkg/registry/node/etcd/etcd.go index 82e912e2f851..92fd2b2def81 100644 --- a/pkg/registry/node/etcd/etcd.go +++ b/pkg/registry/node/etcd/etcd.go @@ -50,27 +50,16 @@ func (r *StatusREST) Update(ctx api.Context, obj runtime.Object) (runtime.Object } // NewREST returns a RESTStorage object that will work against nodes. -func NewREST(s storage.Interface, useCacher bool, connection client.ConnectionInfoGetter, proxyTransport http.RoundTripper) (*REST, *StatusREST) { +func NewREST(s storage.Interface, storageFactory storage.StorageFactory, connection client.ConnectionInfoGetter, proxyTransport http.RoundTripper) (*REST, *StatusREST) { prefix := "/minions" - storageInterface := s - if useCacher { - config := storage.CacherConfig{ - CacheCapacity: 1000, - Storage: s, - Type: &api.Node{}, - ResourcePrefix: prefix, - KeyFunc: func(obj runtime.Object) (string, error) { - return storage.NoNamespaceKeyFunc(prefix, obj) - }, - NewListFunc: func() runtime.Object { return &api.NodeList{} }, - } - storageInterface = storage.NewCacher(config) - } + newListFunc := func() runtime.Object { return &api.NodeList{} } + storageInterface := storageFactory( + s, 1000, nil, &api.Node{}, prefix, false, newListFunc) store := &etcdgeneric.Etcd{ NewFunc: func() runtime.Object { return &api.Node{} }, - NewListFunc: func() runtime.Object { return &api.NodeList{} }, + NewListFunc: newListFunc, KeyRootFunc: func(ctx api.Context) string { return prefix }, diff --git a/pkg/registry/node/etcd/etcd_test.go b/pkg/registry/node/etcd/etcd_test.go index 120735e1f09d..3c1550275255 100644 --- a/pkg/registry/node/etcd/etcd_test.go +++ b/pkg/registry/node/etcd/etcd_test.go @@ -26,6 +26,7 @@ import ( "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/registry/registrytest" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" "k8s.io/kubernetes/pkg/tools" ) @@ -38,7 +39,7 @@ func (fakeConnectionInfoGetter) GetConnectionInfo(host string) (string, uint, ht func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) { etcdStorage, fakeClient := registrytest.NewEtcdStorage(t, "") - storage, _ := NewREST(etcdStorage, false, fakeConnectionInfoGetter{}, nil) + storage, _ := NewREST(etcdStorage, storage.NoDecoration, fakeConnectionInfoGetter{}, nil) return storage, fakeClient } diff --git a/pkg/registry/pod/etcd/etcd.go b/pkg/registry/pod/etcd/etcd.go index c7f1aa6edf24..0df42f1c1cda 100644 --- a/pkg/registry/pod/etcd/etcd.go +++ b/pkg/registry/pod/etcd/etcd.go @@ -60,27 +60,16 @@ type REST struct { } // NewStorage returns a RESTStorage object that will work against pods. -func NewStorage(s storage.Interface, useCacher bool, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper) PodStorage { +func NewStorage(s storage.Interface, storageFactory storage.StorageFactory, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper) PodStorage { prefix := "/pods" - storageInterface := s - if useCacher { - config := storage.CacherConfig{ - CacheCapacity: 1000, - Storage: s, - Type: &api.Pod{}, - ResourcePrefix: prefix, - KeyFunc: func(obj runtime.Object) (string, error) { - return storage.NamespaceKeyFunc(prefix, obj) - }, - NewListFunc: func() runtime.Object { return &api.PodList{} }, - } - storageInterface = storage.NewCacher(config) - } + newListFunc := func() runtime.Object { return &api.PodList{} } + storageInterface := storageFactory( + s, 1000, nil, &api.Pod{}, prefix, true, newListFunc) store := &etcdgeneric.Etcd{ NewFunc: func() runtime.Object { return &api.Pod{} }, - NewListFunc: func() runtime.Object { return &api.PodList{} }, + NewListFunc: newListFunc, KeyRootFunc: func(ctx api.Context) string { return etcdgeneric.NamespaceKeyRootFunc(ctx, prefix) }, diff --git a/pkg/registry/pod/etcd/etcd_test.go b/pkg/registry/pod/etcd/etcd_test.go index ed7b5947ed70..9544e4085d6f 100644 --- a/pkg/registry/pod/etcd/etcd_test.go +++ b/pkg/registry/pod/etcd/etcd_test.go @@ -31,6 +31,7 @@ import ( "k8s.io/kubernetes/pkg/registry/registrytest" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/securitycontext" + "k8s.io/kubernetes/pkg/storage" "k8s.io/kubernetes/pkg/tools" "k8s.io/kubernetes/pkg/tools/etcdtest" "k8s.io/kubernetes/pkg/util" @@ -38,7 +39,7 @@ import ( func newStorage(t *testing.T) (*REST, *BindingREST, *StatusREST, *tools.FakeEtcdClient) { etcdStorage, fakeClient := registrytest.NewEtcdStorage(t, "") - storage := NewStorage(etcdStorage, false, nil, nil) + storage := NewStorage(etcdStorage, storage.NoDecoration, nil, nil) return storage.Pod, storage.Binding, storage.Status, fakeClient } diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index 0316d7a2c207..b2ab986980d3 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -111,10 +111,63 @@ type Cacher struct { ListFromCache bool } +// StorageFactory is a function signature for producing +// a storage.Interface from given parameters. +type StorageFactory func( + storage Interface, + capacity int, + versioner Versioner, + objectType runtime.Object, + resourcePrefix string, + namespaceScoped bool, + newListFunc func() runtime.Object) Interface + +func NoDecoration( + storage Interface, + capacity int, + versioner Versioner, + objectType runtime.Object, + resourcePrefix string, + namespaceScoped bool, + newListFunc func() runtime.Object) Interface { + return storage +} + +// Create a new Cacher responsible from service WATCH and LIST requests from its +// internal cache and updating its cache in the background based on the given +// configuration. +func NewCacher( + storage Interface, + capacity int, + versioner Versioner, + objectType runtime.Object, + resourcePrefix string, + namespaceScoped bool, + newListFunc func() runtime.Object) Interface { + config := CacherConfig{ + CacheCapacity: capacity, + Storage: storage, + Versioner: versioner, + Type: objectType, + ResourcePrefix: resourcePrefix, + NewListFunc: newListFunc, + } + if namespaceScoped { + config.KeyFunc = func(obj runtime.Object) (string, error) { + return NamespaceKeyFunc(resourcePrefix, obj) + } + } else { + config.KeyFunc = func(obj runtime.Object) (string, error) { + return NoNamespaceKeyFunc(resourcePrefix, obj) + } + } + return NewCacherFromConfig(config) +} + // Create a new Cacher responsible from service WATCH and LIST requests from its // internal cache and updating its cache in the background based on the given // configuration. -func NewCacher(config CacherConfig) *Cacher { +func NewCacherFromConfig(config CacherConfig) *Cacher { watchCache := newWatchCache(config.CacheCapacity) listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) diff --git a/pkg/storage/cacher_test.go b/pkg/storage/cacher_test.go index 1bbbb7dd0a39..abfbd3d82d8b 100644 --- a/pkg/storage/cacher_test.go +++ b/pkg/storage/cacher_test.go @@ -55,7 +55,7 @@ func newTestCacher(client tools.EtcdClient) *storage.Cacher { NewListFunc: func() runtime.Object { return &api.PodList{} }, StopChannel: util.NeverStop, } - return storage.NewCacher(config) + return storage.NewCacherFromConfig(config) } func makeTestPod(name string) *api.Pod {