Skip to content

Commit

Permalink
fix ut
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyuqing4 authored and lminzhw committed Jul 3, 2019
1 parent d38f70d commit 1559537
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 20 deletions.
31 changes: 17 additions & 14 deletions pkg/controllers/job/job_controller_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,13 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseM
}

// Update Job status
job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job)
newjob, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job)
if err != nil {
glog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
}
if e := cc.cache.Update(job); e != nil {
if e := cc.cache.Update(newjob); e != nil {
glog.Errorf("KillJob - Failed to update Job %v/%v in cache: %v",
job.Namespace, job.Name, e)
return e
Expand Down Expand Up @@ -157,10 +157,12 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, updateStatus state.Update
job := jobInfo.Job.DeepCopy()
glog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name)

if err := cc.initJobStatus(job); err != nil {
if update, err := cc.initJobStatus(job); err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.JobStatusError),
fmt.Sprintf("Failed to initialize job status, err: %v", err))
return err
} else if update {
return nil
}

if err := cc.pluginOnJobAdd(job); err != nil {
Expand Down Expand Up @@ -188,13 +190,13 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, updateStatus state.Update
}
}

job, err = cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job)
newjob, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job)
if err != nil {
glog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
}
if err = cc.cache.Update(job); err != nil {
if err = cc.cache.Update(newjob); err != nil {
glog.Errorf("CreateJob - Failed to update Job %v/%v in cache: %v",
job.Namespace, job.Name, err)
return err
Expand Down Expand Up @@ -350,13 +352,14 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
job.Status.State.LastTransitionTime = metav1.Now()
}
}
job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job)

newjob, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job)
if err != nil {
glog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
}
if e := cc.cache.Update(job); e != nil {
if e := cc.cache.Update(newjob); e != nil {
glog.Errorf("SyncJob - Failed to update Job %v/%v in cache: %v",
job.Namespace, job.Name, e)
return e
Expand Down Expand Up @@ -537,24 +540,24 @@ func (cc *Controller) calcPGMinResources(job *vkv1.Job) *v1.ResourceList {
return &minAvailableTasksRes
}

func (cc *Controller) initJobStatus(job *vkv1.Job) error {
func (cc *Controller) initJobStatus(job *vkv1.Job) (bool, error) {
if job.Status.State.Phase != "" {
return nil
return false, nil
}

job.Status.State.Phase = vkv1.Pending
job.Status.MinAvailable = int32(job.Spec.MinAvailable)
job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job)
newjob, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job)
if err != nil {
glog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
return false, err
}
if err := cc.cache.Update(job); err != nil {
if err := cc.cache.Update(newjob); err != nil {
glog.Errorf("CreateJob - Failed to update Job %v/%v in cache: %v",
job.Namespace, job.Name, err)
return err
return false, err
}

return nil
return true, nil
}
9 changes: 4 additions & 5 deletions test/e2e/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package e2e
import (
"bytes"
"fmt"
v1 "k8s.io/api/core/v1"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand Down Expand Up @@ -98,9 +98,8 @@ var _ = Describe("Job E2E Test: Test Job Command", func() {

//Pod is gone
podName := jobUtil.MakePodName(jobName, taskName, 0)
_, err = context.kubeclient.CoreV1().Pods(namespace).Get(podName, metav1.GetOptions{})
Expect(apierrors.IsNotFound(err)).To(BeTrue(),
"Job related pod should be deleted when aborting job.")
err = waitPodGone(context, podName, job.Namespace)
Expect(err).NotTo(HaveOccurred())

//Resume job
ResumeJob(jobName, namespace)
Expand All @@ -116,7 +115,7 @@ var _ = Describe("Job E2E Test: Test Job Command", func() {
It("Suspend pending job", func() {
context := initTestContext()
defer cleanupTestContext(context)
rep := clusterSize(context, oneCPU) * 2
rep := clusterSize(context, oneCPU)

jobName := "test-suspend-pending-job"
namespace := "test"
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/job_scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ var _ = Describe("Job E2E Test", func() {

job.name = "gang-fq-qj2"
job2 := createJob(context, job)
err = waitJobPending(context, job2)
err = waitJobStatePending(context, job2)
Expect(err).NotTo(HaveOccurred())

err = waitJobReady(context, job1)
Expand Down
17 changes: 17 additions & 0 deletions test/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,3 +1005,20 @@ func readyNodeAmount(ctx *context) int {
}
return amount
}

func waitPodGone(ctx *context, podName, namespace string) error {
var additionalError error
err := wait.Poll(100*time.Millisecond, oneMinute, func() (bool, error) {
_, err := ctx.kubeclient.CoreV1().Pods(namespace).Get(podName, metav1.GetOptions{})
expected := errors.IsNotFound(err)
if !expected {
additionalError = fmt.Errorf("Job related pod should be deleted when aborting job.")
}

return expected, nil
})
if err != nil && strings.Contains(err.Error(), timeOutMessage) {
return fmt.Errorf("[Wait time out]: %s", additionalError)
}
return err
}

0 comments on commit 1559537

Please sign in to comment.