-
Notifications
You must be signed in to change notification settings - Fork 18
/
lease.go
182 lines (159 loc) · 6.92 KB
/
lease.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
package resources
import (
"context"
"fmt"
"time"
"github.com/go-logr/logr"
"github.com/medik8s/common/pkg/lease"
coordv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)
var (
templateSuffix = "Template"
//LeaseBuffer is used to make sure we have a bit of buffer before extending the lease, so it won't be taken by another component
LeaseBuffer = time.Minute
RequeueIfLeaseTaken = time.Minute
//max times lease would be extended - this is a conceptual variable used to calculate max time lease can be held
maxTimesToExtendLease = 2
)
type LeaseOverDueError struct {
msg string
}
func (e LeaseOverDueError) Error() string {
return e.msg
}
type LeaseManager interface {
// ObtainNodeLease will attempt to get a node lease with the correct duration, the duration is affected by whether escalation is used and the remediation timeOut.
//The first return value (*time.Duration) is an indicator on when a new reconcile should be scheduled (mainly in order to extend the lease)
ObtainNodeLease(ctx context.Context, nodeName string, currentRemediationDuration time.Duration) (*time.Duration, error)
//ManageLease extends or releases a lease based on the CR status, type of remediation and how long the lease is already leased
ManageLease(ctx context.Context, nodeName string, currentRemediationDuration, previousRemediationsDuration time.Duration) (time.Duration, error)
// InvalidateLease invalidates the lease for the node with the given name
InvalidateLease(ctx context.Context, nodeName string) error
}
type nhcLeaseManager struct {
client client.Client
commonLeaseManager lease.Manager
holderIdentity string
log logr.Logger
}
func NewLeaseManager(client client.Client, holderIdent string, log logr.Logger) (LeaseManager, error) {
newManager, err := lease.NewManager(client, holderIdent)
if err != nil {
log.Error(err, "couldn't initialize lease manager")
return nil, err
}
return &nhcLeaseManager{
client: client,
commonLeaseManager: newManager,
holderIdentity: holderIdent,
log: log.WithName("nhc lease manager"),
}, nil
}
func (m *nhcLeaseManager) ObtainNodeLease(ctx context.Context, nodeName string, currentRemediationDuration time.Duration) (*time.Duration, error) {
leaseDurationWithBuffer := currentRemediationDuration + LeaseBuffer
node := &corev1.Node{}
if err := m.client.Get(context.Background(), types.NamespacedName{Name: nodeName}, node); err != nil {
m.log.Error(err, "couldn't obtain node lease node error getting node", "node name", nodeName)
return nil, err
}
if err := m.commonLeaseManager.RequestLease(ctx, node, leaseDurationWithBuffer); err != nil {
if _, ok := err.(lease.AlreadyHeldError); ok {
m.log.Info("can't acquire node lease, it is already owned by another owner", "already held error", err)
return &RequeueIfLeaseTaken, err
}
m.log.Error(err, "couldn't obtain lease for node", "node name", nodeName)
return nil, err
}
//all good lease created with wanted duration
return ¤tRemediationDuration, nil
}
func (m *nhcLeaseManager) ManageLease(ctx context.Context, nodeName string, currentRemediationDuration, previousRemediationsDuration time.Duration) (time.Duration, error) {
node := &corev1.Node{}
if err := m.client.Get(context.Background(), types.NamespacedName{Name: nodeName}, node); err != nil {
m.log.Error(err, "couldn't obtain node lease node error getting node", "node name", nodeName)
return 0, err
}
nodeLease, err := m.commonLeaseManager.GetLease(ctx, node)
if err != nil {
if errors.IsNotFound(err) {
return 0, nil
}
m.log.Error(err, "managing lease - couldn't fetch lease", "node name", nodeName)
return 0, err
}
//nothing to do with this lease
if !m.isLeaseOwner(nodeLease) {
return 0, nil
}
if isLeaseOverdue, err := m.isLeaseOverdue(nodeLease, currentRemediationDuration, previousRemediationsDuration); err != nil {
return 0, err
} else if isLeaseOverdue { //release the lease - lease is overdue
m.log.Info("managing lease - lease is overdue about to be removed", "lease name", nodeLease.Name)
if err = m.commonLeaseManager.InvalidateLease(ctx, node); err != nil {
m.log.Error(err, "failed to invalidate overdue lease", "node name", nodeName)
return 0, err
}
return 0, LeaseOverDueError{msg: fmt.Sprintf("failed to extend lease, it is overdue. node name: %s", nodeName)}
}
m.log.Info("managing lease - about to try to acquire/extended the lease", "lease name", nodeLease.Name, "NHC is lease owner", m.isLeaseOwner(nodeLease), "lease expiration time", currentRemediationDuration)
now := time.Now()
expectedExpiry := now.Add(currentRemediationDuration)
actualExpiry := nodeLease.Spec.RenewTime.Add(time.Second * time.Duration(int(*nodeLease.Spec.LeaseDurationSeconds)))
if actualExpiry.Before(expectedExpiry) {
err := m.commonLeaseManager.RequestLease(ctx, node, currentRemediationDuration+LeaseBuffer)
if err != nil {
m.log.Error(err, "couldn't renew lease", "lease name", nodeLease.Name)
return 0, err
}
}
return currentRemediationDuration, nil
}
func (m *nhcLeaseManager) InvalidateLease(ctx context.Context, nodeName string) error {
// node might be deleted already, so build it manually
node := &corev1.Node{
TypeMeta: metav1.TypeMeta{
Kind: "Node",
APIVersion: corev1.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: nodeName,
},
}
err := m.commonLeaseManager.InvalidateLease(ctx, node)
if err != nil {
if _, ok := err.(lease.AlreadyHeldError); ok {
// lease exists but isn't owned by us, can be ignored
return nil
}
m.log.Error(err, "failed to invalidate lease", "node name", nodeName)
return err
}
return nil
}
func (m *nhcLeaseManager) isLeaseOverdue(l *coordv1.Lease, currentRemediationDuration, previousRemediationsDuration time.Duration) (bool, error) {
if l.Spec.AcquireTime == nil {
err := fmt.Errorf("lease Spec.AcquireTime is nil")
m.log.Error(err, "lease Spec.AcquireTime is nil", "lease name", l.Name)
return false, err
}
isLeaseOverdue := time.Now().After(m.calcLeaseExpiration(l, currentRemediationDuration, previousRemediationsDuration))
return isLeaseOverdue, nil
}
func (m *nhcLeaseManager) calcLeaseExpiration(l *coordv1.Lease, currentRemediationDuration, previousRemediationsDuration time.Duration) time.Time {
return l.Spec.AcquireTime.Add(time.Duration(maxTimesToExtendLease+1 /*1 is offsetting the lease creation*/)*currentRemediationDuration + previousRemediationsDuration)
}
func (m *nhcLeaseManager) isRemediationsExist(remediationCrs []unstructured.Unstructured) bool {
return len(remediationCrs) > 0
}
func (m *nhcLeaseManager) isLeaseOwner(l *coordv1.Lease) bool {
if l.Spec.HolderIdentity == nil {
return false
}
return *l.Spec.HolderIdentity == m.holderIdentity
}