-
Notifications
You must be signed in to change notification settings - Fork 298
/
etcddefrag_controller.go
275 lines (230 loc) · 9.35 KB
/
etcddefrag_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
package etcddefrag
import (
"context"
"errors"
"fmt"
"math"
"time"
"github.com/go-logr/logr"
"go.etcd.io/etcd/api/v3/etcdserverpb"
clientv3 "go.etcd.io/etcd/client/v3"
"k8s.io/apimachinery/pkg/util/wait"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"github.com/openshift/hypershift/pkg/etcdcli"
"github.com/openshift/hypershift/support/upsert"
"github.com/openshift/library-go/pkg/operator/events"
)
const (
pollWaitDuration = 2 * time.Second
pollTimeoutDuration = 60 * time.Second
maxDefragFailuresBeforeDegrade = 3
minDefragBytes int64 = 100 * 1024 * 1024 // 100MB
minDefragWaitDuration = 36 * time.Second
maxFragmentedPercentage float64 = 45
controllerRequeueDuration = 10 * time.Minute
)
type DefragController struct {
client.Client
log logr.Logger
ControllerName string
upsert.CreateOrUpdateProvider
etcdClient etcdcli.EtcdClient
numDefragFailures int
defragWaitDuration time.Duration
}
type defragTicker struct {
defrag *DefragController
}
func (r *DefragController) setupTicker(mgr manager.Manager) error {
ticker := defragTicker{
defrag: r,
}
if err := mgr.Add(&ticker); err != nil {
return fmt.Errorf("failed to add defrag ticker runnable to manager: %w", err)
}
return nil
}
func (m *defragTicker) Start(ctx context.Context) error {
ticker := time.NewTicker(controllerRequeueDuration)
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
m.defrag.log.Info("Running defrag.")
if err := m.defrag.runDefrag(ctx); err != nil {
m.defrag.log.Error(err, "failed to run defragmentation cycle")
}
}
}
}
func (r *DefragController) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
endpointsFunc := func() ([]string, error) {
return r.etcdEndpoints(ctx)
}
r.etcdClient = etcdcli.NewEtcdClient(endpointsFunc, events.NewLoggingEventRecorder(r.ControllerName))
// Set this so that it will immediately requeue itself.
r.defragWaitDuration = minDefragWaitDuration
if err := r.setupTicker(mgr); err != nil {
return fmt.Errorf("failed to set up ticker: %w", err)
}
return nil
}
func (r *DefragController) etcdEndpoints(ctx context.Context) ([]string, error) {
var eplist []string
// Because we are part of the etcd pod, we can just use localhost.
// The client itself will discover the other endpoints.
eplist = append(eplist, "https://localhost:2379")
return eplist, nil
}
/*
Everything from here down is from the cluster-etcd-controller code.
It's been modified mostly to replace 'c' with 'r' as the object name.
Also the logging has been changed.
https://github.com/openshift/cluster-etcd-operator/blob/master/pkg/operator/defragcontroller/defragcontroller.go
https://github.com/openshift/cluster-etcd-operator/tree/master/pkg/etcdcli
*/
func (r *DefragController) runDefrag(ctx context.Context) error {
// Do not defrag if any of the cluster members are unhealthy.
members, err := r.etcdClient.MemberList(ctx)
if err != nil {
return err
}
r.log.Info("Checking status for Defrag", "members", members)
for _, m := range members {
status, err := r.etcdClient.Status(ctx, m.ClientURLs[0])
if err != nil {
r.log.Error(err, "Member returned error", "member", m)
} else {
fragmentedPercentage := checkFragmentationPercentage(status.DbSize, status.DbSizeInUse)
r.log.Info("Member", "name", m.Name, "URL", m.ClientURLs[0], "fragmentation percentage", fragmentedPercentage, "DBSize on disk", status.DbSize, "DBSize in use", status.DbSizeInUse, "leader", status.Leader)
}
}
memberHealth, err := r.etcdClient.MemberHealth(ctx)
if err != nil {
return err
}
if !etcdcli.IsClusterHealthy(memberHealth) {
r.log.Error(err, "Cluster is unhealthy", "status", memberHealth.Status())
return fmt.Errorf("cluster is unhealthy, status: %s", memberHealth.Status())
}
// filter out learner members since they don't support the defragment API call
var etcdMembers []*etcdserverpb.Member
for _, m := range members {
if !m.IsLearner {
etcdMembers = append(etcdMembers, m)
}
}
var endpointStatus []*clientv3.StatusResponse
var leader *clientv3.StatusResponse
for _, member := range etcdMembers {
if len(member.ClientURLs) == 0 {
// skip unstarted member
continue
}
status, err := r.etcdClient.Status(ctx, member.ClientURLs[0])
if err != nil {
return err
}
if leader == nil && status.Leader == member.ID {
leader = status
continue
}
endpointStatus = append(endpointStatus, status)
}
// Leader last if possible.
if leader != nil {
r.log.Info("Appending leader last", "ID", leader.Header.MemberId)
endpointStatus = append(endpointStatus, leader)
}
successfulDefrags := 0
var errs []error
for _, status := range endpointStatus {
member, err := getMemberFromStatus(etcdMembers, status)
if err != nil {
errs = append(errs, err)
continue
}
// Check each member's status which includes the db size on disk "DbSize" and the db size in use "DbSizeInUse"
// compare the % difference and if that difference is over the max diff threshold and also above the minimum
// db size we defrag the members state file. In the case where this command only partially completed controller
// can clean that up on the next sync. Having the db sizes slightly different is not a problem in itself.
if r.isEndpointBackendFragmented(member, status) {
fragmentedPercentage := checkFragmentationPercentage(status.DbSize, status.DbSizeInUse)
r.log.Info("Member is over defrag threshold", "name", member.Name, "URL", member.ClientURLs[0], "fragmentation percentage", fragmentedPercentage, "DBSize on disk", status.DbSize, "DBSize in use", status.DbSizeInUse, "leader", status.Leader)
if _, err := r.etcdClient.Defragment(ctx, member); err != nil {
// Defrag can timeout if defragmentation takes longer than etcdcli.DefragDialTimeout.
r.log.Error(err, "DefragController Defragment Failed", "member", member.Name, "ID", member.ID)
errMsg := fmt.Sprintf("failed defrag on member: %s, memberID: %x: %v", member.Name, member.ID, err)
errs = append(errs, fmt.Errorf(errMsg))
continue
}
r.log.Info("DefragController Defragment Success", "member", member.Name, "ID", member.ID)
successfulDefrags++
// Give cluster time to recover before we move to the next member.
if err := wait.Poll(
pollWaitDuration,
pollTimeoutDuration,
func() (bool, error) {
// Ensure defragmentation attempts have clear observable signal.
r.log.Info("Sleeping to allow cluster to recover before defragging next member", "waiting", r.defragWaitDuration)
time.Sleep(r.defragWaitDuration)
memberHealth, err := r.etcdClient.MemberHealth(ctx)
if err != nil {
r.log.Error(err, "Failed checking member health")
return false, nil
}
if !etcdcli.IsClusterHealthy(memberHealth) {
r.log.Info("Cluster member is unhealthy", "member status", memberHealth.Status())
return false, nil
}
return true, nil
}); err != nil {
errs = append(errs, fmt.Errorf("timeout waiting for cluster to stabilize after defrag: %w", err))
}
} else {
// no fragmentation needed is also a success
successfulDefrags++
}
}
if successfulDefrags != len(endpointStatus) {
r.numDefragFailures++
r.log.Info("DefragController Defragment Partial Failure", "successfully defragged", successfulDefrags, "of members", len(endpointStatus), "tries remaining", maxDefragFailuresBeforeDegrade-r.numDefragFailures)
// TODO: This should bubble up to HCP condition errors.
return errors.Join(errs...)
}
if len(errs) > 0 {
r.log.Info("found errors even though all members have been successfully defragmented", "error", errors.Join(errs...))
}
return nil
}
// isEndpointBackendFragmented checks the status of all cluster members to ensure that no members have a fragmented store.
// This can happen if the operator starts defrag of the cluster but then loses leader status and is rescheduled before
// the operator can defrag all members.
func (r *DefragController) isEndpointBackendFragmented(member *etcdserverpb.Member, endpointStatus *clientv3.StatusResponse) bool {
if endpointStatus == nil {
r.log.Error(nil, "endpoint status validation failed", "status", endpointStatus)
return false
}
fragmentedPercentage := checkFragmentationPercentage(endpointStatus.DbSize, endpointStatus.DbSizeInUse)
r.log.Info("Etcd member backend store fragmentation status", "name", member.Name, "URL", member.ClientURLs[0], "fragmentation percentage", fragmentedPercentage, "DBSize on disk", endpointStatus.DbSize, "DBSize in use", endpointStatus.DbSizeInUse)
return fragmentedPercentage >= maxFragmentedPercentage && endpointStatus.DbSize >= minDefragBytes
}
func checkFragmentationPercentage(ondisk, inuse int64) float64 {
diff := float64(ondisk - inuse)
fragmentedPercentage := (diff / float64(ondisk)) * 100
return math.Round(fragmentedPercentage*100) / 100
}
func getMemberFromStatus(members []*etcdserverpb.Member, endpointStatus *clientv3.StatusResponse) (*etcdserverpb.Member, error) {
if endpointStatus == nil {
return nil, fmt.Errorf("endpoint status validation failed: %v", endpointStatus)
}
for _, member := range members {
if member.ID == endpointStatus.Header.MemberId {
return member, nil
}
}
return nil, fmt.Errorf("no member found in MemberList matching ID: %v", endpointStatus.Header.MemberId)
}