Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove inqueue job phase #421

Merged
merged 1 commit into from
Aug 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions docs/design/job-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -648,8 +648,6 @@ const (
Terminated JobPhase = "Terminated"
// Failed is the phase that the job is restarted failed reached the maximum number of retries.
Failed JobPhase = "Failed"
// Inqueue is the phase that cluster have idle resource to schedule the job
Inqueue JobPhase = "Inqueue"
)

// JobState contains details for the current state of the job.
Expand Down
2 changes: 0 additions & 2 deletions pkg/apis/batch/v1alpha1/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,6 @@ const (
Terminated JobPhase = "Terminated"
// Failed is the phase that the job is restarted failed reached the maximum number of retries.
Failed JobPhase = "Failed"
// Inqueue is the phase that cluster have idle resource to schedule the job
Inqueue JobPhase = "Inqueue"
)

// JobState contains details for the current state of the job.
Expand Down
1 change: 0 additions & 1 deletion pkg/controllers/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ func NewJobController(
// Register actions
state.SyncJob = cc.syncJob
state.KillJob = cc.killJob
state.CreateJob = cc.createJob

return cc
}
Expand Down
43 changes: 13 additions & 30 deletions pkg/controllers/job/job_controller_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,58 +143,34 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseM
return nil
}

func (cc *Controller) createJob(jobInfo *apis.JobInfo, updateStatus state.UpdateStatusFn) error {
glog.V(3).Infof("Starting to create Job <%s/%s>", jobInfo.Job.Namespace, jobInfo.Job.Name)
defer glog.V(3).Infof("Finished Job <%s/%s> create", jobInfo.Job.Namespace, jobInfo.Job.Name)

job := jobInfo.Job.DeepCopy()
glog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name)

func (cc *Controller) createJob(job *vkv1.Job) (*vkv1.Job, error) {
job, err := cc.initJobStatus(job)
if err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.JobStatusError),
fmt.Sprintf("Failed to initialize job status, err: %v", err))
return err
return nil, err
}

if err := cc.pluginOnJobAdd(job); err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.PluginError),
fmt.Sprintf("Execute plugin when job add failed, err: %v", err))
return err
return nil, err
}

if err := cc.createPodGroupIfNotExist(job); err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.PodGroupError),
fmt.Sprintf("Failed to create PodGroup, err: %v", err))
return err
return nil, err
}

newJob, err := cc.createJobIOIfNotExist(job)
if err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.PVCError),
fmt.Sprintf("Failed to create PVC, err: %v", err))
return err
}

if updateStatus != nil {
if updateStatus(&newJob.Status) {
newJob.Status.State.LastTransitionTime = metav1.Now()
}
}

newJob2, err := cc.vkClients.BatchV1alpha1().Jobs(newJob.Namespace).UpdateStatus(newJob)
if err != nil {
glog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
}
if err = cc.cache.Update(newJob2); err != nil {
glog.Errorf("CreateJob - Failed to update Job %v/%v in cache: %v",
newJob2.Namespace, newJob2.Name, err)
return err
return nil, err
}

return nil
return newJob, nil
}

func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateStatusFn) error {
Expand All @@ -210,6 +186,11 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
return nil
}

var err error
if job, err = cc.createJob(job); err != nil {
return err
}

var running, pending, terminating, succeeded, failed, unknown int32

var podToCreate []*v1.Pod
Expand Down Expand Up @@ -404,6 +385,8 @@ func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) (*vkv1.Job, error) {
job.Namespace, job.Name, err)
return nil, err
}
newJob.Status = job.Status

return newJob, err
}
return job, nil
Expand Down
118 changes: 5 additions & 113 deletions pkg/controllers/job/job_controller_actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,119 +165,6 @@ func TestKillJobFunc(t *testing.T) {
}
}

func TestCreateJobFunc(t *testing.T) {
namespace := "test"

testcases := []struct {
Name string
Job *v1alpha1.Job
PodGroup *schedulingv1alpha2.PodGroup
UpdateStatus state.UpdateStatusFn
JobInfo *apis.JobInfo
Plugins []string
ExpextVal error
}{
{
Name: "CreateJob success Case",
Job: &v1alpha1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job1",
Namespace: namespace,
},
Status: v1alpha1.JobStatus{
State: v1alpha1.JobState{
Phase: v1alpha1.Pending,
},
},
},
PodGroup: &schedulingv1alpha2.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Name: "job1",
Namespace: namespace,
},
},
UpdateStatus: nil,
JobInfo: &apis.JobInfo{
Namespace: namespace,
Name: "jobinfo1",
},
Plugins: []string{"svc", "ssh", "env"},
ExpextVal: nil,
},
}

