Skip to content

Commit

Permalink
Switch default settings to leverage etcd3.v3client
Browse files Browse the repository at this point in the history
  • Loading branch information
Timothy St. Clair committed Aug 19, 2016
1 parent c5e3b79 commit 828755a
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 52 deletions.
2 changes: 1 addition & 1 deletion pkg/genericapiserver/genericapiserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import (

// setUp is a convience function for setting up for (most) tests.
func setUp(t *testing.T) (GenericAPIServer, *etcdtesting.EtcdTestServer, Config, *assert.Assertions) {
etcdServer := etcdtesting.NewEtcdTestClientServer(t)
etcdServer, _ := etcdtesting.NewUnsecuredEtcd3TestClientServer(t)

genericapiserver := GenericAPIServer{}
config := Config{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/genericapiserver/options/server_run_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) {
fs.MarkDeprecated("service-node-ports", "see --service-node-port-range instead")

fs.StringVar(&s.StorageConfig.Type, "storage-backend", s.StorageConfig.Type,
"The storage backend for persistence. Options: 'etcd2' (default), 'etcd3'.")
"The storage backend for persistence. Options: 'etcd2', 'etcd3' (default).")

fs.IntVar(&s.StorageConfig.DeserializationCacheSize, "deserialization-cache-size", s.StorageConfig.DeserializationCacheSize,
"Number of deserialized json objects to cache in memory.")
Expand Down
15 changes: 2 additions & 13 deletions pkg/master/master_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ import (
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
"k8s.io/kubernetes/pkg/storage/storagebackend"
"k8s.io/kubernetes/pkg/util/intstr"
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/sets"
Expand All @@ -70,7 +69,7 @@ import (

// setUp is a convience function for setting up for (most) tests.
func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.Assertions) {
server := etcdtesting.NewUnsecuredEtcdTestClientServer(t)
server, storageConfig := etcdtesting.NewUnsecuredEtcd3TestClientServer(t)

master := &Master{
GenericAPIServer: &genericapiserver.GenericAPIServer{},
Expand All @@ -79,16 +78,6 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.
Config: &genericapiserver.Config{},
}

storageConfig := storagebackend.Config{
Prefix: etcdtest.PathPrefix(),
CAFile: server.CAFile,
KeyFile: server.KeyFile,
CertFile: server.CertFile,
}
for _, url := range server.ClientURLs {
storageConfig.ServerList = append(storageConfig.ServerList, url.String())
}

resourceEncoding := genericapiserver.NewDefaultResourceEncodingConfig()
resourceEncoding.SetVersionEncoding(api.GroupName, *testapi.Default.GroupVersion(), unversioned.GroupVersion{Group: api.GroupName, Version: runtime.APIVersionInternal})
resourceEncoding.SetVersionEncoding(autoscaling.GroupName, *testapi.Autoscaling.GroupVersion(), unversioned.GroupVersion{Group: autoscaling.GroupName, Version: runtime.APIVersionInternal})
Expand All @@ -97,7 +86,7 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.
resourceEncoding.SetVersionEncoding(extensions.GroupName, *testapi.Extensions.GroupVersion(), unversioned.GroupVersion{Group: extensions.GroupName, Version: runtime.APIVersionInternal})
resourceEncoding.SetVersionEncoding(rbac.GroupName, *testapi.Rbac.GroupVersion(), unversioned.GroupVersion{Group: rbac.GroupName, Version: runtime.APIVersionInternal})
resourceEncoding.SetVersionEncoding(certificates.GroupName, *testapi.Certificates.GroupVersion(), unversioned.GroupVersion{Group: certificates.GroupName, Version: runtime.APIVersionInternal})
storageFactory := genericapiserver.NewDefaultStorageFactory(storageConfig, testapi.StorageMediaType(), api.Codecs, resourceEncoding, DefaultAPIResourceConfigSource())
storageFactory := genericapiserver.NewDefaultStorageFactory(*storageConfig, testapi.StorageMediaType(), api.Codecs, resourceEncoding, DefaultAPIResourceConfigSource())

config.StorageFactory = storageFactory
config.APIResourceConfigSource = DefaultAPIResourceConfigSource()
Expand Down
13 changes: 8 additions & 5 deletions pkg/registry/generic/registry/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ import (
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
"k8s.io/kubernetes/pkg/storage/storagebackend/factory"
storagetesting "k8s.io/kubernetes/pkg/storage/testing"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/validation/field"
Expand Down Expand Up @@ -1013,10 +1013,13 @@ func TestStoreWatch(t *testing.T) {

func newTestGenericStoreRegistry(t *testing.T, hasCacheEnabled bool) (*etcdtesting.EtcdTestServer, *Store) {
podPrefix := "/pods"
server := etcdtesting.NewEtcdTestClientServer(t)
server, sc := etcdtesting.NewUnsecuredEtcd3TestClientServer(t)
strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false, true}
codec := testapi.Default.StorageCodec()
s := etcdstorage.NewEtcdStorage(server.Client, codec, etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize)
sc.Codec = testapi.Default.StorageCodec()
s, err := factory.Create(*sc)
if err != nil {
t.Fatalf("Error creating storage: %v", err)
}
if hasCacheEnabled {
config := storage.CacherConfig{
CacheCapacity: 10,
Expand All @@ -1026,7 +1029,7 @@ func newTestGenericStoreRegistry(t *testing.T, hasCacheEnabled bool) (*etcdtesti
ResourcePrefix: podPrefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) },
NewListFunc: func() runtime.Object { return &api.PodList{} },
Codec: codec,
Codec: sc.Codec,
}
s = storage.NewCacherFromConfig(config)
}
Expand Down
11 changes: 2 additions & 9 deletions pkg/registry/registrytest/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,14 @@ import (
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/runtime"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
"k8s.io/kubernetes/pkg/storage/storagebackend"
storagetesting "k8s.io/kubernetes/pkg/storage/testing"
)

func NewEtcdStorage(t *testing.T, group string) (*storagebackend.Config, *etcdtesting.EtcdTestServer) {
server := etcdtesting.NewUnsecuredEtcdTestClientServer(t)
config := &storagebackend.Config{
Type: "etcd2",
Prefix: etcdtest.PathPrefix(),
ServerList: server.Client.Endpoints(),
DeserializationCacheSize: etcdtest.DeserializationCacheSize,
Codec: testapi.Groups[group].StorageCodec(),
}
server, config := etcdtesting.NewUnsecuredEtcd3TestClientServer(t)
config.Codec = testapi.Groups[group].StorageCodec()
return config, server
}

Expand Down
69 changes: 49 additions & 20 deletions pkg/storage/etcd/testing/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,15 @@ import (
"testing"
"time"

"k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
"k8s.io/kubernetes/pkg/storage/storagebackend"
"k8s.io/kubernetes/pkg/util/wait"

etcd "github.com/coreos/etcd/client"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v2http"
"github.com/coreos/etcd/integration"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/pkg/types"
Expand All @@ -41,6 +45,8 @@ import (

// EtcdTestServer encapsulates the datastructures needed to start local instance for testing
type EtcdTestServer struct {
// The following are lumped etcd2 test server params
// TODO: Deprecate in a post 1.5 release
etcdserver.ServerConfig
PeerListeners, ClientListeners []net.Listener
Client etcd.Client
Expand All @@ -53,6 +59,10 @@ type EtcdTestServer struct {
raftHandler http.Handler
s *etcdserver.EtcdServer
hss []*httptest.Server

// The following are lumped etcd3 test server params
v3Cluster *integration.ClusterV3
v3Client *clientv3.Client
}

// newLocalListener opens a port localhost using any port
Expand Down Expand Up @@ -218,30 +228,34 @@ func (m *EtcdTestServer) waitUntilUp() error {

// Terminate will shutdown the running etcd server
func (m *EtcdTestServer) Terminate(t *testing.T) {
m.Client = nil
m.s.Stop()
// TODO: This is a pretty ugly hack to workaround races during closing
// in-memory etcd server in unit tests - see #18928 for more details.
// We should get rid of it as soon as we have a proper fix - etcd clients
// have overwritten transport counting opened connections (probably by
// overwriting Dial function) and termination function waiting for all
// connections to be closed and stopping accepting new ones.
time.Sleep(250 * time.Millisecond)
for _, hs := range m.hss {
hs.CloseClientConnections()
hs.Close()
}
if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
t.Fatal(err)
}
if len(m.CertificatesDir) > 0 {
if err := os.RemoveAll(m.CertificatesDir); err != nil {
if m.v3Cluster != nil {
m.v3Cluster.Terminate(t)
} else {
m.Client = nil
m.s.Stop()
// TODO: This is a pretty ugly hack to workaround races during closing
// in-memory etcd server in unit tests - see #18928 for more details.
// We should get rid of it as soon as we have a proper fix - etcd clients
// have overwritten transport counting opened connections (probably by
// overwriting Dial function) and termination function waiting for all
// connections to be closed and stopping accepting new ones.
time.Sleep(250 * time.Millisecond)
for _, hs := range m.hss {
hs.CloseClientConnections()
hs.Close()
}
if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
t.Fatal(err)
}
if len(m.CertificatesDir) > 0 {
if err := os.RemoveAll(m.CertificatesDir); err != nil {
t.Fatal(err)
}
}
}
}

// NewEtcdTestClientServer creates a new client and server for testing
// NewEtcdTestClientServer DEPRECATED creates a new client and server for testing
func NewEtcdTestClientServer(t *testing.T) *EtcdTestServer {
server := configureTestCluster(t, "foo", true)
err := server.launch(t)
Expand All @@ -268,7 +282,7 @@ func NewEtcdTestClientServer(t *testing.T) *EtcdTestServer {
return server
}

// NewUnsecuredEtcdTestClientServer creates a new client and server for testing
// NewUnsecuredEtcdTestClientServer DEPRECATED creates a new client and server for testing
func NewUnsecuredEtcdTestClientServer(t *testing.T) *EtcdTestServer {
server := configureTestCluster(t, "foo", false)
err := server.launch(t)
Expand All @@ -293,3 +307,18 @@ func NewUnsecuredEtcdTestClientServer(t *testing.T) *EtcdTestServer {
}
return server
}

// NewEtcd3TestClientServer creates a new client and server for testing
func NewUnsecuredEtcd3TestClientServer(t *testing.T) (*EtcdTestServer, *storagebackend.Config) {
server := &EtcdTestServer{
v3Cluster: integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}),
}
server.v3Client = server.v3Cluster.RandClient()
config := &storagebackend.Config{
Type: "etcd3",
Prefix: etcdtest.PathPrefix(),
ServerList: server.v3Client.Endpoints(),
DeserializationCacheSize: etcdtest.DeserializationCacheSize,
}
return server, config
}
2 changes: 1 addition & 1 deletion pkg/storage/storagebackend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const (

// Config is configuration for creating a storage backend.
type Config struct {
// Type defines the type of storage backend, e.g. "etcd2", etcd3". Default ("") is "etcd2".
// Type defines the type of storage backend, e.g. "etcd2", etcd3". Default ("") is "etcd3".
Type string
// Prefix is the prefix to all keys passed to storage.Interface methods.
Prefix string
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/storagebackend/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
// Create creates a storage backend based on given config.
func Create(c storagebackend.Config) (storage.Interface, error) {
switch c.Type {
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD2:
case storagebackend.StorageTypeETCD2:
return newETCD2Storage(c)
case storagebackend.StorageTypeETCD3:
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
// TODO: We have the following features to implement:
// - Support secure connection by using key, cert, and CA files.
// - Honor "https" scheme to support secure connection in gRPC.
Expand Down

0 comments on commit 828755a

Please sign in to comment.