Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support TaskComplete in LifecyclePolicy #36

Merged
merged 4 commits into from
Mar 24, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/apis/batch/v1alpha1/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ const (
OutOfSyncEvent Event = "OutOfSync"
// CommandIssuedEvent is triggered if a command is raised by user
CommandIssuedEvent Event = "CommandIssued"
// TaskCompletedEvent is triggered if the 'Replicas' amount of pods are succeed
TaskCompletedEvent Event = "TaskCompleted"
)

// Action is the action that Job controller will take according to the event.
Expand All @@ -114,6 +116,8 @@ const (
// TerminateJobAction if this action is set, the whole job wil be terminated
// and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated.
TerminateJobAction Action = "TerminateJob"
//CompleteJobAction if this action is set, the unfinished pods will be killed, job completed.
CompleteJobAction Action = "CompleteJobAction"
TommyLike marked this conversation as resolved.
Show resolved Hide resolved

// ResumeJobAction is the action to resume an aborted job.
ResumeJobAction Action = "ResumeJob"
Expand Down Expand Up @@ -170,6 +174,8 @@ const (
Running JobPhase = "Running"
// Restarting is the phase that the Job is restarted, waiting for pod releasing and recreating
Restarting JobPhase = "Restarting"
// Completing is the phase that required tasks of job are completed, job starts to clean up
Completing JobPhase = "Completing"
// Completed is the phase that all tasks of Job are completed
Completed JobPhase = "Completed"
// Terminating is the phase that the Job is terminated, waiting for releasing pods
Expand Down
31 changes: 31 additions & 0 deletions pkg/controllers/job/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,37 @@ func (jc *jobCache) Run(stopCh <-chan struct{}) {
wait.Until(jc.processCleanupJob, 0, stopCh)
}

func (jc jobCache) TaskCompleted(jobName, taskName string) bool {
var taskReplicas, completed int32

jobInfo, found := jc.jobs[jobName]
if !found {
return false
}

taskPods, found := jobInfo.Pods[taskName]

if !found {
return false
}

for _, task := range jobInfo.Job.Spec.Tasks {
if task.Name == taskName {
taskReplicas = task.Replicas
}
}
if taskReplicas <= 0 {
return false
}

for _, pod := range taskPods {
if pod.Status.Phase == v1.PodSucceeded {
completed += 1
}
}
return completed >= taskReplicas
}

func (jc *jobCache) processCleanupJob() {
obj, shutdown := jc.deletedJobs.Get()
if shutdown {
Expand Down
4 changes: 4 additions & 0 deletions pkg/controllers/job/cache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,7 @@ type Cache interface {
UpdatePod(pod *v1.Pod) error
DeletePod(pod *v1.Pod) error
}

type JobStateStore interface {
TaskCompleted(jobName, taskName string) bool
TommyLike marked this conversation as resolved.
Show resolved Hide resolved
}
17 changes: 12 additions & 5 deletions pkg/controllers/job/job_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,24 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) {
return
}

if err := cc.cache.UpdatePod(newPod); err != nil {
glog.Errorf("Failed to update Pod <%s/%s>: %v in cache",
newPod.Namespace, newPod.Name, err)
}

event := vkbatchv1.OutOfSyncEvent
if oldPod.Status.Phase != v1.PodFailed &&
newPod.Status.Phase == v1.PodFailed {
event = vkbatchv1.PodFailedEvent
}

if oldPod.Status.Phase != v1.PodSucceeded &&
newPod.Status.Phase == v1.PodSucceeded {
if cc.cache.(vkcache.JobStateStore).TaskCompleted(vkcache.JobKeyByName(newPod.Namespace, jobName), taskName) {
event = vkbatchv1.TaskCompletedEvent
}
}

req := apis.Request{
Namespace: newPod.Namespace,
JobName: jobName,
Expand All @@ -212,11 +224,6 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) {
JobVersion: int32(dVersion),
}

if err := cc.cache.UpdatePod(newPod); err != nil {
glog.Errorf("Failed to update Pod <%s/%s>: %v in cache",
newPod.Namespace, newPod.Name, err)
}

cc.queue.Add(req)
}

Expand Down
41 changes: 41 additions & 0 deletions pkg/controllers/job/state/completing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
Copyright 2019 The Volcano Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package state

import (
vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
"volcano.sh/volcano/pkg/controllers/job/apis"
)

type completingState struct {
job *apis.JobInfo
}

func (ps *completingState) Execute(action vkv1.Action) error {
return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
// If any "alive" pods, still in Completing phase
if status.Terminating != 0 || status.Pending != 0 || status.Running != 0 {
return vkv1.JobState{
Phase: vkv1.Completing,
}
}

return vkv1.JobState{
Phase: vkv1.Completed,
}
})
}
2 changes: 2 additions & 0 deletions pkg/controllers/job/state/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func NewState(jobInfo *apis.JobInfo) State {
return &abortingState{job: jobInfo}
case vkv1.Aborted:
return &abortedState{job: jobInfo}
case vkv1.Completing:
return &completingState{job: jobInfo}
}

