Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #116395: One lock among PodNominator and SchedulingQueue #116438

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/scheduler/framework/interface.go
Expand Up @@ -608,6 +608,9 @@ type Framework interface {

// PercentageOfNodesToScore returns percentageOfNodesToScore associated to a profile.
PercentageOfNodesToScore() *int32

// SetPodNominator sets the PodNominator
SetPodNominator(nominator PodNominator)
}

// Handle provides data and some tools that plugins can use. It is
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/framework/runtime/framework.go
Expand Up @@ -368,6 +368,10 @@ func NewFramework(r Registry, profile *config.KubeSchedulerProfile, stopCh <-cha
return f, nil
}

func (f *frameworkImpl) SetPodNominator(n framework.PodNominator) {
f.PodNominator = n
}

// getScoreWeights makes sure that, between MultiPoint-Score plugin weights and individual Score
// plugin weights there is not an overflow of MaxTotalScore.
func getScoreWeights(f *frameworkImpl, pluginsMap map[string]framework.Plugin, plugins []config.Plugin) error {
Expand Down
77 changes: 42 additions & 35 deletions pkg/scheduler/internal/queue/scheduling_queue.go
Expand Up @@ -34,7 +34,6 @@ import (
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -141,8 +140,7 @@ func NominatedNodeName(pod *v1.Pod) string {
// - unschedulablePods holds pods that were already attempted for scheduling and
// are currently determined to be unschedulable.
type PriorityQueue struct {
// PodNominator abstracts the operations to maintain nominated Pods.
framework.PodNominator
*nominator

stop chan struct{}
clock clock.Clock
Expand All @@ -154,7 +152,6 @@ type PriorityQueue struct {
// the maximum time a pod can stay in the unschedulablePods.
podMaxInUnschedulablePodsDuration time.Duration

lock sync.RWMutex
cond sync.Cond

// activeQ is heap structure that scheduler actively looks at to find pods to
Expand Down Expand Up @@ -190,7 +187,7 @@ type priorityQueueOptions struct {
podInitialBackoffDuration time.Duration
podMaxBackoffDuration time.Duration
podMaxInUnschedulablePodsDuration time.Duration
podNominator framework.PodNominator
podLister listersv1.PodLister
clusterEventMap map[framework.ClusterEvent]sets.String
preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin
}
Expand Down Expand Up @@ -219,10 +216,10 @@ func WithPodMaxBackoffDuration(duration time.Duration) Option {
}
}

// WithPodNominator sets pod nominator for PriorityQueue.
func WithPodNominator(pn framework.PodNominator) Option {
// WithPodLister sets pod lister for PriorityQueue.
func WithPodLister(pl listersv1.PodLister) Option {
return func(o *priorityQueueOptions) {
o.podNominator = pn
o.podLister = pl
}
}

Expand Down Expand Up @@ -274,6 +271,9 @@ func NewPriorityQueue(
opts ...Option,
) *PriorityQueue {
options := defaultPriorityQueueOptions
if options.podLister == nil {
options.podLister = informerFactory.Core().V1().Pods().Lister()
}
for _, opt := range opts {
opt(&options)
}
Expand All @@ -284,12 +284,8 @@ func NewPriorityQueue(
return lessFn(pInfo1, pInfo2)
}

if options.podNominator == nil {
options.podNominator = NewPodNominator(informerFactory.Core().V1().Pods().Lister())
}

pq := &PriorityQueue{
PodNominator: options.podNominator,
nominator: newPodNominator(options.podLister),
clock: options.clock,
stop: make(chan struct{}),
podInitialBackoffDuration: options.podInitialBackoffDuration,
Expand Down Expand Up @@ -382,7 +378,7 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error {
}
klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", PodAdd, "queue", activeQName)
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc()
p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil)
p.addNominatedPodUnlocked(pInfo.PodInfo, nil)
p.cond.Broadcast()

return nil
Expand Down Expand Up @@ -436,7 +432,7 @@ func (p *PriorityQueue) activate(pod *v1.Pod) bool {
p.unschedulablePods.delete(pInfo.Pod, gated)
p.podBackoffQ.Delete(pInfo)
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", ForceActivate).Inc()
p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil)
p.addNominatedPodUnlocked(pInfo.PodInfo, nil)
return true
}

Expand Down Expand Up @@ -497,7 +493,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodI

}

p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil)
p.addNominatedPodUnlocked(pInfo.PodInfo, nil)
return nil
}

Expand Down Expand Up @@ -606,22 +602,22 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
// If the pod is already in the active queue, just update it there.
if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists {
pInfo := updatePod(oldPodInfo, newPod)
p.PodNominator.UpdateNominatedPod(oldPod, pInfo.PodInfo)
p.updateNominatedPodUnlocked(oldPod, pInfo.PodInfo)
return p.activeQ.Update(pInfo)
}

// If the pod is in the backoff queue, update it there.
if oldPodInfo, exists, _ := p.podBackoffQ.Get(oldPodInfo); exists {
pInfo := updatePod(oldPodInfo, newPod)
p.PodNominator.UpdateNominatedPod(oldPod, pInfo.PodInfo)
p.updateNominatedPodUnlocked(oldPod, pInfo.PodInfo)
return p.podBackoffQ.Update(pInfo)
}
}

// If the pod is in the unschedulable queue, updating it may make it schedulable.
if usPodInfo := p.unschedulablePods.get(newPod); usPodInfo != nil {
pInfo := updatePod(usPodInfo, newPod)
p.PodNominator.UpdateNominatedPod(oldPod, pInfo.PodInfo)
p.updateNominatedPodUnlocked(oldPod, pInfo.PodInfo)
if isPodUpdated(oldPod, newPod) {
gated := usPodInfo.Gated
if p.isPodBackingoff(usPodInfo) {
Expand Down Expand Up @@ -650,7 +646,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
if added, err := p.addToActiveQ(pInfo); !added {
return err
}
p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil)
p.addNominatedPodUnlocked(pInfo.PodInfo, nil)
klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", PodUpdate, "queue", activeQName)
p.cond.Broadcast()
return nil
Expand All @@ -661,7 +657,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
func (p *PriorityQueue) Delete(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
p.PodNominator.DeleteNominatedPodIfExists(pod)
p.deleteNominatedPodIfExistsUnlocked(pod)
pInfo := newQueuedPodInfoForLookup(pod)
if err := p.activeQ.Delete(pInfo); err != nil {
// The item was probably not found in the activeQ.
Expand Down Expand Up @@ -745,8 +741,7 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.
// any affinity term that matches "pod".
// NOTE: this function assumes lock has been acquired in caller.
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*framework.QueuedPodInfo {
var nsLabels labels.Set
nsLabels = interpodaffinity.GetNamespaceLabelsSnapshot(pod.Namespace, p.nsLister)
nsLabels := interpodaffinity.GetNamespaceLabelsSnapshot(pod.Namespace, p.nsLister)

var podsToMove []*framework.QueuedPodInfo
for _, pInfo := range p.unschedulablePods.podInfoMap {
Expand Down Expand Up @@ -793,26 +788,30 @@ func (p *PriorityQueue) Close() {

// DeleteNominatedPodIfExists deletes <pod> from nominatedPods.
func (npm *nominator) DeleteNominatedPodIfExists(pod *v1.Pod) {
npm.Lock()
npm.lock.Lock()
npm.deleteNominatedPodIfExistsUnlocked(pod)
npm.lock.Unlock()
}

func (npm *nominator) deleteNominatedPodIfExistsUnlocked(pod *v1.Pod) {
npm.delete(pod)
npm.Unlock()
}

// AddNominatedPod adds a pod to the nominated pods of the given node.
// This is called during the preemption process after a node is nominated to run
// the pod. We update the structure before sending a request to update the pod
// object to avoid races with the following scheduling cycles.
func (npm *nominator) AddNominatedPod(pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) {
npm.Lock()
npm.add(pi, nominatingInfo)
npm.Unlock()
npm.lock.Lock()
npm.addNominatedPodUnlocked(pi, nominatingInfo)
npm.lock.Unlock()
}

// NominatedPodsForNode returns a copy of pods that are nominated to run on the given node,
// but they are waiting for other pods to be removed from the node.
func (npm *nominator) NominatedPodsForNode(nodeName string) []*framework.PodInfo {
npm.RLock()
defer npm.RUnlock()
npm.lock.RLock()
defer npm.lock.RUnlock()
// Make a copy of the nominated Pods so the caller can mutate safely.
pods := make([]*framework.PodInfo, len(npm.nominatedPods[nodeName]))
for i := 0; i < len(pods); i++ {
Expand Down Expand Up @@ -954,10 +953,10 @@ type nominator struct {
// nominated.
nominatedPodToNode map[types.UID]string

sync.RWMutex
lock sync.RWMutex
}

func (npm *nominator) add(pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) {
func (npm *nominator) addNominatedPodUnlocked(pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) {
// Always delete the pod if it already exists, to ensure we never store more than
// one instance of the pod.
npm.delete(pi.Pod)
Expand Down Expand Up @@ -1014,8 +1013,12 @@ func (npm *nominator) delete(p *v1.Pod) {

// UpdateNominatedPod updates the <oldPod> with <newPod>.
func (npm *nominator) UpdateNominatedPod(oldPod *v1.Pod, newPodInfo *framework.PodInfo) {
npm.Lock()
defer npm.Unlock()
npm.lock.Lock()
defer npm.lock.Unlock()
npm.updateNominatedPodUnlocked(oldPod, newPodInfo)
}

func (npm *nominator) updateNominatedPodUnlocked(oldPod *v1.Pod, newPodInfo *framework.PodInfo) {
// In some cases, an Update event with no "NominatedNode" present is received right
// after a node("NominatedNode") is reserved for this pod in memory.
// In this case, we need to keep reserving the NominatedNode when updating the pod pointer.
Expand All @@ -1036,13 +1039,17 @@ func (npm *nominator) UpdateNominatedPod(oldPod *v1.Pod, newPodInfo *framework.P
// We update irrespective of the nominatedNodeName changed or not, to ensure
// that pod pointer is updated.
npm.delete(oldPod)
npm.add(newPodInfo, nominatingInfo)
npm.addNominatedPodUnlocked(newPodInfo, nominatingInfo)
}

// NewPodNominator creates a nominator as a backing of framework.PodNominator.
// A podLister is passed in so as to check if the pod exists
// before adding its nominatedNode info.
func NewPodNominator(podLister listersv1.PodLister) framework.PodNominator {
return newPodNominator(podLister)
}

func newPodNominator(podLister listersv1.PodLister) *nominator {
return &nominator{
podLister: podLister,
nominatedPods: make(map[string][]*framework.PodInfo),
Expand Down