-
Notifications
You must be signed in to change notification settings - Fork 2.4k
/
node_lifecycle_controller.go
1221 lines (1114 loc) · 46.5 KB
/
node_lifecycle_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// The Controller sets tainted annotations on nodes.
// Tainted nodes should not be used for new work loads and
// some effort should be given to getting existing work
// loads off of tainted nodes.
package nodelifecycle
import (
"fmt"
"sync"
"time"
"k8s.io/klog"
"k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
extensionsinformers "k8s.io/client-go/informers/extensions/v1beta1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
extensionslisters "k8s.io/client-go/listers/extensions/v1beta1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/client-go/util/workqueue"
v1node "k8s.io/kubernetes/pkg/api/v1/node"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler"
nodeutil "k8s.io/kubernetes/pkg/controller/util/node"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
"k8s.io/kubernetes/pkg/util/metrics"
utilnode "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/util/system"
taintutils "k8s.io/kubernetes/pkg/util/taints"
)
func init() {
// Register prometheus metrics
Register()
}
var (
// UnreachableTaintTemplate is the taint for when a node becomes unreachable.
UnreachableTaintTemplate = &v1.Taint{
Key: schedulerapi.TaintNodeUnreachable,
Effect: v1.TaintEffectNoExecute,
}
// NotReadyTaintTemplate is the taint for when a node is not ready for
// executing pods
NotReadyTaintTemplate = &v1.Taint{
Key: schedulerapi.TaintNodeNotReady,
Effect: v1.TaintEffectNoExecute,
}
// map {NodeConditionType: {ConditionStatus: TaintKey}}
// represents which NodeConditionType under which ConditionStatus should be
// tainted with which TaintKey
// for certain NodeConditionType, there are multiple {ConditionStatus,TaintKey} pairs
nodeConditionToTaintKeyStatusMap = map[v1.NodeConditionType]map[v1.ConditionStatus]string{
v1.NodeReady: {
v1.ConditionFalse: schedulerapi.TaintNodeNotReady,
v1.ConditionUnknown: schedulerapi.TaintNodeUnreachable,
},
v1.NodeMemoryPressure: {
v1.ConditionTrue: schedulerapi.TaintNodeMemoryPressure,
},
v1.NodeOutOfDisk: {
v1.ConditionTrue: schedulerapi.TaintNodeOutOfDisk,
},
v1.NodeDiskPressure: {
v1.ConditionTrue: schedulerapi.TaintNodeDiskPressure,
},
v1.NodeNetworkUnavailable: {
v1.ConditionTrue: schedulerapi.TaintNodeNetworkUnavailable,
},
v1.NodePIDPressure: {
v1.ConditionTrue: schedulerapi.TaintNodePIDPressure,
},
}
taintKeyToNodeConditionMap = map[string]v1.NodeConditionType{
schedulerapi.TaintNodeNotReady: v1.NodeReady,
schedulerapi.TaintNodeUnreachable: v1.NodeReady,
schedulerapi.TaintNodeNetworkUnavailable: v1.NodeNetworkUnavailable,
schedulerapi.TaintNodeMemoryPressure: v1.NodeMemoryPressure,
schedulerapi.TaintNodeOutOfDisk: v1.NodeOutOfDisk,
schedulerapi.TaintNodeDiskPressure: v1.NodeDiskPressure,
schedulerapi.TaintNodePIDPressure: v1.NodePIDPressure,
}
)
// ZoneState is the state of a given zone.
type ZoneState string
const (
stateInitial = ZoneState("Initial")
stateNormal = ZoneState("Normal")
stateFullDisruption = ZoneState("FullDisruption")
statePartialDisruption = ZoneState("PartialDisruption")
)
const (
// The amount of time the nodecontroller should sleep between retrying node health updates
retrySleepTime = 20 * time.Millisecond
)
type nodeHealthData struct {
probeTimestamp metav1.Time
readyTransitionTimestamp metav1.Time
status *v1.NodeStatus
}
// Controller is the controller that manages node's life cycle.
type Controller struct {
taintManager *scheduler.NoExecuteTaintManager
podInformerSynced cache.InformerSynced
kubeClient clientset.Interface
// This timestamp is to be used instead of LastProbeTime stored in Condition. We do this
// to aviod the problem with time skew across the cluster.
now func() metav1.Time
enterPartialDisruptionFunc func(nodeNum int) float32
enterFullDisruptionFunc func(nodeNum int) float32
computeZoneStateFunc func(nodeConditions []*v1.NodeCondition) (int, ZoneState)
knownNodeSet map[string]*v1.Node
// per Node map storing last observed health together with a local time when it was observed.
nodeHealthMap map[string]*nodeHealthData
// Lock to access evictor workers
evictorLock sync.Mutex
// workers that evicts pods from unresponsive nodes.
zonePodEvictor map[string]*scheduler.RateLimitedTimedQueue
// workers that are responsible for tainting nodes.
zoneNoExecuteTainter map[string]*scheduler.RateLimitedTimedQueue
zoneStates map[string]ZoneState
daemonSetStore extensionslisters.DaemonSetLister
daemonSetInformerSynced cache.InformerSynced
nodeLister corelisters.NodeLister
nodeInformerSynced cache.InformerSynced
recorder record.EventRecorder
// Value controlling Controller monitoring period, i.e. how often does Controller
// check node health signal posted from kubelet. This value should be lower than
// nodeMonitorGracePeriod.
// TODO: Change node health monitor to watch based.
nodeMonitorPeriod time.Duration
// When node is just created, e.g. cluster bootstrap or node creation, we give
// a longer grace period.
nodeStartupGracePeriod time.Duration
// Controller will not proactively sync node health, but will monitor node
// health signal updated from kubelet. There are 2 kinds of node healthiness
// signals: NodeStatus and NodeLease. NodeLease signal is generated only when
// NodeLease feature is enabled. If it doesn't receive update for this amount
// of time, it will start posting "NodeReady==ConditionUnknown". The amount of
// time before which Controller start evicting pods is controlled via flag
// 'pod-eviction-timeout'.
// Note: be cautious when changing the constant, it must work with
// nodeStatusUpdateFrequency in kubelet and renewInterval in NodeLease
// controller. The node health signal update frequency is the minimal of the
// two.
// There are several constraints:
// 1. nodeMonitorGracePeriod must be N times more than the node health signal
// update frequency, where N means number of retries allowed for kubelet to
// post node status/lease. It is pointless to make nodeMonitorGracePeriod
// be less than the node health signal update frequency, since there will
// only be fresh values from Kubelet at an interval of node health signal
// update frequency. The constant must be less than podEvictionTimeout.
// 2. nodeMonitorGracePeriod can't be too large for user experience - larger
// value takes longer for user to see up-to-date node health.
nodeMonitorGracePeriod time.Duration
podEvictionTimeout time.Duration
evictionLimiterQPS float32
secondaryEvictionLimiterQPS float32
largeClusterThreshold int32
unhealthyZoneThreshold float32
// if set to true Controller will start TaintManager that will evict Pods from
// tainted nodes, if they're not tolerated.
runTaintManager bool
// if set to true Controller will taint Nodes with 'TaintNodeNotReady' and 'TaintNodeUnreachable'
// taints instead of evicting Pods itself.
useTaintBasedEvictions bool
// if set to true, NodeController will taint Nodes based on its condition for 'NetworkUnavailable',
// 'MemoryPressure', 'OutOfDisk' and 'DiskPressure'.
taintNodeByCondition bool
nodeUpdateQueue workqueue.Interface
}
// NewNodeLifecycleController returns a new taint controller.
func NewNodeLifecycleController(
podInformer coreinformers.PodInformer,
nodeInformer coreinformers.NodeInformer,
daemonSetInformer extensionsinformers.DaemonSetInformer,
kubeClient clientset.Interface,
nodeMonitorPeriod time.Duration,
nodeStartupGracePeriod time.Duration,
nodeMonitorGracePeriod time.Duration,
podEvictionTimeout time.Duration,
evictionLimiterQPS float32,
secondaryEvictionLimiterQPS float32,
largeClusterThreshold int32,
unhealthyZoneThreshold float32,
runTaintManager bool,
useTaintBasedEvictions bool,
taintNodeByCondition bool) (*Controller, error) {
if kubeClient == nil {
klog.Fatalf("kubeClient is nil when starting Controller")
}
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "node-controller"})
eventBroadcaster.StartLogging(klog.Infof)
klog.Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(
&v1core.EventSinkImpl{
Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events(""),
})
if kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("node_lifecycle_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
}
nc := &Controller{
kubeClient: kubeClient,
now: metav1.Now,
knownNodeSet: make(map[string]*v1.Node),
nodeHealthMap: make(map[string]*nodeHealthData),
recorder: recorder,
nodeMonitorPeriod: nodeMonitorPeriod,
nodeStartupGracePeriod: nodeStartupGracePeriod,
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
zonePodEvictor: make(map[string]*scheduler.RateLimitedTimedQueue),
zoneNoExecuteTainter: make(map[string]*scheduler.RateLimitedTimedQueue),
zoneStates: make(map[string]ZoneState),
podEvictionTimeout: podEvictionTimeout,
evictionLimiterQPS: evictionLimiterQPS,
secondaryEvictionLimiterQPS: secondaryEvictionLimiterQPS,
largeClusterThreshold: largeClusterThreshold,
unhealthyZoneThreshold: unhealthyZoneThreshold,
runTaintManager: runTaintManager,
useTaintBasedEvictions: useTaintBasedEvictions && runTaintManager,
taintNodeByCondition: taintNodeByCondition,
nodeUpdateQueue: workqueue.New(),
}
if useTaintBasedEvictions {
klog.Infof("Controller is using taint based evictions.")
}
nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc
nc.enterFullDisruptionFunc = nc.HealthyQPSFunc
nc.computeZoneStateFunc = nc.ComputeZoneState
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*v1.Pod)
if nc.taintManager != nil {
nc.taintManager.PodUpdated(nil, pod)
}
},
UpdateFunc: func(prev, obj interface{}) {
prevPod := prev.(*v1.Pod)
newPod := obj.(*v1.Pod)
if nc.taintManager != nil {
nc.taintManager.PodUpdated(prevPod, newPod)
}
},
DeleteFunc: func(obj interface{}) {
pod, isPod := obj.(*v1.Pod)
// We can get DeletedFinalStateUnknown instead of *v1.Pod here and we need to handle that correctly.
if !isPod {
deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
klog.Errorf("Received unexpected object: %v", obj)
return
}
pod, ok = deletedState.Obj.(*v1.Pod)
if !ok {
klog.Errorf("DeletedFinalStateUnknown contained non-Pod object: %v", deletedState.Obj)
return
}
}
if nc.taintManager != nil {
nc.taintManager.PodUpdated(pod, nil)
}
},
})
nc.podInformerSynced = podInformer.Informer().HasSynced
if nc.runTaintManager {
podLister := podInformer.Lister()
podGetter := func(name, namespace string) (*v1.Pod, error) { return podLister.Pods(namespace).Get(name) }
nodeLister := nodeInformer.Lister()
nodeGetter := func(name string) (*v1.Node, error) { return nodeLister.Get(name) }
nc.taintManager = scheduler.NewNoExecuteTaintManager(kubeClient, podGetter, nodeGetter)
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error {
nc.taintManager.NodeUpdated(nil, node)
return nil
}),
UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) error {
nc.taintManager.NodeUpdated(oldNode, newNode)
return nil
}),
DeleteFunc: nodeutil.CreateDeleteNodeHandler(func(node *v1.Node) error {
nc.taintManager.NodeUpdated(node, nil)
return nil
}),
})
}
if nc.taintNodeByCondition {
klog.Infof("Controller will taint node by condition.")
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error {
nc.nodeUpdateQueue.Add(node.Name)
return nil
}),
UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
nc.nodeUpdateQueue.Add(newNode.Name)
return nil
}),
})
}
// NOTE(resouer): nodeInformer to substitute deprecated taint key (notReady -> not-ready).
// Remove this logic when we don't need this backwards compatibility
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error {
return nc.doFixDeprecatedTaintKeyPass(node)
}),
UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
return nc.doFixDeprecatedTaintKeyPass(newNode)
}),
})
nc.nodeLister = nodeInformer.Lister()
nc.nodeInformerSynced = nodeInformer.Informer().HasSynced
nc.daemonSetStore = daemonSetInformer.Lister()
nc.daemonSetInformerSynced = daemonSetInformer.Informer().HasSynced
return nc, nil
}
// Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *Controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
klog.Infof("Starting node controller")
defer klog.Infof("Shutting down node controller")
if !controller.WaitForCacheSync("taint", stopCh, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) {
return
}
if nc.runTaintManager {
go nc.taintManager.Run(stopCh)
}
if nc.taintNodeByCondition {
// Close node update queue to cleanup go routine.
defer nc.nodeUpdateQueue.ShutDown()
// Start workers to update NoSchedule taint for nodes.
for i := 0; i < scheduler.UpdateWorkerSize; i++ {
// Thanks to "workqueue", each worker just need to get item from queue, because
// the item is flagged when got from queue: if new event come, the new item will
// be re-queued until "Done", so no more than one worker handle the same item and
// no event missed.
go wait.Until(nc.doNoScheduleTaintingPassWorker, time.Second, stopCh)
}
}
if nc.useTaintBasedEvictions {
// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, stopCh)
} else {
// Managing eviction of nodes:
// When we delete pods off a node, if the node was not empty at the time we then
// queue an eviction watcher. If we hit an error, retry deletion.
go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, stopCh)
}
// Incorporate the results of node health signal pushed from kubelet to master.
go wait.Until(func() {
if err := nc.monitorNodeHealth(); err != nil {
klog.Errorf("Error monitoring node health: %v", err)
}
}, nc.nodeMonitorPeriod, stopCh)
<-stopCh
}
// doFixDeprecatedTaintKeyPass checks and replaces deprecated taint key with proper key name if needed.
func (nc *Controller) doFixDeprecatedTaintKeyPass(node *v1.Node) error {
taintsToAdd := []*v1.Taint{}
taintsToDel := []*v1.Taint{}
for _, taint := range node.Spec.Taints {
if taint.Key == schedulerapi.DeprecatedTaintNodeNotReady {
tDel := taint
taintsToDel = append(taintsToDel, &tDel)
tAdd := taint
tAdd.Key = schedulerapi.TaintNodeNotReady
taintsToAdd = append(taintsToAdd, &tAdd)
}
if taint.Key == schedulerapi.DeprecatedTaintNodeUnreachable {
tDel := taint
taintsToDel = append(taintsToDel, &tDel)
tAdd := taint
tAdd.Key = schedulerapi.TaintNodeUnreachable
taintsToAdd = append(taintsToAdd, &tAdd)
}
}
if len(taintsToAdd) == 0 && len(taintsToDel) == 0 {
return nil
}
klog.Warningf("Detected deprecated taint keys: %v on node: %v, will substitute them with %v",
taintsToDel, node.GetName(), taintsToAdd)
if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, taintsToAdd, taintsToDel, node) {
return fmt.Errorf("failed to swap taints of node %+v", node)
}
return nil
}
func (nc *Controller) doNoScheduleTaintingPassWorker() {
for {
obj, shutdown := nc.nodeUpdateQueue.Get()
// "nodeUpdateQueue" will be shutdown when "stopCh" closed;
// we do not need to re-check "stopCh" again.
if shutdown {
return
}
nodeName := obj.(string)
if err := nc.doNoScheduleTaintingPass(nodeName); err != nil {
// TODO (k82cn): Add nodeName back to the queue.
klog.Errorf("Failed to taint NoSchedule on node <%s>, requeue it: %v", nodeName, err)
}
nc.nodeUpdateQueue.Done(nodeName)
}
}
func (nc *Controller) doNoScheduleTaintingPass(nodeName string) error {
node, err := nc.nodeLister.Get(nodeName)
if err != nil {
// If node not found, just ignore it.
if apierrors.IsNotFound(err) {
return nil
}
return err
}
// Map node's condition to Taints.
var taints []v1.Taint
for _, condition := range node.Status.Conditions {
if taintMap, found := nodeConditionToTaintKeyStatusMap[condition.Type]; found {
if taintKey, found := taintMap[condition.Status]; found {
taints = append(taints, v1.Taint{
Key: taintKey,
Effect: v1.TaintEffectNoSchedule,
})
}
}
}
if node.Spec.Unschedulable {
// If unschedulable, append related taint.
taints = append(taints, v1.Taint{
Key: schedulerapi.TaintNodeUnschedulable,
Effect: v1.TaintEffectNoSchedule,
})
}
// Get exist taints of node.
nodeTaints := taintutils.TaintSetFilter(node.Spec.Taints, func(t *v1.Taint) bool {
// only NoSchedule taints are candidates to be compared with "taints" later
if t.Effect != v1.TaintEffectNoSchedule {
return false
}
// Find unschedulable taint of node.
if t.Key == schedulerapi.TaintNodeUnschedulable {
return true
}
// Find node condition taints of node.
_, found := taintKeyToNodeConditionMap[t.Key]
return found
})
taintsToAdd, taintsToDel := taintutils.TaintSetDiff(taints, nodeTaints)
// If nothing to add not delete, return true directly.
if len(taintsToAdd) == 0 && len(taintsToDel) == 0 {
return nil
}
if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, taintsToAdd, taintsToDel, node) {
return fmt.Errorf("failed to swap taints of node %+v", node)
}
return nil
}
func (nc *Controller) doNoExecuteTaintingPass() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
for k := range nc.zoneNoExecuteTainter {
// Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
nc.zoneNoExecuteTainter[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
node, err := nc.nodeLister.Get(value.Value)
if apierrors.IsNotFound(err) {
klog.Warningf("Node %v no longer present in nodeLister!", value.Value)
return true, 0
} else if err != nil {
klog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err)
// retry in 50 millisecond
return false, 50 * time.Millisecond
} else {
zone := utilnode.GetZoneKey(node)
evictionsNumber.WithLabelValues(zone).Inc()
}
_, condition := v1node.GetNodeCondition(&node.Status, v1.NodeReady)
// Because we want to mimic NodeStatus.Condition["Ready"] we make "unreachable" and "not ready" taints mutually exclusive.
taintToAdd := v1.Taint{}
oppositeTaint := v1.Taint{}
if condition.Status == v1.ConditionFalse {
taintToAdd = *NotReadyTaintTemplate
oppositeTaint = *UnreachableTaintTemplate
} else if condition.Status == v1.ConditionUnknown {
taintToAdd = *UnreachableTaintTemplate
oppositeTaint = *NotReadyTaintTemplate
} else {
// It seems that the Node is ready again, so there's no need to taint it.
klog.V(4).Infof("Node %v was in a taint queue, but it's ready now. Ignoring taint request.", value.Value)
return true, 0
}
return nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{&oppositeTaint}, node), 0
})
}
}
func (nc *Controller) doEvictionPass() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
for k := range nc.zonePodEvictor {
// Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
nc.zonePodEvictor[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
node, err := nc.nodeLister.Get(value.Value)
if apierrors.IsNotFound(err) {
klog.Warningf("Node %v no longer present in nodeLister!", value.Value)
} else if err != nil {
klog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err)
} else {
zone := utilnode.GetZoneKey(node)
evictionsNumber.WithLabelValues(zone).Inc()
}
nodeUID, _ := value.UID.(string)
remaining, err := nodeutil.DeletePods(nc.kubeClient, nc.recorder, value.Value, nodeUID, nc.daemonSetStore)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
return false, 0
}
if remaining {
klog.Infof("Pods awaiting deletion due to Controller eviction")
}
return true, 0
})
}
}
// monitorNodeHealth verifies node health are constantly updated by kubelet, and
// if not, post "NodeReady==ConditionUnknown". It also evicts all pods if node
// is not ready or not reachable for a long period of time.
func (nc *Controller) monitorNodeHealth() error {
// We are listing nodes from local cache as we can tolerate some small delays
// comparing to state from etcd and there is eventual consistency anyway.
nodes, err := nc.nodeLister.List(labels.Everything())
if err != nil {
return err
}
added, deleted, newZoneRepresentatives := nc.classifyNodes(nodes)
for i := range newZoneRepresentatives {
nc.addPodEvictorForNewZone(newZoneRepresentatives[i])
}
for i := range added {
klog.V(1).Infof("Controller observed a new Node: %#v", added[i].Name)
nodeutil.RecordNodeEvent(nc.recorder, added[i].Name, string(added[i].UID), v1.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in Controller", added[i].Name))
nc.knownNodeSet[added[i].Name] = added[i]
nc.addPodEvictorForNewZone(added[i])
if nc.useTaintBasedEvictions {
nc.markNodeAsReachable(added[i])
} else {
nc.cancelPodEviction(added[i])
}
}
for i := range deleted {
klog.V(1).Infof("Controller observed a Node deletion: %v", deleted[i].Name)
nodeutil.RecordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), v1.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from Controller", deleted[i].Name))
delete(nc.knownNodeSet, deleted[i].Name)
}
zoneToNodeConditions := map[string][]*v1.NodeCondition{}
for i := range nodes {
var gracePeriod time.Duration
var observedReadyCondition v1.NodeCondition
var currentReadyCondition *v1.NodeCondition
node := nodes[i].DeepCopy()
if err := wait.PollImmediate(retrySleepTime, retrySleepTime*scheduler.NodeHealthUpdateRetry, func() (bool, error) {
gracePeriod, observedReadyCondition, currentReadyCondition, err = nc.tryUpdateNodeHealth(node)
if err == nil {
return true, nil
}
name := node.Name
node, err = nc.kubeClient.CoreV1().Nodes().Get(name, metav1.GetOptions{})
if err != nil {
klog.Errorf("Failed while getting a Node to retry updating node health. Probably Node %s was deleted.", name)
return false, err
}
return false, nil
}); err != nil {
klog.Errorf("Update health of Node '%v' from Controller error: %v. "+
"Skipping - no pods will be evicted.", node.Name, err)
continue
}
// We do not treat a master node as a part of the cluster for network disruption checking.
if !system.IsMasterNode(node.Name) {
zoneToNodeConditions[utilnode.GetZoneKey(node)] = append(zoneToNodeConditions[utilnode.GetZoneKey(node)], currentReadyCondition)
}
decisionTimestamp := nc.now()
if currentReadyCondition != nil {
// Check eviction timeout against decisionTimestamp
if observedReadyCondition.Status == v1.ConditionFalse {
if nc.useTaintBasedEvictions {
// We want to update the taint straight away if Node is already tainted with the UnreachableTaint
if taintutils.TaintExists(node.Spec.Taints, UnreachableTaintTemplate) {
taintToAdd := *NotReadyTaintTemplate
if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{UnreachableTaintTemplate}, node) {
klog.Errorf("Failed to instantly swap UnreachableTaint to NotReadyTaint. Will try again in the next cycle.")
}
} else if nc.markNodeForTainting(node) {
klog.V(2).Infof("Node %v is NotReady as of %v. Adding it to the Taint queue.",
node.Name,
decisionTimestamp,
)
}
} else {
if decisionTimestamp.After(nc.nodeHealthMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
if nc.evictPods(node) {
klog.V(2).Infof("Node is NotReady. Adding Pods on Node %s to eviction queue: %v is later than %v + %v",
node.Name,
decisionTimestamp,
nc.nodeHealthMap[node.Name].readyTransitionTimestamp,
nc.podEvictionTimeout,
)
}
}
}
}
if observedReadyCondition.Status == v1.ConditionUnknown {
if nc.useTaintBasedEvictions {
// We want to update the taint straight away if Node is already tainted with the UnreachableTaint
if taintutils.TaintExists(node.Spec.Taints, NotReadyTaintTemplate) {
taintToAdd := *UnreachableTaintTemplate
if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{NotReadyTaintTemplate}, node) {
klog.Errorf("Failed to instantly swap UnreachableTaint to NotReadyTaint. Will try again in the next cycle.")
}
} else if nc.markNodeForTainting(node) {
klog.V(2).Infof("Node %v is unresponsive as of %v. Adding it to the Taint queue.",
node.Name,
decisionTimestamp,
)
}
} else {
if decisionTimestamp.After(nc.nodeHealthMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout)) {
if nc.evictPods(node) {
klog.V(2).Infof("Node is unresponsive. Adding Pods on Node %s to eviction queues: %v is later than %v + %v",
node.Name,
decisionTimestamp,
nc.nodeHealthMap[node.Name].readyTransitionTimestamp,
nc.podEvictionTimeout-gracePeriod,
)
}
}
}
}
if observedReadyCondition.Status == v1.ConditionTrue {
if nc.useTaintBasedEvictions {
removed, err := nc.markNodeAsReachable(node)
if err != nil {
klog.Errorf("Failed to remove taints from node %v. Will retry in next iteration.", node.Name)
}
if removed {
klog.V(2).Infof("Node %s is healthy again, removing all taints", node.Name)
}
} else {
if nc.cancelPodEviction(node) {
klog.V(2).Infof("Node %s is ready again, cancelled pod eviction", node.Name)
}
}
// remove shutdown taint this is needed always depending do we use taintbased or not
err := nc.markNodeAsNotShutdown(node)
if err != nil {
klog.Errorf("Failed to remove taints from node %v. Will retry in next iteration.", node.Name)
}
}
// Report node event.
if currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue {
nodeutil.RecordNodeStatusChange(nc.recorder, node, "NodeNotReady")
if err = nodeutil.MarkAllPodsNotReady(nc.kubeClient, node); err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err))
}
}
}
}
nc.handleDisruption(zoneToNodeConditions, nodes)
return nil
}
// tryUpdateNodeHealth checks a given node's conditions and tries to update it. Returns grace period to
// which given node is entitled, state of current and last observed Ready Condition, and an error if it occurred.
func (nc *Controller) tryUpdateNodeHealth(node *v1.Node) (time.Duration, v1.NodeCondition, *v1.NodeCondition, error) {
var err error
var gracePeriod time.Duration
var observedReadyCondition v1.NodeCondition
_, currentReadyCondition := v1node.GetNodeCondition(&node.Status, v1.NodeReady)
if currentReadyCondition == nil {
// If ready condition is nil, then kubelet (or nodecontroller) never posted node status.
// A fake ready condition is created, where LastHeartbeatTime and LastTransitionTime is set
// to node.CreationTimestamp to avoid handle the corner case.
observedReadyCondition = v1.NodeCondition{
Type: v1.NodeReady,
Status: v1.ConditionUnknown,
LastHeartbeatTime: node.CreationTimestamp,
LastTransitionTime: node.CreationTimestamp,
}
gracePeriod = nc.nodeStartupGracePeriod
if _, found := nc.nodeHealthMap[node.Name]; found {
nc.nodeHealthMap[node.Name].status = &node.Status
} else {
nc.nodeHealthMap[node.Name] = &nodeHealthData{
status: &node.Status,
probeTimestamp: node.CreationTimestamp,
readyTransitionTimestamp: node.CreationTimestamp,
}
}
} else {
// If ready condition is not nil, make a copy of it, since we may modify it in place later.
observedReadyCondition = *currentReadyCondition
gracePeriod = nc.nodeMonitorGracePeriod
}
savedNodeHealth, found := nc.nodeHealthMap[node.Name]
// There are following cases to check:
// - both saved and new status have no Ready Condition set - we leave everything as it is,
// - saved status have no Ready Condition, but current one does - Controller was restarted with Node data already present in etcd,
// - saved status have some Ready Condition, but current one does not - it's an error, but we fill it up because that's probably a good thing to do,
// - both saved and current statuses have Ready Conditions and they have the same LastProbeTime - nothing happened on that Node, it may be
// unresponsive, so we leave it as it is,
// - both saved and current statuses have Ready Conditions, they have different LastProbeTimes, but the same Ready Condition State -
// everything's in order, no transition occurred, we update only probeTimestamp,
// - both saved and current statuses have Ready Conditions, different LastProbeTimes and different Ready Condition State -
// Ready Condition changed it state since we last seen it, so we update both probeTimestamp and readyTransitionTimestamp.
// TODO: things to consider:
// - if 'LastProbeTime' have gone back in time its probably an error, currently we ignore it,
// - currently only correct Ready State transition outside of Node Controller is marking it ready by Kubelet, we don't check
// if that's the case, but it does not seem necessary.
var savedCondition *v1.NodeCondition
if found {
_, savedCondition = v1node.GetNodeCondition(savedNodeHealth.status, v1.NodeReady)
}
_, observedCondition := v1node.GetNodeCondition(&node.Status, v1.NodeReady)
if !found {
klog.Warningf("Missing timestamp for Node %s. Assuming now as a timestamp.", node.Name)
savedNodeHealth = &nodeHealthData{
status: &node.Status,
probeTimestamp: nc.now(),
readyTransitionTimestamp: nc.now(),
}
} else if savedCondition == nil && observedCondition != nil {
klog.V(1).Infof("Creating timestamp entry for newly observed Node %s", node.Name)
savedNodeHealth = &nodeHealthData{
status: &node.Status,
probeTimestamp: nc.now(),
readyTransitionTimestamp: nc.now(),
}
} else if savedCondition != nil && observedCondition == nil {
klog.Errorf("ReadyCondition was removed from Status of Node %s", node.Name)
// TODO: figure out what to do in this case. For now we do the same thing as above.
savedNodeHealth = &nodeHealthData{
status: &node.Status,
probeTimestamp: nc.now(),
readyTransitionTimestamp: nc.now(),
}
} else if savedCondition != nil && observedCondition != nil && savedCondition.LastHeartbeatTime != observedCondition.LastHeartbeatTime {
var transitionTime metav1.Time
// If ReadyCondition changed since the last time we checked, we update the transition timestamp to "now",
// otherwise we leave it as it is.
if savedCondition.LastTransitionTime != observedCondition.LastTransitionTime {
klog.V(3).Infof("ReadyCondition for Node %s transitioned from %v to %v", node.Name, savedCondition, observedCondition)
transitionTime = nc.now()
} else {
transitionTime = savedNodeHealth.readyTransitionTimestamp
}
if klog.V(5) {
klog.V(5).Infof("Node %s ReadyCondition updated. Updating timestamp: %+v vs %+v.", node.Name, savedNodeHealth.status, node.Status)
} else {
klog.V(3).Infof("Node %s ReadyCondition updated. Updating timestamp.", node.Name)
}
savedNodeHealth = &nodeHealthData{
status: &node.Status,
probeTimestamp: nc.now(),
readyTransitionTimestamp: transitionTime,
}
}
nc.nodeHealthMap[node.Name] = savedNodeHealth
if nc.now().After(savedNodeHealth.probeTimestamp.Add(gracePeriod)) {
// NodeReady condition or lease was last set longer ago than gracePeriod, so
// update it to Unknown (regardless of its current value) in the master.
if currentReadyCondition == nil {
klog.V(2).Infof("node %v is never updated by kubelet", node.Name)
node.Status.Conditions = append(node.Status.Conditions, v1.NodeCondition{
Type: v1.NodeReady,
Status: v1.ConditionUnknown,
Reason: "NodeStatusNeverUpdated",
Message: fmt.Sprintf("Kubelet never posted node status."),
LastHeartbeatTime: node.CreationTimestamp,
LastTransitionTime: nc.now(),
})
} else {
klog.V(4).Infof("node %v hasn't been updated for %+v. Last ready condition is: %+v",
node.Name, nc.now().Time.Sub(savedNodeHealth.probeTimestamp.Time), observedReadyCondition)
if observedReadyCondition.Status != v1.ConditionUnknown {
currentReadyCondition.Status = v1.ConditionUnknown
currentReadyCondition.Reason = "NodeStatusUnknown"
currentReadyCondition.Message = "Kubelet stopped posting node status."
// LastProbeTime is the last time we heard from kubelet.
currentReadyCondition.LastHeartbeatTime = observedReadyCondition.LastHeartbeatTime
currentReadyCondition.LastTransitionTime = nc.now()
}
}
// remaining node conditions should also be set to Unknown
remainingNodeConditionTypes := []v1.NodeConditionType{
v1.NodeOutOfDisk,
v1.NodeMemoryPressure,
v1.NodeDiskPressure,
v1.NodePIDPressure,
// We don't change 'NodeNetworkUnavailable' condition, as it's managed on a control plane level.
// v1.NodeNetworkUnavailable,
}
nowTimestamp := nc.now()
for _, nodeConditionType := range remainingNodeConditionTypes {
_, currentCondition := v1node.GetNodeCondition(&node.Status, nodeConditionType)
if currentCondition == nil {
klog.V(2).Infof("Condition %v of node %v was never updated by kubelet", nodeConditionType, node.Name)
node.Status.Conditions = append(node.Status.Conditions, v1.NodeCondition{
Type: nodeConditionType,
Status: v1.ConditionUnknown,
Reason: "NodeStatusNeverUpdated",
Message: "Kubelet never posted node status.",
LastHeartbeatTime: node.CreationTimestamp,
LastTransitionTime: nowTimestamp,
})
} else {
klog.V(4).Infof("node %v hasn't been updated for %+v. Last %v is: %+v",
node.Name, nc.now().Time.Sub(savedNodeHealth.probeTimestamp.Time), nodeConditionType, currentCondition)
if currentCondition.Status != v1.ConditionUnknown {
currentCondition.Status = v1.ConditionUnknown
currentCondition.Reason = "NodeStatusUnknown"
currentCondition.Message = "Kubelet stopped posting node status."
currentCondition.LastTransitionTime = nowTimestamp
}
}
}
_, currentCondition := v1node.GetNodeCondition(&node.Status, v1.NodeReady)
if !apiequality.Semantic.DeepEqual(currentCondition, &observedReadyCondition) {
if _, err = nc.kubeClient.CoreV1().Nodes().UpdateStatus(node); err != nil {
klog.Errorf("Error updating node %s: %v", node.Name, err)
return gracePeriod, observedReadyCondition, currentReadyCondition, err
}
nc.nodeHealthMap[node.Name] = &nodeHealthData{
status: &node.Status,
probeTimestamp: nc.nodeHealthMap[node.Name].probeTimestamp,
readyTransitionTimestamp: nc.now(),
}
return gracePeriod, observedReadyCondition, currentReadyCondition, nil
}
}
return gracePeriod, observedReadyCondition, currentReadyCondition, err
}
func (nc *Controller) handleDisruption(zoneToNodeConditions map[string][]*v1.NodeCondition, nodes []*v1.Node) {
newZoneStates := map[string]ZoneState{}
allAreFullyDisrupted := true
for k, v := range zoneToNodeConditions {
zoneSize.WithLabelValues(k).Set(float64(len(v)))
unhealthy, newState := nc.computeZoneStateFunc(v)
zoneHealth.WithLabelValues(k).Set(float64(100*(len(v)-unhealthy)) / float64(len(v)))
unhealthyNodes.WithLabelValues(k).Set(float64(unhealthy))
if newState != stateFullDisruption {
allAreFullyDisrupted = false
}
newZoneStates[k] = newState
if _, had := nc.zoneStates[k]; !had {
klog.Errorf("Setting initial state for unseen zone: %v", k)
nc.zoneStates[k] = stateInitial
}
}
allWasFullyDisrupted := true
for k, v := range nc.zoneStates {
if _, have := zoneToNodeConditions[k]; !have {
zoneSize.WithLabelValues(k).Set(0)
zoneHealth.WithLabelValues(k).Set(100)
unhealthyNodes.WithLabelValues(k).Set(0)
delete(nc.zoneStates, k)
continue
}
if v != stateFullDisruption {
allWasFullyDisrupted = false
break
}
}
// At least one node was responding in previous pass or in the current pass. Semantics is as follows:
// - if the new state is "partialDisruption" we call a user defined function that returns a new limiter to use,
// - if the new state is "normal" we resume normal operation (go back to default limiter settings),
// - if new state is "fullDisruption" we restore normal eviction rate,
// - unless all zones in the cluster are in "fullDisruption" - in that case we stop all evictions.
if !allAreFullyDisrupted || !allWasFullyDisrupted {
// We're switching to full disruption mode
if allAreFullyDisrupted {
klog.V(0).Info("Controller detected that all Nodes are not-Ready. Entering master disruption mode.")
for i := range nodes {
if nc.useTaintBasedEvictions {
_, err := nc.markNodeAsReachable(nodes[i])
if err != nil {
klog.Errorf("Failed to remove taints from Node %v", nodes[i].Name)
}
} else {
nc.cancelPodEviction(nodes[i])
}
}
// We stop all evictions.
for k := range nc.zoneStates {