Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 71 additions & 21 deletions pkg/controller/queuejob/queuejob_controller_ex.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ package queuejob

import (
"fmt"
"github.com/IBM/multi-cluster-app-dispatcher/pkg/controller/quota/quotamanager"
dto "github.com/prometheus/client_model/go"
"math"
"math/rand"
Expand All @@ -44,7 +45,6 @@ import (
"github.com/IBM/multi-cluster-app-dispatcher/cmd/kar-controllers/app/options"
"github.com/IBM/multi-cluster-app-dispatcher/pkg/controller/metrics/adapter"
"github.com/IBM/multi-cluster-app-dispatcher/pkg/controller/quota"
"github.com/IBM/multi-cluster-app-dispatcher/pkg/controller/quota/quotamanager"
qmutils "github.com/IBM/multi-cluster-app-dispatcher/pkg/controller/quota/quotamanager/util"
"k8s.io/apimachinery/pkg/api/equality"

Expand Down Expand Up @@ -159,6 +159,9 @@ type XController struct {

// Quota Manager
quotaManager quota.QuotaManagerInterface

// Active Scheduling AppWrapper
schedulingAW *arbv1.AppWrapper
}

type JobAndClusterAgent struct{
Expand Down Expand Up @@ -371,18 +374,19 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) *
},
})
cc.queueJobLister = cc.queueJobInformer.Lister()
cc.queueJobSynced = cc.queueJobInformer.Informer().HasSynced

//create sub-resource reference manager
cc.refManager = queuejobresources.NewLabelRefManager()

// Setup Quota
if serverOption.QuotaEnabled {
dispatchedAWDemands := cc.getDispatchedAppWrappers(cc.queueJobLister)
cc.quotaManager, _ = quotamanager.NewQuotaManager(cc.queueJobLister, dispatchedAWDemands, config, serverOption)
dispatchedAWDemands, dispatchedAWs := cc.getDispatchedAppWrappers()
cc.quotaManager, _ = quotamanager.NewQuotaManager(dispatchedAWDemands, dispatchedAWs, cc.queueJobLister,
config, serverOption)
} else {
cc.quotaManager = nil
}
cc.queueJobSynced = cc.queueJobInformer.Informer().HasSynced

//create sub-resource reference manager
cc.refManager = queuejobresources.NewLabelRefManager()

// Set dispatcher mode or agent mode
cc.isDispatcher=serverOption.Dispatcher
Expand All @@ -409,6 +413,8 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) *
//create (empty) dispatchMap
cc.dispatchMap=map[string]string{}

// Initialize current scheuling active AppWrapper
cc.schedulingAW = nil
return cc
}

Expand Down Expand Up @@ -624,24 +630,62 @@ func (qjm *XController) getProposedPreemptions(requestingJob *arbv1.AppWrapper,
return proposedPreemptions
}

func (qjm *XController) getDispatchedAppWrappers(awJobLister listersv1.AppWrapperLister) map[string]*clusterstateapi.Resource {
retval := make(map[string]*clusterstateapi.Resource)
func (qjm *XController) getDispatchedAppWrappers() (map[string]*clusterstateapi.Resource, map[string]*arbv1.AppWrapper){
awrRetVal := make(map[string]*clusterstateapi.Resource)
awsRetVal := make(map[string]*arbv1.AppWrapper)
// Setup and break down an informer to get a list of appwrappers bofore controllerinitialization completes
appwrapperJobClient, _, err := clients.NewClient(qjm.config)
if err != nil {
klog.Errorf("[getDispatchedAppWrappers] Failure creating client for initialization informer err=%#v", err)
return awrRetVal, awsRetVal
}
queueJobInformer := arbinformers.NewSharedInformerFactory(appwrapperJobClient, 0).AppWrapper().AppWrappers()
queueJobInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *arbv1.AppWrapper:
klog.V(10).Infof("[getDispatchedAppWrappers] Filtered name=%s/%s",
t.Namespace, t.Name)
return true
default:
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: qjm.addQueueJob,
UpdateFunc: qjm.updateQueueJob,
DeleteFunc: qjm.deleteQueueJob,
},
})
queueJobLister := queueJobInformer.Lister()
queueJobSynced := queueJobInformer.Informer().HasSynced

stopCh := make(chan struct{})
defer close(stopCh)

go queueJobInformer.Informer().Run(stopCh)

cache.WaitForCacheSync(stopCh, queueJobSynced)

appwrappers, err := queueJobLister.AppWrappers("").List(labels.Everything())

appwrappers, err := awJobLister.AppWrappers("").List(labels.Everything())
if err != nil {
klog.Errorf("[getDispatchedAppWrappers] List of AppWrappers err=%+v", err)
return retval
return awrRetVal, awsRetVal
}

