diff --git a/CONTROLLER_VERSION b/CONTROLLER_VERSION index cad6e08a2..93f9beed3 100644 --- a/CONTROLLER_VERSION +++ b/CONTROLLER_VERSION @@ -1 +1 @@ -1.29.3 +1.29.4 diff --git a/pkg/apis/controller/v1alpha1/appwrapper.go b/pkg/apis/controller/v1alpha1/appwrapper.go index 241ef5731..7513b7a31 100644 --- a/pkg/apis/controller/v1alpha1/appwrapper.go +++ b/pkg/apis/controller/v1alpha1/appwrapper.go @@ -188,7 +188,7 @@ type AppWrapperStatus struct { SystemPriority float64 `json:"systempriority,omitempty"` // State of QueueJob - Init, Queueing, HeadOfLine, Rejoining, ... - QueueJobState QueueJobState `json:"queuejobstate,omitempty"` + QueueJobState AppWrapperConditionType `json:"queuejobstate,omitempty"` // Microsecond level timestamp when controller first sees QueueJob (by Informer) ControllerFirstTimestamp metav1.MicroTime `json:"controllerfirsttimestamp,omitempty"` @@ -201,6 +201,10 @@ type AppWrapperStatus struct { // Indicate if message is a duplicate (for Informer to recognize duplicate messages) Local bool `json:"local,omitempty"` + + // Represents the latest available observations of a appwrapper's current condition. + Conditions []AppWrapperCondition `json:"conditions,omitempty"` + } type AppWrapperState string @@ -213,15 +217,33 @@ const ( AppWrapperStateFailed AppWrapperState = "Failed" ) -type QueueJobState string +type AppWrapperConditionType string const ( - QueueJobStateInit QueueJobState = "Init" - QueueJobStateQueueing QueueJobState = "Queueing" - QueueJobStateHeadOfLine QueueJobState = "HeadOfLine" - QueueJobStateRejoining QueueJobState = "Rejoining" - QueueJobStateDispatched QueueJobState = "Dispatched" - QueueJobStateRunning QueueJobState = "Running" - QueueJobStateDeleted QueueJobState = "Deleted" - QueueJobStateFailed QueueJobState = "Failed" + AppWrapperCondInit AppWrapperConditionType = "Init" + AppWrapperCondQueueing AppWrapperConditionType = "Queueing" + AppWrapperCondHeadOfLine AppWrapperConditionType = "HeadOfLine" + AppWrapperCondBackoff AppWrapperConditionType = "Backoff" + AppWrapperCondDispatched AppWrapperConditionType = "Dispatched" + AppWrapperCondRunning AppWrapperConditionType = "Running" + AppWrapperCondPreemptCandidate AppWrapperConditionType = "PreemptCandidate" + AppWrapperCondPreempted AppWrapperConditionType = "Preempted" + AppWrapperCondDeleted AppWrapperConditionType = "Deleted" + AppWrapperCondFailed AppWrapperConditionType = "Failed" ) + +// DeploymentCondition describes the state of a deployment at a certain point. +type AppWrapperCondition struct { + // Type of appwrapper condition. + Type AppWrapperConditionType `json:"type"` + // Status of the condition, one of True, False, Unknown. + Status v1.ConditionStatus `json:"status"` + // The last time this condition was updated. + LastUpdateMicroTime metav1.MicroTime `json:"lastUpdateMicroTime,omitempty"` + // Last time the condition transitioned from one status to another. + LastTransitionMicroTime metav1.MicroTime `json:"lastTransitionMicroTime,omitempty"` + // The reason for the condition's last transition. + Reason string `json:"reason,omitempty"` + // A human readable message indicating details about the transition. + Message string `json:"message,omitempty"` +} diff --git a/pkg/apis/controller/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/controller/v1alpha1/zz_generated.deepcopy.go index 6f10bc1da..337925c66 100644 --- a/pkg/apis/controller/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/controller/v1alpha1/zz_generated.deepcopy.go @@ -54,6 +54,24 @@ func (in *AppWrapper) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AppWrapperCondition) DeepCopyInto(out *AppWrapperCondition) { + *out = *in + in.LastUpdateMicroTime.DeepCopyInto(&out.LastUpdateMicroTime) + in.LastTransitionMicroTime.DeepCopyInto(&out.LastTransitionMicroTime) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AppWrapperCondition. +func (in *AppWrapperCondition) DeepCopy() *AppWrapperCondition { + if in == nil { + return nil + } + out := new(AppWrapperCondition) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *AppWrapperGenericResource) DeepCopyInto(out *AppWrapperGenericResource) { *out = *in @@ -219,6 +237,13 @@ func (in *AppWrapperSpec) DeepCopy() *AppWrapperSpec { func (in *AppWrapperStatus) DeepCopyInto(out *AppWrapperStatus) { *out = *in in.ControllerFirstTimestamp.DeepCopyInto(&out.ControllerFirstTimestamp) + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]AppWrapperCondition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } return } diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index def5e0adf..371ea4e8c 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -397,6 +397,11 @@ func (qjm *XController) PreemptQueueJobs() { continue } newjob.Status.CanRun = false + + message := fmt.Sprintf("Insufficient number of Running pods, minimum=%d, running=%v.", q.Spec.SchedSpec.MinAvailable, q.Status.Running) + cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondPreemptCandidate, v1.ConditionTrue, "MinPodsNotRunning", message) + newjob.Status.Conditions = append(newjob.Status.Conditions, cond) + if err := qjm.updateEtcd(newjob, "PreemptQueueJobs - CanRun: false"); err != nil { glog.Errorf("Failed to update status of AppWrapper %v/%v: %v", q.Namespace, q.Name, err) @@ -442,13 +447,38 @@ func (qjm *XController) GetQueueJobsEligibleForPreemption() []*arbv1.AppWrapper continue } } - if value.Status.State == arbv1.AppWrapperStateEnqueued { + + // Skip if AW Pending or just entering the system and does not have a state yet. + if (value.Status.State == arbv1.AppWrapperStateEnqueued) || (value.Status.State == ""){ continue } if int(value.Status.Running) < replicas { - if (replicas>0) { - glog.V(3).Infof("XQJ %s is eligible for preemption %v - %v , %v !!! \n", value.Name, value.Status.Running, replicas, value.Status.Succeeded) + + //Check to see if if this AW job has been dispatched for a time window before preempting + conditionsLen := len(value.Status.Conditions) + var dispatchConditionExists bool + dispatchConditionExists = false + var condition arbv1.AppWrapperCondition + // Get the last time the AppWrapper was dispatched + for i := (conditionsLen - 1); i > 0; i-- { + condition = value.Status.Conditions[i] + if (condition.Type != arbv1.AppWrapperCondDispatched) { + continue + } + dispatchConditionExists = true + break + } + + // Now check for 0 running pods and for the minimum age and then + // skip preempt if current time is not beyond minimum age + minAge := condition.LastTransitionMicroTime.Add(60 * time.Second) + if (value.Status.Running <= 0) && (dispatchConditionExists && (time.Now().Before(minAge))) { + continue + } + + if (replicas > 0) { + glog.V(3).Infof("AppWrapper %s is eligible for preemption %v - %v , %v !!! \n", value.Name, value.Status.Running, replicas, value.Status.Succeeded) qjobs = append(qjobs, value) } } @@ -784,7 +814,10 @@ func (qjm *XController) ScheduleNext() { return } - qj.Status.QueueJobState = arbv1.QueueJobStateHeadOfLine + qj.Status.QueueJobState = arbv1.AppWrapperCondHeadOfLine + cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondHeadOfLine, v1.ConditionTrue, "FrontOfQueue.", "") + qj.Status.Conditions = append(qj.Status.Conditions, cond) + qj.Status.FilterIgnore = true // update QueueJobState only qjm.updateEtcd(qj, "ScheduleNext - setHOL") qjm.qjqueue.AddUnschedulableIfNotPresent(qj) // working on qj, avoid other threads putting it back to activeQ @@ -796,6 +829,8 @@ func (qjm *XController) ScheduleNext() { glog.V(2).Infof("[ScheduleNext] [Agent Mode] Deploy Next QueueJob: %s Status=%+v\n", qj.Name, qj.Status) } + dispatchFailedReason := "AppWrapperNotRunnable." + dispatchFailedMessage := "" if qjm.isDispatcher { // Dispatcher Mode agentId:=qjm.chooseAgent(qj) if agentId != "" { // A proper agent is found. @@ -835,8 +870,9 @@ func (qjm *XController) ScheduleNext() { glog.V(10).Infof("[TTime] %s, %s: ScheduleNextAfterEtcd", qj.Name, time.Now().Sub(qj.CreationTimestamp.Time)) return } else { - glog.V(2).Infof("[ScheduleNext: Dispatcher Mode] Cannot find an Agent with enough Resources\n") - go qjm.backoff(qj) + dispatchFailedMessage = "Cannot find an cluster with enough resources to dispatch AppWrapper." + glog.V(2).Infof("[Controller: Dispatcher Mode] %s %s\n", dispatchFailedReason, dispatchFailedMessage) + go qjm.backoff(qj, dispatchFailedReason, dispatchFailedMessage) } } else { // Agent Mode aggqj := qjm.GetAggregatedResources(qj) @@ -892,10 +928,12 @@ func (qjm *XController) ScheduleNext() { } } } else { // Not enough free resources to dispatch HOL + dispatchFailedMessage = "Insufficient quota to dispatch AppWrapper." glog.V(3).Infof("[ScheduleNext] HOL Blocking by %s for %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v due to quota limits", qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) } } else { // Not enough free resources to dispatch HOL - glog.V(3).Infof("[ScheduleNext] HOL Blocking by %s for %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v due to resource limits", qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) + dispatchFailedMessage = "Insufficient resources to dispatch AppWrapper." + glog.V(3).Infof("[ScheduleNext] HOL Blocking by %s for %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) } // stop trying to dispatch after HeadOfLineHoldingTime if (forwarded || time.Now().After(HOLStartTime.Add(time.Duration(qjm.serverOption.HeadOfLineHoldingTime)*time.Second))) { @@ -906,7 +944,7 @@ func (qjm *XController) ScheduleNext() { } if !forwarded { // start thread to backoff glog.V(3).Infof("[ScheduleNext] HOL backoff %s after waiting for %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) - go qjm.backoff(qj) + go qjm.backoff(qj, dispatchFailedReason, dispatchFailedMessage) } } } @@ -914,32 +952,72 @@ func (qjm *XController) ScheduleNext() { // Update AppWrappers in etcd // todo: This is a current workaround for duplicate message bug. func (cc *XController) updateEtcd(qj *arbv1.AppWrapper, at string) error { - qj.Status.Sender = "before "+ at // set Sender string to indicate code location - qj.Status.Local = false // for Informer FilterFunc to pickup - if _, err := cc.arbclients.ArbV1().AppWrappers(qj.Namespace).Update(qj); err != nil { - glog.Errorf("[updateEtcd] Failed to update status of AppWrapper %s, namespace: %s at %s err=%v", qj.Name, qj.Namespace, at, err) + //apiCacheAWJob, e := cc.queueJobLister.AppWrappers(qj.Namespace).Get(qj.Name) + // + //if (e != nil) { + // glog.Errorf("[updateEtcd] Failed to update status of AppWrapper %s, namespace: %s at %s err=%v", + // apiCacheAWJob.Name, apiCacheAWJob.Namespace, at, e) + // return e + //} + + //TODO: Remove next line + var apiCacheAWJob*arbv1.AppWrapper + //TODO: Remove next line + apiCacheAWJob = qj + apiCacheAWJob.Status.Sender = "before "+ at // set Sender string to indicate code location + apiCacheAWJob.Status.Local = false // for Informer FilterFunc to pickup + if _, err := cc.arbclients.ArbV1().AppWrappers(apiCacheAWJob.Namespace).Update(apiCacheAWJob); err != nil { + glog.Errorf("[updateEtcd] Failed to update status of AppWrapper %s, namespace: %s at %s err=%v", + apiCacheAWJob.Name, apiCacheAWJob.Namespace, at, err) return err // } else { // qjj should be the same as qj except with newer ResourceVersion // qj.ResourceVersion = qjj.ResourceVersion // update new ResourceVersion from etcd } - glog.V(10).Infof("[updateEtcd] AppWrapperUpdate success %s at %s &qj=%p qj=%+v", qj.Name, at, qj, qj) + glog.V(10).Infof("[updateEtcd] AppWrapperUpdate success %s at %s &qj=%p qj=%+v", + apiCacheAWJob.Name, at, apiCacheAWJob, apiCacheAWJob) //qj.Status.Local = true // for Informer FilterFunc to ignore duplicate //qj.Status.Sender = "after "+ at // set Sender string to indicate code location return nil } -func (qjm *XController) backoff(q *arbv1.AppWrapper) { - q.Status.QueueJobState = arbv1.QueueJobStateRejoining - q.Status.FilterIgnore = true // update QueueJobState only, no work needed - qjm.updateEtcd(q, "backoff - Rejoining") - qjm.qjqueue.AddUnschedulableIfNotPresent(q) - glog.V(3).Infof("[backoff] %s move to unschedulableQ before sleep for %d seconds. activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", q.Name, qjm.serverOption.BackoffTime, qjm.qjqueue.IfExistActiveQ((q)), qjm.qjqueue.IfExistUnschedulableQ((q)), q, q.ResourceVersion, q.Status) +func (cc *XController) updateStatusInEtcd(qj *arbv1.AppWrapper, at string) error { + var apiCacheAWJob*arbv1.AppWrapper + apiCacheAWJob = qj + if _, err := cc.arbclients.ArbV1().AppWrappers(apiCacheAWJob.Namespace).UpdateStatus(apiCacheAWJob); err != nil { + glog.Errorf("[updateEtcd] Failed to update status of AppWrapper %s, namespace: %s at %s err=%v", + apiCacheAWJob.Name, apiCacheAWJob.Namespace, at, err) + return err + } + glog.V(10).Infof("[updateEtcd] AppWrapperUpdate success %s at %s &qj=%p qj=%+v", + apiCacheAWJob.Name, at, apiCacheAWJob, apiCacheAWJob) + return nil +} + +func (qjm *XController) backoff(q *arbv1.AppWrapper, reason string, message string) { + var workingAW *arbv1.AppWrapper + apiCacheAWJob, e := qjm.queueJobLister.AppWrappers(q.Namespace).Get(q.Name) + // Update condition + if (e == nil) { + workingAW = apiCacheAWJob + apiCacheAWJob.Status.QueueJobState = arbv1.AppWrapperCondBackoff + cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondBackoff, v1.ConditionTrue, reason, message) + workingAW.Status.Conditions = append(workingAW.Status.Conditions, cond) + workingAW.Status.FilterIgnore = true // update QueueJobState only, no work needed + //qjm.updateEtcd(workingAW, "backoff - Rejoining") + qjm.updateStatusInEtcd(workingAW, "backoff - Rejoining") + } else { + workingAW = q + glog.Errorf("[backoff] Failed to retrieve cached object for %s/%s. Continuing with possible stale object without updating conditions.", workingAW.Namespace,workingAW.Name) + + } + qjm.qjqueue.AddUnschedulableIfNotPresent(workingAW) + glog.V(3).Infof("[backoff] %s move to unschedulableQ before sleep for %d seconds. activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", workingAW.Name, + qjm.serverOption.BackoffTime, qjm.qjqueue.IfExistActiveQ((workingAW)), qjm.qjqueue.IfExistUnschedulableQ((workingAW)), workingAW, workingAW.ResourceVersion, workingAW.Status) time.Sleep(time.Duration(qjm.serverOption.BackoffTime) * time.Second) - qjm.qjqueue.MoveToActiveQueueIfExists(q) - q.Status.QueueJobState = arbv1.QueueJobStateQueueing - q.Status.FilterIgnore = true // update QueueJobState only, no work needed - qjm.updateEtcd(q, "backoff - Queueing") - glog.V(3).Infof("[backoff] %s activeQ.Add after sleep for %d seconds. activeQ=%t Unsched=%t &aw=%p Version=%s Status=%+v", q.Name, qjm.serverOption.BackoffTime, qjm.qjqueue.IfExistActiveQ((q)), qjm.qjqueue.IfExistUnschedulableQ((q)), q, q.ResourceVersion, q.Status) + qjm.qjqueue.MoveToActiveQueueIfExists(workingAW) + + glog.V(3).Infof("[backoff] %s activeQ.Add after sleep for %d seconds. activeQ=%t Unsched=%t &aw=%p Version=%s Status=%+v", workingAW.Name, + qjm.serverOption.BackoffTime, qjm.qjqueue.IfExistActiveQ((workingAW)), qjm.qjqueue.IfExistUnschedulableQ((workingAW)), workingAW, workingAW.ResourceVersion, workingAW.Status) } // Run start AppWrapper Controller @@ -1008,19 +1086,28 @@ func (qjm *XController) UpdateQueueJobs() { if newjob.Status.QueueJobState == "" { newjob.Status.ControllerFirstTimestamp = firstTime newjob.Status.SystemPriority = float64(newjob.Spec.Priority) - newjob.Status.QueueJobState = arbv1.QueueJobStateInit - glog.V(3).Infof("[UpdateQueueJobs] %s 0Delay=%.6f seconds CreationTimestamp=%s ControllerFirstTimestamp=%s", + newjob.Status.QueueJobState = arbv1.AppWrapperCondInit + newjob.Status.Conditions = []arbv1.AppWrapperCondition{ + arbv1.AppWrapperCondition{ + Type: arbv1.AppWrapperCondInit, + Status: v1.ConditionTrue, + LastUpdateMicroTime: metav1.NowMicro(), + LastTransitionMicroTime: metav1.NowMicro(), + }, + } + glog.V(4).Infof("[UpdateQueueJobs] %s 0Delay=%.6f seconds CreationTimestamp=%s ControllerFirstTimestamp=%s", newjob.Name, time.Now().Sub(newjob.Status.ControllerFirstTimestamp.Time).Seconds(), newjob.CreationTimestamp, newjob.Status.ControllerFirstTimestamp) } glog.V(10).Infof("[UpdateQueueJobs] %s: qjqueue=%t &qj=%p Version=%s Status=%+v", newjob.Name, qjm.qjqueue.IfExist(newjob), newjob, newjob.ResourceVersion, newjob.Status) // check eventQueue, qjqueue in program sequence to make sure job is not in qjqueue if _, exists, _ := qjm.eventQueue.Get(newjob); exists { continue } // do not enqueue if already in eventQueue if qjm.qjqueue.IfExist(newjob) { continue } // do not enqueue if already in qjqueue - err := qjm.eventQueue.AddIfNotPresent(newjob) // add to eventQueue if not in, otherwise, keep position without updating object, as object in eventQueue should be more recent + + err = qjm.enqueueIfNotPresent(newjob) if err != nil { glog.Errorf("[UpdateQueueJobs] Fail to enqueue %s to eventQueue, ignore. *Delay=%.6f seconds &qj=%p Version=%s Status=%+v err=%#v", newjob.Name, time.Now().Sub(newjob.Status.ControllerFirstTimestamp.Time).Seconds(), newjob, newjob.ResourceVersion, newjob.Status, err) } else { - glog.V(3).Infof("[UpdateQueueJobs] %s *Delay=%.6f seconds eventQueue.Add_byUpdateQueueJobs &qj=%p Version=%s Status=%+v", newjob.Name, time.Now().Sub(newjob.Status.ControllerFirstTimestamp.Time).Seconds(), newjob, newjob.ResourceVersion, newjob.Status) + glog.V(4).Infof("[UpdateQueueJobs] %s *Delay=%.6f seconds eventQueue.Add_byUpdateQueueJobs &qj=%p Version=%s Status=%+v", newjob.Name, time.Now().Sub(newjob.Status.ControllerFirstTimestamp.Time).Seconds(), newjob, newjob.ResourceVersion, newjob.Status) } } } @@ -1036,7 +1123,15 @@ func (cc *XController) addQueueJob(obj interface{}) { if qj.Status.QueueJobState == "" { qj.Status.ControllerFirstTimestamp = firstTime qj.Status.SystemPriority = float64(qj.Spec.Priority) - qj.Status.QueueJobState = arbv1.QueueJobStateInit + qj.Status.QueueJobState = arbv1.AppWrapperCondInit + qj.Status.Conditions = []arbv1.AppWrapperCondition{ + arbv1.AppWrapperCondition{ + Type: arbv1.AppWrapperCondInit, + Status: v1.ConditionTrue, + LastUpdateMicroTime: metav1.NowMicro(), + LastTransitionMicroTime: metav1.NowMicro(), + }, + } } else { glog.Warningf("[Informer-addQJ] Received and add by the informer for AppWrapper job %s which already has been seen and initialized current state %s with timestamp: %s, elapsed time of %.6f", qj.Name, qj.Status.State, qj.Status.ControllerFirstTimestamp, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds()) @@ -1064,8 +1159,8 @@ func (cc *XController) updateQueueJob(oldObj, newObj interface{}) { } // AppWrappers may come out of order. Ignore old ones. if (oldQJ.Name == newQJ.Name) && (larger(oldQJ.ResourceVersion, newQJ.ResourceVersion)) { - glog.V(10).Infof("[Informer-updateQJ] ignore OutOfOrder arrival &oldQJ=%p oldQJ=%+v", oldQJ, oldQJ) - glog.V(10).Infof("[Informer-updateQJ] ignore OutOfOrder arrival &newQJ=%p newQJ=%+v", newQJ, newQJ) + glog.V(10).Infof("[Informer-updateQJ] %s ignored OutOfOrder arrival &oldQJ=%p oldQJ=%+v", oldQJ.Name, oldQJ, oldQJ) + glog.V(10).Infof("[Informer-updateQJ] %s ignored OutOfOrder arrival &newQJ=%p newQJ=%+v", newQJ.Name, newQJ, newQJ) return } @@ -1103,11 +1198,10 @@ func (cc *XController) deleteQueueJob(obj interface{}) { cc.enqueue(qj) } -func (cc *XController) enqueue(obj interface{}) { +func (cc *XController) enqueue(obj interface{}) error { qj, ok := obj.(*arbv1.AppWrapper) if !ok { - glog.Errorf("[enqueue] obj is not AppWrapper. obj=%+v", obj) - return + return fmt.Errorf("[enqueue] obj is not AppWrapper. obj=%+v", obj) } err := cc.eventQueue.Add(qj) // add to FIFO queue if not in, update object & keep position if already in FIFO queue @@ -1116,6 +1210,17 @@ func (cc *XController) enqueue(obj interface{}) { } else { glog.V(10).Infof("[enqueue] %s *Delay=%.6f seconds eventQueue.Add_byEnqueue &qj=%p Version=%s Status=%+v", qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj, qj.ResourceVersion, qj.Status) } + return err +} + +func (cc *XController) enqueueIfNotPresent(obj interface{}) error { + aw, ok := obj.(*arbv1.AppWrapper) + if !ok { + return fmt.Errorf("[enqueueIfNotPresent] obj is not AppWrapper. obj=%+v", obj) + } + + err := cc.eventQueue.AddIfNotPresent(aw) // add to FIFO queue if not in, update object & keep position if already in FIFO queue + return err } func (cc *XController) agentEventQueueWorker() { @@ -1197,6 +1302,7 @@ func (cc *XController) worker() { return nil } + // sync AppWrapper if err := cc.syncQueueJob(queuejob); err != nil { glog.Warningf("[worker] Failed to sync AppWrapper %s, err %#v", queuejob.Name, err) @@ -1241,20 +1347,32 @@ func (cc *XController) syncQueueJob(qj *arbv1.AppWrapper) error { //Make a copy first to not update cache object and to use for comparing awNew := qj.DeepCopy() // we call sync to update pods running, pending,... - err := cc.qjobResControls[arbv1.ResourceTypePod].UpdateQueueJobStatus(awNew) - if err != nil { - glog.Errorf("[syncQueueJob] Error updating pod status counts for AppWrapper job: %s, err=%+v", qj.Name, err) - } - glog.V(10).Infof("[syncQueueJob] AW popped from event queue %s &qj=%p Version=%s Status=%+v", awNew.Name, awNew, awNew.ResourceVersion, awNew.Status) - - if ! reflect.DeepEqual(awNew.Status, qj.Status) { - podPhaseChanges = true - // Using DeepCopy before DeepCopyInto as it seems that DeepCopyInto does not alloc a new memory object - awNewStatus := awNew.Status.DeepCopy() - awNewStatus.DeepCopyInto(&qj.Status) - //awNew.Status.DeepCopy().DeepCopyInto(&qj.Status) - glog.V(10).Infof("[syncQueueJob] AW pod phase change(s) detected %s &eventqueueaw=%p eventqueueawVersion=%s eventqueueawStatus=%+v; &newaw=%p newawVersion=%s newawStatus=%+v", - qj.Name, qj, qj.ResourceVersion, qj.Status, awNew, awNew.ResourceVersion, awNew.Status) + if (qj.Status.State == arbv1.AppWrapperStateActive) { + err := cc.qjobResControls[arbv1.ResourceTypePod].UpdateQueueJobStatus(awNew) + if err != nil { + glog.Errorf("[syncQueueJob] Error updating pod status counts for AppWrapper job: %s, err=%+v", qj.Name, err) + } + glog.V(10).Infof("[syncQueueJob] AW popped from event queue %s &qj=%p Version=%s Status=%+v", awNew.Name, awNew, awNew.ResourceVersion, awNew.Status) + + // Update etcd conditions if AppWrapper Job has at least 1 running pod and transitioning from dispatched to running. + if (awNew.Status.QueueJobState != arbv1.AppWrapperCondRunning ) && (awNew.Status.Running > 0) { + awNew.Status.QueueJobState = arbv1.AppWrapperCondRunning + cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondRunning, v1.ConditionTrue, "PodsRunning", "") + awNew.Status.Conditions = append(awNew.Status.Conditions, cond) + awNew.Status.FilterIgnore = true // Update AppWrapperCondRunning + cc.updateEtcd(awNew, "[syncQueueJob] setRunning") + } + + //For debugging? + if ! reflect.DeepEqual(awNew.Status, qj.Status) { + podPhaseChanges = true + // Using DeepCopy before DeepCopyInto as it seems that DeepCopyInto does not alloc a new memory object + awNewStatus := awNew.Status.DeepCopy() + awNewStatus.DeepCopyInto(&qj.Status) + //awNew.Status.DeepCopy().DeepCopyInto(&qj.Status) + glog.V(10).Infof("[syncQueueJob] AW pod phase change(s) detected %s &eventqueueaw=%p eventqueueawVersion=%s eventqueueawStatus=%+v; &newaw=%p newawVersion=%s newawStatus=%+v", + qj.Name, qj, qj.ResourceVersion, qj.Status, awNew, awNew.ResourceVersion, awNew.Status) + } } } @@ -1302,12 +1420,17 @@ func (cc *XController) manageQueueJob(qj *arbv1.AppWrapper, podPhaseChanges bool if !qj.Status.CanRun && (qj.Status.State != arbv1.AppWrapperStateEnqueued && qj.Status.State != arbv1.AppWrapperStateDeleted) { // if there are running resources for this job then delete them because the job was put in // pending state... - glog.V(2).Infof("[manageQueueJob] Deleting resources for AppWrapper Job %s because it was preempted (newjob) status=%+v\n", qj.Name, qj.Status) - err = cc.Cleanup(qj) - glog.V(8).Infof("[manageQueueJob] Validation after deleting resources for AppWrapper Job %s because it was be preempted (newjob) status=%+v\n", qj.Name, qj.Status) - if err != nil { - glog.Errorf("[manageQueueJob] Fail to delete resources for AppWrapper Job %s, err=%#v", qj.Name, err) - return err + + // If this the first time seeing this AW, no need to delete. + stateLen := len(qj.Status.State) + if (stateLen > 0) { + glog.V(2).Infof("[manageQueueJob] Deleting resources for AppWrapper Job %s because it was preempted, status=%+v\n", qj.Name, qj.Status) + err = cc.Cleanup(qj) + glog.V(8).Infof("[manageQueueJob] Validation after deleting resources for AppWrapper Job %s because it was be preempted, status=%+v\n", qj.Name, qj.Status) + if err != nil { + glog.Errorf("[manageQueueJob] Fail to delete resources for AppWrapper Job %s, err=%#v", qj.Name, err) + return err + } } qj.Status.State = arbv1.AppWrapperStateEnqueued @@ -1315,7 +1438,11 @@ func (cc *XController) manageQueueJob(qj *arbv1.AppWrapper, podPhaseChanges bool if cc.qjqueue.IfExistUnschedulableQ(qj) { glog.V(10).Infof("[worker-manageQJ] leaving %s to qjqueue.UnschedulableQ activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", qj.Name, cc.qjqueue.IfExistActiveQ(qj), cc.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) } else { - qj.Status.QueueJobState = arbv1.QueueJobStateQueueing + glog.V(10).Infof("[worker-manageQJ] before add to activeQ %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", qj.Name, cc.qjqueue.IfExistActiveQ(qj), cc.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) + qj.Status.QueueJobState = arbv1.AppWrapperCondQueueing + cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondQueueing, v1.ConditionTrue, "AwaitingHeadOfLine", "") + qj.Status.Conditions = append(qj.Status.Conditions, cond) + qj.Status.FilterIgnore = true // Update Queueing status, add to qjqueue for ScheduleNext cc.updateEtcd(qj, "manageQueueJob - setQueueing") glog.V(10).Infof("[worker-manageQJ] before add to activeQ %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", qj.Name, cc.qjqueue.IfExistActiveQ(qj), cc.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) @@ -1349,11 +1476,14 @@ func (cc *XController) manageQueueJob(qj *arbv1.AppWrapper, podPhaseChanges bool glog.V(3).Infof("[worker-manageQJ] %s 3Delay=%.6f seconds BeforeDispatchingToEtcd Version=%s Status=%+v", qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj.ResourceVersion, qj.Status) dispatched := true + dispatchFailureReason := "ItemCreationFailure." + dispatchFailureMessage := "" for _, ar := range qj.Spec.AggrResources.Items { glog.V(10).Infof("[worker-manageQJ] before dispatch [%v].SyncQueueJob %s &qj=%p Version=%s Status=%+v", ar.Type, qj.Name, qj, qj.ResourceVersion, qj.Status) // Call Resource Controller of ar.Type to issue REST call to Etcd for resource creation err00 := cc.qjobResControls[ar.Type].SyncQueueJob(qj, &ar) if err00 != nil { + dispatchFailureMessage = fmt.Sprintf("%s/%s creation failure: %+v", qj.Namespace, qj.Name, err00) glog.V(3).Infof("[worker-manageQJ] Error dispatching job=%s type=%v Status=%+v err=%+v", qj.Name, ar.Type, qj.Status, err00) dispatched = false break @@ -1364,18 +1494,26 @@ func (cc *XController) manageQueueJob(qj *arbv1.AppWrapper, podPhaseChanges bool glog.V(10).Infof("[worker-manageQJ] before dispatch Generic.SyncQueueJob %s &qj=%p Version=%s Status=%+v", qj.Name, qj, qj.ResourceVersion, qj.Status) _, err00 := cc.genericresources.SyncQueueJob(qj, &ar) if err00 != nil { + dispatchFailureMessage = fmt.Sprintf("%s/%s creation failure: %+v", qj.Namespace, qj.Name, err00) glog.Errorf("[worker-manageQJ] Error dispatching job=%s Status=%+v err=%+v", qj.Name, qj.Status, err00) dispatched = false } } - if dispatched { // set QueueJobStateRunning if all resources are successfully dispatched - qj.Status.QueueJobState = arbv1.QueueJobStateDispatched + if dispatched { // set AppWrapperCondRunning if all resources are successfully dispatched + qj.Status.QueueJobState = arbv1.AppWrapperCondDispatched + cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondDispatched, v1.ConditionTrue, "AppWrapperRunnable", "") + qj.Status.Conditions = append(qj.Status.Conditions, cond) + glog.V(3).Infof("[worker-manageQJ] %s 4Delay=%.6f seconds AllResourceDispatchedToEtcd Version=%s Status=%+v", qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj.ResourceVersion, qj.Status) } else { qj.Status.State = arbv1.AppWrapperStateFailed - qj.Status.QueueJobState = arbv1.QueueJobStateFailed + qj.Status.QueueJobState = arbv1.AppWrapperCondFailed + if ( !isLastConditionDuplicate(qj,arbv1.AppWrapperCondFailed, v1.ConditionTrue, dispatchFailureReason, dispatchFailureMessage) ) { + cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondFailed, v1.ConditionTrue, dispatchFailureReason, dispatchFailureMessage) + qj.Status.Conditions = append(qj.Status.Conditions, cond) + } cc.Cleanup(qj) } @@ -1429,7 +1567,7 @@ func (cc *XController) manageQueueJob(qj *arbv1.AppWrapper, podPhaseChanges bool glog.V(10).Infof("[worker-manageQJ] leaving %s to qjqueue.UnschedulableQ activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", qj.Name, cc.qjqueue.IfExistActiveQ(qj), cc.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) } else { glog.V(10).Infof("[worker-manageQJ] before add to activeQ %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", qj.Name, cc.qjqueue.IfExistActiveQ(qj), cc.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) - qj.Status.QueueJobState = arbv1.QueueJobStateQueueing + qj.Status.QueueJobState = arbv1.AppWrapperCondQueueing qj.Status.FilterIgnore = true // Update Queueing status, add to qjqueue for ScheduleNext cc.updateEtcd(qj, "manageQueueJob - setQueueing") if err = cc.qjqueue.AddIfNotPresent(qj); err != nil { @@ -1492,7 +1630,7 @@ func (cc *XController) manageQueueJob(qj *arbv1.AppWrapper, podPhaseChanges bool //Cleanup function func (cc *XController) Cleanup(queuejob *arbv1.AppWrapper) error { - glog.V(3).Infof("[Cleanup] begin AppWrapper %s Version=%s Status=%+v\n", queuejob.Name, queuejob.ResourceVersion, queuejob.Status) + glog.V(4).Infof("[Cleanup] begin AppWrapper %s Version=%s Status=%+v\n", queuejob.Name, queuejob.ResourceVersion, queuejob.Status) if !cc.isDispatcher { if queuejob.Spec.AggrResources.Items != nil { @@ -1517,7 +1655,7 @@ func (cc *XController) Cleanup(queuejob *arbv1.AppWrapper) error { queuejob.Status.Running = 0 queuejob.Status.Succeeded = 0 queuejob.Status.Failed = 0 - glog.V(3).Infof("[Cleanup] end AppWrapper %s Version=%s Status=%+v\n", queuejob.Name, queuejob.ResourceVersion, queuejob.Status) + glog.V(10).Infof("[Cleanup] end AppWrapper %s Version=%s Status=%+v\n", queuejob.Name, queuejob.ResourceVersion, queuejob.Status) return nil } diff --git a/pkg/controller/queuejob/utils.go b/pkg/controller/queuejob/utils.go index d47f4ec8f..07b3892cc 100644 --- a/pkg/controller/queuejob/utils.go +++ b/pkg/controller/queuejob/utils.go @@ -8,7 +8,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/uuid" - "k8s.io/client-go/rest" + "k8s.io/client-go/rest" arbv1 "github.com/IBM/multi-cluster-app-dispatcher/pkg/apis/controller/v1alpha1" "github.com/IBM/multi-cluster-app-dispatcher/pkg/client/clientset/controller-versioned/clients" @@ -146,4 +146,37 @@ func createAppWrapperKind(config *rest.Config) error { return nil } +// AppWrapperCondition returns condition of a AppWrapper condition. +func GenerateAppWrapperCondition(condType arbv1.AppWrapperConditionType, condStatus corev1.ConditionStatus, condReason string, condMsg string) arbv1.AppWrapperCondition { + return arbv1.AppWrapperCondition{ + Type: condType, + Status: condStatus, + LastUpdateMicroTime: metav1.NowMicro(), + LastTransitionMicroTime: metav1.NowMicro(), + Reason: condReason, + Message: condMsg, + } +} + +// AppWrapperCondition returns condition of a AppWrapper condition. +func isLastConditionDuplicate(aw *arbv1.AppWrapper, condType arbv1.AppWrapperConditionType, condStatus corev1.ConditionStatus, condReason string, condMsg string) bool { + if (aw.Status.Conditions == nil) { + return false + } + lastIndex := len(aw.Status.Conditions) - 1 + + if (lastIndex < 0) { + return false + } + + lastCond := aw.Status.Conditions[lastIndex] + if (lastCond.Type == condType) && + (lastCond.Status == condStatus) && + (lastCond.Reason == condReason) && + (lastCond.Message == condMsg) { + return true + } else { + return false + } +} \ No newline at end of file diff --git a/test/e2e/queue.go b/test/e2e/queue.go index 902cca15f..03b8bfa29 100644 --- a/test/e2e/queue.go +++ b/test/e2e/queue.go @@ -29,7 +29,7 @@ var _ = Describe("AppWrapper E2E Test", func() { It("Create AppWrapper - Generic 100 Deployment Only - 2 pods each", func() { context := initTestContext() - defer cleanupTestContextExtendedTime(context, (240 * time.Second)) + defer cleanupTestContextExtendedTime(context, (300 * time.Second)) const ( awCount = 100