From 1e8edc0837fd2cd050807af8deed1c6b24de31f5 Mon Sep 17 00:00:00 2001 From: TommyLike Date: Tue, 19 Mar 2019 17:38:59 +0800 Subject: [PATCH 1/4] Support complete job when task finished --- pkg/apis/batch/v1alpha1/job.go | 6 +++ pkg/controllers/job/cache/cache.go | 31 ++++++++++++++ pkg/controllers/job/cache/interface.go | 4 ++ pkg/controllers/job/job_controller_handler.go | 17 +++++--- pkg/controllers/job/state/completing.go | 41 +++++++++++++++++++ pkg/controllers/job/state/factory.go | 2 + pkg/controllers/job/state/running.go | 11 +++++ test/e2e/job_error_handling.go | 37 +++++++++++++++++ 8 files changed, 144 insertions(+), 5 deletions(-) create mode 100644 pkg/controllers/job/state/completing.go diff --git a/pkg/apis/batch/v1alpha1/job.go b/pkg/apis/batch/v1alpha1/job.go index d1a132eb00..f11ac8e66f 100644 --- a/pkg/apis/batch/v1alpha1/job.go +++ b/pkg/apis/batch/v1alpha1/job.go @@ -97,6 +97,8 @@ const ( OutOfSyncEvent Event = "OutOfSync" // CommandIssuedEvent is triggered if a command is raised by user CommandIssuedEvent Event = "CommandIssued" + // TaskCompletedEvent is triggered if the 'Replicas' amount of pods are succeed + TaskCompletedEvent Event = "TaskCompleted" ) // Action is the action that Job controller will take according to the event. @@ -114,6 +116,8 @@ const ( // TerminateJobAction if this action is set, the whole job wil be terminated // and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated. TerminateJobAction Action = "TerminateJob" + //CompleteJobAction if this action is set, the unfinished pods will be killed, job completed. + CompleteJobAction Action = "CompleteJobAction" // ResumeJobAction is the action to resume an aborted job. ResumeJobAction Action = "ResumeJob" @@ -170,6 +174,8 @@ const ( Running JobPhase = "Running" // Restarting is the phase that the Job is restarted, waiting for pod releasing and recreating Restarting JobPhase = "Restarting" + // Completing is the phase that required tasks of job are completed, job starts to clean up + Completing JobPhase = "Completing" // Completed is the phase that all tasks of Job are completed Completed JobPhase = "Completed" // Terminating is the phase that the Job is terminated, waiting for releasing pods diff --git a/pkg/controllers/job/cache/cache.go b/pkg/controllers/job/cache/cache.go index d9866c2588..0358f476f7 100644 --- a/pkg/controllers/job/cache/cache.go +++ b/pkg/controllers/job/cache/cache.go @@ -230,6 +230,37 @@ func (jc *jobCache) Run(stopCh <-chan struct{}) { wait.Until(jc.processCleanupJob, 0, stopCh) } +func (jc jobCache) TaskCompleted(jobName, taskName string) bool { + var taskReplicas, completed int32 + + jobInfo, found := jc.jobs[jobName] + if !found { + return false + } + + taskPods, found := jobInfo.Pods[taskName] + + if !found { + return false + } + + for _, task := range jobInfo.Job.Spec.Tasks { + if task.Name == taskName { + taskReplicas = task.Replicas + } + } + if taskReplicas <= 0 { + return false + } + + for _, pod := range taskPods { + if pod.Status.Phase == v1.PodSucceeded { + completed += 1 + } + } + return completed >= taskReplicas +} + func (jc *jobCache) processCleanupJob() { obj, shutdown := jc.deletedJobs.Get() if shutdown { diff --git a/pkg/controllers/job/cache/interface.go b/pkg/controllers/job/cache/interface.go index e9a4506289..932c90b914 100644 --- a/pkg/controllers/job/cache/interface.go +++ b/pkg/controllers/job/cache/interface.go @@ -36,3 +36,7 @@ type Cache interface { UpdatePod(pod *v1.Pod) error DeletePod(pod *v1.Pod) error } + +type JobStateStore interface { + TaskCompleted(jobName, taskName string) bool +} diff --git a/pkg/controllers/job/job_controller_handler.go b/pkg/controllers/job/job_controller_handler.go index 4672262b9f..d9512f2ad6 100644 --- a/pkg/controllers/job/job_controller_handler.go +++ b/pkg/controllers/job/job_controller_handler.go @@ -197,12 +197,24 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) { return } + if err := cc.cache.UpdatePod(newPod); err != nil { + glog.Errorf("Failed to update Pod <%s/%s>: %v in cache", + newPod.Namespace, newPod.Name, err) + } + event := vkbatchv1.OutOfSyncEvent if oldPod.Status.Phase != v1.PodFailed && newPod.Status.Phase == v1.PodFailed { event = vkbatchv1.PodFailedEvent } + if oldPod.Status.Phase != v1.PodSucceeded && + newPod.Status.Phase == v1.PodSucceeded { + if cc.cache.(vkcache.JobStateStore).TaskCompleted(jobName, taskName) { + event = vkbatchv1.TaskCompletedEvent + } + } + req := apis.Request{ Namespace: newPod.Namespace, JobName: jobName, @@ -212,11 +224,6 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) { JobVersion: int32(dVersion), } - if err := cc.cache.UpdatePod(newPod); err != nil { - glog.Errorf("Failed to update Pod <%s/%s>: %v in cache", - newPod.Namespace, newPod.Name, err) - } - cc.queue.Add(req) } diff --git a/pkg/controllers/job/state/completing.go b/pkg/controllers/job/state/completing.go new file mode 100644 index 0000000000..c1317c2a78 --- /dev/null +++ b/pkg/controllers/job/state/completing.go @@ -0,0 +1,41 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" + "volcano.sh/volcano/pkg/controllers/job/apis" +) + +type completingState struct { + job *apis.JobInfo +} + +func (ps *completingState) Execute(action vkv1.Action) error { + return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState { + // If any "alive" pods, still in Completing phase + if status.Terminating != 0 || status.Pending != 0 || status.Running != 0 { + return vkv1.JobState{ + Phase: vkv1.Completing, + } + } + + return vkv1.JobState{ + Phase: vkv1.Completed, + } + }) +} diff --git a/pkg/controllers/job/state/factory.go b/pkg/controllers/job/state/factory.go index be740b3ca4..8394584cce 100644 --- a/pkg/controllers/job/state/factory.go +++ b/pkg/controllers/job/state/factory.go @@ -53,6 +53,8 @@ func NewState(jobInfo *apis.JobInfo) State { return &abortingState{job: jobInfo} case vkv1.Aborted: return &abortedState{job: jobInfo} + case vkv1.Completing: + return &completingState{job: jobInfo} } // It's pending by default. diff --git a/pkg/controllers/job/state/running.go b/pkg/controllers/job/state/running.go index 2cf128a6ba..f25dbda394 100644 --- a/pkg/controllers/job/state/running.go +++ b/pkg/controllers/job/state/running.go @@ -56,6 +56,17 @@ func (ps *runningState) Execute(action vkv1.Action) error { phase = vkv1.Terminating } + return vkv1.JobState{ + Phase: phase, + } + }) + case vkv1.CompleteJobAction: + return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState { + phase := vkv1.Completed + if status.Terminating != 0 { + phase = vkv1.Completing + } + return vkv1.JobState{ Phase: phase, } diff --git a/test/e2e/job_error_handling.go b/test/e2e/job_error_handling.go index ee9712c7d7..b7a9dd5e8c 100644 --- a/test/e2e/job_error_handling.go +++ b/test/e2e/job_error_handling.go @@ -429,4 +429,41 @@ var _ = Describe("Job Error Handling", func() { Expect(err).NotTo(HaveOccurred()) }) + It("job level LifecyclePolicy, Event: TaskCompleted; Action: CompletedJob", func() { + By("init test context") + context := initTestContext() + defer cleanupTestContext(context) + + By("create job") + job := createJob(context, &jobSpec{ + name: "any-restart-job", + policies: []vkv1.LifecyclePolicy{ + { + Action: vkv1.CompleteJobAction, + Event: vkv1.TaskCompletedEvent, + }, + }, + tasks: []taskSpec{ + { + name: "completed_task", + img: defaultBusyBoxImage, + min: 2, + rep: 2, + }, + { + name: "terminating_task", + img: defaultNginxImage, + min: 2, + rep: 2, + }, + }, + }) + + By("job scheduled, then task 'completed_task' finished and job finally complete") + // job phase: pending -> running -> completing -> completed + err := waitJobPhases(context, job, []vkv1.JobPhase{ + vkv1.Pending, vkv1.Running, vkv1.Completing, vkv1.Completed}) + Expect(err).NotTo(HaveOccurred()) + + }) }) From d9bb6128a472450a5d06d5cc7387e18b0eeab634 Mon Sep 17 00:00:00 2001 From: TommyLike Date: Tue, 19 Mar 2019 22:45:04 +0800 Subject: [PATCH 2/4] Fix test issue --- pkg/controllers/job/job_controller_handler.go | 2 +- pkg/controllers/job/state/pending.go | 11 +++++++++++ test/e2e/job_error_handling.go | 9 ++++++--- test/e2e/util.go | 10 ++++++++++ 4 files changed, 28 insertions(+), 4 deletions(-) diff --git a/pkg/controllers/job/job_controller_handler.go b/pkg/controllers/job/job_controller_handler.go index d9512f2ad6..79b7e1cd53 100644 --- a/pkg/controllers/job/job_controller_handler.go +++ b/pkg/controllers/job/job_controller_handler.go @@ -210,7 +210,7 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) { if oldPod.Status.Phase != v1.PodSucceeded && newPod.Status.Phase == v1.PodSucceeded { - if cc.cache.(vkcache.JobStateStore).TaskCompleted(jobName, taskName) { + if cc.cache.(vkcache.JobStateStore).TaskCompleted(vkcache.JobKeyByName(newPod.Namespace, jobName), taskName) { event = vkbatchv1.TaskCompletedEvent } } diff --git a/pkg/controllers/job/state/pending.go b/pkg/controllers/job/state/pending.go index dea311b381..0aa471325f 100644 --- a/pkg/controllers/job/state/pending.go +++ b/pkg/controllers/job/state/pending.go @@ -46,6 +46,17 @@ func (ps *pendingState) Execute(action vkv1.Action) error { phase = vkv1.Aborting } + return vkv1.JobState{ + Phase: phase, + } + }) + case vkv1.CompleteJobAction: + return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState { + phase := vkv1.Completed + if status.Terminating != 0 { + phase = vkv1.Completing + } + return vkv1.JobState{ Phase: phase, } diff --git a/test/e2e/job_error_handling.go b/test/e2e/job_error_handling.go index b7a9dd5e8c..185993196a 100644 --- a/test/e2e/job_error_handling.go +++ b/test/e2e/job_error_handling.go @@ -445,13 +445,15 @@ var _ = Describe("Job Error Handling", func() { }, tasks: []taskSpec{ { - name: "completed_task", + name: "completed-task", img: defaultBusyBoxImage, min: 2, rep: 2, + //Sleep 5 seconds ensure job in running state + command: "sleep 5", }, { - name: "terminating_task", + name: "terminating-task", img: defaultNginxImage, min: 2, rep: 2, @@ -461,9 +463,10 @@ var _ = Describe("Job Error Handling", func() { By("job scheduled, then task 'completed_task' finished and job finally complete") // job phase: pending -> running -> completing -> completed - err := waitJobPhases(context, job, []vkv1.JobPhase{ + err := waitJobStates(context, job, []vkv1.JobPhase{ vkv1.Pending, vkv1.Running, vkv1.Completing, vkv1.Completed}) Expect(err).NotTo(HaveOccurred()) }) + }) diff --git a/test/e2e/util.go b/test/e2e/util.go index 4ac3595efe..d64e6c75b2 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -455,6 +455,16 @@ func waitJobPhases(ctx *context, job *vkv1.Job, phases []vkv1.JobPhase) error { return nil } +func waitJobStates(ctx *context, job *vkv1.Job, phases []vkv1.JobPhase) error { + for _, phase := range phases { + err := wait.Poll(100*time.Millisecond, oneMinute, jobPhaseExpect(ctx, job, phase)) + if err != nil { + return err + } + } + return nil +} + func waitJobPhase(ctx *context, job *vkv1.Job, phase vkv1.JobPhase) error { return wait.Poll(100*time.Millisecond, twoMinute, func() (bool, error) { newJob, err := ctx.vkclient.BatchV1alpha1().Jobs(job.Namespace).Get(job.Name, metav1.GetOptions{}) From 70c1ad047ecdafa8fbbf8900fee6792c999eba88 Mon Sep 17 00:00:00 2001 From: TommyLike Date: Thu, 21 Mar 2019 19:30:45 +0800 Subject: [PATCH 3/4] Fix comment issue --- pkg/apis/batch/v1alpha1/job.go | 4 ++-- pkg/controllers/job/cache/cache.go | 4 ++-- pkg/controllers/job/cache/interface.go | 5 ++--- pkg/controllers/job/job_controller_handler.go | 2 +- 4 files changed, 7 insertions(+), 8 deletions(-) diff --git a/pkg/apis/batch/v1alpha1/job.go b/pkg/apis/batch/v1alpha1/job.go index f11ac8e66f..f29814acaa 100644 --- a/pkg/apis/batch/v1alpha1/job.go +++ b/pkg/apis/batch/v1alpha1/job.go @@ -97,7 +97,7 @@ const ( OutOfSyncEvent Event = "OutOfSync" // CommandIssuedEvent is triggered if a command is raised by user CommandIssuedEvent Event = "CommandIssued" - // TaskCompletedEvent is triggered if the 'Replicas' amount of pods are succeed + // TaskCompletedEvent is triggered if the 'Replicas' amount of pods in one task are succeed TaskCompletedEvent Event = "TaskCompleted" ) @@ -117,7 +117,7 @@ const ( // and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated. TerminateJobAction Action = "TerminateJob" //CompleteJobAction if this action is set, the unfinished pods will be killed, job completed. - CompleteJobAction Action = "CompleteJobAction" + CompleteJobAction Action = "CompleteJob" // ResumeJobAction is the action to resume an aborted job. ResumeJobAction Action = "ResumeJob" diff --git a/pkg/controllers/job/cache/cache.go b/pkg/controllers/job/cache/cache.go index 0358f476f7..ab34966f7c 100644 --- a/pkg/controllers/job/cache/cache.go +++ b/pkg/controllers/job/cache/cache.go @@ -230,10 +230,10 @@ func (jc *jobCache) Run(stopCh <-chan struct{}) { wait.Until(jc.processCleanupJob, 0, stopCh) } -func (jc jobCache) TaskCompleted(jobName, taskName string) bool { +func (jc jobCache) TaskCompleted(jobKey, taskName string) bool { var taskReplicas, completed int32 - jobInfo, found := jc.jobs[jobName] + jobInfo, found := jc.jobs[jobKey] if !found { return false } diff --git a/pkg/controllers/job/cache/interface.go b/pkg/controllers/job/cache/interface.go index 932c90b914..5cf191d438 100644 --- a/pkg/controllers/job/cache/interface.go +++ b/pkg/controllers/job/cache/interface.go @@ -35,8 +35,7 @@ type Cache interface { AddPod(pod *v1.Pod) error UpdatePod(pod *v1.Pod) error DeletePod(pod *v1.Pod) error -} -type JobStateStore interface { - TaskCompleted(jobName, taskName string) bool + TaskCompleted(jobKey, taskName string) bool } + diff --git a/pkg/controllers/job/job_controller_handler.go b/pkg/controllers/job/job_controller_handler.go index 79b7e1cd53..3e512f4cdc 100644 --- a/pkg/controllers/job/job_controller_handler.go +++ b/pkg/controllers/job/job_controller_handler.go @@ -210,7 +210,7 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) { if oldPod.Status.Phase != v1.PodSucceeded && newPod.Status.Phase == v1.PodSucceeded { - if cc.cache.(vkcache.JobStateStore).TaskCompleted(vkcache.JobKeyByName(newPod.Namespace, jobName), taskName) { + if cc.cache.TaskCompleted(vkcache.JobKeyByName(newPod.Namespace, jobName), taskName) { event = vkbatchv1.TaskCompletedEvent } } From 4e0bf834f66379d61fc7850be05a4ca5ea6f2fc3 Mon Sep 17 00:00:00 2001 From: Hu Sheng Date: Sat, 23 Mar 2019 19:45:07 +0800 Subject: [PATCH 4/4] Update interface.go --- pkg/controllers/job/cache/interface.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/controllers/job/cache/interface.go b/pkg/controllers/job/cache/interface.go index 5cf191d438..7cda0e084d 100644 --- a/pkg/controllers/job/cache/interface.go +++ b/pkg/controllers/job/cache/interface.go @@ -38,4 +38,3 @@ type Cache interface { TaskCompleted(jobKey, taskName string) bool } -