for _, aw := range appwrappers {
// Get dispatched jobs
if aw.Status.CanRun == true {
id := qmutils.CreateId(aw.Namespace, aw.Name)
retval[id] = qjm.GetAggregatedResources(aw)
awrRetVal[id] = qjm.GetAggregatedResources(aw)
awsRetVal[id] = aw
}
}
klog.V(10).Infof("[getDispatchedAppWrappers] List of runnable AppWrappers dispatched or to be dispatched: %+v", retval)
return retval
klog.V(10).Infof("[getDispatchedAppWrappers] List of runnable AppWrappers dispatched or to be dispatched: %+v",
awrRetVal)
return awrRetVal, awsRetVal
}

func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClusterResources *clusterstateapi.
Expand Down Expand Up @@ -858,6 +902,7 @@ func (qjm *XController) ScheduleNext() {
// if we have enough compute resources then we set the AllocatedReplicas to the total
// amount of resources asked by the job
qj, err := qjm.qjqueue.Pop()
qjm.schedulingAW = qj
if err != nil {
klog.V(3).Infof("[ScheduleNext] Cannot pop QueueJob from qjqueue! err=%#v", err)
return // Try to pop qjqueue again
Expand Down Expand Up @@ -906,8 +951,9 @@ func (qjm *XController) ScheduleNext() {
}
}

// Retrive HeadOfLine after priority update
// Retrieve HeadOfLine after priority update
qj, err = qjm.qjqueue.Pop()
qjm.schedulingAW = qj
if err != nil {
klog.V(3).Infof("[ScheduleNext] Cannot pop QueueJob from qjqueue! err=%#v", err)
} else {
Expand Down Expand Up @@ -1216,7 +1262,7 @@ func (qjm *XController) UpdateAgent() {

func (qjm *XController) UpdateQueueJobs() {
firstTime := metav1.NowMicro()
// retrive queueJobs from local cache. no guarantee queueJobs contain up-to-date information
// retrieve queueJobs from local cache. no guarantee queueJobs contain up-to-date information
queueJobs, err := qjm.queueJobLister.AppWrappers("").List(labels.Everything())
if err != nil {
klog.Errorf("[UpdateQueueJobs] List of queueJobs err=%+v", err)
Expand Down Expand Up @@ -1601,12 +1647,16 @@ func (cc *XController) manageQueueJob(qj *arbv1.AppWrapper, podPhaseChanges bool

//Handle recovery condition
if !qj.Status.CanRun && qj.Status.State == arbv1.AppWrapperStateEnqueued &&
!cc.qjqueue.IfExistUnschedulableQ(qj) && !cc.qjqueue.IfExistActiveQ(qj){
!cc.qjqueue.IfExistUnschedulableQ(qj) && !cc.qjqueue.IfExistActiveQ(qj) {
// One more check to ensure AW is not the current active schedule object
if cc.schedulingAW == nil ||
(strings.Compare(cc.schedulingAW.Namespace, qj.Namespace) != 0 &&
strings.Compare(cc.schedulingAW.Name, qj.Name) != 0) {
cc.qjqueue.AddIfNotPresent(qj)
klog.V(3).Infof("[manageQueueJob] Recovered AppWrapper %s%s - added to active queue, Status=%+v",
qj.Namespace, qj.Name, qj.Status)

return nil
klog.V(3).Infof("[manageQueueJob] Recovered AppWrapper %s%s - added to active queue, Status=%+v",
qj.Namespace, qj.Name, qj.Status)
return nil
}
}

// add qj to Etcd for dispatch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/IBM/multi-cluster-app-dispatcher/pkg/controller/quota/quotamanager/util"
qmbackend "github.ibm.com/ai-foundation/quota-manager/quota"
qmbackendutils "github.ibm.com/ai-foundation/quota-manager/quota/utils"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/rest"
"strings"

Expand Down Expand Up @@ -93,31 +92,17 @@ type TreeNode struct {
// Making sure that QuotaManager implements QuotaManager.
var _ = quota.QuotaManagerInterface(&QuotaManager{})

func getDispatchedAppWrapper(awJobLister listersv1.AppWrapperLister, awId string) *arbv1.AppWrapper {
targetAWIdNamespace, targetAWIdName := util.ParseId(awId)

// Get all AWs
appwrappers, err := awJobLister.AppWrappers("").List(labels.Everything())
if err != nil {
klog.Errorf("[getDispatchedAppWrapper] List of AppWrappers err=%#v", err)
return nil
}

// Find Appwrappers that can run (runnable)
for _, aw := range appwrappers {
// Get dispatched jobs
if aw.Status.CanRun == true {
if strings.Compare(aw.Name, targetAWIdName) == 0 &&
strings.Compare(aw.Namespace, targetAWIdNamespace) == 0 {
return aw
}
}
func getDispatchedAppWrapper(dispatchedAWs map[string]*arbv1.AppWrapper, awId string) *arbv1.AppWrapper {
// Find Appwrapper that is run (runnable)
aw := dispatchedAWs[awId]
if aw != nil && aw.Status.CanRun == true {
return aw
}
return nil
}

func NewQuotaManager(awJobLister listersv1.AppWrapperLister, dispatchedAWDemands map[string]*clusterstateapi.Resource,
config *rest.Config, serverOptions *options.ServerOption) (*QuotaManager, error) {
func NewQuotaManager(dispatchedAWDemands map[string]*clusterstateapi.Resource, dispatchedAWs map[string]*arbv1.AppWrapper,
awJobLister listersv1.AppWrapperLister, config *rest.Config, serverOptions *options.ServerOption) (*QuotaManager, error) {

if serverOptions.QuotaEnabled == false {
klog.
Expand All @@ -126,8 +111,8 @@ func NewQuotaManager(awJobLister listersv1.AppWrapperLister, dispatchedAWDemands
}

qm := &QuotaManager{
appwrapperLister: awJobLister,
url: serverOptions.QuotaRestURL,
appwrapperLister: awJobLister,
preemptionEnabled: serverOptions.Preemption,
quotaManagerBackend: qmbackend.NewManager(),
initializationDone: false,
Expand All @@ -147,7 +132,7 @@ func NewQuotaManager(awJobLister listersv1.AppWrapperLister, dispatchedAWDemands
}

// Add AppWrappers that have been evaluated as runnable to QuotaManager
err2 := qm.loadDispatchedAWs(awJobLister, dispatchedAWDemands)
err2 := qm.loadDispatchedAWs(dispatchedAWDemands, dispatchedAWs)
if err2 != nil {
klog.Errorf("[dispatchedAWDemands] Failure during Quota Manager Backend Cache refresh, err=%#v",
err2)
Expand All @@ -171,19 +156,22 @@ func NewQuotaManager(awJobLister listersv1.AppWrapperLister, dispatchedAWDemands
return qm, err
}

func (qm *QuotaManager) loadDispatchedAWs(awJobLister listersv1.AppWrapperLister,
dispatchedAWDemands map[string]*clusterstateapi.Resource) error {
func (qm *QuotaManager) loadDispatchedAWs(dispatchedAWDemands map[string]*clusterstateapi.Resource,
dispatchedAWs map[string]*arbv1.AppWrapper) error {

// Nothing to do
if dispatchedAWDemands == nil {
if dispatchedAWDemands == nil || len(dispatchedAWDemands) <= 0 {
klog.V(4).Infof("[loadDispatchedAWs] No dispatched AppWrappers found to preload.")
return nil
}


// Process list of AppWrappers that are already dispatched
var err error
err = nil

for k, v := range dispatchedAWDemands{
aw := getDispatchedAppWrapper(awJobLister, k)
aw := getDispatchedAppWrapper(dispatchedAWs, k)
if aw != nil {

doesFit, preemptionIds, err2:= qm.Fits(aw, v, nil)
Expand All @@ -205,6 +193,7 @@ func (qm *QuotaManager) loadDispatchedAWs(awJobLister listersv1.AppWrapperLister
err, aw.Namespace, aw.Name, preemptionIds)
}
}
klog.V(4).Infof("[loadDispatchedAWs] Dispatched AppWrappers %s/%s found to preload.", aw.Namespace, aw.Name)
} else {
klog.Warningf("[loadDispatchedAWs] Unable to obtain AppWrapper from key: %s. Loading of AppWrapper will be skipped.",
k)
Expand Down
7 changes: 4 additions & 3 deletions pkg/controller/quota/quotamanager/quota_rest_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,17 @@ func createId(ns string, n string) string {
return id
}

func NewQuotaManager(awJobLister listersv1.AppWrapperLister, dispatchedAWDemands map[string]*clusterstateapi.Resource,
config *rest.Config, serverOptions *options.ServerOption) (*QuotaManager, error) {
func NewQuotaManager(dispatchedAWDemands map[string]*clusterstateapi.Resource, dispatchedAWs map[string]*arbv1.AppWrapper,
awJobLister listersv1.AppWrapperLister, config *rest.Config,
serverOptions *options.ServerOption) (*QuotaManager, error) {
if serverOptions.QuotaEnabled == false {
klog.Infof("[NewQuotaManager] Quota management is not enabled.")
return nil, nil
}

qm := &QuotaManager{
appwrapperLister: awJobLister,
url: serverOptions.QuotaRestURL,
appwrapperLister: awJobLister,
preemptionEnabled: serverOptions.Preemption,
}

Expand Down