/
mainloop.go
169 lines (133 loc) · 4.76 KB
/
mainloop.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package cmd
import (
"context"
"time"
"github.com/pkg/errors"
"github.com/rebuy-de/node-drainer/v2/pkg/collectors"
"github.com/rebuy-de/node-drainer/v2/pkg/collectors/kube/node"
"github.com/rebuy-de/rebuy-go-sdk/v3/pkg/logutil"
"github.com/rebuy-de/rebuy-go-sdk/v3/pkg/syncutil"
)
// MainLoop does the actual node-drainer actions. When any client cache
// changes, it starts a new update loop and checks whether an action is
// required.
type MainLoop struct {
triggerLoop *syncutil.SignalEmitter
signaler syncutil.Signaler
failureCount int
collectors collectors.Collectors
}
// NewMainLoop initializes a MainLoop.
func NewMainLoop(collectors collectors.Collectors) *MainLoop {
ml := new(MainLoop)
ml.collectors = collectors
ml.triggerLoop = new(syncutil.SignalEmitter)
ml.signaler = syncutil.SignalerFromEmitters(
ml.triggerLoop,
ml.collectors.EC2.SignalEmitter(),
ml.collectors.ASG.SignalEmitter(),
ml.collectors.Spot.SignalEmitter(),
//ml.collectors.Node.SignalEmitter(),
//ml.collectors.Pod.SignalEmitter(),
)
return ml
}
// Healthy indicates whether the background job is running correctly.
func (l *MainLoop) Healthy() bool {
return l.failureCount == 0
}
// Run starts the mainloop.
func (l *MainLoop) Run(ctx context.Context) error {
ctx = logutil.Start(ctx, "mainloop")
logutil.Get(ctx).Debug("waiting for EC2 cache to warm up")
for ctx.Err() == nil {
if len(l.collectors.EC2.List()) > 0 {
break
}
time.Sleep(100 * time.Millisecond)
}
logutil.Get(ctx).Debug("waiting for EC2 cache done")
for ctx.Err() == nil {
err := l.runOnce(ctx)
if err != nil {
logutil.Get(ctx).
WithError(errors.WithStack(err)).
Errorf("main loop run failed %d times in a row", l.failureCount)
l.failureCount++
} else {
l.failureCount = 0
}
time.Sleep(1 * time.Second)
<-l.signaler.C(ctx, time.Minute)
}
return nil
}
func (l *MainLoop) runOnce(ctx context.Context) error {
// Implementation Detail: This function uses a lot of for loops that
// iterate of the filtered instances. While we mostly return on the first
// iteration, it still makes sense to use the loops, because:
// * No need to check explicitly whether the filtered list is empty.
// * Possibility to skip an item when some additional condition is not met.
// Also it makes sense to be consistent with the usage of loops, because it
// makes the code more readable (ie visual separation of steps by loop
// blocks).
// Another One: A call of this function should only do a single action (eg
// evict a pod or complete an instance lifecycle). This is, because the
// action can change the underlying data, but this is not reflected in the
// local variables. Additionally it is easier to restart the whole thing
// after any condition is met than having to keep track about possible side
// effects of any action. One example:
// * Evicting a pod makes all other pods with the same owner (eg
// Deployment) unevictable, but that data is not updated in the local
// variables. We could implement things to make it update, but is easier
// to just restart the loop every time.
ctx = logutil.Start(ctx, "loop")
instances, pods := collectors.Combine(l.collectors.List(ctx))
instances = instances.
Sort(collectors.InstancesByLaunchTime).SortReverse(collectors.InstancesByTriggeredAt)
InstMainLoopStarted(ctx, instances, pods)
for _, instance := range instances.Select(InstancesThatNeedCordon()) {
InstMainLoopCordoningInstance(ctx, instance)
err := l.collectors.Node.Taint(ctx, instance.Node, TaintSoft, node.TaintEffectNoSchedule)
if err != nil {
return errors.Wrap(err, "failed to apply soft taint")
}
l.triggerLoop.Emit()
return nil
}
for _, instance := range instances.Select(InstancesThatNeedLifecycleCompletion()) {
InstMainLoopCompletingInstance(ctx, instance)
err := l.collectors.ASG.Complete(ctx, instance.InstanceID)
if err != nil {
return errors.Wrap(err, "failed to mark node as complete")
}
l.triggerLoop.Emit()
return nil
}
for _, instance := range instances.Select(InstancesThanNeedLifecycleDeletion()) {
InstMainLoopDeletingLifecycleMessage(ctx, instance)
age := time.Since(instance.ASG.TriggeredAt)
if age < 30*time.Minute {
InstMainLoopDeletingLifecycleMessageAgeSanityCheckFailed(ctx, instance, age)
l.triggerLoop.Emit() // we need to retry
continue
}
err := l.collectors.ASG.Delete(ctx, instance.InstanceID)
if err != nil {
return errors.Wrap(err, "failed to delete message")
}
l.triggerLoop.Emit()
return nil
}
for _, pod := range pods.Select(PodsReadyForEviction()) {
InstMainLoopEvictPod(ctx, pod)
err := l.collectors.Pod.Evict(ctx, &pod.Pod)
if err != nil {
return errors.Wrap(err, "failed to evict pod")
}
l.triggerLoop.Emit()
return nil
}
InstMainLoopNoop(ctx)
return nil
}