diff --git a/pkg/controller/nodelifecycle/node_lifecycle_controller.go b/pkg/controller/nodelifecycle/node_lifecycle_controller.go index 9457c3fc4e13..d3fd14a07fe2 100644 --- a/pkg/controller/nodelifecycle/node_lifecycle_controller.go +++ b/pkg/controller/nodelifecycle/node_lifecycle_controller.go @@ -345,7 +345,11 @@ func NewNodeLifecycleController( nc.podInformerSynced = podInformer.Informer().HasSynced if nc.runTaintManager { - nc.taintManager = scheduler.NewNoExecuteTaintManager(kubeClient) + podLister := podInformer.Lister() + podGetter := func(name, namespace string) (*v1.Pod, error) { return podLister.Pods(namespace).Get(name) } + nodeLister := nodeInformer.Lister() + nodeGetter := func(name string) (*v1.Node, error) { return nodeLister.Get(name) } + nc.taintManager = scheduler.NewNoExecuteTaintManager(kubeClient, podGetter, nodeGetter) nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error { nc.taintManager.NodeUpdated(nil, node) diff --git a/pkg/controller/nodelifecycle/scheduler/BUILD b/pkg/controller/nodelifecycle/scheduler/BUILD index 5bd9454b4f57..d8f2f98790a4 100644 --- a/pkg/controller/nodelifecycle/scheduler/BUILD +++ b/pkg/controller/nodelifecycle/scheduler/BUILD @@ -13,10 +13,12 @@ go_library( "//pkg/apis/core/helper:go_default_library", "//pkg/apis/core/v1/helper:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", diff --git a/pkg/controller/nodelifecycle/scheduler/taint_manager.go b/pkg/controller/nodelifecycle/scheduler/taint_manager.go index b46450a7568b..90e43757c4d9 100644 --- a/pkg/controller/nodelifecycle/scheduler/taint_manager.go +++ b/pkg/controller/nodelifecycle/scheduler/taint_manager.go @@ -24,10 +24,12 @@ import ( "time" "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" @@ -51,39 +53,14 @@ const ( retries = 5 ) -// Needed to make workqueue work -type updateItemInterface interface{} - type nodeUpdateItem struct { - oldNode *v1.Node - newNode *v1.Node - newTaints []v1.Taint + nodeName string } type podUpdateItem struct { - oldPod *v1.Pod - newPod *v1.Pod - newTolerations []v1.Toleration -} - -func (n *nodeUpdateItem) name() string { - if n.newNode != nil { - return n.newNode.ObjectMeta.Name - } - if n.oldNode != nil { - return n.oldNode.ObjectMeta.Name - } - return "" -} - -func (p *podUpdateItem) nodeName() string { - if p.newPod != nil { - return p.newPod.Spec.NodeName - } - if p.oldPod != nil { - return p.oldPod.Spec.NodeName - } - return "" + podName string + podNamespace string + nodeName string } func hash(val string, max int) int { @@ -92,19 +69,27 @@ func hash(val string, max int) int { return int(hasher.Sum32() % uint32(max)) } +// GetPodFunc returns the pod for the specified name/namespace, or a NotFound error if missing. +type GetPodFunc func(name, namespace string) (*v1.Pod, error) + +// GetNodeFunc returns the node for the specified name, or a NotFound error if missing. +type GetNodeFunc func(name string) (*v1.Node, error) + // NoExecuteTaintManager listens to Taint/Toleration changes and is responsible for removing Pods // from Nodes tainted with NoExecute Taints. type NoExecuteTaintManager struct { client clientset.Interface recorder record.EventRecorder + getPod GetPodFunc + getNode GetNodeFunc taintEvictionQueue *TimedWorkerQueue // keeps a map from nodeName to all noExecute taints on that Node taintedNodesLock sync.Mutex taintedNodes map[string][]v1.Taint - nodeUpdateChannels []chan *nodeUpdateItem - podUpdateChannels []chan *podUpdateItem + nodeUpdateChannels []chan nodeUpdateItem + podUpdateChannels []chan podUpdateItem nodeUpdateQueue workqueue.Interface podUpdateQueue workqueue.Interface @@ -182,7 +167,7 @@ func getMinTolerationTime(tolerations []v1.Toleration) time.Duration { // NewNoExecuteTaintManager creates a new NoExecuteTaintManager that will use passed clientset to // communicate with the API server. -func NewNoExecuteTaintManager(c clientset.Interface) *NoExecuteTaintManager { +func NewNoExecuteTaintManager(c clientset.Interface, getPod GetPodFunc, getNode GetNodeFunc) *NoExecuteTaintManager { eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "taint-controller"}) eventBroadcaster.StartLogging(glog.Infof) @@ -196,6 +181,8 @@ func NewNoExecuteTaintManager(c clientset.Interface) *NoExecuteTaintManager { tm := &NoExecuteTaintManager{ client: c, recorder: recorder, + getPod: getPod, + getNode: getNode, taintedNodes: make(map[string][]v1.Taint), nodeUpdateQueue: workqueue.New(), @@ -211,8 +198,8 @@ func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) { glog.V(0).Infof("Starting NoExecuteTaintManager") for i := 0; i < UpdateWorkerSize; i++ { - tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan *nodeUpdateItem, NodeUpdateChannelSize)) - tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan *podUpdateItem, podUpdateChannelSize)) + tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan nodeUpdateItem, NodeUpdateChannelSize)) + tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan podUpdateItem, podUpdateChannelSize)) } // Functions that are responsible for taking work items out of the workqueues and putting them @@ -223,15 +210,15 @@ func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) { if shutdown { break } - nodeUpdate := item.(*nodeUpdateItem) - hash := hash(nodeUpdate.name(), UpdateWorkerSize) + nodeUpdate := item.(nodeUpdateItem) + hash := hash(nodeUpdate.nodeName, UpdateWorkerSize) select { case <-stopCh: tc.nodeUpdateQueue.Done(item) return case tc.nodeUpdateChannels[hash] <- nodeUpdate: + // tc.nodeUpdateQueue.Done is called by the nodeUpdateChannels worker } - tc.nodeUpdateQueue.Done(item) } }(stopCh) @@ -241,15 +228,15 @@ func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) { if shutdown { break } - podUpdate := item.(*podUpdateItem) - hash := hash(podUpdate.nodeName(), UpdateWorkerSize) + podUpdate := item.(podUpdateItem) + hash := hash(podUpdate.nodeName, UpdateWorkerSize) select { case <-stopCh: tc.podUpdateQueue.Done(item) return case tc.podUpdateChannels[hash] <- podUpdate: + // tc.podUpdateQueue.Done is called by the podUpdateChannels worker } - tc.podUpdateQueue.Done(item) } }(stopCh) @@ -274,6 +261,7 @@ func (tc *NoExecuteTaintManager) worker(worker int, done func(), stopCh <-chan s return case nodeUpdate := <-tc.nodeUpdateChannels[worker]: tc.handleNodeUpdate(nodeUpdate) + tc.nodeUpdateQueue.Done(nodeUpdate) case podUpdate := <-tc.podUpdateChannels[worker]: // If we found a Pod update we need to empty Node queue first. priority: @@ -281,63 +269,73 @@ func (tc *NoExecuteTaintManager) worker(worker int, done func(), stopCh <-chan s select { case nodeUpdate := <-tc.nodeUpdateChannels[worker]: tc.handleNodeUpdate(nodeUpdate) + tc.nodeUpdateQueue.Done(nodeUpdate) default: break priority } } // After Node queue is emptied we process podUpdate. tc.handlePodUpdate(podUpdate) + tc.podUpdateQueue.Done(podUpdate) } } } // PodUpdated is used to notify NoExecuteTaintManager about Pod changes. func (tc *NoExecuteTaintManager) PodUpdated(oldPod *v1.Pod, newPod *v1.Pod) { + podName := "" + podNamespace := "" + nodeName := "" oldTolerations := []v1.Toleration{} if oldPod != nil { + podName = oldPod.Name + podNamespace = oldPod.Namespace + nodeName = oldPod.Spec.NodeName oldTolerations = oldPod.Spec.Tolerations } newTolerations := []v1.Toleration{} if newPod != nil { + podName = newPod.Name + podNamespace = newPod.Namespace + nodeName = newPod.Spec.NodeName newTolerations = newPod.Spec.Tolerations } if oldPod != nil && newPod != nil && helper.Semantic.DeepEqual(oldTolerations, newTolerations) && oldPod.Spec.NodeName == newPod.Spec.NodeName { return } - updateItem := &podUpdateItem{ - oldPod: oldPod, - newPod: newPod, - newTolerations: newTolerations, + updateItem := podUpdateItem{ + podName: podName, + podNamespace: podNamespace, + nodeName: nodeName, } - tc.podUpdateQueue.Add(updateItemInterface(updateItem)) + tc.podUpdateQueue.Add(updateItem) } // NodeUpdated is used to notify NoExecuteTaintManager about Node changes. func (tc *NoExecuteTaintManager) NodeUpdated(oldNode *v1.Node, newNode *v1.Node) { + nodeName := "" oldTaints := []v1.Taint{} if oldNode != nil { - oldTaints = oldNode.Spec.Taints + nodeName = oldNode.Name + oldTaints = getNoExecuteTaints(oldNode.Spec.Taints) } - oldTaints = getNoExecuteTaints(oldTaints) newTaints := []v1.Taint{} if newNode != nil { - newTaints = newNode.Spec.Taints + nodeName = newNode.Name + newTaints = getNoExecuteTaints(newNode.Spec.Taints) } - newTaints = getNoExecuteTaints(newTaints) if oldNode != nil && newNode != nil && helper.Semantic.DeepEqual(oldTaints, newTaints) { return } - updateItem := &nodeUpdateItem{ - oldNode: oldNode, - newNode: newNode, - newTaints: newTaints, + updateItem := nodeUpdateItem{ + nodeName: nodeName, } - tc.nodeUpdateQueue.Add(updateItemInterface(updateItem)) + tc.nodeUpdateQueue.Add(updateItem) } func (tc *NoExecuteTaintManager) cancelWorkWithEvent(nsName types.NamespacedName) { @@ -384,17 +382,26 @@ func (tc *NoExecuteTaintManager) processPodOnNode( tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime) } -func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate *podUpdateItem) { - // Delete - if podUpdate.newPod == nil { - pod := podUpdate.oldPod - podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} - glog.V(4).Infof("Noticed pod deletion: %#v", podNamespacedName) - tc.cancelWorkWithEvent(podNamespacedName) +func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate podUpdateItem) { + pod, err := tc.getPod(podUpdate.podName, podUpdate.podNamespace) + if err != nil { + if apierrors.IsNotFound(err) { + // Delete + podNamespacedName := types.NamespacedName{Namespace: podUpdate.podNamespace, Name: podUpdate.podName} + glog.V(4).Infof("Noticed pod deletion: %#v", podNamespacedName) + tc.cancelWorkWithEvent(podNamespacedName) + return + } + utilruntime.HandleError(fmt.Errorf("could not get pod %s/%s: %v", podUpdate.podName, podUpdate.podNamespace, err)) return } + + // We key the workqueue and shard workers by nodeName. If we don't match the current state we should not be the one processing the current object. + if pod.Spec.NodeName != podUpdate.nodeName { + return + } + // Create or Update - pod := podUpdate.newPod podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} glog.V(4).Infof("Noticed pod update: %#v", podNamespacedName) nodeName := pod.Spec.NodeName @@ -412,23 +419,27 @@ func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate *podUpdateItem) { if !ok { return } - tc.processPodOnNode(podNamespacedName, nodeName, podUpdate.newTolerations, taints, time.Now()) + tc.processPodOnNode(podNamespacedName, nodeName, pod.Spec.Tolerations, taints, time.Now()) } -func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate *nodeUpdateItem) { - // Delete - if nodeUpdate.newNode == nil { - node := nodeUpdate.oldNode - glog.V(4).Infof("Noticed node deletion: %#v", node.Name) - tc.taintedNodesLock.Lock() - defer tc.taintedNodesLock.Unlock() - delete(tc.taintedNodes, node.Name) +func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate nodeUpdateItem) { + node, err := tc.getNode(nodeUpdate.nodeName) + if err != nil { + if apierrors.IsNotFound(err) { + // Delete + glog.V(4).Infof("Noticed node deletion: %#v", nodeUpdate.nodeName) + tc.taintedNodesLock.Lock() + defer tc.taintedNodesLock.Unlock() + delete(tc.taintedNodes, nodeUpdate.nodeName) + return + } + utilruntime.HandleError(fmt.Errorf("cannot get node %s: %v", nodeUpdate.nodeName, err)) return } + // Create or Update glog.V(4).Infof("Noticed node update: %#v", nodeUpdate) - node := nodeUpdate.newNode - taints := nodeUpdate.newTaints + taints := getNoExecuteTaints(node.Spec.Taints) func() { tc.taintedNodesLock.Lock() defer tc.taintedNodesLock.Unlock() diff --git a/pkg/controller/nodelifecycle/scheduler/taint_manager_test.go b/pkg/controller/nodelifecycle/scheduler/taint_manager_test.go index 926761b4fa72..7a8504f258a0 100644 --- a/pkg/controller/nodelifecycle/scheduler/taint_manager_test.go +++ b/pkg/controller/nodelifecycle/scheduler/taint_manager_test.go @@ -19,6 +19,7 @@ package scheduler import ( "fmt" "sort" + "sync" "testing" "time" @@ -32,6 +33,42 @@ import ( var timeForControllerToProgress = 500 * time.Millisecond +func getPodFromClientset(clientset *fake.Clientset) GetPodFunc { + return func(name, namespace string) (*v1.Pod, error) { + return clientset.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{}) + } +} + +func getNodeFromClientset(clientset *fake.Clientset) GetNodeFunc { + return func(name string) (*v1.Node, error) { + return clientset.CoreV1().Nodes().Get(name, metav1.GetOptions{}) + } +} + +type podHolder struct { + pod *v1.Pod + sync.Mutex +} + +func (p *podHolder) getPod(name, namespace string) (*v1.Pod, error) { + p.Lock() + defer p.Unlock() + return p.pod, nil +} +func (p *podHolder) setPod(pod *v1.Pod) { + p.Lock() + defer p.Unlock() + p.pod = pod +} + +type nodeHolder struct { + node *v1.Node +} + +func (n *nodeHolder) getNode(name string) (*v1.Node, error) { + return n.node, nil +} + func createNoExecuteTaint(index int) v1.Taint { now := metav1.Now() return v1.Taint{ @@ -150,7 +187,7 @@ func TestCreatePod(t *testing.T) { for _, item := range testCases { stopCh := make(chan struct{}) fakeClientset := fake.NewSimpleClientset() - controller := NewNoExecuteTaintManager(fakeClientset) + controller := NewNoExecuteTaintManager(fakeClientset, (&podHolder{pod: item.pod}).getPod, getNodeFromClientset(fakeClientset)) controller.recorder = testutil.NewFakeRecorder() go controller.Run(stopCh) controller.taintedNodes = item.taintedNodes @@ -174,7 +211,7 @@ func TestCreatePod(t *testing.T) { func TestDeletePod(t *testing.T) { stopCh := make(chan struct{}) fakeClientset := fake.NewSimpleClientset() - controller := NewNoExecuteTaintManager(fakeClientset) + controller := NewNoExecuteTaintManager(fakeClientset, getPodFromClientset(fakeClientset), getNodeFromClientset(fakeClientset)) controller.recorder = testutil.NewFakeRecorder() go controller.Run(stopCh) controller.taintedNodes = map[string][]v1.Taint{ @@ -237,14 +274,17 @@ func TestUpdatePod(t *testing.T) { for _, item := range testCases { stopCh := make(chan struct{}) fakeClientset := fake.NewSimpleClientset() - controller := NewNoExecuteTaintManager(fakeClientset) + holder := &podHolder{} + controller := NewNoExecuteTaintManager(fakeClientset, holder.getPod, getNodeFromClientset(fakeClientset)) controller.recorder = testutil.NewFakeRecorder() go controller.Run(stopCh) controller.taintedNodes = item.taintedNodes + holder.setPod(item.prevPod) controller.PodUpdated(nil, item.prevPod) fakeClientset.ClearActions() time.Sleep(timeForControllerToProgress) + holder.setPod(item.newPod) controller.PodUpdated(item.prevPod, item.newPod) // wait a bit time.Sleep(timeForControllerToProgress) @@ -301,7 +341,7 @@ func TestCreateNode(t *testing.T) { for _, item := range testCases { stopCh := make(chan struct{}) fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods}) - controller := NewNoExecuteTaintManager(fakeClientset) + controller := NewNoExecuteTaintManager(fakeClientset, getPodFromClientset(fakeClientset), (&nodeHolder{item.node}).getNode) controller.recorder = testutil.NewFakeRecorder() go controller.Run(stopCh) controller.NodeUpdated(nil, item.node) @@ -324,7 +364,7 @@ func TestCreateNode(t *testing.T) { func TestDeleteNode(t *testing.T) { stopCh := make(chan struct{}) fakeClientset := fake.NewSimpleClientset() - controller := NewNoExecuteTaintManager(fakeClientset) + controller := NewNoExecuteTaintManager(fakeClientset, getPodFromClientset(fakeClientset), getNodeFromClientset(fakeClientset)) controller.recorder = testutil.NewFakeRecorder() controller.taintedNodes = map[string][]v1.Taint{ "node1": {createNoExecuteTaint(1)}, @@ -422,7 +462,7 @@ func TestUpdateNode(t *testing.T) { for _, item := range testCases { stopCh := make(chan struct{}) fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods}) - controller := NewNoExecuteTaintManager(fakeClientset) + controller := NewNoExecuteTaintManager(fakeClientset, getPodFromClientset(fakeClientset), (&nodeHolder{item.newNode}).getNode) controller.recorder = testutil.NewFakeRecorder() go controller.Run(stopCh) controller.NodeUpdated(item.oldNode, item.newNode) @@ -488,7 +528,7 @@ func TestUpdateNodeWithMultiplePods(t *testing.T) { stopCh := make(chan struct{}) fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods}) sort.Sort(item.expectedDeleteTimes) - controller := NewNoExecuteTaintManager(fakeClientset) + controller := NewNoExecuteTaintManager(fakeClientset, getPodFromClientset(fakeClientset), (&nodeHolder{item.newNode}).getNode) controller.recorder = testutil.NewFakeRecorder() go controller.Run(stopCh) controller.NodeUpdated(item.oldNode, item.newNode)