Skip to content

Commit

Permalink
apiserver add --lease-reuse-duration-seconds to config lease reuse du…
Browse files Browse the repository at this point in the history
…ration

Signed-off-by: Ling Samuel <lingsamuelgrace@gmail.com>
  • Loading branch information
lingsamuel authored and mborsz committed Mar 10, 2021
1 parent f2fe40c commit 67a75c2
Show file tree
Hide file tree
Showing 12 changed files with 51 additions and 46 deletions.
13 changes: 7 additions & 6 deletions cmd/kube-apiserver/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,13 @@ func TestAddFlags(t *testing.T) {
TrustedCAFile: "/var/run/kubernetes/etcdca.crt",
CertFile: "/var/run/kubernetes/etcdce.crt",
},
Paging: true,
Prefix: "/registry",
CompactionInterval: storagebackend.DefaultCompactInterval,
CountMetricPollPeriod: time.Minute,
DBMetricPollInterval: storagebackend.DefaultDBMetricPollInterval,
HealthcheckTimeout: storagebackend.DefaultHealthcheckTimeout,
Paging: true,
Prefix: "/registry",
CompactionInterval: storagebackend.DefaultCompactInterval,
CountMetricPollPeriod: time.Minute,
DBMetricPollInterval: storagebackend.DefaultDBMetricPollInterval,
HealthcheckTimeout: storagebackend.DefaultHealthcheckTimeout,
LeaseReuseDurationSeconds: storagebackend.DefaultLeaseReuseDurationSeconds,
},
DefaultStorageMediaType: "application/vnd.kubernetes.protobuf",
DeleteCollectionWorkers: 1,
Expand Down
3 changes: 3 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/server/options/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) {

fs.DurationVar(&s.StorageConfig.HealthcheckTimeout, "etcd-healthcheck-timeout", s.StorageConfig.HealthcheckTimeout,
"The timeout to use when checking etcd health.")

fs.Int64Var(&s.StorageConfig.LeaseReuseDurationSeconds, "lease-reuse-duration-seconds", s.StorageConfig.LeaseReuseDurationSeconds,
"The time in seconds that each lease is reused. A lower value could avoid large number of objects reusing the same lease. Notice that a too small value may cause performance problems at storage layer.")
}

