Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

apiserver add --lease-reuse-duration-seconds to config lease reuse duration #97009

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 7 additions & 6 deletions cmd/kube-apiserver/app/options/options_test.go
Expand Up @@ -155,12 +155,13 @@ func TestAddFlags(t *testing.T) {
TrustedCAFile: "/var/run/kubernetes/etcdca.crt",
CertFile: "/var/run/kubernetes/etcdce.crt",
},
Paging: true,
Prefix: "/registry",
CompactionInterval: storagebackend.DefaultCompactInterval,
CountMetricPollPeriod: time.Minute,
DBMetricPollInterval: storagebackend.DefaultDBMetricPollInterval,
HealthcheckTimeout: storagebackend.DefaultHealthcheckTimeout,
Paging: true,
Prefix: "/registry",
CompactionInterval: storagebackend.DefaultCompactInterval,
CountMetricPollPeriod: time.Minute,
DBMetricPollInterval: storagebackend.DefaultDBMetricPollInterval,
HealthcheckTimeout: storagebackend.DefaultHealthcheckTimeout,
LeaseReuseDurationSeconds: storagebackend.DefaultLeaseReuseDurationSeconds,
},
DefaultStorageMediaType: "application/vnd.kubernetes.protobuf",
DeleteCollectionWorkers: 1,
Expand Down
3 changes: 3 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/server/options/etcd.go
Expand Up @@ -183,6 +183,9 @@ func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) {

fs.DurationVar(&s.StorageConfig.HealthcheckTimeout, "etcd-healthcheck-timeout", s.StorageConfig.HealthcheckTimeout,
"The timeout to use when checking etcd health.")

fs.Int64Var(&s.StorageConfig.LeaseReuseDurationSeconds, "lease-reuse-duration-seconds", s.StorageConfig.LeaseReuseDurationSeconds,
"The time in seconds that each lease is reused. A lower value could avoid large number of objects reusing the same lease. Notice that a too small value may cause performance problems at storage layer.")
}

