Skip to content

Commit

Permalink
One lock among PodNominator and SchedulingQueue
Browse files Browse the repository at this point in the history
Change-Id: I17fe5da40250e42c04124c25b530ce6c8dea4154
  • Loading branch information
alculquicondor committed Mar 9, 2023
1 parent c6f8a85 commit 42c5d12
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 63 deletions.
3 changes: 3 additions & 0 deletions pkg/scheduler/framework/interface.go
Expand Up @@ -573,6 +573,9 @@ type Framework interface {

// ProfileName returns the profile name associated to this framework.
ProfileName() string

// SetPodNominator sets the PodNominator
SetPodNominator(nominator PodNominator)
}

// Handle provides data and some tools that plugins can use. It is
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/framework/runtime/framework.go
Expand Up @@ -363,6 +363,10 @@ func NewFramework(r Registry, profile *config.KubeSchedulerProfile, stopCh <-cha
return f, nil
}

func (f *frameworkImpl) SetPodNominator(n framework.PodNominator) {
f.PodNominator = n
}

// getScoreWeights makes sure that, between MultiPoint-Score plugin weights and individual Score
// plugin weights there is not an overflow of MaxTotalScore.
func getScoreWeights(f *frameworkImpl, pluginsMap map[string]framework.Plugin, plugins []config.Plugin) error {
Expand Down
77 changes: 42 additions & 35 deletions pkg/scheduler/internal/queue/scheduling_queue.go
Expand Up @@ -33,7 +33,6 @@ import (
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -133,8 +132,7 @@ func NominatedNodeName(pod *v1.Pod) string {
// - unschedulablePods holds pods that were already attempted for scheduling and
// are currently determined to be unschedulable.
type PriorityQueue struct {
// PodNominator abstracts the operations to maintain nominated Pods.
framework.PodNominator
*nominator

stop chan struct{}
clock clock.Clock
Expand All @@ -146,7 +144,6 @@ type PriorityQueue struct {
// the maximum time a pod can stay in the unschedulablePods.
podMaxInUnschedulablePodsDuration time.Duration

lock sync.RWMutex
cond sync.Cond

// activeQ is heap structure that scheduler actively looks at to find pods to
Expand Down Expand Up @@ -180,7 +177,7 @@ type priorityQueueOptions struct {
podInitialBackoffDuration time.Duration
podMaxBackoffDuration time.Duration
podMaxInUnschedulablePodsDuration time.Duration
podNominator framework.PodNominator
podLister listersv1.PodLister
clusterEventMap map[framework.ClusterEvent]sets.String
}

Expand Down Expand Up @@ -208,10 +205,10 @@ func WithPodMaxBackoffDuration(duration time.Duration) Option {
}
}

// WithPodNominator sets pod nominator for PriorityQueue.
func WithPodNominator(pn framework.PodNominator) Option {
// WithPodLister sets pod lister for PriorityQueue.
func WithPodLister(pl listersv1.PodLister) Option {
return func(o *priorityQueueOptions) {
o.podNominator = pn
o.podLister = pl
}
}

Expand Down Expand Up @@ -256,6 +253,9 @@ func NewPriorityQueue(
opts ...Option,
) *PriorityQueue {
options := defaultPriorityQueueOptions
if options.podLister == nil {
options.podLister = informerFactory.Core().V1().Pods().Lister()
}
for _, opt := range opts {
opt(&options)
}
Expand All @@ -266,12 +266,8 @@ func NewPriorityQueue(
return lessFn(pInfo1, pInfo2)
}

if options.podNominator == nil {
options.podNominator = NewPodNominator(informerFactory.Core().V1().Pods().Lister())
}

pq := &PriorityQueue{
PodNominator: options.podNominator,
nominator: newPodNominator(options.podLister),
clock: options.clock,
stop: make(chan struct{}),
podInitialBackoffDuration: options.podInitialBackoffDuration,
Expand Down Expand Up @@ -314,7 +310,7 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error {
klog.ErrorS(nil, "Error: pod is already in the podBackoff queue", "pod", klog.KObj(pod))
}
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc()
p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil)
p.addNominatedPodUnlocked(pInfo.PodInfo, nil)
p.cond.Broadcast()

return nil
Expand Down Expand Up @@ -368,7 +364,7 @@ func (p *PriorityQueue) activate(pod *v1.Pod) bool {
p.unschedulablePods.delete(pod)
p.podBackoffQ.Delete(pInfo)
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", ForceActivate).Inc()
p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil)
p.addNominatedPodUnlocked(pInfo.PodInfo, nil)
return true
}

Expand Down Expand Up @@ -424,7 +420,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodI

}

p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil)
p.addNominatedPodUnlocked(pInfo.PodInfo, nil)
return nil
}

Expand Down Expand Up @@ -531,22 +527,22 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
// If the pod is already in the active queue, just update it there.
if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists {
pInfo := updatePod(oldPodInfo, newPod)
p.PodNominator.UpdateNominatedPod(oldPod, pInfo.PodInfo)
p.updateNominatedPodUnlocked(oldPod, pInfo.PodInfo)
return p.activeQ.Update(pInfo)
}

// If the pod is in the backoff queue, update it there.
if oldPodInfo, exists, _ := p.podBackoffQ.Get(oldPodInfo); exists {
pInfo := updatePod(oldPodInfo, newPod)
p.PodNominator.UpdateNominatedPod(oldPod, pInfo.PodInfo)
p.updateNominatedPodUnlocked(oldPod, pInfo.PodInfo)
return p.podBackoffQ.Update(pInfo)
}
}

// If the pod is in the unschedulable queue, updating it may make it schedulable.
if usPodInfo := p.unschedulablePods.get(newPod); usPodInfo != nil {
pInfo := updatePod(usPodInfo, newPod)
p.PodNominator.UpdateNominatedPod(oldPod, pInfo.PodInfo)
p.updateNominatedPodUnlocked(oldPod, pInfo.PodInfo)
if isPodUpdated(oldPod, newPod) {
if p.isPodBackingoff(usPodInfo) {
if err := p.podBackoffQ.Add(pInfo); err != nil {
Expand All @@ -572,7 +568,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
if err := p.activeQ.Add(pInfo); err != nil {
return err
}
p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil)
p.addNominatedPodUnlocked(pInfo.PodInfo, nil)
p.cond.Broadcast()
return nil
}
Expand All @@ -582,7 +578,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
func (p *PriorityQueue) Delete(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
p.PodNominator.DeleteNominatedPodIfExists(pod)
p.deleteNominatedPodIfExistsUnlocked(pod)
if err := p.activeQ.Delete(newQueuedPodInfoForLookup(pod)); err != nil {
// The item was probably not found in the activeQ.
p.podBackoffQ.Delete(newQueuedPodInfoForLookup(pod))
Expand Down Expand Up @@ -662,8 +658,7 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.
// any affinity term that matches "pod".
// NOTE: this function assumes lock has been acquired in caller.
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*framework.QueuedPodInfo {
var nsLabels labels.Set
nsLabels = interpodaffinity.GetNamespaceLabelsSnapshot(pod.Namespace, p.nsLister)
nsLabels := interpodaffinity.GetNamespaceLabelsSnapshot(pod.Namespace, p.nsLister)

var podsToMove []*framework.QueuedPodInfo
for _, pInfo := range p.unschedulablePods.podInfoMap {
Expand Down Expand Up @@ -707,26 +702,30 @@ func (p *PriorityQueue) Close() {

// DeleteNominatedPodIfExists deletes <pod> from nominatedPods.
func (npm *nominator) DeleteNominatedPodIfExists(pod *v1.Pod) {
npm.Lock()
npm.lock.Lock()
npm.deleteNominatedPodIfExistsUnlocked(pod)
npm.lock.Unlock()
}

func (npm *nominator) deleteNominatedPodIfExistsUnlocked(pod *v1.Pod) {
npm.delete(pod)
npm.Unlock()
}

// AddNominatedPod adds a pod to the nominated pods of the given node.
// This is called during the preemption process after a node is nominated to run
// the pod. We update the structure before sending a request to update the pod
// object to avoid races with the following scheduling cycles.
func (npm *nominator) AddNominatedPod(pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) {
npm.Lock()
npm.add(pi, nominatingInfo)
npm.Unlock()
npm.lock.Lock()
npm.addNominatedPodUnlocked(pi, nominatingInfo)
npm.lock.Unlock()
}

// NominatedPodsForNode returns a copy of pods that are nominated to run on the given node,
// but they are waiting for other pods to be removed from the node.
func (npm *nominator) NominatedPodsForNode(nodeName string) []*framework.PodInfo {
npm.RLock()
defer npm.RUnlock()
npm.lock.RLock()
defer npm.lock.RUnlock()
// Make a copy of the nominated Pods so the caller can mutate safely.
pods := make([]*framework.PodInfo, len(npm.nominatedPods[nodeName]))
for i := 0; i < len(pods); i++ {
Expand Down Expand Up @@ -852,10 +851,10 @@ type nominator struct {
// nominated.
nominatedPodToNode map[types.UID]string

sync.RWMutex
lock sync.RWMutex
}

func (npm *nominator) add(pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) {
func (npm *nominator) addNominatedPodUnlocked(pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) {
// Always delete the pod if it already exists, to ensure we never store more than
// one instance of the pod.
npm.delete(pi.Pod)
Expand Down Expand Up @@ -912,8 +911,12 @@ func (npm *nominator) delete(p *v1.Pod) {

// UpdateNominatedPod updates the <oldPod> with <newPod>.
func (npm *nominator) UpdateNominatedPod(oldPod *v1.Pod, newPodInfo *framework.PodInfo) {
npm.Lock()
defer npm.Unlock()
npm.lock.Lock()
defer npm.lock.Unlock()
npm.updateNominatedPodUnlocked(oldPod, newPodInfo)
}

func (npm *nominator) updateNominatedPodUnlocked(oldPod *v1.Pod, newPodInfo *framework.PodInfo) {
// In some cases, an Update event with no "NominatedNode" present is received right
// after a node("NominatedNode") is reserved for this pod in memory.
// In this case, we need to keep reserving the NominatedNode when updating the pod pointer.
Expand All @@ -934,13 +937,17 @@ func (npm *nominator) UpdateNominatedPod(oldPod *v1.Pod, newPodInfo *framework.P
// We update irrespective of the nominatedNodeName changed or not, to ensure
// that pod pointer is updated.
npm.delete(oldPod)
npm.add(newPodInfo, nominatingInfo)
npm.addNominatedPodUnlocked(newPodInfo, nominatingInfo)
}

// NewPodNominator creates a nominator as a backing of framework.PodNominator.
// A podLister is passed in so as to check if the pod exists
// before adding its nominatedNode info.
func NewPodNominator(podLister listersv1.PodLister) framework.PodNominator {
return newPodNominator(podLister)
}

func newPodNominator(podLister listersv1.PodLister) *nominator {
return &nominator{
podLister: podLister,
nominatedPods: make(map[string][]*framework.PodInfo),
Expand Down

0 comments on commit 42c5d12

Please sign in to comment.