for i, testcase := range testcases {

t.Run(testcase.Name, func(t *testing.T) {
fakeController := newFakeController()
jobPlugins := make(map[string][]string)

for _, plugin := range testcase.Plugins {
jobPlugins[plugin] = make([]string, 0)
}
testcase.JobInfo.Job = testcase.Job
testcase.JobInfo.Job.Spec.Plugins = jobPlugins

_, err := fakeController.vkClients.BatchV1alpha1().Jobs(namespace).Create(testcase.Job)
if err != nil {
t.Errorf("Case %d (%s): expected: No Error, but got error %s", i, testcase.Name, err.Error())
}

err = fakeController.cache.Add(testcase.Job)
if err != nil {
t.Error("Error While Adding Job in cache")
}

err = fakeController.createJob(testcase.JobInfo, testcase.UpdateStatus)
if err != nil {
t.Errorf("Case %d (%s): expected: No Error, but got error %s", i, testcase.Name, err.Error())
}

job, err := fakeController.vkClients.BatchV1alpha1().Jobs(namespace).Get(testcase.Job.Name, metav1.GetOptions{})
if err != nil {
t.Errorf("Case %d (%s): expected: No Error, but got error %s", i, testcase.Name, err.Error())
}
for _, plugin := range testcase.Plugins {

if plugin == "svc" {
_, err = fakeController.kubeClients.CoreV1().Services(namespace).Get(testcase.Job.Name, metav1.GetOptions{})
if err != nil {
t.Errorf("Case %d (%s): expected: Service to be created, but not created because of error %s", i, testcase.Name, err.Error())
}

_, err = fakeController.kubeClients.CoreV1().ConfigMaps(namespace).Get(fmt.Sprint(testcase.Job.Name, "-svc"), metav1.GetOptions{})
if err != nil {
t.Errorf("Case %d (%s): expected: Service to be created, but not created because of error %s", i, testcase.Name, err.Error())
}

exist := job.Status.ControlledResources["plugin-svc"]
if exist == "" {
t.Errorf("Case %d (%s): expected: ControlledResources should be added, but not got added", i, testcase.Name)
}
}

if plugin == "ssh" {
_, err := fakeController.kubeClients.CoreV1().ConfigMaps(namespace).Get(fmt.Sprint(testcase.Job.Name, "-ssh"), metav1.GetOptions{})
if err != nil {
t.Errorf("Case %d (%s): expected: ConfigMap to be created, but not created because of error %s", i, testcase.Name, err.Error())
}
exist := job.Status.ControlledResources["plugin-ssh"]
if exist == "" {
t.Errorf("Case %d (%s): expected: ControlledResources should be added, but not got added", i, testcase.Name)
}
}
if plugin == "env" {
exist := job.Status.ControlledResources["plugin-env"]
if exist == "" {
t.Errorf("Case %d (%s): expected: ControlledResources should be added, but not got added", i, testcase.Name)
}
}
}
})

}
}

func TestSyncJobFunc(t *testing.T) {
namespace := "test"

Expand Down Expand Up @@ -321,6 +208,11 @@ func TestSyncJobFunc(t *testing.T) {
},
},
},
Status: v1alpha1.JobStatus{
State: v1alpha1.JobState{
Phase: v1alpha1.Pending,
},
},
},
PodGroup: &schedulingv1alpha2.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Expand Down
4 changes: 1 addition & 3 deletions pkg/controllers/job/job_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func (cc *Controller) updatePodGroup(oldObj, newObj interface{}) {
}

_, err := cc.cache.Get(vkcache.JobKeyByName(newPG.Namespace, newPG.Name))
if err != nil {
if err != nil && newPG.Annotations != nil {
glog.Warningf(
"Failed to find job in cache by PodGroup, this may not be a PodGroup for volcano job.")
}
Expand All @@ -423,8 +423,6 @@ func (cc *Controller) updatePodGroup(oldObj, newObj interface{}) {
switch newPG.Status.Phase {
case kbtype.PodGroupUnknown:
req.Event = vkbatchv1.JobUnknownEvent
case kbtype.PodGroupInqueue:
req.Action = vkbatchv1.EnqueueAction
}
key := vkjobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
Expand Down
Loading