From 55a4525c3e745f685aea4758ac6b790c6d5eae95 Mon Sep 17 00:00:00 2001 From: xuzhonghu Date: Wed, 8 May 2019 09:55:08 +0800 Subject: [PATCH 1/4] Implement queue Capability: donot allow podgroup enqueue when queue capability reached --- pkg/scheduler/actions/enqueue/enqueue.go | 12 ++++++------ pkg/scheduler/api/queue_info.go | 3 ++- pkg/scheduler/plugins/proportion/proportion.go | 4 ++++ 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/pkg/scheduler/actions/enqueue/enqueue.go b/pkg/scheduler/actions/enqueue/enqueue.go index 70ac372914..60a11cce71 100644 --- a/pkg/scheduler/actions/enqueue/enqueue.go +++ b/pkg/scheduler/actions/enqueue/enqueue.go @@ -100,14 +100,14 @@ func (enqueue *enqueueAction) Execute(ssn *framework.Session) { job := jobs.Pop().(*api.JobInfo) inqueue := false - if len(job.TaskStatusIndex[api.Pending]) != 0 { + + if job.PodGroup.Spec.MinResources == nil { inqueue = true } else { - if job.PodGroup.Spec.MinResources == nil { - inqueue = true - } else { - pgResource := api.NewResource(*job.PodGroup.Spec.MinResources) - + pgResource := api.NewResource(*job.PodGroup.Spec.MinResources) + // The queue resource quota limit has not reached + if pgResource.Clone().Add(ssn.Queues[queue.UID].Allocated).LessEqual(api.NewResource(queue.Queue.Spec.Capability)) { + // the pod group required minimal resources less than cluster idle if pgResource.LessEqual(nodesIdleRes) { nodesIdleRes.Sub(pgResource) inqueue = true diff --git a/pkg/scheduler/api/queue_info.go b/pkg/scheduler/api/queue_info.go index e0014c30a3..3131051f04 100644 --- a/pkg/scheduler/api/queue_info.go +++ b/pkg/scheduler/api/queue_info.go @@ -30,7 +30,8 @@ type QueueInfo struct { UID QueueID Name string - Weight int32 + Weight int32 + Allocated *Resource Queue *arbcorev1.Queue } diff --git a/pkg/scheduler/plugins/proportion/proportion.go b/pkg/scheduler/plugins/proportion/proportion.go index 2059f78de7..8fd380a3e9 100644 --- a/pkg/scheduler/plugins/proportion/proportion.go +++ b/pkg/scheduler/plugins/proportion/proportion.go @@ -98,6 +98,10 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { } } + for id, attr := range pp.queueOpts { + ssn.Queues[id].Allocated = attr.allocated + } + remaining := pp.totalResource.Clone() meet := map[api.QueueID]struct{}{} for { From b1607c9b263793cb4fb9501aa9bd103353fd3175 Mon Sep 17 00:00:00 2001 From: xuzhonghu Date: Thu, 9 May 2019 19:29:57 +0800 Subject: [PATCH 2/4] fix comments --- pkg/scheduler/actions/enqueue/enqueue.go | 10 ++---- pkg/scheduler/api/queue_info.go | 3 +- pkg/scheduler/framework/session.go | 31 ++++++++++--------- pkg/scheduler/framework/session_plugins.go | 23 ++++++++++++++ .../plugins/proportion/proportion.go | 17 +++++++--- 5 files changed, 56 insertions(+), 28 deletions(-) diff --git a/pkg/scheduler/actions/enqueue/enqueue.go b/pkg/scheduler/actions/enqueue/enqueue.go index 60a11cce71..7e1927347e 100644 --- a/pkg/scheduler/actions/enqueue/enqueue.go +++ b/pkg/scheduler/actions/enqueue/enqueue.go @@ -105,13 +105,9 @@ func (enqueue *enqueueAction) Execute(ssn *framework.Session) { inqueue = true } else { pgResource := api.NewResource(*job.PodGroup.Spec.MinResources) - // The queue resource quota limit has not reached - if pgResource.Clone().Add(ssn.Queues[queue.UID].Allocated).LessEqual(api.NewResource(queue.Queue.Spec.Capability)) { - // the pod group required minimal resources less than cluster idle - if pgResource.LessEqual(nodesIdleRes) { - nodesIdleRes.Sub(pgResource) - inqueue = true - } + if ssn.JobEnqueueable(job) && pgResource.LessEqual(nodesIdleRes) { + nodesIdleRes.Sub(pgResource) + inqueue = true } } diff --git a/pkg/scheduler/api/queue_info.go b/pkg/scheduler/api/queue_info.go index 3131051f04..e0014c30a3 100644 --- a/pkg/scheduler/api/queue_info.go +++ b/pkg/scheduler/api/queue_info.go @@ -30,8 +30,7 @@ type QueueInfo struct { UID QueueID Name string - Weight int32 - Allocated *Resource + Weight int32 Queue *arbcorev1.Queue } diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index 3c23f20db8..46dbac1517 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -45,21 +45,22 @@ type Session struct { Backlog []*api.JobInfo Tiers []conf.Tier - plugins map[string]Plugin - eventHandlers []*EventHandler - jobOrderFns map[string]api.CompareFn - queueOrderFns map[string]api.CompareFn - taskOrderFns map[string]api.CompareFn - predicateFns map[string]api.PredicateFn - nodeOrderFns map[string]api.NodeOrderFn - nodeMapFns map[string]api.NodeMapFn - nodeReduceFns map[string]api.NodeReduceFn - preemptableFns map[string]api.EvictableFn - reclaimableFns map[string]api.EvictableFn - overusedFns map[string]api.ValidateFn - jobReadyFns map[string]api.ValidateFn - jobPipelinedFns map[string]api.ValidateFn - jobValidFns map[string]api.ValidateExFn + plugins map[string]Plugin + eventHandlers []*EventHandler + jobOrderFns map[string]api.CompareFn + queueOrderFns map[string]api.CompareFn + taskOrderFns map[string]api.CompareFn + predicateFns map[string]api.PredicateFn + nodeOrderFns map[string]api.NodeOrderFn + nodeMapFns map[string]api.NodeMapFn + nodeReduceFns map[string]api.NodeReduceFn + preemptableFns map[string]api.EvictableFn + reclaimableFns map[string]api.EvictableFn + overusedFns map[string]api.ValidateFn + jobReadyFns map[string]api.ValidateFn + jobPipelinedFns map[string]api.ValidateFn + jobValidFns map[string]api.ValidateExFn + jobEnqueueableFns map[string]api.ValidateFn } func openSession(cache cache.Cache) *Session { diff --git a/pkg/scheduler/framework/session_plugins.go b/pkg/scheduler/framework/session_plugins.go index 448c956280..2605795f00 100644 --- a/pkg/scheduler/framework/session_plugins.go +++ b/pkg/scheduler/framework/session_plugins.go @@ -86,6 +86,11 @@ func (ssn *Session) AddJobValidFn(name string, fn api.ValidateExFn) { ssn.jobValidFns[name] = fn } +// AddJobValidFn add jobvalid function +func (ssn *Session) AddJobEnqueueableFn(name string, fn api.ValidateFn) { + ssn.jobEnqueueableFns[name] = fn +} + // Reclaimable invoke reclaimable function of the plugins func (ssn *Session) Reclaimable(reclaimer *api.TaskInfo, reclaimees []*api.TaskInfo) []*api.TaskInfo { var victims []*api.TaskInfo @@ -249,6 +254,24 @@ func (ssn *Session) JobValid(obj interface{}) *api.ValidateResult { return nil } +// JobEnqueueable invoke jobEnqueueableFns function of the plugins +func (ssn *Session) JobEnqueueable(obj interface{}) bool { + for _, tier := range ssn.Tiers { + for _, plugin := range tier.Plugins { + fn, found := ssn.jobEnqueueableFns[plugin.Name] + if !found { + continue + } + + if res := fn(obj); !res { + return res + } + } + } + + return true +} + // JobOrderFn invoke joborder function of the plugins func (ssn *Session) JobOrderFn(l, r interface{}) bool { for _, tier := range ssn.Tiers { diff --git a/pkg/scheduler/plugins/proportion/proportion.go b/pkg/scheduler/plugins/proportion/proportion.go index 8fd380a3e9..75bb1537ed 100644 --- a/pkg/scheduler/plugins/proportion/proportion.go +++ b/pkg/scheduler/plugins/proportion/proportion.go @@ -98,10 +98,6 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { } } - for id, attr := range pp.queueOpts { - ssn.Queues[id].Allocated = attr.allocated - } - remaining := pp.totalResource.Clone() meet := map[api.QueueID]struct{}{} for { @@ -202,6 +198,19 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { return overused }) + ssn.AddJobEnqueueableFn(pp.Name(), func(obj interface{}) bool { + job := obj.(*api.JobInfo) + queueID := job.Queue + attr := pp.queueOpts[queueID] + queue := ssn.Queues[queueID] + pgResource := api.NewResource(*job.PodGroup.Spec.MinResources) + // The queue resource quota limit has not reached + if pgResource.Clone().Add(attr.allocated).LessEqual(api.NewResource(queue.Queue.Spec.Capability)) { + return true + } + return false + }) + // Register event handlers. ssn.AddEventHandler(&framework.EventHandler{ AllocateFunc: func(event *framework.Event) { From 0ff0bc5b1ced03c8039ed28d339ea1398a5b28b3 Mon Sep 17 00:00:00 2001 From: xuzhonghu Date: Tue, 14 May 2019 19:18:21 +0800 Subject: [PATCH 3/4] fix panic --- pkg/scheduler/framework/session.go | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index 46dbac1517..d8b19608ca 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -72,20 +72,21 @@ func openSession(cache cache.Cache) *Session { Nodes: map[string]*api.NodeInfo{}, Queues: map[api.QueueID]*api.QueueInfo{}, - plugins: map[string]Plugin{}, - jobOrderFns: map[string]api.CompareFn{}, - queueOrderFns: map[string]api.CompareFn{}, - taskOrderFns: map[string]api.CompareFn{}, - predicateFns: map[string]api.PredicateFn{}, - nodeOrderFns: map[string]api.NodeOrderFn{}, - nodeMapFns: map[string]api.NodeMapFn{}, - nodeReduceFns: map[string]api.NodeReduceFn{}, - preemptableFns: map[string]api.EvictableFn{}, - reclaimableFns: map[string]api.EvictableFn{}, - overusedFns: map[string]api.ValidateFn{}, - jobReadyFns: map[string]api.ValidateFn{}, - jobPipelinedFns: map[string]api.ValidateFn{}, - jobValidFns: map[string]api.ValidateExFn{}, + plugins: map[string]Plugin{}, + jobOrderFns: map[string]api.CompareFn{}, + queueOrderFns: map[string]api.CompareFn{}, + taskOrderFns: map[string]api.CompareFn{}, + predicateFns: map[string]api.PredicateFn{}, + nodeOrderFns: map[string]api.NodeOrderFn{}, + nodeMapFns: map[string]api.NodeMapFn{}, + nodeReduceFns: map[string]api.NodeReduceFn{}, + preemptableFns: map[string]api.EvictableFn{}, + reclaimableFns: map[string]api.EvictableFn{}, + overusedFns: map[string]api.ValidateFn{}, + jobReadyFns: map[string]api.ValidateFn{}, + jobPipelinedFns: map[string]api.ValidateFn{}, + jobValidFns: map[string]api.ValidateExFn{}, + jobEnqueueableFns: map[string]api.ValidateFn{}, } snapshot := cache.Snapshot() From 99f2f5953f527b708a6cf188bb7a6376fabaa6bd Mon Sep 17 00:00:00 2001 From: xuzhonghu Date: Wed, 15 May 2019 10:29:23 +0800 Subject: [PATCH 4/4] fix lint --- pkg/scheduler/framework/session_plugins.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/scheduler/framework/session_plugins.go b/pkg/scheduler/framework/session_plugins.go index 2605795f00..b7421b3d0c 100644 --- a/pkg/scheduler/framework/session_plugins.go +++ b/pkg/scheduler/framework/session_plugins.go @@ -86,7 +86,7 @@ func (ssn *Session) AddJobValidFn(name string, fn api.ValidateExFn) { ssn.jobValidFns[name] = fn } -// AddJobValidFn add jobvalid function +// AddJobEnqueueableFn add jobenqueueable function func (ssn *Session) AddJobEnqueueableFn(name string, fn api.ValidateFn) { ssn.jobEnqueueableFns[name] = fn }