Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify taint manager workqueue keys #65350

Merged
merged 1 commit into from Oct 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion pkg/controller/nodelifecycle/node_lifecycle_controller.go
Expand Up @@ -345,7 +345,11 @@ func NewNodeLifecycleController(
nc.podInformerSynced = podInformer.Informer().HasSynced

if nc.runTaintManager {
nc.taintManager = scheduler.NewNoExecuteTaintManager(kubeClient)
podLister := podInformer.Lister()
podGetter := func(name, namespace string) (*v1.Pod, error) { return podLister.Pods(namespace).Get(name) }
nodeLister := nodeInformer.Lister()
nodeGetter := func(name string) (*v1.Node, error) { return nodeLister.Get(name) }
nc.taintManager = scheduler.NewNoExecuteTaintManager(kubeClient, podGetter, nodeGetter)
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error {
nc.taintManager.NodeUpdated(nil, node)
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/nodelifecycle/scheduler/BUILD
Expand Up @@ -13,10 +13,12 @@ go_library(
"//pkg/apis/core/helper:go_default_library",
"//pkg/apis/core/v1/helper:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
Expand Down
157 changes: 84 additions & 73 deletions pkg/controller/nodelifecycle/scheduler/taint_manager.go
Expand Up @@ -24,10 +24,12 @@ import (
"time"

"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
Expand All @@ -51,39 +53,14 @@ const (
retries = 5
)

// Needed to make workqueue work
type updateItemInterface interface{}

type nodeUpdateItem struct {
oldNode *v1.Node
newNode *v1.Node
newTaints []v1.Taint
nodeName string
}

type podUpdateItem struct {
oldPod *v1.Pod
newPod *v1.Pod
newTolerations []v1.Toleration
}

func (n *nodeUpdateItem) name() string {
if n.newNode != nil {
return n.newNode.ObjectMeta.Name
}
if n.oldNode != nil {
return n.oldNode.ObjectMeta.Name
}
return ""
}

func (p *podUpdateItem) nodeName() string {
if p.newPod != nil {
return p.newPod.Spec.NodeName
}
if p.oldPod != nil {
return p.oldPod.Spec.NodeName
}
return ""
podName string
podNamespace string
nodeName string
}

func hash(val string, max int) int {
Expand All @@ -92,19 +69,27 @@ func hash(val string, max int) int {
return int(hasher.Sum32() % uint32(max))
}

// GetPodFunc returns the pod for the specified name/namespace, or a NotFound error if missing.
type GetPodFunc func(name, namespace string) (*v1.Pod, error)

// GetNodeFunc returns the node for the specified name, or a NotFound error if missing.
type GetNodeFunc func(name string) (*v1.Node, error)

// NoExecuteTaintManager listens to Taint/Toleration changes and is responsible for removing Pods
// from Nodes tainted with NoExecute Taints.
type NoExecuteTaintManager struct {
client clientset.Interface
recorder record.EventRecorder
getPod GetPodFunc
getNode GetNodeFunc

taintEvictionQueue *TimedWorkerQueue
// keeps a map from nodeName to all noExecute taints on that Node
taintedNodesLock sync.Mutex
taintedNodes map[string][]v1.Taint

nodeUpdateChannels []chan *nodeUpdateItem
podUpdateChannels []chan *podUpdateItem
nodeUpdateChannels []chan nodeUpdateItem
podUpdateChannels []chan podUpdateItem

nodeUpdateQueue workqueue.Interface
podUpdateQueue workqueue.Interface
Expand Down Expand Up @@ -182,7 +167,7 @@ func getMinTolerationTime(tolerations []v1.Toleration) time.Duration {

// NewNoExecuteTaintManager creates a new NoExecuteTaintManager that will use passed clientset to
// communicate with the API server.
func NewNoExecuteTaintManager(c clientset.Interface) *NoExecuteTaintManager {
func NewNoExecuteTaintManager(c clientset.Interface, getPod GetPodFunc, getNode GetNodeFunc) *NoExecuteTaintManager {
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "taint-controller"})
eventBroadcaster.StartLogging(glog.Infof)
Expand All @@ -196,6 +181,8 @@ func NewNoExecuteTaintManager(c clientset.Interface) *NoExecuteTaintManager {
tm := &NoExecuteTaintManager{
client: c,
recorder: recorder,
getPod: getPod,
getNode: getNode,
taintedNodes: make(map[string][]v1.Taint),

nodeUpdateQueue: workqueue.New(),
Expand All @@ -211,8 +198,8 @@ func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
glog.V(0).Infof("Starting NoExecuteTaintManager")

for i := 0; i < UpdateWorkerSize; i++ {
tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan *nodeUpdateItem, NodeUpdateChannelSize))
tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan *podUpdateItem, podUpdateChannelSize))
tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan nodeUpdateItem, NodeUpdateChannelSize))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need channel group? The performance of TaintNodeByCondition is acceptable by one channel.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say "now".
I think that once we will be able to bump default qps limits in large clusters, it may appear to be too slow. But yeah - it's mostly guessing...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TaintNodeByCondition is distributing a single update queue among multiple workers.

This has two update queues to coordinate, so the exact solution will look different. I think we can do something simpler than this two-layer async, but we need to do it in a way that still lets us fan out workers and give node update handling priority when handling pod update events.

tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan podUpdateItem, podUpdateChannelSize))
}

// Functions that are responsible for taking work items out of the workqueues and putting them
Expand All @@ -223,15 +210,15 @@ func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
if shutdown {
break
}
nodeUpdate := item.(*nodeUpdateItem)
hash := hash(nodeUpdate.name(), UpdateWorkerSize)
nodeUpdate := item.(nodeUpdateItem)
hash := hash(nodeUpdate.nodeName, UpdateWorkerSize)
select {
case <-stopCh:
tc.nodeUpdateQueue.Done(item)
return
case tc.nodeUpdateChannels[hash] <- nodeUpdate:
// tc.nodeUpdateQueue.Done is called by the nodeUpdateChannels worker
}
tc.nodeUpdateQueue.Done(item)
}
}(stopCh)

