Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #119434: Include ignored pods when computing backoff delay for Job pod #119466

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
75 changes: 53 additions & 22 deletions pkg/controller/job/backoff_utils.go
Expand Up @@ -23,7 +23,9 @@ import (

v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
apipod "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/utils/clock"
"k8s.io/utils/pointer"
)

type backoffRecord struct {
Expand Down Expand Up @@ -86,8 +88,7 @@ var backoffRecordKeyFunc = func(obj interface{}) (string, error) {
return "", fmt.Errorf("could not find key for obj %#v", obj)
}

func (backoffRecordStore *backoffStore) newBackoffRecord(clock clock.WithTicker, key string, newSucceededPods []*v1.Pod, newFailedPods []*v1.Pod) backoffRecord {
now := clock.Now()
func (backoffRecordStore *backoffStore) newBackoffRecord(key string, newSucceededPods []*v1.Pod, newFailedPods []*v1.Pod) backoffRecord {
var backoff *backoffRecord

if b, exists, _ := backoffRecordStore.store.GetByKey(key); exists {
Expand All @@ -105,16 +106,16 @@ func (backoffRecordStore *backoffStore) newBackoffRecord(clock clock.WithTicker,
}
}

sortByFinishedTime(newSucceededPods, now)
sortByFinishedTime(newFailedPods, now)
sortByFinishedTime(newSucceededPods)
sortByFinishedTime(newFailedPods)

if len(newSucceededPods) == 0 {
if len(newFailedPods) == 0 {
return *backoff
}

backoff.failuresAfterLastSuccess = backoff.failuresAfterLastSuccess + int32(len(newFailedPods))
lastFailureTime := getFinishedTime(newFailedPods[len(newFailedPods)-1], now)
lastFailureTime := getFinishedTime(newFailedPods[len(newFailedPods)-1])
backoff.lastFailureTime = &lastFailureTime
return *backoff

Expand All @@ -128,9 +129,9 @@ func (backoffRecordStore *backoffStore) newBackoffRecord(clock clock.WithTicker,
backoff.failuresAfterLastSuccess = 0
backoff.lastFailureTime = nil

lastSuccessTime := getFinishedTime(newSucceededPods[len(newSucceededPods)-1], now)
lastSuccessTime := getFinishedTime(newSucceededPods[len(newSucceededPods)-1])
for i := len(newFailedPods) - 1; i >= 0; i-- {
failedTime := getFinishedTime(newFailedPods[i], now)
failedTime := getFinishedTime(newFailedPods[i])
if !failedTime.After(lastSuccessTime) {
break
}
Expand All @@ -146,39 +147,69 @@ func (backoffRecordStore *backoffStore) newBackoffRecord(clock clock.WithTicker,

}

func sortByFinishedTime(pods []*v1.Pod, currentTime time.Time) {
func sortByFinishedTime(pods []*v1.Pod) {
sort.Slice(pods, func(i, j int) bool {
p1 := pods[i]
p2 := pods[j]
p1FinishTime := getFinishedTime(p1, currentTime)
p2FinishTime := getFinishedTime(p2, currentTime)
p1FinishTime := getFinishedTime(p1)
p2FinishTime := getFinishedTime(p2)

return p1FinishTime.Before(p2FinishTime)
})
}

func getFinishedTime(p *v1.Pod, currentTime time.Time) time.Time {
// Returns the pod finish time using the following lookups:
// 1. if all containers finished, use the latest time
// 2. if the pod has Ready=False condition, use the last transition time
// 3. if the pod has been deleted, use the `deletionTimestamp - grace_period` to estimate the moment of deletion
// 4. fallback to pod's creation time
//
// Pods owned by Kubelet are marked with Ready=False condition when
// transitioning to terminal phase, thus being handled by (1.) or (2.).
// Orphaned pods are deleted by PodGC, thus being handled by (3.).
func getFinishedTime(p *v1.Pod) time.Time {
if finishTime := getFinishTimeFromContainers(p); finishTime != nil {
return *finishTime
}
if finishTime := getFinishTimeFromPodReadyFalseCondition(p); finishTime != nil {
return *finishTime
}
if finishTime := getFinishTimeFromDeletionTimestamp(p); finishTime != nil {
return *finishTime
}
// This should not happen in clusters with Kubelet and PodGC running.
return p.CreationTimestamp.Time
}

func getFinishTimeFromContainers(p *v1.Pod) *time.Time {
var finishTime *time.Time
for _, containerState := range p.Status.ContainerStatuses {
if containerState.State.Terminated == nil {
finishTime = nil
break
return nil
}

if finishTime == nil {
if containerState.State.Terminated.FinishedAt.Time.IsZero() {
return nil
}
if finishTime == nil || finishTime.Before(containerState.State.Terminated.FinishedAt.Time) {
finishTime = &containerState.State.Terminated.FinishedAt.Time
} else {
if finishTime.Before(containerState.State.Terminated.FinishedAt.Time) {
finishTime = &containerState.State.Terminated.FinishedAt.Time
}
}
}
return finishTime
}

if finishTime == nil || finishTime.IsZero() {
return currentTime
func getFinishTimeFromPodReadyFalseCondition(p *v1.Pod) *time.Time {
if _, c := apipod.GetPodCondition(&p.Status, v1.PodReady); c != nil && c.Status == v1.ConditionFalse && !c.LastTransitionTime.Time.IsZero() {
return &c.LastTransitionTime.Time
}
return nil
}

return *finishTime
func getFinishTimeFromDeletionTimestamp(p *v1.Pod) *time.Time {
if p.DeletionTimestamp != nil {
finishTime := p.DeletionTimestamp.Time.Add(-time.Duration(pointer.Int64Deref(p.DeletionGracePeriodSeconds, 0)) * time.Second)
return &finishTime
}
return nil
}

func (backoff backoffRecord) getRemainingTime(clock clock.WithTicker, defaultBackoff time.Duration, maxBackoff time.Duration) time.Duration {
Expand Down
87 changes: 80 additions & 7 deletions pkg/controller/job/backoff_utils_test.go
Expand Up @@ -24,6 +24,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clocktesting "k8s.io/utils/clock/testing"
"k8s.io/utils/pointer"
)

func TestNewBackoffRecord(t *testing.T) {
Expand Down Expand Up @@ -189,9 +190,7 @@ func TestNewBackoffRecord(t *testing.T) {
})
}

fakeClock := clocktesting.NewFakeClock(time.Now().Truncate(time.Second))

backoffRecord := backoffRecordStore.newBackoffRecord(fakeClock, "key", newSucceededPods, newFailedPods)
backoffRecord := backoffRecordStore.newBackoffRecord("key", newSucceededPods, newFailedPods)
if diff := cmp.Diff(tc.wantBackoffRecord, backoffRecord, cmp.AllowUnexported(backoffRecord)); diff != "" {
t.Errorf("backoffRecord not matching; (-want,+got): %v", diff)
}
Expand All @@ -201,6 +200,7 @@ func TestNewBackoffRecord(t *testing.T) {

func TestGetFinishedTime(t *testing.T) {
defaultTestTime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
defaultTestTimeMinus30s := defaultTestTime.Add(-30 * time.Second)
testCases := map[string]struct {
pod v1.Pod
wantFinishTime time.Time
Expand Down Expand Up @@ -229,7 +229,7 @@ func TestGetFinishedTime(t *testing.T) {
},
wantFinishTime: defaultTestTime,
},
"Pod with multiple containers; two containers in terminated state and one in running state": {
"Pod with multiple containers; two containers in terminated state and one in running state; fallback to deletionTimestamp": {
pod: v1.Pod{
Status: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
Expand All @@ -250,10 +250,13 @@ func TestGetFinishedTime(t *testing.T) {
},
},
},
ObjectMeta: metav1.ObjectMeta{
DeletionTimestamp: &metav1.Time{Time: defaultTestTime},
},
},
wantFinishTime: defaultTestTime,
},
"Pod with single container in running state": {
"fallback to deletionTimestamp": {
pod: v1.Pod{
Status: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
Expand All @@ -264,10 +267,53 @@ func TestGetFinishedTime(t *testing.T) {
},
},
},
ObjectMeta: metav1.ObjectMeta{
DeletionTimestamp: &metav1.Time{Time: defaultTestTime},
},
},
wantFinishTime: defaultTestTime,
},
"fallback to deletionTimestamp, decremented by grace period": {
pod: v1.Pod{
Status: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Running: &v1.ContainerStateRunning{},
},
},
},
},
ObjectMeta: metav1.ObjectMeta{
DeletionTimestamp: &metav1.Time{Time: defaultTestTime},
DeletionGracePeriodSeconds: pointer.Int64(30),
},
},
wantFinishTime: defaultTestTimeMinus30s,
},
"fallback to PodReady.LastTransitionTime when status of the condition is False": {
pod: v1.Pod{
Status: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{},
},
},
},
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionFalse,
Reason: "PodFailed",
LastTransitionTime: metav1.Time{Time: defaultTestTime},
},
},
},
},
wantFinishTime: defaultTestTime,
},
"Pod with single container with zero finish time": {
"skip fallback to PodReady.LastTransitionTime when status of the condition is True": {
pod: v1.Pod{
Status: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
Expand All @@ -277,6 +323,33 @@ func TestGetFinishedTime(t *testing.T) {
},
},
},
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Time{Time: defaultTestTimeMinus30s},
},
},
},
ObjectMeta: metav1.ObjectMeta{
DeletionTimestamp: &metav1.Time{Time: defaultTestTime},
},
},
wantFinishTime: defaultTestTime,
},
"fallback to creationTimestamp": {
pod: v1.Pod{
Status: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{},
},
},
},
},
ObjectMeta: metav1.ObjectMeta{
CreationTimestamp: metav1.Time{Time: defaultTestTime},
},
},
wantFinishTime: defaultTestTime,
Expand All @@ -285,7 +358,7 @@ func TestGetFinishedTime(t *testing.T) {

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
f := getFinishedTime(&tc.pod, defaultTestTime)
f := getFinishedTime(&tc.pod)
if !f.Equal(tc.wantFinishTime) {
t.Errorf("Expected value of finishedTime %v; got %v", tc.wantFinishTime, f)
}
Expand Down
27 changes: 16 additions & 11 deletions pkg/controller/job/job_controller.go
Expand Up @@ -750,7 +750,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
active := int32(len(activePods))
newSucceededPods, newFailedPods := getNewFinishedPods(&job, pods, uncounted, expectedRmFinalizers)
succeeded := job.Status.Succeeded + int32(len(newSucceededPods)) + int32(len(uncounted.succeeded))
failed := job.Status.Failed + int32(len(newFailedPods)) + int32(len(uncounted.failed))
failed := job.Status.Failed + int32(nonIgnoredFailedPodsCount(&job, newFailedPods)) + int32(len(uncounted.failed))
var ready *int32
if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) {
ready = pointer.Int32(countReadyPods(activePods))
Expand All @@ -762,7 +762,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
job.Status.StartTime = &now
}

