Skip to content

Commit

Permalink
modify podgroup events
Browse files Browse the repository at this point in the history
  • Loading branch information
sivanzcw committed Nov 9, 2019
1 parent 22c00ac commit e19c365
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 22 deletions.
13 changes: 13 additions & 0 deletions pkg/apis/scheduling/types.go
Expand Up @@ -57,7 +57,20 @@ const (
type PodGroupConditionType string

const (
// PodGroupUnschedulableType is Unschedulable event type
PodGroupUnschedulableType PodGroupConditionType = "Unschedulable"

// PodGroupScheduled is scheduled event type
PodGroupScheduled PodGroupConditionType = "Scheduled"
)

type PodGroupSchedulingStatus string

const (
// PodGroupReady is that PodGroup has reached scheduling restriction
PodGroupReady PodGroupSchedulingStatus = "pod group is ready"
// PodGroupNotReady is that PodGroup has not yet reached the scheduling restriction
PodGroupNotReady PodGroupSchedulingStatus = "pod group is not ready"
)

// PodGroupCondition contains details for the current state of this pod group.
Expand Down
3 changes: 2 additions & 1 deletion pkg/scheduler/api/job_info.go
Expand Up @@ -26,6 +26,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

"volcano.sh/volcano/pkg/apis/scheduling"
"volcano.sh/volcano/pkg/apis/scheduling/v1alpha2"
)

Expand Down Expand Up @@ -335,7 +336,7 @@ func (ji *JobInfo) FitError() string {
sort.Strings(reasonStrings)
return reasonStrings
}
reasonMsg := fmt.Sprintf("job is not ready, %v.", strings.Join(sortReasonsHistogram(), ", "))
reasonMsg := fmt.Sprintf("%v, %v.", scheduling.PodGroupNotReady, strings.Join(sortReasonsHistogram(), ", "))
return reasonMsg
}

Expand Down
74 changes: 53 additions & 21 deletions pkg/scheduler/cache/cache.go
Expand Up @@ -586,11 +586,29 @@ func (sc *SchedulerCache) Bind(taskInfo *schedulingapi.TaskInfo, hostname string

p := task.Pod

var pgCopy schedulingapi.PodGroup
if job.PodGroup != nil {
pgCopy = schedulingapi.PodGroup{
Version: job.PodGroup.Version,
PodGroup: *job.PodGroup.PodGroup.DeepCopy(),
}
}

go func() {
if err := sc.Binder.Bind(p, hostname); err != nil {
sc.resyncTask(task)
} else {
sc.Recorder.Eventf(p, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", p.Namespace, p.Name, hostname)

if job.PodGroup != nil {
msg := fmt.Sprintf("%v/%v tasks in gang unschedulable: %v, %v minAvailable, %v Pending",
len(job.TaskStatusIndex[schedulingapi.Pending]),
len(job.Tasks),
scheduling.PodGroupReady,
job.MinAvailable,
len(job.TaskStatusIndex[schedulingapi.Pending]))
sc.recordPodGroupEvent(&pgCopy, v1.EventTypeNormal, string(scheduling.PodGroupScheduled), msg)
}
}
}()

Expand Down Expand Up @@ -821,31 +839,17 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *schedulingapi.JobInfo) {

pgUnschedulable := job.PodGroup != nil &&
(job.PodGroup.Status.Phase == scheduling.PodGroupUnknown ||
job.PodGroup.Status.Phase == scheduling.PodGroupPending)
job.PodGroup.Status.Phase == scheduling.PodGroupPending ||
job.PodGroup.Status.Phase == scheduling.PodGroupInqueue)
pdbUnschedulabe := job.PDB != nil && len(job.TaskStatusIndex[schedulingapi.Pending]) != 0

// If pending or unschedulable, record unschedulable event.
if pgUnschedulable || pdbUnschedulabe {
msg := fmt.Sprintf("%v/%v tasks in gang unschedulable: %v", len(job.TaskStatusIndex[schedulingapi.Pending]), len(job.Tasks), job.FitError())
if job.PodGroup.Version == schedulingapi.PodGroupVersionV1Alpha1 {
podgroup := &v1alpha1.PodGroup{}
if err := schedulingscheme.Scheme.Convert(&job.PodGroup.PodGroup, podgroup, nil); err != nil {
glog.Errorf("Error while converting PodGroup to v1alpha1.PodGroup with error: %v", err)
return
}
sc.Recorder.Eventf(podgroup, v1.EventTypeWarning,
string(v1alpha1.PodGroupUnschedulableType), msg)
}

if job.PodGroup.Version == schedulingapi.PodGroupVersionV1Alpha2 {
podgroup := &v1alpha2.PodGroup{}
if err := schedulingscheme.Scheme.Convert(&job.PodGroup.PodGroup, podgroup, nil); err != nil {
glog.Errorf("Error while converting PodGroup to v1alpha2.PodGroup with error: %v", err)
return
}
sc.Recorder.Eventf(podgroup, v1.EventTypeWarning,
string(v1alpha2.PodGroupUnschedulableType), msg)
}
msg := fmt.Sprintf("%v/%v tasks in gang unschedulable: %v",
len(job.TaskStatusIndex[schedulingapi.Pending]),
len(job.Tasks),
job.FitError())
sc.recordPodGroupEvent(job.PodGroup, v1.EventTypeWarning, string(scheduling.PodGroupUnschedulableType), msg)
}

// Update podCondition for tasks Allocated and Pending before job discarded
Expand Down Expand Up @@ -878,3 +882,31 @@ func (sc *SchedulerCache) UpdateJobStatus(job *schedulingapi.JobInfo, updatePG b

return job, nil
}

func (sc *SchedulerCache) recordPodGroupEvent(podGroup *schedulingapi.PodGroup, eventType, reason, msg string) {
if podGroup == nil {
return
}

if podGroup.Version == schedulingapi.PodGroupVersionV1Alpha1 {
pg := &v1alpha1.PodGroup{}
if err := schedulingscheme.Scheme.Convert(&podGroup.PodGroup, pg, nil); err != nil {
glog.Errorf("Error while converting PodGroup to v1alpha1.PodGroup with error: %v", err)
return
}

sc.Recorder.Eventf(pg, eventType, reason, msg)
}

if podGroup.Version == schedulingapi.PodGroupVersionV1Alpha2 {
pg := &v1alpha2.PodGroup{}
if err := schedulingscheme.Scheme.Convert(&podGroup.PodGroup, pg, nil); err != nil {
glog.Errorf("Error while converting PodGroup to v1alpha2.PodGroup with error: %v", err)
return
}

sc.Recorder.Eventf(pg, eventType, reason, msg)
}

return
}

0 comments on commit e19c365

Please sign in to comment.