Expand All @@ -241,15 +228,15 @@ func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
if shutdown {
break
}
podUpdate := item.(*podUpdateItem)
hash := hash(podUpdate.nodeName(), UpdateWorkerSize)
podUpdate := item.(podUpdateItem)
hash := hash(podUpdate.nodeName, UpdateWorkerSize)
select {
case <-stopCh:
tc.podUpdateQueue.Done(item)
return
case tc.podUpdateChannels[hash] <- podUpdate:
// tc.podUpdateQueue.Done is called by the podUpdateChannels worker
}
tc.podUpdateQueue.Done(item)
}
}(stopCh)

Expand All @@ -274,70 +261,81 @@ func (tc *NoExecuteTaintManager) worker(worker int, done func(), stopCh <-chan s
return
case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
tc.handleNodeUpdate(nodeUpdate)
tc.nodeUpdateQueue.Done(nodeUpdate)
case podUpdate := <-tc.podUpdateChannels[worker]:
// If we found a Pod update we need to empty Node queue first.
priority:
for {
select {
case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
tc.handleNodeUpdate(nodeUpdate)
tc.nodeUpdateQueue.Done(nodeUpdate)
default:
break priority
}
}
// After Node queue is emptied we process podUpdate.
tc.handlePodUpdate(podUpdate)
tc.podUpdateQueue.Done(podUpdate)
}
}
}

