-
Notifications
You must be signed in to change notification settings - Fork 405
/
lease_manager.go
131 lines (114 loc) · 4.84 KB
/
lease_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"context"
"sync"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
)
const (
defaultLeaseReuseDurationSeconds = 60
defaultLeaseMaxObjectCount = 1000
)
// LeaseManagerConfig is configuration for creating a lease manager.
type LeaseManagerConfig struct {
// ReuseDurationSeconds specifies time in seconds that each lease is reused
ReuseDurationSeconds int64
// MaxObjectCount specifies how many objects that a lease can attach
MaxObjectCount int64
}
// NewDefaultLeaseManagerConfig creates a LeaseManagerConfig with default values
func NewDefaultLeaseManagerConfig() LeaseManagerConfig {
return LeaseManagerConfig{
ReuseDurationSeconds: defaultLeaseReuseDurationSeconds,
MaxObjectCount: defaultLeaseMaxObjectCount,
}
}
// leaseManager is used to manage leases requested from etcd. If a new write
// needs a lease that has similar expiration time to the previous one, the old
// lease will be reused to reduce the overhead of etcd, since lease operations
// are expensive. In the implementation, we only store one previous lease,
// since all the events have the same ttl.
type leaseManager struct {
client *clientv3.Client // etcd client used to grant leases
leaseMu sync.Mutex
prevLeaseID clientv3.LeaseID
prevLeaseExpirationTime time.Time
// The period of time in seconds and percent of TTL that each lease is
// reused. The minimum of them is used to avoid unreasonably large
// numbers.
leaseReuseDurationSeconds int64
leaseReuseDurationPercent float64
leaseMaxAttachedObjectCount int64
leaseAttachedObjectCount int64
}
// newDefaultLeaseManager creates a new lease manager using default setting.
func newDefaultLeaseManager(client *clientv3.Client, config LeaseManagerConfig) *leaseManager {
if config.MaxObjectCount <= 0 {
config.MaxObjectCount = defaultLeaseMaxObjectCount
}
return newLeaseManager(client, config.ReuseDurationSeconds, 0.05, config.MaxObjectCount)
}
// newLeaseManager creates a new lease manager with the number of buffered
// leases, lease reuse duration in seconds and percentage. The percentage
// value x means x*100%.
func newLeaseManager(client *clientv3.Client, leaseReuseDurationSeconds int64, leaseReuseDurationPercent float64, maxObjectCount int64) *leaseManager {
return &leaseManager{
client: client,
leaseReuseDurationSeconds: leaseReuseDurationSeconds,
leaseReuseDurationPercent: leaseReuseDurationPercent,
leaseMaxAttachedObjectCount: maxObjectCount,
}
}
// GetLease returns a lease based on requested ttl: if the cached previous
// lease can be reused, reuse it; otherwise request a new one from etcd.
func (l *leaseManager) GetLease(ctx context.Context, ttl int64) (clientv3.LeaseID, error) {
now := time.Now()
l.leaseMu.Lock()
defer l.leaseMu.Unlock()
// check if previous lease can be reused
reuseDurationSeconds := l.getReuseDurationSecondsLocked(ttl)
valid := now.Add(time.Duration(ttl) * time.Second).Before(l.prevLeaseExpirationTime)
sufficient := now.Add(time.Duration(ttl+reuseDurationSeconds) * time.Second).After(l.prevLeaseExpirationTime)
// We count all operations that happened in the same lease, regardless of success or failure.
// Currently each GetLease call only attach 1 object
l.leaseAttachedObjectCount++
if valid && sufficient && l.leaseAttachedObjectCount <= l.leaseMaxAttachedObjectCount {
return l.prevLeaseID, nil
}
// request a lease with a little extra ttl from etcd
ttl += reuseDurationSeconds
lcr, err := l.client.Lease.Grant(ctx, ttl)
if err != nil {
return clientv3.LeaseID(0), err
}
// cache the new lease id
l.prevLeaseID = lcr.ID
l.prevLeaseExpirationTime = now.Add(time.Duration(ttl) * time.Second)
// refresh count
metrics.UpdateLeaseObjectCount(l.leaseAttachedObjectCount)
l.leaseAttachedObjectCount = 1
return lcr.ID, nil
}
// getReuseDurationSecondsLocked returns the reusable duration in seconds
// based on the configuration. Lock has to be acquired before calling this
// function.
func (l *leaseManager) getReuseDurationSecondsLocked(ttl int64) int64 {
reuseDurationSeconds := int64(l.leaseReuseDurationPercent * float64(ttl))
if reuseDurationSeconds > l.leaseReuseDurationSeconds {
reuseDurationSeconds = l.leaseReuseDurationSeconds
}
return reuseDurationSeconds
}