Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix an accuracy issue of scheduler_pending_pods metric #113946

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/scheduler/framework/types.go
Expand Up @@ -115,6 +115,8 @@ func (pqi *QueuedPodInfo) DeepCopy() *QueuedPodInfo {
Timestamp: pqi.Timestamp,
Attempts: pqi.Attempts,
InitialAttemptTimestamp: pqi.InitialAttemptTimestamp,
UnschedulablePlugins: pqi.UnschedulablePlugins.Clone(),
Gated: pqi.Gated,
}
}

Expand Down
37 changes: 22 additions & 15 deletions pkg/scheduler/internal/queue/scheduling_queue.go
Expand Up @@ -368,12 +368,13 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error {
defer p.lock.Unlock()

pInfo := p.newQueuedPodInfo(pod)
gated := pInfo.Gated
if added, err := p.addToActiveQ(pInfo); !added {
return err
}
if p.unschedulablePods.get(pod) != nil {
klog.ErrorS(nil, "Error: pod is already in the unschedulable queue", "pod", klog.KObj(pod))
p.unschedulablePods.delete(pInfo)
p.unschedulablePods.delete(pod, gated)
}
// Delete pod from backoffQ if it is backing off
if err := p.podBackoffQ.Delete(pInfo); err == nil {
Expand Down Expand Up @@ -428,10 +429,11 @@ func (p *PriorityQueue) activate(pod *v1.Pod) bool {
return false
}

gated := pInfo.Gated
if added, _ := p.addToActiveQ(pInfo); !added {
return false
}
p.unschedulablePods.delete(pInfo)
p.unschedulablePods.delete(pInfo.Pod, gated)
p.podBackoffQ.Delete(pInfo)
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", ForceActivate).Inc()
p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil)
Expand Down Expand Up @@ -621,17 +623,18 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
pInfo := updatePod(usPodInfo, newPod)
p.PodNominator.UpdateNominatedPod(oldPod, pInfo.PodInfo)
if isPodUpdated(oldPod, newPod) {
gated := usPodInfo.Gated
if p.isPodBackingoff(usPodInfo) {
if err := p.podBackoffQ.Add(pInfo); err != nil {
return err
}
p.unschedulablePods.delete(usPodInfo)
p.unschedulablePods.delete(usPodInfo.Pod, gated)
klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", PodUpdate, "queue", backoffQName)
} else {
if added, err := p.addToActiveQ(pInfo); !added {
return err
}
p.unschedulablePods.delete(usPodInfo)
p.unschedulablePods.delete(usPodInfo.Pod, gated)
klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", BackoffComplete, "queue", activeQName)
p.cond.Broadcast()
}
Expand Down Expand Up @@ -663,7 +666,9 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error {
if err := p.activeQ.Delete(pInfo); err != nil {
// The item was probably not found in the activeQ.
p.podBackoffQ.Delete(pInfo)
p.unschedulablePods.delete(pInfo)
if pInfo = p.unschedulablePods.get(pod); pInfo != nil {
p.unschedulablePods.delete(pod, pInfo.Gated)
}
}
return nil
}
Expand Down Expand Up @@ -718,14 +723,15 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.
} else {
klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", backoffQName)
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event.Label).Inc()
p.unschedulablePods.delete(pInfo)
p.unschedulablePods.delete(pod, pInfo.Gated)
}
} else {
gated := pInfo.Gated
if added, _ := p.addToActiveQ(pInfo); added {
klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQName)
activated = true
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event.Label).Inc()
p.unschedulablePods.delete(pInfo)
p.unschedulablePods.delete(pod, gated)
}
}
}
Expand Down Expand Up @@ -875,7 +881,7 @@ type UnschedulablePods struct {
unschedulableRecorder, gatedRecorder metrics.MetricRecorder
}

