From cce904eeb3efe4d2027dc9bc9c3c3d336d8483c4 Mon Sep 17 00:00:00 2001 From: dmatch01 Date: Thu, 26 Aug 2021 08:28:48 -0500 Subject: [PATCH 1/2] Fix to recovery for QM. Signed-off-by: dmatch01 --- .../queuejob/queuejob_controller_ex.go | 88 +++++++++++++++---- .../qm_lib_backend_with_resplan_mgr.go | 45 ++++------ .../quota/quotamanager/quota_rest_manager.go | 7 +- 3 files changed, 90 insertions(+), 50 deletions(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 731c02ab2..2b72efa47 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -32,6 +32,7 @@ package queuejob import ( "fmt" + "github.com/IBM/multi-cluster-app-dispatcher/pkg/controller/quota/quotamanager" dto "github.com/prometheus/client_model/go" "math" "math/rand" @@ -44,7 +45,6 @@ import ( "github.com/IBM/multi-cluster-app-dispatcher/cmd/kar-controllers/app/options" "github.com/IBM/multi-cluster-app-dispatcher/pkg/controller/metrics/adapter" "github.com/IBM/multi-cluster-app-dispatcher/pkg/controller/quota" - "github.com/IBM/multi-cluster-app-dispatcher/pkg/controller/quota/quotamanager" qmutils "github.com/IBM/multi-cluster-app-dispatcher/pkg/controller/quota/quotamanager/util" "k8s.io/apimachinery/pkg/api/equality" @@ -159,6 +159,9 @@ type XController struct { // Quota Manager quotaManager quota.QuotaManagerInterface + + // Active Scheduling AppWrapper + schedulingAW *arbv1.AppWrapper } type JobAndClusterAgent struct{ @@ -371,18 +374,19 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) * }, }) cc.queueJobLister = cc.queueJobInformer.Lister() + cc.queueJobSynced = cc.queueJobInformer.Informer().HasSynced + + //create sub-resource reference manager + cc.refManager = queuejobresources.NewLabelRefManager() // Setup Quota if serverOption.QuotaEnabled { - dispatchedAWDemands := cc.getDispatchedAppWrappers(cc.queueJobLister) - cc.quotaManager, _ = quotamanager.NewQuotaManager(cc.queueJobLister, dispatchedAWDemands, config, serverOption) + dispatchedAWDemands, dispatchedAWs := cc.getDispatchedAppWrappers() + cc.quotaManager, _ = quotamanager.NewQuotaManager(dispatchedAWDemands, dispatchedAWs, cc.queueJobLister, + config, serverOption) } else { cc.quotaManager = nil } - cc.queueJobSynced = cc.queueJobInformer.Informer().HasSynced - - //create sub-resource reference manager - cc.refManager = queuejobresources.NewLabelRefManager() // Set dispatcher mode or agent mode cc.isDispatcher=serverOption.Dispatcher @@ -409,6 +413,8 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) * //create (empty) dispatchMap cc.dispatchMap=map[string]string{} + // Initialize current scheuling active AppWrapper + cc.schedulingAW = nil return cc } @@ -624,24 +630,62 @@ func (qjm *XController) getProposedPreemptions(requestingJob *arbv1.AppWrapper, return proposedPreemptions } -func (qjm *XController) getDispatchedAppWrappers(awJobLister listersv1.AppWrapperLister) map[string]*clusterstateapi.Resource { - retval := make(map[string]*clusterstateapi.Resource) +func (qjm *XController) getDispatchedAppWrappers() (map[string]*clusterstateapi.Resource, map[string]*arbv1.AppWrapper){ + awrRetVal := make(map[string]*clusterstateapi.Resource) + awsRetVal := make(map[string]*arbv1.AppWrapper) + // Setup and break down an informer to get a list of appwrappers bofore controllerinitialization completes + appwrapperJobClient, _, err := clients.NewClient(qjm.config) + if err != nil { + klog.Errorf("[getDispatchedAppWrappers] Failure creating client for initialization informer err=%#v", err) + return awrRetVal, awsRetVal + } + queueJobInformer := arbinformers.NewSharedInformerFactory(appwrapperJobClient, 0).AppWrapper().AppWrappers() + queueJobInformer.Informer().AddEventHandler( + cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + switch t := obj.(type) { + case *arbv1.AppWrapper: + klog.V(10).Infof("[getDispatchedAppWrappers] Filtered name=%s/%s", + t.Namespace, t.Name) + return true + default: + return false + } + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: qjm.addQueueJob, + UpdateFunc: qjm.updateQueueJob, + DeleteFunc: qjm.deleteQueueJob, + }, + }) + queueJobLister := queueJobInformer.Lister() + queueJobSynced := queueJobInformer.Informer().HasSynced + + stopCh := make(chan struct{}) + defer close(stopCh) + + go queueJobInformer.Informer().Run(stopCh) + + cache.WaitForCacheSync(stopCh, queueJobSynced) + + appwrappers, err := queueJobLister.AppWrappers("").List(labels.Everything()) - appwrappers, err := awJobLister.AppWrappers("").List(labels.Everything()) if err != nil { klog.Errorf("[getDispatchedAppWrappers] List of AppWrappers err=%+v", err) - return retval + return awrRetVal, awsRetVal } for _, aw := range appwrappers { // Get dispatched jobs if aw.Status.CanRun == true { id := qmutils.CreateId(aw.Namespace, aw.Name) - retval[id] = qjm.GetAggregatedResources(aw) + awrRetVal[id] = qjm.GetAggregatedResources(aw) + awsRetVal[id] = aw } } - klog.V(10).Infof("[getDispatchedAppWrappers] List of runnable AppWrappers dispatched or to be dispatched: %+v", retval) - return retval + klog.V(10).Infof("[getDispatchedAppWrappers] List of runnable AppWrappers dispatched or to be dispatched: %+v", + awrRetVal) + return awrRetVal, awsRetVal } func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClusterResources *clusterstateapi. @@ -858,6 +902,7 @@ func (qjm *XController) ScheduleNext() { // if we have enough compute resources then we set the AllocatedReplicas to the total // amount of resources asked by the job qj, err := qjm.qjqueue.Pop() + qjm.schedulingAW = qj if err != nil { klog.V(3).Infof("[ScheduleNext] Cannot pop QueueJob from qjqueue! err=%#v", err) return // Try to pop qjqueue again @@ -908,6 +953,7 @@ func (qjm *XController) ScheduleNext() { // Retrive HeadOfLine after priority update qj, err = qjm.qjqueue.Pop() + qjm.schedulingAW = qj if err != nil { klog.V(3).Infof("[ScheduleNext] Cannot pop QueueJob from qjqueue! err=%#v", err) } else { @@ -1601,12 +1647,16 @@ func (cc *XController) manageQueueJob(qj *arbv1.AppWrapper, podPhaseChanges bool //Handle recovery condition if !qj.Status.CanRun && qj.Status.State == arbv1.AppWrapperStateEnqueued && - !cc.qjqueue.IfExistUnschedulableQ(qj) && !cc.qjqueue.IfExistActiveQ(qj){ + !cc.qjqueue.IfExistUnschedulableQ(qj) && !cc.qjqueue.IfExistActiveQ(qj) { + // One more check to ensure AW is not the current active schedule object + if cc.schedulingAW == nil || + (strings.Compare(cc.schedulingAW.Namespace, qj.Namespace) != 0 && + strings.Compare(cc.schedulingAW.Name, qj.Name) != 0) { cc.qjqueue.AddIfNotPresent(qj) - klog.V(3).Infof("[manageQueueJob] Recovered AppWrapper %s%s - added to active queue, Status=%+v", - qj.Namespace, qj.Name, qj.Status) - - return nil + klog.V(3).Infof("[manageQueueJob] Recovered AppWrapper %s%s - added to active queue, Status=%+v", + qj.Namespace, qj.Name, qj.Status) + return nil + } } // add qj to Etcd for dispatch diff --git a/pkg/controller/quota/quotamanager/qm_lib_backend_with_resplan_mgr.go b/pkg/controller/quota/quotamanager/qm_lib_backend_with_resplan_mgr.go index 1f8f7fe34..da7929f1c 100644 --- a/pkg/controller/quota/quotamanager/qm_lib_backend_with_resplan_mgr.go +++ b/pkg/controller/quota/quotamanager/qm_lib_backend_with_resplan_mgr.go @@ -25,7 +25,6 @@ import ( "github.com/IBM/multi-cluster-app-dispatcher/pkg/controller/quota/quotamanager/util" qmbackend "github.ibm.com/ai-foundation/quota-manager/quota" qmbackendutils "github.ibm.com/ai-foundation/quota-manager/quota/utils" - "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/rest" "strings" @@ -93,31 +92,17 @@ type TreeNode struct { // Making sure that QuotaManager implements QuotaManager. var _ = quota.QuotaManagerInterface(&QuotaManager{}) -func getDispatchedAppWrapper(awJobLister listersv1.AppWrapperLister, awId string) *arbv1.AppWrapper { - targetAWIdNamespace, targetAWIdName := util.ParseId(awId) - - // Get all AWs - appwrappers, err := awJobLister.AppWrappers("").List(labels.Everything()) - if err != nil { - klog.Errorf("[getDispatchedAppWrapper] List of AppWrappers err=%#v", err) - return nil - } - - // Find Appwrappers that can run (runnable) - for _, aw := range appwrappers { - // Get dispatched jobs - if aw.Status.CanRun == true { - if strings.Compare(aw.Name, targetAWIdName) == 0 && - strings.Compare(aw.Namespace, targetAWIdNamespace) == 0 { - return aw - } - } +func getDispatchedAppWrapper(dispatchedAWs map[string]*arbv1.AppWrapper, awId string) *arbv1.AppWrapper { + // Find Appwrapper that is run (runnable) + aw := dispatchedAWs[awId] + if aw != nil && aw.Status.CanRun == true { + return aw } return nil } -func NewQuotaManager(awJobLister listersv1.AppWrapperLister, dispatchedAWDemands map[string]*clusterstateapi.Resource, - config *rest.Config, serverOptions *options.ServerOption) (*QuotaManager, error) { +func NewQuotaManager(dispatchedAWDemands map[string]*clusterstateapi.Resource, dispatchedAWs map[string]*arbv1.AppWrapper, + awJobLister listersv1.AppWrapperLister, config *rest.Config, serverOptions *options.ServerOption) (*QuotaManager, error) { if serverOptions.QuotaEnabled == false { klog. @@ -126,8 +111,8 @@ func NewQuotaManager(awJobLister listersv1.AppWrapperLister, dispatchedAWDemands } qm := &QuotaManager{ - appwrapperLister: awJobLister, url: serverOptions.QuotaRestURL, + appwrapperLister: awJobLister, preemptionEnabled: serverOptions.Preemption, quotaManagerBackend: qmbackend.NewManager(), initializationDone: false, @@ -147,7 +132,7 @@ func NewQuotaManager(awJobLister listersv1.AppWrapperLister, dispatchedAWDemands } // Add AppWrappers that have been evaluated as runnable to QuotaManager - err2 := qm.loadDispatchedAWs(awJobLister, dispatchedAWDemands) + err2 := qm.loadDispatchedAWs(dispatchedAWDemands, dispatchedAWs) if err2 != nil { klog.Errorf("[dispatchedAWDemands] Failure during Quota Manager Backend Cache refresh, err=%#v", err2) @@ -171,19 +156,22 @@ func NewQuotaManager(awJobLister listersv1.AppWrapperLister, dispatchedAWDemands return qm, err } -func (qm *QuotaManager) loadDispatchedAWs(awJobLister listersv1.AppWrapperLister, - dispatchedAWDemands map[string]*clusterstateapi.Resource) error { +func (qm *QuotaManager) loadDispatchedAWs(dispatchedAWDemands map[string]*clusterstateapi.Resource, + dispatchedAWs map[string]*arbv1.AppWrapper) error { + // Nothing to do - if dispatchedAWDemands == nil { + if dispatchedAWDemands == nil || len(dispatchedAWDemands) <= 0 { + klog.V(4).Infof("[loadDispatchedAWs] No dispatched AppWrappers found to preload.") return nil } + // Process list of AppWrappers that are already dispatched var err error err = nil for k, v := range dispatchedAWDemands{ - aw := getDispatchedAppWrapper(awJobLister, k) + aw := getDispatchedAppWrapper(dispatchedAWs, k) if aw != nil { doesFit, preemptionIds, err2:= qm.Fits(aw, v, nil) @@ -205,6 +193,7 @@ func (qm *QuotaManager) loadDispatchedAWs(awJobLister listersv1.AppWrapperLister err, aw.Namespace, aw.Name, preemptionIds) } } + klog.V(4).Infof("[loadDispatchedAWs] Dispatched AppWrappers %s/%s found to preload.", aw.Namespace, aw.Name) } else { klog.Warningf("[loadDispatchedAWs] Unable to obtain AppWrapper from key: %s. Loading of AppWrapper will be skipped.", k) diff --git a/pkg/controller/quota/quotamanager/quota_rest_manager.go b/pkg/controller/quota/quotamanager/quota_rest_manager.go index 0a7c88322..fe5e00650 100644 --- a/pkg/controller/quota/quotamanager/quota_rest_manager.go +++ b/pkg/controller/quota/quotamanager/quota_rest_manager.go @@ -109,16 +109,17 @@ func createId(ns string, n string) string { return id } -func NewQuotaManager(awJobLister listersv1.AppWrapperLister, dispatchedAWDemands map[string]*clusterstateapi.Resource, - config *rest.Config, serverOptions *options.ServerOption) (*QuotaManager, error) { +func NewQuotaManager(dispatchedAWDemands map[string]*clusterstateapi.Resource, dispatchedAWs map[string]*arbv1.AppWrapper, + awJobLister listersv1.AppWrapperLister, config *rest.Config, + serverOptions *options.ServerOption) (*QuotaManager, error) { if serverOptions.QuotaEnabled == false { klog.Infof("[NewQuotaManager] Quota management is not enabled.") return nil, nil } qm := &QuotaManager{ - appwrapperLister: awJobLister, url: serverOptions.QuotaRestURL, + appwrapperLister: awJobLister, preemptionEnabled: serverOptions.Preemption, } From c68353c2adb690e6d58f7f4c2dcea98c2472841a Mon Sep 17 00:00:00 2001 From: dmatch01 Date: Thu, 26 Aug 2021 10:42:39 -0500 Subject: [PATCH 2/2] Update queuejob_controller_ex.go --- pkg/controller/queuejob/queuejob_controller_ex.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 2b72efa47..2676a4656 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -951,7 +951,7 @@ func (qjm *XController) ScheduleNext() { } } - // Retrive HeadOfLine after priority update + // Retrieve HeadOfLine after priority update qj, err = qjm.qjqueue.Pop() qjm.schedulingAW = qj if err != nil { @@ -1262,7 +1262,7 @@ func (qjm *XController) UpdateAgent() { func (qjm *XController) UpdateQueueJobs() { firstTime := metav1.NowMicro() - // retrive queueJobs from local cache. no guarantee queueJobs contain up-to-date information + // retrieve queueJobs from local cache. no guarantee queueJobs contain up-to-date information queueJobs, err := qjm.queueJobLister.AppWrappers("").List(labels.Everything()) if err != nil { klog.Errorf("[UpdateQueueJobs] List of queueJobs err=%+v", err)