func (s *EtcdOptions) ApplyTo(c *server.Config) error {
Expand Down
1 change: 1 addition & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/apis/example/v1:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/testing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
Expand Down
12 changes: 2 additions & 10 deletions staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ type leaseManager struct {
}

// newDefaultLeaseManager creates a new lease manager using default setting.
func newDefaultLeaseManager(client *clientv3.Client) *leaseManager {
return newLeaseManager(client, 60, 0.05)
func newDefaultLeaseManager(client *clientv3.Client, leaseReuseDurationSeconds int64) *leaseManager {
return newLeaseManager(client, leaseReuseDurationSeconds, 0.05)
}

// newLeaseManager creates a new lease manager with the number of buffered
Expand All @@ -57,14 +57,6 @@ func newLeaseManager(client *clientv3.Client, leaseReuseDurationSeconds int64, l
}
}

// setLeaseReuseDurationSeconds is used for testing purpose. It is used to
// reduce the extra lease duration to avoid unnecessary timeout in testing.
func (l *leaseManager) setLeaseReuseDurationSeconds(duration int64) {
l.leaseMu.Lock()
defer l.leaseMu.Unlock()
l.leaseReuseDurationSeconds = duration
}

// GetLease returns a lease based on requested ttl: if the cached previous
// lease can be reused, reuse it; otherwise request a new one from etcd.
func (l *leaseManager) GetLease(ctx context.Context, ttl int64) (clientv3.LeaseID, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package etcd3

import (
"k8s.io/apiserver/pkg/storage/storagebackend"
"testing"
)

Expand All @@ -34,7 +35,7 @@ func TestGetReuseDurationSeconds(t *testing.T) {
duration: 50,
},
}
lm := newDefaultLeaseManager(nil)
lm := newDefaultLeaseManager(nil, storagebackend.DefaultLeaseReuseDurationSeconds)
for i := 0; i < len(testCases); i++ {
dur := lm.getReuseDurationSecondsLocked(testCases[i].ttl)
if dur != testCases[i].duration {
Expand Down
8 changes: 4 additions & 4 deletions staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ type objState struct {
}

// New returns an etcd3 implementation of storage.Interface.
func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface {
return newStore(c, newFunc, pagingEnabled, codec, prefix, transformer)
func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, transformer value.Transformer, pagingEnabled bool, leaseReuseDurationSeconds int64) storage.Interface {
return newStore(c, newFunc, pagingEnabled, leaseReuseDurationSeconds, codec, prefix, transformer)
}

func newStore(c *clientv3.Client, newFunc func() runtime.Object, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store {
func newStore(c *clientv3.Client, newFunc func() runtime.Object, pagingEnabled bool, leaseReuseDurationSeconds int64, codec runtime.Codec, prefix string, transformer value.Transformer) *store {
versioner := APIObjectVersioner{}
result := &store{
client: c,
Expand All @@ -100,7 +100,7 @@ func newStore(c *clientv3.Client, newFunc func() runtime.Object, pagingEnabled b
// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
pathPrefix: path.Join("/", prefix),
watcher: newWatcher(c, codec, newFunc, versioner, transformer),
leaseManager: newDefaultLeaseManager(c),
leaseManager: newDefaultLeaseManager(c, leaseReuseDurationSeconds),
}
return result
}
Expand Down
22 changes: 11 additions & 11 deletions staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
storagetesting "k8s.io/apiserver/pkg/storage/testing"
"k8s.io/apiserver/pkg/storage/value"
utilfeature "k8s.io/apiserver/pkg/util/feature"
Expand Down Expand Up @@ -823,7 +824,7 @@ func TestTransformationFailure(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), newPod, false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
store := newStore(cluster.RandClient(), newPod, false, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background()

preset := []struct {
Expand Down Expand Up @@ -900,8 +901,8 @@ func TestList(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
disablePagingStore := newStore(cluster.RandClient(), newPod, false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
store := newStore(cluster.RandClient(), newPod, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
disablePagingStore := newStore(cluster.RandClient(), newPod, false, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background()

// Setup storage with the following structure:
Expand Down Expand Up @@ -1399,7 +1400,7 @@ func TestListContinuation(t *testing.T) {
etcdClient := cluster.RandClient()
recorder := &clientRecorder{KV: etcdClient.KV}
etcdClient.KV = recorder
store := newStore(etcdClient, newPod, true, codec, "", transformer)
store := newStore(etcdClient, newPod, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", transformer)
ctx := context.Background()

// Setup storage with the following structure:
Expand Down Expand Up @@ -1561,7 +1562,7 @@ func TestListContinuationWithFilter(t *testing.T) {
etcdClient := cluster.RandClient()
recorder := &clientRecorder{KV: etcdClient.KV}
etcdClient.KV = recorder
store := newStore(etcdClient, newPod, true, codec, "", transformer)
store := newStore(etcdClient, newPod, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", transformer)
ctx := context.Background()

preset := []struct {
Expand Down Expand Up @@ -1664,7 +1665,7 @@ func TestListInconsistentContinuation(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
store := newStore(cluster.RandClient(), newPod, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background()

// Setup storage with the following structure:
Expand Down Expand Up @@ -1809,12 +1810,11 @@ func TestListInconsistentContinuation(t *testing.T) {
func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
store := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background()
// As 30s is the default timeout for testing in glboal configuration,
// we cannot wait longer than that in a single time: change it to 10
// for testing purposes. See apimachinery/pkg/util/wait/wait.go
store.leaseManager.setLeaseReuseDurationSeconds(1)
store := newStore(cluster.RandClient(), newPod, true, 1, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background()
return ctx, store, cluster
}

Expand Down Expand Up @@ -1855,7 +1855,7 @@ func TestPrefix(t *testing.T) {
"/registry": "/registry",
}
for configuredPrefix, effectivePrefix := range testcases {
store := newStore(cluster.RandClient(), nil, true, codec, configuredPrefix, transformer)
store := newStore(cluster.RandClient(), nil, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, configuredPrefix, transformer)
if store.pathPrefix != effectivePrefix {
t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, store.pathPrefix)
}
Expand Down Expand Up @@ -2022,7 +2022,7 @@ func TestConsistentList(t *testing.T) {
transformer := &fancyTransformer{
transformer: &prefixTransformer{prefix: []byte(defaultTestPrefix)},
}
store := newStore(cluster.RandClient(), newPod, true, codec, "", transformer)
store := newStore(cluster.RandClient(), newPod, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", transformer)
transformer.store = store

for i := 0; i < 5; i++ {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
)

func TestWatch(t *testing.T) {
Expand Down Expand Up @@ -225,13 +226,13 @@ func TestWatchError(t *testing.T) {
codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)}
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
invalidStore := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte("test!")})
invalidStore := newStore(cluster.RandClient(), newPod, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte("test!")})
ctx := context.Background()
w, err := invalidStore.Watch(ctx, "/abc", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil {
t.Fatalf("Watch failed: %v", err)
}
validStore := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte("test!")})
validStore := newStore(cluster.RandClient(), newPod, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte("test!")})
validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate(
func(runtime.Object) (runtime.Object, error) {
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil
Expand Down Expand Up @@ -322,7 +323,7 @@ func TestProgressNotify(t *testing.T) {
}
cluster := integration.NewClusterV3(t, clusterConfig)
defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), newPod, false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
store := newStore(cluster.RandClient(), newPod, false, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background()

key := "/somekey"
Expand Down
22 changes: 13 additions & 9 deletions staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ const (
StorageTypeUnset = ""
StorageTypeETCD3 = "etcd3"

DefaultCompactInterval = 5 * time.Minute
DefaultDBMetricPollInterval = 30 * time.Second
DefaultHealthcheckTimeout = 2 * time.Second
DefaultCompactInterval = 5 * time.Minute
DefaultDBMetricPollInterval = 30 * time.Second
DefaultHealthcheckTimeout = 2 * time.Second
DefaultLeaseReuseDurationSeconds = 60
)

// TransportConfig holds all connection related info, i.e. equal TransportConfig means equal servers we talk to.
Expand Down Expand Up @@ -77,15 +78,18 @@ type Config struct {
DBMetricPollInterval time.Duration
// HealthcheckTimeout specifies the timeout used when checking health
HealthcheckTimeout time.Duration
// LeaseReuseDurationSeconds specifies time in seconds that each lease is reused. See pkg/storage/etcd3/lease_manager.go
LeaseReuseDurationSeconds int64
}

func NewDefaultConfig(prefix string, codec runtime.Codec) *Config {
return &Config{
Paging: true,
Prefix: prefix,
Codec: codec,
CompactionInterval: DefaultCompactInterval,
DBMetricPollInterval: DefaultDBMetricPollInterval,
HealthcheckTimeout: DefaultHealthcheckTimeout,
Paging: true,
Prefix: prefix,
Codec: codec,
CompactionInterval: DefaultCompactInterval,
DBMetricPollInterval: DefaultDBMetricPollInterval,
HealthcheckTimeout: DefaultHealthcheckTimeout,
LeaseReuseDurationSeconds: DefaultLeaseReuseDurationSeconds,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func newETCD3Storage(c storagebackend.Config, newFunc func() runtime.Object) (st
if transformer == nil {
transformer = value.IdentityTransformer
}
return etcd3.New(client, c.Codec, newFunc, c.Prefix, transformer, c.Paging), destroyFunc, nil
return etcd3.New(client, c.Codec, newFunc, c.Prefix, transformer, c.Paging, c.LeaseReuseDurationSeconds), destroyFunc, nil
}

// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the
Expand Down
1 change: 1 addition & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/storage/cacher:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3/testing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/testing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
cacherstorage "k8s.io/apiserver/pkg/storage/cacher"
"k8s.io/apiserver/pkg/storage/etcd3"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
"k8s.io/apiserver/pkg/storage/storagebackend"
storagetesting "k8s.io/apiserver/pkg/storage/testing"
"k8s.io/apiserver/pkg/storage/value"
utilfeature "k8s.io/apiserver/pkg/util/feature"
Expand Down Expand Up @@ -105,7 +106,7 @@ func newPodList() runtime.Object { return &example.PodList{} }

func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), newPod, prefix, value.IdentityTransformer, true)
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), newPod, prefix, value.IdentityTransformer, true, storagebackend.DefaultLeaseReuseDurationSeconds)
return server, storage
}

Expand Down

0 comments on commit 67a75c2

Please sign in to comment.