newBackoffInfo := jm.backoffRecordStore.newBackoffRecord(jm.clock, key, newSucceededPods, newFailedPods)
newBackoffInfo := jm.backoffRecordStore.newBackoffRecord(key, newSucceededPods, newFailedPods)

var manageJobErr error
var finishedCondition *batch.JobCondition
Expand Down Expand Up @@ -911,6 +911,19 @@ func (jm *Controller) deleteActivePods(ctx context.Context, job *batch.Job, pods
return successfulDeletes, errorFromChannel(errCh)
}

func nonIgnoredFailedPodsCount(job *batch.Job, failedPods []*v1.Pod) int {
result := len(failedPods)
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil {
for _, p := range failedPods {
_, countFailed, _ := matchPodFailurePolicy(job.Spec.PodFailurePolicy, p)
if !countFailed {
result--
}
}
}
return result
}

// deleteJobPods deletes the pods, returns the number of successful removals
// and any error.
func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey string, pods []*v1.Pod) (int32, error) {
Expand Down Expand Up @@ -1340,15 +1353,7 @@ func getNewFinishedPods(job *batch.Job, pods []*v1.Pod, uncounted *uncountedTerm
return p.Status.Phase == v1.PodSucceeded
})
failedPods = getValidPodsWithFilter(job, pods, uncounted.Failed(), expectedRmFinalizers, func(p *v1.Pod) bool {
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil {
if !isPodFailed(p, job) {
return false
}
_, countFailed, _ := matchPodFailurePolicy(job.Spec.PodFailurePolicy, p)
return countFailed
} else {
return isPodFailed(p, job)
}
return isPodFailed(p, job)
})
return succeededPods, failedPods
}
Expand Down
47 changes: 47 additions & 0 deletions pkg/controller/job/job_controller_test.go
Expand Up @@ -2909,6 +2909,53 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
wantStatusFailed: 0,
wantStatusSucceeded: 0,
},
"ignore pod failure based on OnPodConditions, ignored failures delays pod recreation": {
enableJobPodFailurePolicy: true,
job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: validObjectMeta,
Spec: batch.JobSpec{
Selector: validSelector,
Template: validTemplate,
Parallelism: pointer.Int32(1),
Completions: pointer.Int32(1),
BackoffLimit: pointer.Int32(0),
PodFailurePolicy: &batch.PodFailurePolicy{
Rules: []batch.PodFailurePolicyRule{
{
Action: batch.PodFailurePolicyActionIgnore,
OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{
{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
},
},
},
},
},
},
},
pods: []v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
DeletionTimestamp: &now,
},
Status: v1.PodStatus{
Phase: v1.PodFailed,
Conditions: []v1.PodCondition{
{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
},
},
},
},
},
wantConditions: nil,
wantStatusActive: 0,
wantStatusFailed: 0,
wantStatusSucceeded: 0,
},
"fail job based on OnPodConditions": {
enableJobPodFailurePolicy: true,
job: batch.Job{
Expand Down