func (s *EtcdOptions) ApplyTo(c *server.Config) error {
Expand Down
1 change: 1 addition & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD
Expand Up @@ -35,6 +35,7 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/apis/example/v1:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/testing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
Expand Down
12 changes: 2 additions & 10 deletions staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager.go
Expand Up @@ -42,8 +42,8 @@ type leaseManager struct {
}

// newDefaultLeaseManager creates a new lease manager using default setting.
func newDefaultLeaseManager(client *clientv3.Client) *leaseManager {
return newLeaseManager(client, 60, 0.05)
func newDefaultLeaseManager(client *clientv3.Client, leaseReuseDurationSeconds int64) *leaseManager {
return newLeaseManager(client, leaseReuseDurationSeconds, 0.05)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could change this to an option struct if we can config a max size of a lease further.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do that when we will be adding the next argument.

}

// newLeaseManager creates a new lease manager with the number of buffered
Expand All @@ -57,14 +57,6 @@ func newLeaseManager(client *clientv3.Client, leaseReuseDurationSeconds int64, l
}
}

// setLeaseReuseDurationSeconds is used for testing purpose. It is used to
// reduce the extra lease duration to avoid unnecessary timeout in testing.
func (l *leaseManager) setLeaseReuseDurationSeconds(duration int64) {
l.leaseMu.Lock()
defer l.leaseMu.Unlock()
l.leaseReuseDurationSeconds = duration
}

// GetLease returns a lease based on requested ttl: if the cached previous
// lease can be reused, reuse it; otherwise request a new one from etcd.
func (l *leaseManager) GetLease(ctx context.Context, ttl int64) (clientv3.LeaseID, error) {
Expand Down
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package etcd3

import (
"k8s.io/apiserver/pkg/storage/storagebackend"
"testing"
)

Expand All @@ -34,7 +35,7 @@ func TestGetReuseDurationSeconds(t *testing.T) {
duration: 50,
},
}
lm := newDefaultLeaseManager(nil)
lm := newDefaultLeaseManager(nil, storagebackend.DefaultLeaseReuseDurationSeconds)
for i := 0; i < len(testCases); i++ {
dur := lm.getReuseDurationSecondsLocked(testCases[i].ttl)
if dur != testCases[i].duration {
Expand Down
8 changes: 4 additions & 4 deletions staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go
Expand Up @@ -83,11 +83,11 @@ type objState struct {
}

// New returns an etcd3 implementation of storage.Interface.
func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface {
return newStore(c, newFunc, pagingEnabled, codec, prefix, transformer)
func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, transformer value.Transformer, pagingEnabled bool, leaseReuseDurationSeconds int64) storage.Interface {
return newStore(c, newFunc, pagingEnabled, leaseReuseDurationSeconds, codec, prefix, transformer)
}

func newStore(c *clientv3.Client, newFunc func() runtime.Object, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store {
func newStore(c *clientv3.Client, newFunc func() runtime.Object, pagingEnabled bool, leaseReuseDurationSeconds int64, codec runtime.Codec, prefix string, transformer value.Transformer) *store {
versioner := APIObjectVersioner{}
result := &store{
client: c,
Expand All @@ -100,7 +100,7 @@ func newStore(c *clientv3.Client, newFunc func() runtime.Object, pagingEnabled b
// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
pathPrefix: path.Join("/", prefix),
watcher: newWatcher(c, codec, newFunc, versioner, transformer),
leaseManager: newDefaultLeaseManager(c),
leaseManager: newDefaultLeaseManager(c, leaseReuseDurationSeconds),
}
return result
}
Expand Down
22 changes: 11 additions & 11 deletions staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go
Expand Up @@ -47,6 +47,7 @@ import (
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
storagetesting "k8s.io/apiserver/pkg/storage/testing"
"k8s.io/apiserver/pkg/storage/value"
utilfeature "k8s.io/apiserver/pkg/util/feature"
Expand Down Expand Up @@ -823,7 +824,7 @@ func TestTransformationFailure(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), newPod, false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
store := newStore(cluster.RandClient(), newPod, false, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background()

preset := []struct {
Expand Down Expand Up @@ -900,8 +901,8 @@ func TestList(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
disablePagingStore := newStore(cluster.RandClient(), newPod, false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
store := newStore(cluster.RandClient(), newPod, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
disablePagingStore := newStore(cluster.RandClient(), newPod, false, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background()

// Setup storage with the following structure:
Expand Down Expand Up @@ -1399,7 +1400,7 @@ func TestListContinuation(t *testing.T) {
etcdClient := cluster.RandClient()
recorder := &clientRecorder{KV: etcdClient.KV}
etcdClient.KV = recorder
store := newStore(etcdClient, newPod, true, codec, "", transformer)
store := newStore(etcdClient, newPod, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", transformer)
ctx := context.Background()

// Setup storage with the following structure:
Expand Down Expand Up @@ -1561,7 +1562,7 @@ func TestListContinuationWithFilter(t *testing.T) {
etcdClient := cluster.RandClient()
recorder := &clientRecorder{KV: etcdClient.KV}
etcdClient.KV = recorder
store := newStore(etcdClient, newPod, true, codec, "", transformer)
store := newStore(etcdClient, newPod, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", transformer)
ctx := context.Background()

preset := []struct {
Expand Down Expand Up @@ -1664,7 +1665,7 @@ func TestListInconsistentContinuation(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
store := newStore(cluster.RandClient(), newPod, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background()

// Setup storage with the following structure:
Expand Down Expand Up @@ -1809,12 +1810,11 @@ func TestListInconsistentContinuation(t *testing.T) {
func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
store := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background()
// As 30s is the default timeout for testing in glboal configuration,
// we cannot wait longer than that in a single time: change it to 10
// for testing purposes. See apimachinery/pkg/util/wait/wait.go
store.leaseManager.setLeaseReuseDurationSeconds(1)
store := newStore(cluster.RandClient(), newPod, true, 1, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background()
return ctx, store, cluster
}

Expand Down Expand Up @@ -1855,7 +1855,7 @@ func TestPrefix(t *testing.T) {
"/registry": "/registry",
}
for configuredPrefix, effectivePrefix := range testcases {
store := newStore(cluster.RandClient(), nil, true, codec, configuredPrefix, transformer)
store := newStore(cluster.RandClient(), nil, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, configuredPrefix, transformer)
if store.pathPrefix != effectivePrefix {
t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, store.pathPrefix)
}
Expand Down Expand Up @@ -2022,7 +2022,7 @@ func TestConsistentList(t *testing.T) {
transformer := &fancyTransformer{
transformer: &prefixTransformer{prefix: []byte(defaultTestPrefix)},
}
store := newStore(cluster.RandClient(), newPod, true, codec, "", transformer)
store := newStore(cluster.RandClient(), newPod, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", transformer)
transformer.store = store

for i := 0; i < 5; i++ {
Expand Down
Expand Up @@ -38,6 +38,7 @@ import (
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
)

func TestWatch(t *testing.T) {
Expand Down Expand Up @@ -225,13 +226,13 @@ func TestWatchError(t *testing.T) {
codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)}
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
invalidStore := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte("test!")})
invalidStore := newStore(cluster.RandClient(), newPod, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte("test!")})
ctx := context.Background()
w, err := invalidStore.Watch(ctx, "/abc", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil {
t.Fatalf("Watch failed: %v", err)
}
validStore := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte("test!")})
validStore := newStore(cluster.RandClient(), newPod, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte("test!")})
validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate(
func(runtime.Object) (runtime.Object, error) {
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil
Expand Down Expand Up @@ -322,7 +323,7 @@ func TestProgressNotify(t *testing.T) {
}
cluster := integration.NewClusterV3(t, clusterConfig)
defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), newPod, false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
store := newStore(cluster.RandClient(), newPod, false, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background()

key := "/somekey"
Expand Down
22 changes: 13 additions & 9 deletions staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go
Expand Up @@ -28,9 +28,10 @@ const (
StorageTypeUnset = ""
StorageTypeETCD3 = "etcd3"

DefaultCompactInterval = 5 * time.Minute
DefaultDBMetricPollInterval = 30 * time.Second
DefaultHealthcheckTimeout = 2 * time.Second
DefaultCompactInterval = 5 * time.Minute
DefaultDBMetricPollInterval = 30 * time.Second
DefaultHealthcheckTimeout = 2 * time.Second
DefaultLeaseReuseDurationSeconds = 60
)

// TransportConfig holds all connection related info, i.e. equal TransportConfig means equal servers we talk to.
Expand Down Expand Up @@ -77,15 +78,18 @@ type Config struct {
DBMetricPollInterval time.Duration
// HealthcheckTimeout specifies the timeout used when checking health
HealthcheckTimeout time.Duration
// LeaseReuseDurationSeconds specifies time in seconds that each lease is reused. See pkg/storage/etcd3/lease_manager.go
LeaseReuseDurationSeconds int64
}

func NewDefaultConfig(prefix string, codec runtime.Codec) *Config {
return &Config{
Paging: true,
Prefix: prefix,
Codec: codec,
CompactionInterval: DefaultCompactInterval,
DBMetricPollInterval: DefaultDBMetricPollInterval,
HealthcheckTimeout: DefaultHealthcheckTimeout,
Paging: true,
Prefix: prefix,
Codec: codec,
CompactionInterval: DefaultCompactInterval,
DBMetricPollInterval: DefaultDBMetricPollInterval,
HealthcheckTimeout: DefaultHealthcheckTimeout,
LeaseReuseDurationSeconds: DefaultLeaseReuseDurationSeconds,
}
}
Expand Up @@ -254,7 +254,7 @@ func newETCD3Storage(c storagebackend.Config, newFunc func() runtime.Object) (st
if transformer == nil {
transformer = value.IdentityTransformer
}
return etcd3.New(client, c.Codec, newFunc, c.Prefix, transformer, c.Paging), destroyFunc, nil
return etcd3.New(client, c.Codec, newFunc, c.Prefix, transformer, c.Paging, c.LeaseReuseDurationSeconds), destroyFunc, nil
}

// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the
Expand Down
1 change: 1 addition & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/tests/BUILD
Expand Up @@ -27,6 +27,7 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/storage/cacher:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3/testing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/testing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
Expand Down
Expand Up @@ -47,6 +47,7 @@ import (
cacherstorage "k8s.io/apiserver/pkg/storage/cacher"
"k8s.io/apiserver/pkg/storage/etcd3"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
"k8s.io/apiserver/pkg/storage/storagebackend"
storagetesting "k8s.io/apiserver/pkg/storage/testing"
"k8s.io/apiserver/pkg/storage/value"
utilfeature "k8s.io/apiserver/pkg/util/feature"
Expand Down Expand Up @@ -105,7 +106,7 @@ func newPodList() runtime.Object { return &example.PodList{} }

func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), newPod, prefix, value.IdentityTransformer, true)
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), newPod, prefix, value.IdentityTransformer, true, storagebackend.DefaultLeaseReuseDurationSeconds)
return server, storage
}

Expand Down