// PodUpdated is used to notify NoExecuteTaintManager about Pod changes.
func (tc *NoExecuteTaintManager) PodUpdated(oldPod *v1.Pod, newPod *v1.Pod) {
podName := ""
podNamespace := ""
nodeName := ""
oldTolerations := []v1.Toleration{}
if oldPod != nil {
podName = oldPod.Name
podNamespace = oldPod.Namespace
nodeName = oldPod.Spec.NodeName
oldTolerations = oldPod.Spec.Tolerations
}
newTolerations := []v1.Toleration{}
if newPod != nil {
podName = newPod.Name
podNamespace = newPod.Namespace
nodeName = newPod.Spec.NodeName
newTolerations = newPod.Spec.Tolerations
}

if oldPod != nil && newPod != nil && helper.Semantic.DeepEqual(oldTolerations, newTolerations) && oldPod.Spec.NodeName == newPod.Spec.NodeName {
return
}
updateItem := &podUpdateItem{
oldPod: oldPod,
newPod: newPod,
newTolerations: newTolerations,
updateItem := podUpdateItem{
podName: podName,
podNamespace: podNamespace,
nodeName: nodeName,
}

tc.podUpdateQueue.Add(updateItemInterface(updateItem))
tc.podUpdateQueue.Add(updateItem)
}

// NodeUpdated is used to notify NoExecuteTaintManager about Node changes.
func (tc *NoExecuteTaintManager) NodeUpdated(oldNode *v1.Node, newNode *v1.Node) {
nodeName := ""
oldTaints := []v1.Taint{}
if oldNode != nil {
oldTaints = oldNode.Spec.Taints
nodeName = oldNode.Name
oldTaints = getNoExecuteTaints(oldNode.Spec.Taints)
}
oldTaints = getNoExecuteTaints(oldTaints)

newTaints := []v1.Taint{}
if newNode != nil {
newTaints = newNode.Spec.Taints
nodeName = newNode.Name
newTaints = getNoExecuteTaints(newNode.Spec.Taints)
}
newTaints = getNoExecuteTaints(newTaints)

if oldNode != nil && newNode != nil && helper.Semantic.DeepEqual(oldTaints, newTaints) {
return
}
updateItem := &nodeUpdateItem{
oldNode: oldNode,
newNode: newNode,
newTaints: newTaints,
updateItem := nodeUpdateItem{
nodeName: nodeName,
}

tc.nodeUpdateQueue.Add(updateItemInterface(updateItem))
tc.nodeUpdateQueue.Add(updateItem)
}

func (tc *NoExecuteTaintManager) cancelWorkWithEvent(nsName types.NamespacedName) {
Expand Down Expand Up @@ -384,17 +382,26 @@ func (tc *NoExecuteTaintManager) processPodOnNode(
tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime)
}

func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate *podUpdateItem) {
// Delete
if podUpdate.newPod == nil {
pod := podUpdate.oldPod
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
glog.V(4).Infof("Noticed pod deletion: %#v", podNamespacedName)
tc.cancelWorkWithEvent(podNamespacedName)
func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate podUpdateItem) {
pod, err := tc.getPod(podUpdate.podName, podUpdate.podNamespace)
if err != nil {
if apierrors.IsNotFound(err) {
// Delete
podNamespacedName := types.NamespacedName{Namespace: podUpdate.podNamespace, Name: podUpdate.podName}
glog.V(4).Infof("Noticed pod deletion: %#v", podNamespacedName)
tc.cancelWorkWithEvent(podNamespacedName)
return
}
utilruntime.HandleError(fmt.Errorf("could not get pod %s/%s: %v", podUpdate.podName, podUpdate.podNamespace, err))
return
}

// We key the workqueue and shard workers by nodeName. If we don't match the current state we should not be the one processing the current object.
if pod.Spec.NodeName != podUpdate.nodeName {
return
}

// Create or Update
pod := podUpdate.newPod
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
glog.V(4).Infof("Noticed pod update: %#v", podNamespacedName)
nodeName := pod.Spec.NodeName
Expand All @@ -412,23 +419,27 @@ func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate *podUpdateItem) {
if !ok {
return
}
tc.processPodOnNode(podNamespacedName, nodeName, podUpdate.newTolerations, taints, time.Now())
tc.processPodOnNode(podNamespacedName, nodeName, pod.Spec.Tolerations, taints, time.Now())
}

func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate *nodeUpdateItem) {
// Delete
if nodeUpdate.newNode == nil {
node := nodeUpdate.oldNode
glog.V(4).Infof("Noticed node deletion: %#v", node.Name)
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
delete(tc.taintedNodes, node.Name)
func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate nodeUpdateItem) {
node, err := tc.getNode(nodeUpdate.nodeName)
if err != nil {
if apierrors.IsNotFound(err) {
// Delete
glog.V(4).Infof("Noticed node deletion: %#v", nodeUpdate.nodeName)
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
delete(tc.taintedNodes, nodeUpdate.nodeName)
return
}
utilruntime.HandleError(fmt.Errorf("cannot get node %s: %v", nodeUpdate.nodeName, err))
return
}

// Create or Update
glog.V(4).Infof("Noticed node update: %#v", nodeUpdate)
node := nodeUpdate.newNode
taints := nodeUpdate.newTaints
taints := getNoExecuteTaints(node.Spec.Taints)
func() {
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
Expand Down