Skip to content

Commit

Permalink
Extend metrics with the new labels (kubernetes#113324)
Browse files Browse the repository at this point in the history
* Extend job metrics

* Refactor TestMetrics to extract its checks into dedicated tests per feature
  • Loading branch information
mimowo authored and jaehnri committed Jan 3, 2023
1 parent 39e0115 commit f05af74
Show file tree
Hide file tree
Showing 5 changed files with 292 additions and 42 deletions.
26 changes: 19 additions & 7 deletions pkg/controller/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1031,6 +1031,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
if cleanUncountedPodsWithoutFinalizers(&job.Status, uidsWithFinalizer) {
needsFlush = true
}
podFailureCountByPolicyAction := map[string]int{}
for _, pod := range pods {
if !hasJobTrackingFinalizer(pod) || expectedRmFinalizers.Has(string(pod.UID)) {
continue
Expand Down Expand Up @@ -1061,7 +1062,10 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
ix := getCompletionIndex(pod.Annotations)
if !uncounted.failed.Has(string(pod.UID)) && (!isIndexed || (ix != unknownCompletionIndex && ix < int(*job.Spec.Completions))) {
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil {
_, countFailed := matchPodFailurePolicy(job.Spec.PodFailurePolicy, pod)
_, countFailed, action := matchPodFailurePolicy(job.Spec.PodFailurePolicy, pod)
if action != nil {
podFailureCountByPolicyAction[string(*action)] += 1
}
if countFailed {
needsFlush = true
uncountedStatus.Failed = append(uncountedStatus.Failed, pod.UID)
Expand Down Expand Up @@ -1102,7 +1106,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
}
}
var err error
if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(ctx, job, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, needsFlush); err != nil {
if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(ctx, job, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, podFailureCountByPolicyAction, needsFlush); err != nil {
return err
}
jobFinished := jm.enactJobFinished(job, finishedCond)
Expand Down Expand Up @@ -1132,7 +1136,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
//
// Returns whether there are pending changes in the Job status that need to be
// flushed in subsequent calls.
func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, oldCounters *batch.JobStatus, needsFlush bool) (*batch.Job, bool, error) {
func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, oldCounters *batch.JobStatus, podFailureCountByPolicyAction map[string]int, needsFlush bool) (*batch.Job, bool, error) {
var err error
if needsFlush {
if job, err = jm.updateStatusHandler(ctx, job); err != nil {
Expand All @@ -1143,6 +1147,8 @@ func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job
*oldCounters = job.Status
needsFlush = false
}
recordJobPodFailurePolicyActions(job, podFailureCountByPolicyAction)

jobKey, err := controller.KeyFunc(job)
if err != nil {
return job, needsFlush, fmt.Errorf("getting job key: %w", err)
Expand Down Expand Up @@ -1263,10 +1269,10 @@ func (jm *Controller) recordJobFinished(job *batch.Job, finishedCond *batch.JobC
jm.recorder.Event(job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached")
}
jm.recorder.Event(job, v1.EventTypeNormal, "Completed", "Job completed")
metrics.JobFinishedNum.WithLabelValues(completionMode, "succeeded").Inc()
metrics.JobFinishedNum.WithLabelValues(completionMode, "succeeded", "").Inc()
} else {
jm.recorder.Event(job, v1.EventTypeWarning, finishedCond.Reason, finishedCond.Message)
metrics.JobFinishedNum.WithLabelValues(completionMode, "failed").Inc()
metrics.JobFinishedNum.WithLabelValues(completionMode, "failed", finishedCond.Reason).Inc()
}
return true
}
Expand Down Expand Up @@ -1345,7 +1351,7 @@ func getFailJobMessage(job *batch.Job, pods []*v1.Pod, uncounted sets.String) *s
}
for _, p := range pods {
if isPodFailed(p, uncounted != nil) {
jobFailureMessage, _ := matchPodFailurePolicy(job.Spec.PodFailurePolicy, p)
jobFailureMessage, _, _ := matchPodFailurePolicy(job.Spec.PodFailurePolicy, p)
if jobFailureMessage != nil {
return jobFailureMessage
}
Expand All @@ -1369,7 +1375,7 @@ func getStatus(job *batch.Job, pods []*v1.Pod, uncounted *uncountedTerminatedPod
if !isPodFailed(p, uncounted != nil) {
return false
}
_, countFailed := matchPodFailurePolicy(job.Spec.PodFailurePolicy, p)
_, countFailed, _ := matchPodFailurePolicy(job.Spec.PodFailurePolicy, p)
return countFailed
} else {
return isPodFailed(p, uncounted != nil)
Expand Down Expand Up @@ -1768,6 +1774,12 @@ func recordJobPodFinished(job *batch.Job, oldCounters batch.JobStatus) {
metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Failed).Add(float64(diff))
}

func recordJobPodFailurePolicyActions(job *batch.Job, podFailureCountByPolicyAction map[string]int) {
for action, count := range podFailureCountByPolicyAction {
metrics.PodFailuresHandledByFailurePolicy.WithLabelValues(action).Add(float64(count))
}
}

func countReadyPods(pods []*v1.Pod) int32 {
cnt := int32(0)
for _, p := range pods {
Expand Down
25 changes: 22 additions & 3 deletions pkg/controller/job/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,20 @@ var (
},
[]string{"completion_mode", "result", "action"},
)
// JobFinishedNum tracks the number of Jobs that finish. Possible label
// values:
// JobFinishedNum tracks the number of Jobs that finish. Empty reason label
// is used to count successful jobs.
// Possible label values:
// completion_mode: Indexed, NonIndexed
// result: failed, succeeded
// reason: "BackoffLimitExceeded", "DeadlineExceeded", "PodFailurePolicy", ""
JobFinishedNum = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: JobControllerSubsystem,
Name: "job_finished_total",
Help: "The number of finished job",
StabilityLevel: metrics.ALPHA,
},
[]string{"completion_mode", "result"},
[]string{"completion_mode", "result", "reason"},
)

// JobPodsFinished records the number of finished Pods that the job controller
Expand All @@ -84,6 +86,22 @@ var (
},
[]string{"completion_mode", "result"})

// PodFailuresHandledByFailurePolicy records the number of finished Pods
// handled by pod failure policy.
// Possible label values:
// action: FailJob, Ignore, Count
PodFailuresHandledByFailurePolicy = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: JobControllerSubsystem,
Name: "pod_failures_handled_by_failure_policy_total",
Help: `The number of failed Pods handled by failure policy with
respect to the failure policy action applied based on the matched
rule. Possible values of the action label correspond to the
possible values for the failure policy rule action, which are:
"FailJob", "Ignore" and "Count".`,
},
[]string{"action"})

// TerminatedPodsWithTrackingFinalizer records the addition and removal of
// terminated pods that have the finalizer batch.kubernetes.io/job-tracking,
// regardless of whether they are owned by a Job.
Expand Down Expand Up @@ -137,6 +155,7 @@ func Register() {
legacyregistry.MustRegister(JobSyncNum)
legacyregistry.MustRegister(JobFinishedNum)
legacyregistry.MustRegister(JobPodsFinished)
legacyregistry.MustRegister(PodFailuresHandledByFailurePolicy)
legacyregistry.MustRegister(TerminatedPodsTrackingFinalizerTotal)
})
}
26 changes: 15 additions & 11 deletions pkg/controller/job/pod_failure_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,42 +26,46 @@ import (
// matchPodFailurePolicy returns information about matching a given failed pod
// against the pod failure policy rules. The information is represented as an
// optional job failure message (present in case the pod matched a 'FailJob'
// rule) and a boolean indicating if the failure should be counted towards
// backoffLimit (it should not be counted if the pod matched an 'Ignore' rule).
func matchPodFailurePolicy(podFailurePolicy *batch.PodFailurePolicy, failedPod *v1.Pod) (*string, bool) {
// rule), a boolean indicating if the failure should be counted towards
// backoffLimit (it should not be counted if the pod matched an 'Ignore' rule),
// and a pointer to the matched pod failure policy action.
func matchPodFailurePolicy(podFailurePolicy *batch.PodFailurePolicy, failedPod *v1.Pod) (*string, bool, *batch.PodFailurePolicyAction) {
if podFailurePolicy == nil {
return nil, true
return nil, true, nil
}
ignore := batch.PodFailurePolicyActionIgnore
failJob := batch.PodFailurePolicyActionFailJob
count := batch.PodFailurePolicyActionCount
for index, podFailurePolicyRule := range podFailurePolicy.Rules {
if podFailurePolicyRule.OnExitCodes != nil {
if containerStatus := matchOnExitCodes(&failedPod.Status, podFailurePolicyRule.OnExitCodes); containerStatus != nil {
switch podFailurePolicyRule.Action {
case batch.PodFailurePolicyActionIgnore:
return nil, false
return nil, false, &ignore
case batch.PodFailurePolicyActionCount:
return nil, true
return nil, true, &count
case batch.PodFailurePolicyActionFailJob:
msg := fmt.Sprintf("Container %s for pod %s/%s failed with exit code %v matching %v rule at index %d",
containerStatus.Name, failedPod.Namespace, failedPod.Name, containerStatus.State.Terminated.ExitCode, podFailurePolicyRule.Action, index)
return &msg, true
return &msg, true, &failJob
}
}
} else if podFailurePolicyRule.OnPodConditions != nil {
if podCondition := matchOnPodConditions(&failedPod.Status, podFailurePolicyRule.OnPodConditions); podCondition != nil {
switch podFailurePolicyRule.Action {
case batch.PodFailurePolicyActionIgnore:
return nil, false
return nil, false, &ignore
case batch.PodFailurePolicyActionCount:
return nil, true
return nil, true, &count
case batch.PodFailurePolicyActionFailJob:
msg := fmt.Sprintf("Pod %s/%s has condition %v matching %v rule at index %d",
failedPod.Namespace, failedPod.Name, podCondition.Type, podFailurePolicyRule.Action, index)
return &msg, true
return &msg, true, &failJob
}
}
}
}
return nil, true
return nil, true, nil
}

func matchOnExitCodes(podStatus *v1.PodStatus, requirement *batch.PodFailurePolicyOnExitCodesRequirement) *v1.ContainerStatus {
Expand Down
36 changes: 25 additions & 11 deletions pkg/controller/job/pod_failure_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package job
import (
"testing"

"github.com/google/go-cmp/cmp"
batch "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -31,12 +32,16 @@ func TestMatchPodFailurePolicy(t *testing.T) {
Namespace: "default",
Name: "mypod",
}
ignore := batch.PodFailurePolicyActionIgnore
failJob := batch.PodFailurePolicyActionFailJob
count := batch.PodFailurePolicyActionCount

testCases := map[string]struct {
podFailurePolicy *batch.PodFailurePolicy
failedPod *v1.Pod
wantJobFailureMessage *string
wantCountFailed bool
wantAction *batch.PodFailurePolicyAction
}{
"unknown action for rule matching by exit codes - skip rule with unknown action": {
podFailurePolicy: &batch.PodFailurePolicy{
Expand Down Expand Up @@ -75,6 +80,7 @@ func TestMatchPodFailurePolicy(t *testing.T) {
},
wantJobFailureMessage: pointer.String("Container main-container for pod default/mypod failed with exit code 2 matching FailJob rule at index 1"),
wantCountFailed: true,
wantAction: &failJob,
},
"unknown action for rule matching by pod conditions - skip rule with unknown action": {
podFailurePolicy: &batch.PodFailurePolicy{
Expand Down Expand Up @@ -113,6 +119,7 @@ func TestMatchPodFailurePolicy(t *testing.T) {
},
wantJobFailureMessage: nil,
wantCountFailed: false,
wantAction: &ignore,
},
"unknown operator - rule with unknown action is skipped for onExitCodes": {
podFailurePolicy: &batch.PodFailurePolicy{
Expand Down Expand Up @@ -151,6 +158,7 @@ func TestMatchPodFailurePolicy(t *testing.T) {
},
wantJobFailureMessage: pointer.String("Container main-container for pod default/mypod failed with exit code 2 matching FailJob rule at index 1"),
wantCountFailed: true,
wantAction: &failJob,
},
"no policy rules": {
podFailurePolicy: nil,
Expand Down Expand Up @@ -201,6 +209,7 @@ func TestMatchPodFailurePolicy(t *testing.T) {
},
wantJobFailureMessage: nil,
wantCountFailed: false,
wantAction: &ignore,
},
"FailJob rule matched for exit codes": {
podFailurePolicy: &batch.PodFailurePolicy{
Expand Down Expand Up @@ -232,6 +241,7 @@ func TestMatchPodFailurePolicy(t *testing.T) {
},
wantJobFailureMessage: pointer.String("Container main-container for pod default/mypod failed with exit code 2 matching FailJob rule at index 0"),
wantCountFailed: true,
wantAction: &failJob,
},
"successful containers are skipped by the rules": {
podFailurePolicy: &batch.PodFailurePolicy{
Expand Down Expand Up @@ -320,6 +330,7 @@ func TestMatchPodFailurePolicy(t *testing.T) {
},
wantJobFailureMessage: pointer.String("Container main-container for pod default/mypod failed with exit code 1 matching FailJob rule at index 0"),
wantCountFailed: true,
wantAction: &failJob,
},
"second jobfail rule matched for exit codes": {
podFailurePolicy: &batch.PodFailurePolicy{
Expand Down Expand Up @@ -358,6 +369,7 @@ func TestMatchPodFailurePolicy(t *testing.T) {
},
wantJobFailureMessage: pointer.String("Container main-container for pod default/mypod failed with exit code 6 matching FailJob rule at index 1"),
wantCountFailed: true,
wantAction: &failJob,
},
"count rule matched for exit codes": {
podFailurePolicy: &batch.PodFailurePolicy{
Expand Down Expand Up @@ -389,6 +401,7 @@ func TestMatchPodFailurePolicy(t *testing.T) {
},
wantJobFailureMessage: nil,
wantCountFailed: true,
wantAction: &count,
},
"ignore rule matched for pod conditions": {
podFailurePolicy: &batch.PodFailurePolicy{
Expand Down Expand Up @@ -418,6 +431,7 @@ func TestMatchPodFailurePolicy(t *testing.T) {
},
wantJobFailureMessage: nil,
wantCountFailed: false,
wantAction: &ignore,
},
"ignore rule matches by the status=False": {
podFailurePolicy: &batch.PodFailurePolicy{
Expand Down Expand Up @@ -447,6 +461,7 @@ func TestMatchPodFailurePolicy(t *testing.T) {
},
wantJobFailureMessage: nil,
wantCountFailed: false,
wantAction: &ignore,
},
"ignore rule matches by the status=Unknown": {
podFailurePolicy: &batch.PodFailurePolicy{
Expand Down Expand Up @@ -476,6 +491,7 @@ func TestMatchPodFailurePolicy(t *testing.T) {
},
wantJobFailureMessage: nil,
wantCountFailed: false,
wantAction: &ignore,
},
"ignore rule does not match when status for pattern is False, but actual True": {
podFailurePolicy: &batch.PodFailurePolicy{
Expand Down Expand Up @@ -592,6 +608,7 @@ func TestMatchPodFailurePolicy(t *testing.T) {
},
wantJobFailureMessage: pointer.String("Pod default/mypod has condition DisruptionTarget matching FailJob rule at index 0"),
wantCountFailed: true,
wantAction: &failJob,
},
"count rule matched for pod conditions": {
podFailurePolicy: &batch.PodFailurePolicy{
Expand Down Expand Up @@ -621,6 +638,7 @@ func TestMatchPodFailurePolicy(t *testing.T) {
},
wantJobFailureMessage: nil,
wantCountFailed: true,
wantAction: &count,
},
"no rule matched": {
podFailurePolicy: &batch.PodFailurePolicy{
Expand Down Expand Up @@ -683,25 +701,21 @@ func TestMatchPodFailurePolicy(t *testing.T) {
},
wantJobFailureMessage: nil,
wantCountFailed: true,
wantAction: &count,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
jobFailMessage, countFailed := matchPodFailurePolicy(tc.podFailurePolicy, tc.failedPod)
if tc.wantJobFailureMessage == nil {
if jobFailMessage != nil {
t.Errorf("Unexpected job fail message. Got: %q", *jobFailMessage)
}
} else {
if jobFailMessage == nil {
t.Errorf("Missing job fail message. want: %q", *tc.wantJobFailureMessage)
} else if *tc.wantJobFailureMessage != *jobFailMessage {
t.Errorf("Unexpected job fail message. want: %q. got: %q", *tc.wantJobFailureMessage, *jobFailMessage)
}
jobFailMessage, countFailed, action := matchPodFailurePolicy(tc.podFailurePolicy, tc.failedPod)
if diff := cmp.Diff(tc.wantJobFailureMessage, jobFailMessage); diff != "" {
t.Errorf("Unexpected job failure message: %s", diff)
}
if tc.wantCountFailed != countFailed {
t.Errorf("Unexpected count failed. want: %v. got: %v", tc.wantCountFailed, countFailed)
}
if diff := cmp.Diff(tc.wantAction, action); diff != "" {
t.Errorf("Unexpected failure policy action: %s", diff)
}
})
}
}

0 comments on commit f05af74

Please sign in to comment.