diff --git a/pkg/apis/batch/v1alpha1/job.go b/pkg/apis/batch/v1alpha1/job.go index d1a132eb00..f29814acaa 100644 --- a/pkg/apis/batch/v1alpha1/job.go +++ b/pkg/apis/batch/v1alpha1/job.go @@ -97,6 +97,8 @@ const ( OutOfSyncEvent Event = "OutOfSync" // CommandIssuedEvent is triggered if a command is raised by user CommandIssuedEvent Event = "CommandIssued" + // TaskCompletedEvent is triggered if the 'Replicas' amount of pods in one task are succeed + TaskCompletedEvent Event = "TaskCompleted" ) // Action is the action that Job controller will take according to the event. @@ -114,6 +116,8 @@ const ( // TerminateJobAction if this action is set, the whole job wil be terminated // and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated. TerminateJobAction Action = "TerminateJob" + //CompleteJobAction if this action is set, the unfinished pods will be killed, job completed. + CompleteJobAction Action = "CompleteJob" // ResumeJobAction is the action to resume an aborted job. ResumeJobAction Action = "ResumeJob" @@ -170,6 +174,8 @@ const ( Running JobPhase = "Running" // Restarting is the phase that the Job is restarted, waiting for pod releasing and recreating Restarting JobPhase = "Restarting" + // Completing is the phase that required tasks of job are completed, job starts to clean up + Completing JobPhase = "Completing" // Completed is the phase that all tasks of Job are completed Completed JobPhase = "Completed" // Terminating is the phase that the Job is terminated, waiting for releasing pods diff --git a/pkg/controllers/job/cache/cache.go b/pkg/controllers/job/cache/cache.go index d9866c2588..ab34966f7c 100644 --- a/pkg/controllers/job/cache/cache.go +++ b/pkg/controllers/job/cache/cache.go @@ -230,6 +230,37 @@ func (jc *jobCache) Run(stopCh <-chan struct{}) { wait.Until(jc.processCleanupJob, 0, stopCh) } +func (jc jobCache) TaskCompleted(jobKey, taskName string) bool { + var taskReplicas, completed int32 + + jobInfo, found := jc.jobs[jobKey] + if !found { + return false + } + + taskPods, found := jobInfo.Pods[taskName] + + if !found { + return false + } + + for _, task := range jobInfo.Job.Spec.Tasks { + if task.Name == taskName { + taskReplicas = task.Replicas + } + } + if taskReplicas <= 0 { + return false + } + + for _, pod := range taskPods { + if pod.Status.Phase == v1.PodSucceeded { + completed += 1 + } + } + return completed >= taskReplicas +} + func (jc *jobCache) processCleanupJob() { obj, shutdown := jc.deletedJobs.Get() if shutdown { diff --git a/pkg/controllers/job/cache/interface.go b/pkg/controllers/job/cache/interface.go index e9a4506289..7cda0e084d 100644 --- a/pkg/controllers/job/cache/interface.go +++ b/pkg/controllers/job/cache/interface.go @@ -35,4 +35,6 @@ type Cache interface { AddPod(pod *v1.Pod) error UpdatePod(pod *v1.Pod) error DeletePod(pod *v1.Pod) error + + TaskCompleted(jobKey, taskName string) bool } diff --git a/pkg/controllers/job/job_controller_handler.go b/pkg/controllers/job/job_controller_handler.go index 4672262b9f..3e512f4cdc 100644 --- a/pkg/controllers/job/job_controller_handler.go +++ b/pkg/controllers/job/job_controller_handler.go @@ -197,12 +197,24 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) { return } + if err := cc.cache.UpdatePod(newPod); err != nil { + glog.Errorf("Failed to update Pod <%s/%s>: %v in cache", + newPod.Namespace, newPod.Name, err) + } + event := vkbatchv1.OutOfSyncEvent if oldPod.Status.Phase != v1.PodFailed && newPod.Status.Phase == v1.PodFailed { event = vkbatchv1.PodFailedEvent } + if oldPod.Status.Phase != v1.PodSucceeded && + newPod.Status.Phase == v1.PodSucceeded { + if cc.cache.TaskCompleted(vkcache.JobKeyByName(newPod.Namespace, jobName), taskName) { + event = vkbatchv1.TaskCompletedEvent + } + } + req := apis.Request{ Namespace: newPod.Namespace, JobName: jobName, @@ -212,11 +224,6 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) { JobVersion: int32(dVersion), } - if err := cc.cache.UpdatePod(newPod); err != nil { - glog.Errorf("Failed to update Pod <%s/%s>: %v in cache", - newPod.Namespace, newPod.Name, err) - } - cc.queue.Add(req) } diff --git a/pkg/controllers/job/state/completing.go b/pkg/controllers/job/state/completing.go new file mode 100644 index 0000000000..c1317c2a78 --- /dev/null +++ b/pkg/controllers/job/state/completing.go @@ -0,0 +1,41 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" + "volcano.sh/volcano/pkg/controllers/job/apis" +) + +type completingState struct { + job *apis.JobInfo +} + +func (ps *completingState) Execute(action vkv1.Action) error { + return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState { + // If any "alive" pods, still in Completing phase + if status.Terminating != 0 || status.Pending != 0 || status.Running != 0 { + return vkv1.JobState{ + Phase: vkv1.Completing, + } + } + + return vkv1.JobState{ + Phase: vkv1.Completed, + } + }) +} diff --git a/pkg/controllers/job/state/factory.go b/pkg/controllers/job/state/factory.go index be740b3ca4..8394584cce 100644 --- a/pkg/controllers/job/state/factory.go +++ b/pkg/controllers/job/state/factory.go @@ -53,6 +53,8 @@ func NewState(jobInfo *apis.JobInfo) State { return &abortingState{job: jobInfo} case vkv1.Aborted: return &abortedState{job: jobInfo} + case vkv1.Completing: + return &completingState{job: jobInfo} } // It's pending by default. diff --git a/pkg/controllers/job/state/pending.go b/pkg/controllers/job/state/pending.go index dea311b381..0aa471325f 100644 --- a/pkg/controllers/job/state/pending.go +++ b/pkg/controllers/job/state/pending.go @@ -46,6 +46,17 @@ func (ps *pendingState) Execute(action vkv1.Action) error { phase = vkv1.Aborting } + return vkv1.JobState{ + Phase: phase, + } + }) + case vkv1.CompleteJobAction: + return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState { + phase := vkv1.Completed + if status.Terminating != 0 { + phase = vkv1.Completing + } + return vkv1.JobState{ Phase: phase, } diff --git a/pkg/controllers/job/state/running.go b/pkg/controllers/job/state/running.go index 2cf128a6ba..f25dbda394 100644 --- a/pkg/controllers/job/state/running.go +++ b/pkg/controllers/job/state/running.go @@ -56,6 +56,17 @@ func (ps *runningState) Execute(action vkv1.Action) error { phase = vkv1.Terminating } + return vkv1.JobState{ + Phase: phase, + } + }) + case vkv1.CompleteJobAction: + return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState { + phase := vkv1.Completed + if status.Terminating != 0 { + phase = vkv1.Completing + } + return vkv1.JobState{ Phase: phase, } diff --git a/test/e2e/job_error_handling.go b/test/e2e/job_error_handling.go index ee9712c7d7..185993196a 100644 --- a/test/e2e/job_error_handling.go +++ b/test/e2e/job_error_handling.go @@ -429,4 +429,44 @@ var _ = Describe("Job Error Handling", func() { Expect(err).NotTo(HaveOccurred()) }) + It("job level LifecyclePolicy, Event: TaskCompleted; Action: CompletedJob", func() { + By("init test context") + context := initTestContext() + defer cleanupTestContext(context) + + By("create job") + job := createJob(context, &jobSpec{ + name: "any-restart-job", + policies: []vkv1.LifecyclePolicy{ + { + Action: vkv1.CompleteJobAction, + Event: vkv1.TaskCompletedEvent, + }, + }, + tasks: []taskSpec{ + { + name: "completed-task", + img: defaultBusyBoxImage, + min: 2, + rep: 2, + //Sleep 5 seconds ensure job in running state + command: "sleep 5", + }, + { + name: "terminating-task", + img: defaultNginxImage, + min: 2, + rep: 2, + }, + }, + }) + + By("job scheduled, then task 'completed_task' finished and job finally complete") + // job phase: pending -> running -> completing -> completed + err := waitJobStates(context, job, []vkv1.JobPhase{ + vkv1.Pending, vkv1.Running, vkv1.Completing, vkv1.Completed}) + Expect(err).NotTo(HaveOccurred()) + + }) + }) diff --git a/test/e2e/util.go b/test/e2e/util.go index 4ac3595efe..d64e6c75b2 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -455,6 +455,16 @@ func waitJobPhases(ctx *context, job *vkv1.Job, phases []vkv1.JobPhase) error { return nil } +func waitJobStates(ctx *context, job *vkv1.Job, phases []vkv1.JobPhase) error { + for _, phase := range phases { + err := wait.Poll(100*time.Millisecond, oneMinute, jobPhaseExpect(ctx, job, phase)) + if err != nil { + return err + } + } + return nil +} + func waitJobPhase(ctx *context, job *vkv1.Job, phase vkv1.JobPhase) error { return wait.Poll(100*time.Millisecond, twoMinute, func() (bool, error) { newJob, err := ctx.vkclient.BatchV1alpha1().Jobs(job.Namespace).Get(job.Name, metav1.GetOptions{})