Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add ActiveDeadlineSeconds and BackoffLimit features #963

Merged
merged 12 commits into from
Mar 26, 2019
11 changes: 11 additions & 0 deletions pkg/apis/tensorflow/v1beta2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ type TFJob struct {

// TFJobSpec is a desired state description of the TFJob.
type TFJobSpec struct {
// Specifies the duration in seconds relative to the startTime that the job may be active
// before the system tries to terminate it; value must be positive integer.
// This method applies only to pods with restartPolicy == OnFailure or Always.
// +optional
ActiveDeadlineSeconds *int64 `json:"activeDeadlineSeconds,omitempty"`

// Optional number of retries before marking this job failed.
// Defaults to 6
// +optional
BackoffLimit *int32 `json:"backoffLimit,omitempty"`

// CleanPodPolicy defines the policy to kill pods after TFJob is
// succeeded.
// Default to Running.
Expand Down
10 changes: 10 additions & 0 deletions pkg/apis/tensorflow/v1beta2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions pkg/common/util/v1beta2/testutil/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,18 @@ func NewPodList(count int32, status v1.PodPhase, tfJob *tfv1beta2.TFJob, typ str
return pods
}

func SetPodsStatuses(podIndexer cache.Indexer, tfJob *tfv1beta2.TFJob, typ string, pendingPods, activePods, succeededPods, failedPods int32, t *testing.T) {
func SetPodsStatuses(podIndexer cache.Indexer, tfJob *tfv1beta2.TFJob, typ string, pendingPods, activePods, succeededPods, failedPods int32, restartCounts []int32, t *testing.T) {
var index int32
for _, pod := range NewPodList(pendingPods, v1.PodPending, tfJob, typ, index, t) {
if err := podIndexer.Add(pod); err != nil {
t.Errorf("%s: unexpected error when adding pod %v", tfJob.Name, err)
}
}
index += pendingPods
for _, pod := range NewPodList(activePods, v1.PodRunning, tfJob, typ, index, t) {
for i, pod := range NewPodList(activePods, v1.PodRunning, tfJob, typ, index, t) {
if restartCounts != nil {
pod.Status.ContainerStatuses = []v1.ContainerStatus{{RestartCount: restartCounts[i]}}
}
if err := podIndexer.Add(pod); err != nil {
t.Errorf("%s: unexpected error when adding pod %v", tfJob.Name, err)
}
Expand Down
32 changes: 32 additions & 0 deletions pkg/common/util/v1beta2/testutil/tfjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,38 @@ func NewTFJobWithCleanupJobDelay(chief, worker, ps int, ttl *int32) *tfv1beta2.T
return tfJob
}

func NewTFJobWithActiveDeadlineSeconds(chief, worker, ps int, ads *int64) *tfv1beta2.TFJob {
if chief == 1 {
tfJob := NewTFJobWithChief(worker, ps)
tfJob.Spec.ActiveDeadlineSeconds = ads
policy := common.CleanPodPolicyAll
tfJob.Spec.CleanPodPolicy = &policy
return tfJob
}
tfJob := NewTFJob(worker, ps)
tfJob.Spec.ActiveDeadlineSeconds = ads
policy := common.CleanPodPolicyAll
tfJob.Spec.CleanPodPolicy = &policy
return tfJob
}

func NewTFJobWithBackoffLimit(chief, worker, ps int, backoffLimit *int32) *tfv1beta2.TFJob {
if chief == 1 {
tfJob := NewTFJobWithChief(worker, ps)
tfJob.Spec.BackoffLimit = backoffLimit
tfJob.Spec.TFReplicaSpecs["Worker"].RestartPolicy = "OnFailure"
policy := common.CleanPodPolicyAll
tfJob.Spec.CleanPodPolicy = &policy
return tfJob
}
tfJob := NewTFJob(worker, ps)
tfJob.Spec.BackoffLimit = backoffLimit
tfJob.Spec.TFReplicaSpecs["Worker"].RestartPolicy = "OnFailure"
policy := common.CleanPodPolicyAll
tfJob.Spec.CleanPodPolicy = &policy
return tfJob
}

func NewTFJobWithChief(worker, ps int) *tfv1beta2.TFJob {
tfJob := NewTFJob(worker, ps)
tfJob.Spec.TFReplicaSpecs[tfv1beta2.TFReplicaTypeChief] = &common.ReplicaSpec{
Expand Down
129 changes: 120 additions & 9 deletions pkg/controller.v1beta2/tensorflow/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package tensorflow

import (
"fmt"
"strings"
"time"

kubebatchclient "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
Expand All @@ -31,6 +32,7 @@ import (
"k8s.io/client-go/tools/cache"

"github.com/kubeflow/tf-operator/cmd/tf-operator.v1beta2/app/options"
common "github.com/kubeflow/tf-operator/pkg/apis/common/v1beta2"
tfv1beta2 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1beta2"
tfjobclientset "github.com/kubeflow/tf-operator/pkg/client/clientset/versioned"
tfjobscheme "github.com/kubeflow/tf-operator/pkg/client/clientset/versioned/scheme"
Expand All @@ -39,6 +41,7 @@ import (
tfjoblisters "github.com/kubeflow/tf-operator/pkg/client/listers/tensorflow/v1beta2"
"github.com/kubeflow/tf-operator/pkg/common/jobcontroller"
tflogger "github.com/kubeflow/tf-operator/pkg/logger"
"github.com/kubeflow/tf-operator/pkg/util/k8sutil"
"k8s.io/apimachinery/pkg/runtime/schema"
)

Expand Down Expand Up @@ -325,17 +328,14 @@ func (tc *TFController) syncTFJob(key string) (bool, error) {
return true, err
}

func getTotalReplicas(tfjob *tfv1beta2.TFJob) int32 {
tfjobReplicas := int32(0)
for _, r := range tfjob.Spec.TFReplicaSpecs {
tfjobReplicas += *r.Replicas
}
return tfjobReplicas
}

// reconcileTFJobs checks and updates replicas for each given TFReplicaSpec.
// It will requeue the tfjob in case of an error while creating/deleting pods/services.
func (tc *TFController) reconcileTFJobs(tfjob *tfv1beta2.TFJob) error {
tfjobKey, err := KeyFunc(tfjob)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for tfjob object %#v: %v", tfjob, err))
return err
}
logger := tflogger.LoggerForJob(tfjob)
logger.Infof("Reconcile TFJobs %s", tfjob.Name)

Expand All @@ -353,8 +353,46 @@ func (tc *TFController) reconcileTFJobs(tfjob *tfv1beta2.TFJob) error {
return err
}

// retrieve the previous number of retry
previousRetry := tc.WorkQueue.NumRequeues(tfjobKey)

activePods := k8sutil.FilterActivePods(pods)
active := int32(len(activePods))
_, failed := getSucceededAndFailedCount(pods)
totalReplicas := getTotalReplicas(tfjob)
prevReplicasFailedNum := getTotalFailedReplicas(tfjob)

tfJobExceedsLimit := false
var failureMessage string
var exceedsBackoffLimit bool = false
var pastBackoffLimit bool = false

if tfjob.Spec.BackoffLimit != nil {
jobHasNewFailure := failed > prevReplicasFailedNum
// new failures happen when status does not reflect the failures and active
// is different than parallelism, otherwise the previous controller loop
// failed updating status so even if we pick up failure it is not a new one
exceedsBackoffLimit = jobHasNewFailure && (active != totalReplicas) &&
(int32(previousRetry)+1 > *tfjob.Spec.BackoffLimit)

pastBackoffLimit, err = tc.pastBackoffLimit(tfjob, pods)
if err != nil {
return err
}
}

if exceedsBackoffLimit || pastBackoffLimit {
// check if the number of pod restart exceeds backoff (for restart OnFailure only)
// OR if the number of failed jobs increased since the last syncJob
tfJobExceedsLimit = true
failureMessage = fmt.Sprintf("TFJob %s has failed because it has reached the specified backoff limit", tfjob.Name)
} else if tc.pastActiveDeadline(tfjob) {
failureMessage = fmt.Sprintf("TFJob %s has failed because it was active longer than specified deadline", tfjob.Name)
tfJobExceedsLimit = true
}

// If the TFJob is terminated, delete all pods and services.
if isSucceeded(tfjob.Status) || isFailed(tfjob.Status) {
if isSucceeded(tfjob.Status) || isFailed(tfjob.Status) || tfJobExceedsLimit {
if err := tc.deletePodsAndServices(tfjob, pods); err != nil {
return err
}
Expand All @@ -374,6 +412,19 @@ func (tc *TFController) reconcileTFJobs(tfjob *tfv1beta2.TFJob) error {
}
}

if tfJobExceedsLimit {
tc.Recorder.Event(tfjob, v1.EventTypeNormal, tfJobFailedReason, failureMessage)
if tfjob.Status.CompletionTime == nil {
now := metav1.Now()
tfjob.Status.CompletionTime = &now
}
err := updateTFJobConditions(tfjob, common.JobFailed, tfJobFailedReason, failureMessage)
if err != nil {
tflogger.LoggerForJob(tfjob).Infof("Append tfjob condition error: %v", err)
return err
}
}

// At this point the pods may have been deleted, so if the job succeeded, we need to manually set the replica status.
// If any replicas are still Active, set their status to succeeded.
if isSucceeded(tfjob.Status) {
Expand Down Expand Up @@ -432,6 +483,66 @@ func (tc *TFController) satisfiedExpectations(tfjob *tfv1beta2.TFJob) bool {
return satisfied
}

// pastBackoffLimitOnFailure checks if container restartCounts sum exceeds BackoffLimit
// this method applies only to pods with restartPolicy == OnFailure or Always
func (tc *TFController) pastBackoffLimit(tfjob *tfv1beta2.TFJob, pods []*v1.Pod) (bool, error) {
if tfjob.Spec.BackoffLimit == nil {
return false, nil
}
logger := tflogger.LoggerForJob(tfjob)
result := int32(0)
for rtype, spec := range tfjob.Spec.TFReplicaSpecs {
if spec.RestartPolicy != common.RestartPolicyOnFailure && spec.RestartPolicy != common.RestartPolicyAlways {
logger.Warnf("The restart policy of replica %v of the job %v is not OnFailure or Always. Not counted in backoff limit.", rtype, tfjob.Name)
continue
}
// Convert TFReplicaType to lower string.
rt := strings.ToLower(string(rtype))
pods, err := tc.FilterPodsForReplicaType(pods, rt)
if err != nil {
return false, err
}
for i := range pods {
po := pods[i]
if po.Status.Phase != v1.PodRunning {
continue
}
for j := range po.Status.InitContainerStatuses {
stat := po.Status.InitContainerStatuses[j]
result += stat.RestartCount
}
for j := range po.Status.ContainerStatuses {
stat := po.Status.ContainerStatuses[j]
result += stat.RestartCount
}
}
}

if *tfjob.Spec.BackoffLimit == 0 {
return result > 0, nil
}
return result >= *tfjob.Spec.BackoffLimit, nil
}

// pastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded.
func (tc *TFController) pastActiveDeadline(tfjob *tfv1beta2.TFJob) bool {
if tfjob.Spec.ActiveDeadlineSeconds == nil || tfjob.Status.StartTime == nil {
return false
}
now := metav1.Now()
start := tfjob.Status.StartTime.Time
duration := now.Time.Sub(start)
allowedDuration := time.Duration(*tfjob.Spec.ActiveDeadlineSeconds) * time.Second
return duration >= allowedDuration
}

// getSucceededAndFailedCount returns no of succeeded and failed pods running a job
func getSucceededAndFailedCount(pods []*v1.Pod) (succeeded, failed int32) {
succeeded = int32(k8sutil.FilterPods(pods, v1.PodSucceeded))
failed = int32(k8sutil.FilterPods(pods, v1.PodFailed))
return
}

func (tc *TFController) GetJobFromInformerCache(namespace, name string) (metav1.Object, error) {
return tc.getTFJobFromName(namespace, name)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller.v1beta2/tensorflow/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ func TestNormalPath(t *testing.T) {
}

podIndexer := kubeInformerFactory.Core().V1().Pods().Informer().GetIndexer()
testutil.SetPodsStatuses(podIndexer, tfJob, testutil.LabelWorker, tc.pendingWorkerPods, tc.activeWorkerPods, tc.succeededWorkerPods, tc.failedWorkerPods, t)
testutil.SetPodsStatuses(podIndexer, tfJob, testutil.LabelPS, tc.pendingPSPods, tc.activePSPods, tc.succeededPSPods, tc.failedPSPods, t)
testutil.SetPodsStatuses(podIndexer, tfJob, testutil.LabelWorker, tc.pendingWorkerPods, tc.activeWorkerPods, tc.succeededWorkerPods, tc.failedWorkerPods, nil, t)
testutil.SetPodsStatuses(podIndexer, tfJob, testutil.LabelPS, tc.pendingPSPods, tc.activePSPods, tc.succeededPSPods, tc.failedPSPods, nil, t)

serviceIndexer := kubeInformerFactory.Core().V1().Services().Informer().GetIndexer()
testutil.SetServices(serviceIndexer, tfJob, testutil.LabelWorker, tc.activeWorkerServices, t)
Expand Down
45 changes: 45 additions & 0 deletions pkg/controller.v1beta2/tensorflow/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,37 @@ func (tc *TFController) updateTFJob(old, cur interface{}) {
if err != nil {
return
}
curTFJob, err := tfJobFromUnstructured(cur)
if err != nil {
return
}

// never return error
key, err := KeyFunc(curTFJob)
if err != nil {
return
}

log.Infof("Updating tfjob: %s", oldTFJob.Name)
tc.enqueueTFJob(cur)

// check if need to add a new rsync for ActiveDeadlineSeconds
if curTFJob.Status.StartTime != nil {
curTFJobADS := curTFJob.Spec.ActiveDeadlineSeconds
if curTFJobADS == nil {
return
}
oldTFJobADS := oldTFJob.Spec.ActiveDeadlineSeconds
if oldTFJobADS == nil || *oldTFJobADS != *curTFJobADS {
now := metav1.Now()
start := curTFJob.Status.StartTime.Time
passed := now.Time.Sub(start)
total := time.Duration(*curTFJobADS) * time.Second
// AddAfter will handle total < passed
tc.WorkQueue.AddAfter(key, total-passed)
log.Infof("job ActiveDeadlineSeconds updated, will rsync after %d seconds", total-passed)
}
}
}

func (tc *TFController) deletePodsAndServices(tfJob *tfv1beta2.TFJob, pods []*v1.Pod) error {
Expand Down Expand Up @@ -164,3 +193,19 @@ func (tc *TFController) cleanupTFJob(tfJob *tfv1beta2.TFJob) error {
func (tc *TFController) deleteTFJob(tfJob *tfv1beta2.TFJob) error {
return tc.tfJobClientSet.KubeflowV1beta2().TFJobs(tfJob.Namespace).Delete(tfJob.Name, &metav1.DeleteOptions{})
}

func getTotalReplicas(tfjob *tfv1beta2.TFJob) int32 {
tfjobReplicas := int32(0)
for _, r := range tfjob.Spec.TFReplicaSpecs {
tfjobReplicas += *r.Replicas
}
return tfjobReplicas
}

func getTotalFailedReplicas(tfjob *tfv1beta2.TFJob) int32 {
totalFailedReplicas := int32(0)
for rtype := range tfjob.Status.ReplicaStatuses {
totalFailedReplicas += tfjob.Status.ReplicaStatuses[rtype].Failed
}
return totalFailedReplicas
}
Loading