forked from Azure/acs-engine
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cordondrainvm.go
219 lines (195 loc) · 5.87 KB
/
cordondrainvm.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
package operations
import (
"fmt"
"strings"
"time"
"github.com/Azure/acs-engine/pkg/armhelpers"
log "github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/pkg/api/v1"
)
const (
interval = time.Second * 1
mirrorPodAnnotation = "kubernetes.io/config.mirror"
// This is checked into K8s code but I was getting into vendoring issues so I copied it here instead
kubernetesOptimisticLockErrorMsg = "the object has been modified; please apply your changes to the latest version and try again"
cordonMaxRetries = 5
)
type drainOperation struct {
client armhelpers.KubernetesClient
node *v1.Node
logger *log.Entry
timeout time.Duration
}
type podFilter func(v1.Pod) bool
// SafelyDrainNode safely drains a node so that it can be deleted from the cluster
func SafelyDrainNode(az armhelpers.ACSEngineClient, logger *log.Entry, masterURL, kubeConfig, nodeName string, timeout time.Duration) error {
//get client using kubeconfig
client, err := az.GetKubernetesClient(masterURL, kubeConfig, interval, timeout)
if err != nil {
return err
}
return SafelyDrainNodeWithClient(client, logger, nodeName, timeout)
}
// SafelyDrainNodeWithClient safely drains a node so that it can be deleted from the cluster
func SafelyDrainNodeWithClient(client armhelpers.KubernetesClient, logger *log.Entry, nodeName string, timeout time.Duration) error {
//Mark the node unschedulable
var node *v1.Node
var err error
for i := 0; i < cordonMaxRetries; i++ {
node, err = client.GetNode(nodeName)
if err != nil {
return err
}
node.Spec.Unschedulable = true
node, err = client.UpdateNode(node)
if err != nil {
// If this error is because of a concurrent modification get the update
// and then apply the change
if strings.Contains(err.Error(), kubernetesOptimisticLockErrorMsg) {
logger.Infof("Node %s got an error suggesting a concurrent modification. Will retry to cordon", nodeName)
continue
}
return err
}
break
}
logger.Infof("Node %s has been marked unschedulable.", nodeName)
//Evict pods in node
drainOp := &drainOperation{client: client, node: node, logger: logger, timeout: timeout}
return drainOp.deleteOrEvictPodsSimple()
}
func (o *drainOperation) deleteOrEvictPodsSimple() error {
pods, err := o.getPodsForDeletion()
if err != nil {
return err
}
o.logger.Infof("%d pods need to be removed/deleted", len(pods))
err = o.deleteOrEvictPods(pods)
if err != nil {
pendingPods, newErr := o.getPodsForDeletion()
if newErr != nil {
return newErr
}
o.logger.Errorf("There are pending pods when an error occurred: %v\n", err)
for _, pendingPod := range pendingPods {
o.logger.Errorf("%s/%s\n", "pod", pendingPod.Name)
}
}
return err
}
func mirrorPodFilter(pod v1.Pod) bool {
if _, found := pod.ObjectMeta.Annotations[mirrorPodAnnotation]; found {
return false
}
return true
}
func getControllerRef(pod *v1.Pod) *metav1.OwnerReference {
for _, ref := range pod.ObjectMeta.OwnerReferences {
if ref.Controller != nil && *ref.Controller {
return &ref
}
}
return nil
}
func daemonSetPodFilter(pod v1.Pod) bool {
controllerRef := getControllerRef(&pod)
// Kubectl goes and verifies this controller exists in the api server to make sure it isn't orphaned
// we are deleting orphaned pods so we don't care and delete any that aren't a daemonset
if controllerRef == nil || controllerRef.Kind != "DaemonSet" {
return true
}
// Don't delete/evict daemonsets as they will just come back
// and can deleting/evicting them can cause service disruptions
return false
}
// getPodsForDeletion returns all the pods we're going to delete. If there are
// any pods preventing us from deleting, we return that list in an error.
func (o *drainOperation) getPodsForDeletion() (pods []v1.Pod, err error) {
podList, err := o.client.ListPods(o.node)
if err != nil {
return pods, err
}
for _, pod := range podList.Items {
podOk := true
for _, filt := range []podFilter{
mirrorPodFilter,
daemonSetPodFilter,
} {
podOk = podOk && filt(pod)
}
if podOk {
pods = append(pods, pod)
}
}
return pods, nil
}
// deleteOrEvictPods deletes or evicts the pods on the api server
func (o *drainOperation) deleteOrEvictPods(pods []v1.Pod) error {
if len(pods) == 0 {
return nil
}
policyGroupVersion, err := o.client.SupportEviction()
if err != nil {
return err
}
if len(policyGroupVersion) > 0 {
return o.evictPods(pods, policyGroupVersion)
}
return o.deletePods(pods)
}
func (o *drainOperation) evictPods(pods []v1.Pod, policyGroupVersion string) error {
doneCh := make(chan bool, len(pods))
errCh := make(chan error, 1)
for _, pod := range pods {
go func(pod v1.Pod, doneCh chan bool, errCh chan error) {
var err error
for {
err = o.client.EvictPod(&pod, policyGroupVersion)
if err == nil {
break
} else if apierrors.IsNotFound(err) {
doneCh <- true
return
} else if apierrors.IsTooManyRequests(err) {
time.Sleep(5 * time.Second)
} else {
errCh <- fmt.Errorf("error when evicting pod %q: %v", pod.Name, err)
return
}
}
podArray := []v1.Pod{pod}
_, err = o.client.WaitForDelete(o.logger, podArray, true)
if err == nil {
doneCh <- true
} else {
errCh <- fmt.Errorf("error when waiting for pod %q terminating: %v", pod.Name, err)
}
}(pod, doneCh, errCh)
}
doneCount := 0
for {
select {
case err := <-errCh:
return err
case <-doneCh:
doneCount++
if doneCount == len(pods) {
return nil
}
case <-time.After(o.timeout):
return fmt.Errorf("Drain did not complete within %v", o.timeout)
}
}
}
func (o *drainOperation) deletePods(pods []v1.Pod) error {
for _, pod := range pods {
err := o.client.DeletePod(&pod)
if err != nil && !apierrors.IsNotFound(err) {
return err
}
}
_, err := o.client.WaitForDelete(o.logger, pods, false)
return err
}