Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor code for creating Cacher. #16584

Merged
merged 1 commit into from Nov 3, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 9 additions & 3 deletions pkg/master/master.go
Expand Up @@ -535,8 +535,14 @@ func (m *Master) init(c *Config) {

healthzChecks := []healthz.HealthzChecker{}

var storageFactory storage.StorageFactory
if c.EnableWatchCache {
storageFactory = storage.NewCacher
} else {
storageFactory = storage.NoDecoration
}
dbClient := func(resource string) storage.Interface { return c.StorageDestinations.get("", resource) }
podStorage := podetcd.NewStorage(dbClient("pods"), c.EnableWatchCache, c.KubeletClient, m.proxyTransport)
podStorage := podetcd.NewStorage(dbClient("pods"), storageFactory, c.KubeletClient, m.proxyTransport)

podTemplateStorage := podtemplateetcd.NewREST(dbClient("podTemplates"))

Expand All @@ -552,10 +558,10 @@ func (m *Master) init(c *Config) {
namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewREST(dbClient("namespaces"))
m.namespaceRegistry = namespace.NewRegistry(namespaceStorage)

endpointsStorage := endpointsetcd.NewREST(dbClient("endpoints"), c.EnableWatchCache)
endpointsStorage := endpointsetcd.NewREST(dbClient("endpoints"), storageFactory)
m.endpointRegistry = endpoint.NewRegistry(endpointsStorage)

nodeStorage, nodeStatusStorage := nodeetcd.NewREST(dbClient("nodes"), c.EnableWatchCache, c.KubeletClient, m.proxyTransport)
nodeStorage, nodeStatusStorage := nodeetcd.NewREST(dbClient("nodes"), storageFactory, c.KubeletClient, m.proxyTransport)
m.nodeRegistry = node.NewRegistry(nodeStorage)

serviceStorage := serviceetcd.NewREST(dbClient("services"))
Expand Down
21 changes: 5 additions & 16 deletions pkg/registry/endpoint/etcd/etcd.go
Expand Up @@ -32,27 +32,16 @@ type REST struct {
}

// NewREST returns a RESTStorage object that will work against endpoints.
func NewREST(s storage.Interface, useCacher bool) *REST {
func NewREST(s storage.Interface, storageFactory storage.StorageFactory) *REST {
prefix := "/services/endpoints"

storageInterface := s
if useCacher {
config := storage.CacherConfig{
CacheCapacity: 1000,
Storage: s,
Type: &api.Endpoints{},
ResourcePrefix: prefix,
KeyFunc: func(obj runtime.Object) (string, error) {
return storage.NamespaceKeyFunc(prefix, obj)
},
NewListFunc: func() runtime.Object { return &api.EndpointsList{} },
}
storageInterface = storage.NewCacher(config)
}
newListFunc := func() runtime.Object { return &api.EndpointsList{} }
storageInterface := storageFactory(
s, 1000, nil, &api.Endpoints{}, prefix, true, newListFunc)

store := &etcdgeneric.Etcd{
NewFunc: func() runtime.Object { return &api.Endpoints{} },
NewListFunc: func() runtime.Object { return &api.EndpointsList{} },
NewListFunc: newListFunc,
KeyRootFunc: func(ctx api.Context) string {
return etcdgeneric.NamespaceKeyRootFunc(ctx, prefix)
},
Expand Down
3 changes: 2 additions & 1 deletion pkg/registry/endpoint/etcd/etcd_test.go
Expand Up @@ -24,12 +24,13 @@ import (
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/registry/registrytest"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/tools"
)

func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) {
etcdStorage, fakeClient := registrytest.NewEtcdStorage(t, "")
return NewREST(etcdStorage, false), fakeClient
return NewREST(etcdStorage, storage.NoDecoration), fakeClient
}

func validNewEndpoints() *api.Endpoints {
Expand Down
21 changes: 5 additions & 16 deletions pkg/registry/node/etcd/etcd.go
Expand Up @@ -50,27 +50,16 @@ func (r *StatusREST) Update(ctx api.Context, obj runtime.Object) (runtime.Object
}

// NewREST returns a RESTStorage object that will work against nodes.
func NewREST(s storage.Interface, useCacher bool, connection client.ConnectionInfoGetter, proxyTransport http.RoundTripper) (*REST, *StatusREST) {
func NewREST(s storage.Interface, storageFactory storage.StorageFactory, connection client.ConnectionInfoGetter, proxyTransport http.RoundTripper) (*REST, *StatusREST) {
prefix := "/minions"

storageInterface := s
if useCacher {
config := storage.CacherConfig{
CacheCapacity: 1000,
Storage: s,
Type: &api.Node{},
ResourcePrefix: prefix,
KeyFunc: func(obj runtime.Object) (string, error) {
return storage.NoNamespaceKeyFunc(prefix, obj)
},
NewListFunc: func() runtime.Object { return &api.NodeList{} },
}
storageInterface = storage.NewCacher(config)
}
newListFunc := func() runtime.Object { return &api.NodeList{} }
storageInterface := storageFactory(
s, 1000, nil, &api.Node{}, prefix, false, newListFunc)

store := &etcdgeneric.Etcd{
NewFunc: func() runtime.Object { return &api.Node{} },
NewListFunc: func() runtime.Object { return &api.NodeList{} },
NewListFunc: newListFunc,
KeyRootFunc: func(ctx api.Context) string {
return prefix
},
Expand Down
3 changes: 2 additions & 1 deletion pkg/registry/node/etcd/etcd_test.go
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/registry/registrytest"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/tools"
)

Expand All @@ -38,7 +39,7 @@ func (fakeConnectionInfoGetter) GetConnectionInfo(host string) (string, uint, ht

func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) {
etcdStorage, fakeClient := registrytest.NewEtcdStorage(t, "")
storage, _ := NewREST(etcdStorage, false, fakeConnectionInfoGetter{}, nil)
storage, _ := NewREST(etcdStorage, storage.NoDecoration, fakeConnectionInfoGetter{}, nil)
return storage, fakeClient
}

Expand Down
21 changes: 5 additions & 16 deletions pkg/registry/pod/etcd/etcd.go
Expand Up @@ -60,27 +60,16 @@ type REST struct {
}

// NewStorage returns a RESTStorage object that will work against pods.
func NewStorage(s storage.Interface, useCacher bool, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper) PodStorage {
func NewStorage(s storage.Interface, storageFactory storage.StorageFactory, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper) PodStorage {
prefix := "/pods"

storageInterface := s
if useCacher {
config := storage.CacherConfig{
CacheCapacity: 1000,
Storage: s,
Type: &api.Pod{},
ResourcePrefix: prefix,
KeyFunc: func(obj runtime.Object) (string, error) {
return storage.NamespaceKeyFunc(prefix, obj)
},
NewListFunc: func() runtime.Object { return &api.PodList{} },
}
storageInterface = storage.NewCacher(config)
}
newListFunc := func() runtime.Object { return &api.PodList{} }
storageInterface := storageFactory(
s, 1000, nil, &api.Pod{}, prefix, true, newListFunc)

store := &etcdgeneric.Etcd{
NewFunc: func() runtime.Object { return &api.Pod{} },
NewListFunc: func() runtime.Object { return &api.PodList{} },
NewListFunc: newListFunc,
KeyRootFunc: func(ctx api.Context) string {
return etcdgeneric.NamespaceKeyRootFunc(ctx, prefix)
},
Expand Down
3 changes: 2 additions & 1 deletion pkg/registry/pod/etcd/etcd_test.go
Expand Up @@ -31,14 +31,15 @@ import (
"k8s.io/kubernetes/pkg/registry/registrytest"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/securitycontext"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/tools"
"k8s.io/kubernetes/pkg/tools/etcdtest"
"k8s.io/kubernetes/pkg/util"
)

func newStorage(t *testing.T) (*REST, *BindingREST, *StatusREST, *tools.FakeEtcdClient) {
etcdStorage, fakeClient := registrytest.NewEtcdStorage(t, "")
storage := NewStorage(etcdStorage, false, nil, nil)
storage := NewStorage(etcdStorage, storage.NoDecoration, nil, nil)
return storage.Pod, storage.Binding, storage.Status, fakeClient
}

Expand Down
55 changes: 54 additions & 1 deletion pkg/storage/cacher.go
Expand Up @@ -111,10 +111,63 @@ type Cacher struct {
ListFromCache bool
}

// StorageFactory is a function signature for producing
// a storage.Interface from given parameters.
type StorageFactory func(
storage Interface,
capacity int,
versioner Versioner,
objectType runtime.Object,
resourcePrefix string,
namespaceScoped bool,
newListFunc func() runtime.Object) Interface

func NoDecoration(
storage Interface,
capacity int,
versioner Versioner,
objectType runtime.Object,
resourcePrefix string,
namespaceScoped bool,
newListFunc func() runtime.Object) Interface {
return storage
}

// Create a new Cacher responsible from service WATCH and LIST requests from its
// internal cache and updating its cache in the background based on the given
// configuration.
func NewCacher(
storage Interface,
capacity int,
versioner Versioner,
objectType runtime.Object,
resourcePrefix string,
namespaceScoped bool,
newListFunc func() runtime.Object) Interface {
config := CacherConfig{
CacheCapacity: capacity,
Storage: storage,
Versioner: versioner,
Type: objectType,
ResourcePrefix: resourcePrefix,
NewListFunc: newListFunc,
}
if namespaceScoped {
config.KeyFunc = func(obj runtime.Object) (string, error) {
return NamespaceKeyFunc(resourcePrefix, obj)
}
} else {
config.KeyFunc = func(obj runtime.Object) (string, error) {
return NoNamespaceKeyFunc(resourcePrefix, obj)
}
}
return NewCacherFromConfig(config)
}

// Create a new Cacher responsible from service WATCH and LIST requests from its
// internal cache and updating its cache in the background based on the given
// configuration.
func NewCacher(config CacherConfig) *Cacher {
func NewCacherFromConfig(config CacherConfig) *Cacher {
watchCache := newWatchCache(config.CacheCapacity)
listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/cacher_test.go
Expand Up @@ -55,7 +55,7 @@ func newTestCacher(client tools.EtcdClient) *storage.Cacher {
NewListFunc: func() runtime.Object { return &api.PodList{} },
StopChannel: util.NeverStop,
}
return storage.NewCacher(config)
return storage.NewCacherFromConfig(config)
}

func makeTestPod(name string) *api.Pod {
Expand Down