// Add adds a pod to the unschedulable podInfoMap.
// addOrUpdate adds a pod to the unschedulable podInfoMap.
func (u *UnschedulablePods) addOrUpdate(pInfo *framework.QueuedPodInfo) {
podID := u.keyFunc(pInfo.Pod)
if _, exists := u.podInfoMap[podID]; !exists {
Expand All @@ -888,20 +894,21 @@ func (u *UnschedulablePods) addOrUpdate(pInfo *framework.QueuedPodInfo) {
u.podInfoMap[podID] = pInfo
}

// Delete deletes a pod from the unschedulable podInfoMap.
func (u *UnschedulablePods) delete(pInfo *framework.QueuedPodInfo) {
podID := u.keyFunc(pInfo.Pod)
// delete deletes a pod from the unschedulable podInfoMap.
// The `gated` parameter is used to figure out which metric should be decreased.
func (u *UnschedulablePods) delete(pod *v1.Pod, gated bool) {
podID := u.keyFunc(pod)
if _, exists := u.podInfoMap[podID]; exists {
if pInfo.Gated && u.gatedRecorder != nil {
if gated && u.gatedRecorder != nil {
u.gatedRecorder.Dec()
} else if !pInfo.Gated && u.unschedulableRecorder != nil {
} else if !gated && u.unschedulableRecorder != nil {
u.unschedulableRecorder.Dec()
}
}
delete(u.podInfoMap, podID)
}

// Get returns the QueuedPodInfo if a pod with the same key as the key of the given "pod"
// get returns the QueuedPodInfo if a pod with the same key as the key of the given "pod"
// is found in the map. It returns nil otherwise.
func (u *UnschedulablePods) get(pod *v1.Pod) *framework.QueuedPodInfo {
podKey := u.keyFunc(pod)
Expand All @@ -911,7 +918,7 @@ func (u *UnschedulablePods) get(pod *v1.Pod) *framework.QueuedPodInfo {
return nil
}

// Clear removes all the entries from the unschedulable podInfoMap.
// clear removes all the entries from the unschedulable podInfoMap.
func (u *UnschedulablePods) clear() {
u.podInfoMap = make(map[string]*framework.QueuedPodInfo)
if u.unschedulableRecorder != nil {
Expand Down
85 changes: 72 additions & 13 deletions pkg/scheduler/internal/queue/scheduling_queue_test.go
Expand Up @@ -456,8 +456,10 @@ func (pl *preEnqueuePlugin) Name() string {

func (pl *preEnqueuePlugin) PreEnqueue(ctx context.Context, p *v1.Pod) *framework.Status {
for _, allowed := range pl.allowlists {
if strings.Contains(p.Name, allowed) {
return nil
for label := range p.Labels {
if label == allowed {
return nil
}
}
}
return framework.NewStatus(framework.UnschedulableAndUnresolvable, "pod name not in allowlists")
Expand All @@ -473,14 +475,14 @@ func TestPriorityQueue_addToActiveQ(t *testing.T) {
}{
{
name: "no plugins registered",
pod: st.MakePod().Name("p").Obj(),
pod: st.MakePod().Name("p").Label("p", "").Obj(),
wantUnschedulablePods: 0,
wantSuccess: true,
},
{
name: "preEnqueue plugin registered, pod name not in allowlists",
plugins: []framework.PreEnqueuePlugin{&preEnqueuePlugin{}, &preEnqueuePlugin{}},
pod: st.MakePod().Name("p").Obj(),
pod: st.MakePod().Name("p").Label("p", "").Obj(),
wantUnschedulablePods: 1,
wantSuccess: false,
},
Expand All @@ -490,7 +492,7 @@ func TestPriorityQueue_addToActiveQ(t *testing.T) {
&preEnqueuePlugin{allowlists: []string{"foo", "bar"}},
&preEnqueuePlugin{allowlists: []string{"foo"}},
},
pod: st.MakePod().Name("bar").Obj(),
pod: st.MakePod().Name("bar").Label("bar", "").Obj(),
wantUnschedulablePods: 1,
wantSuccess: false,
},
Expand All @@ -500,7 +502,7 @@ func TestPriorityQueue_addToActiveQ(t *testing.T) {
&preEnqueuePlugin{allowlists: []string{"foo", "bar"}},
&preEnqueuePlugin{allowlists: []string{"bar"}},
},
pod: st.MakePod().Name("bar").Obj(),
pod: st.MakePod().Name("bar").Label("bar", "").Obj(),
wantUnschedulablePods: 0,
wantSuccess: true,
},
Expand Down Expand Up @@ -1056,7 +1058,7 @@ func TestUnschedulablePodsMap(t *testing.T) {
}
}
for _, p := range test.podsToDelete {
upm.delete(newQueuedPodInfoForLookup(p))
upm.delete(p, false)
}
if !reflect.DeepEqual(upm.podInfoMap, test.expectedMapAfterDelete) {
t.Errorf("Unexpected map after deleting pods. Expected: %v, got: %v",
Expand Down Expand Up @@ -1406,6 +1408,14 @@ var (
}
queue.unschedulablePods.addOrUpdate(pInfo)
}
deletePod = func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
queue.Delete(pInfo.Pod)
}
updatePodQueueable = func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
newPod := pInfo.Pod.DeepCopy()
newPod.Labels = map[string]string{"queueable": ""}
queue.Update(pInfo.Pod, newPod)
}
addPodBackoffQ = func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
queue.podBackoffQ.Add(pInfo)
}
Expand Down Expand Up @@ -1526,18 +1536,18 @@ func TestPendingPodsMetric(t *testing.T) {
metrics.Register()
total := 60
queueableNum := 50
queueable := "queueable"
queueable, failme := "queueable", "failme"
// First 50 Pods are queueable.
pInfos := makeQueuedPodInfos(queueableNum, queueable, timestamp)
pInfos := makeQueuedPodInfos(queueableNum, "x", queueable, timestamp)
// The last 10 Pods are not queueable.
gated := makeQueuedPodInfos(total-queueableNum, "fail-me", timestamp)
gated := makeQueuedPodInfos(total-queueableNum, "y", failme, timestamp)
// Manually mark them as gated=true.
for _, pInfo := range gated {
pInfo.Gated = true
}
pInfos = append(pInfos, gated...)
totalWithDelay := 20
pInfosWithDelay := makeQueuedPodInfos(totalWithDelay, queueable, timestamp.Add(2*time.Second))
pInfosWithDelay := makeQueuedPodInfos(totalWithDelay, "z", queueable, timestamp.Add(2*time.Second))

tests := []struct {
name string
Expand Down Expand Up @@ -1656,6 +1666,54 @@ scheduler_pending_pods{queue="active"} 50
scheduler_pending_pods{queue="backoff"} 0
scheduler_pending_pods{queue="gated"} 0
scheduler_pending_pods{queue="unschedulable"} 0
`,
},
{
name: "add pods to activeQ/unschedulablePods and then delete some Pods",
operations: []operation{
addPodActiveQ,
addPodUnschedulablePods,
deletePod,
deletePod,
deletePod,
},
operands: [][]*framework.QueuedPodInfo{
pInfos[:30],
pInfos[30:],
pInfos[:2],
pInfos[30:33],
pInfos[50:54],
},
metricsName: "scheduler_pending_pods",
wants: `
# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods that the scheduler attempted to schedule and failed; 'gated' is the number of unschedulable pods that the scheduler never attempted to schedule because they are gated.
# TYPE scheduler_pending_pods gauge
scheduler_pending_pods{queue="active"} 28
scheduler_pending_pods{queue="backoff"} 0
scheduler_pending_pods{queue="gated"} 6
scheduler_pending_pods{queue="unschedulable"} 17
`,
},
{
name: "add pods to activeQ/unschedulablePods and then update some Pods as queueable",
operations: []operation{
addPodActiveQ,
addPodUnschedulablePods,
updatePodQueueable,
},
operands: [][]*framework.QueuedPodInfo{
pInfos[:30],
pInfos[30:],
pInfos[50:55],
},
metricsName: "scheduler_pending_pods",
wants: `
# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods that the scheduler attempted to schedule and failed; 'gated' is the number of unschedulable pods that the scheduler never attempted to schedule because they are gated.
# TYPE scheduler_pending_pods gauge
scheduler_pending_pods{queue="active"} 35
scheduler_pending_pods{queue="backoff"} 0
scheduler_pending_pods{queue="gated"} 5
scheduler_pending_pods{queue="unschedulable"} 20
`,
},
}
Expand Down Expand Up @@ -2094,11 +2152,12 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) {
}
}

func makeQueuedPodInfos(num int, namePrefix string, timestamp time.Time) []*framework.QueuedPodInfo {
func makeQueuedPodInfos(num int, namePrefix, label string, timestamp time.Time) []*framework.QueuedPodInfo {
var pInfos = make([]*framework.QueuedPodInfo, 0, num)
for i := 1; i <= num; i++ {
p := &framework.QueuedPodInfo{
PodInfo: mustNewPodInfo(st.MakePod().Name(fmt.Sprintf("%v-%d", namePrefix, i)).Namespace(fmt.Sprintf("ns%d", i)).UID(fmt.Sprintf("tp-%d", i)).Obj()),
PodInfo: mustNewPodInfo(
st.MakePod().Name(fmt.Sprintf("%v-%d", namePrefix, i)).Namespace(fmt.Sprintf("ns%d", i)).Label(label, "").UID(fmt.Sprintf("tp-%d", i)).Obj()),
Timestamp: timestamp,
UnschedulablePlugins: sets.NewString(),
}
Expand Down