-
Notifications
You must be signed in to change notification settings - Fork 124
/
eviction.go
219 lines (189 loc) · 6.65 KB
/
eviction.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
/*
Copyright 2019 The Machine Controller Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package eviction
import (
"context"
"fmt"
"sync"
"time"
evictiontypes "github.com/kubermatic/machine-controller/pkg/node/eviction/types"
corev1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
"k8s.io/klog"
ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"
)
type NodeEviction struct {
ctx context.Context
nodeName string
client ctrlruntimeclient.Client
kubeClient kubernetes.Interface
}
// New returns a new NodeEviction
func New(ctx context.Context, nodeName string, client ctrlruntimeclient.Client, kubeClient kubernetes.Interface) *NodeEviction {
return &NodeEviction{
ctx: ctx,
nodeName: nodeName,
client: client,
kubeClient: kubeClient,
}
}
// Run excutes the eviction
func (ne *NodeEviction) Run() (bool, error) {
node := &corev1.Node{}
if err := ne.client.Get(ne.ctx, types.NamespacedName{Name: ne.nodeName}, node); err != nil {
return false, fmt.Errorf("failed to get node from lister: %v", err)
}
if _, exists := node.Annotations[evictiontypes.SkipEvictionAnnotationKey]; exists {
klog.V(3).Infof("Skipping eviction for node %s as it has a %s annotation", ne.nodeName, evictiontypes.SkipEvictionAnnotationKey)
return false, nil
}
klog.V(3).Infof("Starting to evict node %s", ne.nodeName)
if err := ne.cordonNode(node); err != nil {
return false, fmt.Errorf("failed to cordon node %s: %v", ne.nodeName, err)
}
klog.V(6).Infof("Successfully cordoned node %s", ne.nodeName)
podsToEvict, err := ne.getFilteredPods()
if err != nil {
return false, fmt.Errorf("failed to get Pods to evict for node %s: %v", ne.nodeName, err)
}
klog.V(6).Infof("Found %v pods to evict for node %s", len(podsToEvict), ne.nodeName)
if len(podsToEvict) == 0 {
return false, nil
}
// If we arrived here we have pods to evict, so tell the controller to retry later
if errs := ne.evictPods(podsToEvict); len(errs) > 0 {
return true, fmt.Errorf("failed to evict pods, errors encountered: %v", errs)
}
klog.V(6).Infof("Successfully created evictions for all pods on node %s!", ne.nodeName)
return true, nil
}
func (ne *NodeEviction) cordonNode(node *corev1.Node) error {
if !node.Spec.Unschedulable {
_, err := ne.updateNode(func(n *corev1.Node) {
n.Spec.Unschedulable = true
})
if err != nil {
return err
}
}
// Be paranoid and wait until the change got propagated to the lister
// This assumes that the delay between our lister and the APIserver
// is smaller or equal to the delay the schedulers lister has - If
// that is not the case, there is a small chance the scheduler schedules
// pods in between, those will then get deleted upon node deletion and
// not evicted
return wait.Poll(1*time.Second, 10*time.Second, func() (bool, error) {
node := &corev1.Node{}
if err := ne.client.Get(ne.ctx, types.NamespacedName{Name: ne.nodeName}, node); err != nil {
return false, err
}
if node.Spec.Unschedulable {
return true, nil
}
return false, nil
})
}
func (ne *NodeEviction) getFilteredPods() ([]corev1.Pod, error) {
// The lister-backed client from the mgr automatically creates a lister for all objects requested through it.
// We explicitly do not want that for pods, hence we have to use the kubernetes core client
// TODO @alvaroaleman: Add source code ref for this
pods, err := ne.kubeClient.CoreV1().Pods(metav1.NamespaceAll).List(metav1.ListOptions{
FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": ne.nodeName}).String(),
})
if err != nil {
return nil, fmt.Errorf("failed to list pods: %v", err)
}
var filteredPods []corev1.Pod
for _, candidatePod := range pods.Items {
if candidatePod.Status.Phase == corev1.PodSucceeded || candidatePod.Status.Phase == corev1.PodFailed {
continue
}
if controllerRef := metav1.GetControllerOf(&candidatePod); controllerRef != nil && controllerRef.Kind == "DaemonSet" {
continue
}
if _, found := candidatePod.ObjectMeta.Annotations[corev1.MirrorPodAnnotationKey]; found {
continue
}
filteredPods = append(filteredPods, candidatePod)
}
return filteredPods, nil
}
func (ne *NodeEviction) evictPods(pods []corev1.Pod) []error {
errCh := make(chan error, len(pods))
retErrs := []error{}
var wg sync.WaitGroup
var isDone bool
defer func() { isDone = true }()
wg.Add(len(pods))
for _, pod := range pods {
go func(p corev1.Pod) {
defer wg.Done()
for {
if isDone {
return
}
err := ne.evictPod(&p)
if err == nil || kerrors.IsNotFound(err) {
klog.V(6).Infof("Successfully evicted pod %s/%s on node %s", p.Namespace, p.Name, ne.nodeName)
return
} else if kerrors.IsTooManyRequests(err) {
// PDB prevents eviction, return and make the controller retry later
return
} else {
errCh <- fmt.Errorf("error evicting pod %s/%s on node %s: %v", p.Namespace, p.Name, ne.nodeName, err)
return
}
}
}(pod)
}
finished := make(chan struct{})
go func() { wg.Wait(); finished <- struct{}{} }()
select {
case <-finished:
klog.V(6).Infof("All goroutines for eviction pods on node %s finished", ne.nodeName)
break
case err := <-errCh:
klog.V(6).Infof("Got an error from eviction goroutine for node %s: %v", ne.nodeName, err)
retErrs = append(retErrs, err)
}
return retErrs
}
func (ne *NodeEviction) evictPod(pod *corev1.Pod) error {
eviction := &policy.Eviction{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
},
}
return ne.kubeClient.PolicyV1beta1().Evictions(eviction.Namespace).Evict(eviction)
}
func (ne *NodeEviction) updateNode(modify func(*corev1.Node)) (*corev1.Node, error) {
node := &corev1.Node{}
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if err := ne.client.Get(ne.ctx, types.NamespacedName{Name: ne.nodeName}, node); err != nil {
return err
}
// Apply modifications
modify(node)
// Update the node
return ne.client.Update(ne.ctx, node)
})
return node, err
}