// It's pending by default.
Expand Down
11 changes: 11 additions & 0 deletions pkg/controllers/job/state/pending.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ func (ps *pendingState) Execute(action vkv1.Action) error {
phase = vkv1.Aborting
}

return vkv1.JobState{
Phase: phase,
}
})
case vkv1.CompleteJobAction:
return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
phase := vkv1.Completed
if status.Terminating != 0 {
phase = vkv1.Completing
}

return vkv1.JobState{
Phase: phase,
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/controllers/job/state/running.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,17 @@ func (ps *runningState) Execute(action vkv1.Action) error {
phase = vkv1.Terminating
}

return vkv1.JobState{
Phase: phase,
}
})
case vkv1.CompleteJobAction:
return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
phase := vkv1.Completed
if status.Terminating != 0 {
phase = vkv1.Completing
}

return vkv1.JobState{
Phase: phase,
}
Expand Down
40 changes: 40 additions & 0 deletions test/e2e/job_error_handling.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,4 +429,44 @@ var _ = Describe("Job Error Handling", func() {
Expect(err).NotTo(HaveOccurred())
})

It("job level LifecyclePolicy, Event: TaskCompleted; Action: CompletedJob", func() {
By("init test context")
context := initTestContext()
defer cleanupTestContext(context)

By("create job")
job := createJob(context, &jobSpec{
name: "any-restart-job",
policies: []vkv1.LifecyclePolicy{
{
Action: vkv1.CompleteJobAction,
Event: vkv1.TaskCompletedEvent,
},
},
tasks: []taskSpec{
{
name: "completed-task",
img: defaultBusyBoxImage,
min: 2,
rep: 2,
//Sleep 5 seconds ensure job in running state
command: "sleep 5",
},
{
name: "terminating-task",
img: defaultNginxImage,
min: 2,
rep: 2,
},
},
})

By("job scheduled, then task 'completed_task' finished and job finally complete")
// job phase: pending -> running -> completing -> completed
err := waitJobStates(context, job, []vkv1.JobPhase{
vkv1.Pending, vkv1.Running, vkv1.Completing, vkv1.Completed})
Expect(err).NotTo(HaveOccurred())

})

})
10 changes: 10 additions & 0 deletions test/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,16 @@ func waitJobPhases(ctx *context, job *vkv1.Job, phases []vkv1.JobPhase) error {
return nil
}

func waitJobStates(ctx *context, job *vkv1.Job, phases []vkv1.JobPhase) error {
for _, phase := range phases {
err := wait.Poll(100*time.Millisecond, oneMinute, jobPhaseExpect(ctx, job, phase))
if err != nil {
return err
}
}
return nil
}

func waitJobPhase(ctx *context, job *vkv1.Job, phase vkv1.JobPhase) error {
return wait.Poll(100*time.Millisecond, twoMinute, func() (bool, error) {
newJob, err := ctx.vkclient.BatchV1alpha1().Jobs(job.Namespace).Get(job.Name, metav1.GetOptions{})
Expand Down