Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JobDeletionDurationSeconds metric in TTLAfterFinished controller #98676

Merged
merged 1 commit into from Feb 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/controller/ttlafterfinished/BUILD
Expand Up @@ -8,6 +8,7 @@ go_library(
deps = [
"//pkg/controller:go_default_library",
"//pkg/controller/job:go_default_library",
"//pkg/controller/ttlafterfinished/metrics:go_default_library",
"//staging/src/k8s.io/api/batch/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
Expand Down Expand Up @@ -52,6 +53,7 @@ filegroup(
srcs = [
":package-srcs",
"//pkg/controller/ttlafterfinished/config:all-srcs",
"//pkg/controller/ttlafterfinished/metrics:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],
Expand Down
26 changes: 26 additions & 0 deletions pkg/controller/ttlafterfinished/metrics/BUILD
@@ -0,0 +1,26 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "go_default_library",
srcs = ["metrics.go"],
importpath = "k8s.io/kubernetes/pkg/controller/ttlafterfinished/metrics",
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/component-base/metrics:go_default_library",
"//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library",
],
)

filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)

filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
51 changes: 51 additions & 0 deletions pkg/controller/ttlafterfinished/metrics/metrics.go
@@ -0,0 +1,51 @@
/*
Copyright 2021 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics

import (
"sync"

"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)

// TTLAfterFinishedSubsystem - subsystem name used for this controller.
const TTLAfterFinishedSubsystem = "ttl_after_finished_controller"

var (
// JobDeletionDurationSeconds tracks the time it took to delete the job since it
// became eligible for deletion.
JobDeletionDurationSeconds = metrics.NewHistogram(
&metrics.HistogramOpts{
Subsystem: TTLAfterFinishedSubsystem,
Name: "job_deletion_duration_seconds",
Help: "The time it took to delete the job since it became eligible for deletion",
StabilityLevel: metrics.ALPHA,
// Start with 100ms with the last bucket being [~27m, Inf).
Buckets: metrics.ExponentialBuckets(0.1, 2, 14),
},
)
)

var registerMetrics sync.Once

// Register registers TTL after finished controller metrics.
func Register() {
registerMetrics.Do(func() {
legacyregistry.MustRegister(JobDeletionDurationSeconds)
})
}
48 changes: 28 additions & 20 deletions pkg/controller/ttlafterfinished/ttlafterfinished_controller.go
Expand Up @@ -41,6 +41,7 @@ import (
"k8s.io/kubectl/pkg/scheme"
"k8s.io/kubernetes/pkg/controller"
jobutil "k8s.io/kubernetes/pkg/controller/job"
"k8s.io/kubernetes/pkg/controller/ttlafterfinished/metrics"
)

// Controller watches for changes of Jobs API objects. Triggered by Job creation
Expand Down Expand Up @@ -79,6 +80,8 @@ func New(jobInformer batchinformers.JobInformer, client clientset.Interface) *Co
ratelimiter.RegisterMetricAndTrackRateLimiterUsage("ttl_after_finished_controller", client.CoreV1().RESTClient().GetRateLimiter())
}

metrics.Register()

tc := &Controller{
client: client,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "ttl-after-finished-controller"}),
Expand Down Expand Up @@ -205,9 +208,9 @@ func (tc *Controller) processJob(key string) error {
return err
}

if expired, err := tc.processTTL(job); err != nil {
if expiredAt, err := tc.processTTL(job); err != nil {
return err
} else if !expired {
} else if expiredAt == nil {
return nil
}

Expand All @@ -223,9 +226,10 @@ func (tc *Controller) processJob(key string) error {
return err
}
// Use the latest Job TTL to see if the TTL truly expires.
if expired, err := tc.processTTL(fresh); err != nil {
expiredAt, err := tc.processTTL(fresh)
if err != nil {
return err
} else if !expired {
} else if expiredAt == nil {
return nil
}
// Cascade deletes the Jobs if TTL truly expires.
Expand All @@ -235,30 +239,34 @@ func (tc *Controller) processJob(key string) error {
Preconditions: &metav1.Preconditions{UID: &fresh.UID},
}
klog.V(4).Infof("Cleaning up Job %s/%s", namespace, name)
return tc.client.BatchV1().Jobs(fresh.Namespace).Delete(context.TODO(), fresh.Name, options)
if err := tc.client.BatchV1().Jobs(fresh.Namespace).Delete(context.TODO(), fresh.Name, options); err != nil {
return err
}
metrics.JobDeletionDurationSeconds.Observe(time.Since(*expiredAt).Seconds())
return nil
}

// processTTL checks whether a given Job's TTL has expired, and add it to the queue after the TTL is expected to expire
// if the TTL will expire later.
func (tc *Controller) processTTL(job *batch.Job) (expired bool, err error) {
func (tc *Controller) processTTL(job *batch.Job) (expiredAt *time.Time, err error) {
// We don't care about the Jobs that are going to be deleted, or the ones that don't need clean up.
if job.DeletionTimestamp != nil || !needsCleanup(job) {
return false, nil
return nil, nil
}

now := tc.clock.Now()
t, err := timeLeft(job, &now)
t, e, err := timeLeft(job, &now)
if err != nil {
return false, err
return nil, err
}

// TTL has expired
if *t <= 0 {
return true, nil
return e, nil
}

tc.enqueueAfter(job, *t)
return false, nil
return nil, nil
}

// needsCleanup checks whether a Job has finished and has a TTL set.
Expand All @@ -270,26 +278,26 @@ func getFinishAndExpireTime(j *batch.Job) (*time.Time, *time.Time, error) {
if !needsCleanup(j) {
return nil, nil, fmt.Errorf("job %s/%s should not be cleaned up", j.Namespace, j.Name)
}
finishAt, err := jobFinishTime(j)
t, err := jobFinishTime(j)
if err != nil {
return nil, nil, err
}
finishAtUTC := finishAt.UTC()
expireAtUTC := finishAtUTC.Add(time.Duration(*j.Spec.TTLSecondsAfterFinished) * time.Second)
return &finishAtUTC, &expireAtUTC, nil
finishAt := t.Time
expireAt := finishAt.Add(time.Duration(*j.Spec.TTLSecondsAfterFinished) * time.Second)
return &finishAt, &expireAt, nil
}

func timeLeft(j *batch.Job, since *time.Time) (*time.Duration, error) {
func timeLeft(j *batch.Job, since *time.Time) (*time.Duration, *time.Time, error) {
finishAt, expireAt, err := getFinishAndExpireTime(j)
if err != nil {
return nil, err
return nil, nil, err
}
if finishAt.UTC().After(since.UTC()) {
if finishAt.After(*since) {
klog.Warningf("Warning: Found Job %s/%s finished in the future. This is likely due to time skew in the cluster. Job cleanup will be deferred.", j.Namespace, j.Name)
}
remaining := expireAt.UTC().Sub(since.UTC())
remaining := expireAt.Sub(*since)
ahg-g marked this conversation as resolved.
Show resolved Hide resolved
klog.V(4).Infof("Found Job %s/%s finished at %v, remaining TTL %v since %v, TTL will expire at %v", j.Namespace, j.Name, finishAt.UTC(), remaining, since.UTC(), expireAt.UTC())
return &remaining, nil
return &remaining, expireAt, nil
}

// jobFinishTime takes an already finished Job and returns the time it finishes.
Expand Down
Expand Up @@ -87,6 +87,7 @@ func TestTimeLeft(t *testing.T) {
expectErr bool
expectErrStr string
expectedTimeLeft *time.Duration
expectedExpireAt time.Time
}{
{
name: "Error case: Job unfinished",
Expand All @@ -108,20 +109,23 @@ func TestTimeLeft(t *testing.T) {
ttl: utilpointer.Int32Ptr(0),
since: &now.Time,
expectedTimeLeft: durationPointer(0),
expectedExpireAt: now.Time,
},
{
name: "Job completed now, 10s TTL",
completionTime: now,
ttl: utilpointer.Int32Ptr(10),
since: &now.Time,
expectedTimeLeft: durationPointer(10),
expectedExpireAt: now.Add(10 * time.Second),
},
{
name: "Job completed 10s ago, 15s TTL",
completionTime: metav1.NewTime(now.Add(-10 * time.Second)),
ttl: utilpointer.Int32Ptr(15),
since: &now.Time,
expectedTimeLeft: durationPointer(5),
expectedExpireAt: now.Add(5 * time.Second),
},
{
name: "Error case: Job failed now, no TTL",
Expand All @@ -136,26 +140,29 @@ func TestTimeLeft(t *testing.T) {
ttl: utilpointer.Int32Ptr(0),
since: &now.Time,
expectedTimeLeft: durationPointer(0),
expectedExpireAt: now.Time,
},
{
name: "Job failed now, 10s TTL",
failedTime: now,
ttl: utilpointer.Int32Ptr(10),
since: &now.Time,
expectedTimeLeft: durationPointer(10),
expectedExpireAt: now.Add(10 * time.Second),
},
{
name: "Job failed 10s ago, 15s TTL",
failedTime: metav1.NewTime(now.Add(-10 * time.Second)),
ttl: utilpointer.Int32Ptr(15),
since: &now.Time,
expectedTimeLeft: durationPointer(5),
expectedExpireAt: now.Add(5 * time.Second),
},
}

for _, tc := range testCases {
job := newJob(tc.completionTime, tc.failedTime, tc.ttl)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Off the logic, but we may migrate this to a pure subtest style.

gotTimeLeft, gotErr := timeLeft(job, tc.since)
gotTimeLeft, gotExpireAt, gotErr := timeLeft(job, tc.since)
if tc.expectErr != (gotErr != nil) {
t.Errorf("%s: expected error is %t, got %t, error: %v", tc.name, tc.expectErr, gotErr != nil, gotErr)
}
Expand All @@ -169,6 +176,9 @@ func TestTimeLeft(t *testing.T) {
if *gotTimeLeft != *tc.expectedTimeLeft {
t.Errorf("%s: expected time left %v, got %v", tc.name, tc.expectedTimeLeft, gotTimeLeft)
}
if *gotExpireAt != tc.expectedExpireAt {
t.Errorf("%s: expected expire at %v, got %v", tc.name, tc.expectedExpireAt, *gotExpireAt)
}
}
}
}