From d714d5529f5cac2b15d981ab3e361375bfe1aa0f Mon Sep 17 00:00:00 2001 From: Dejan Pejchev Date: Fri, 29 Dec 2023 02:23:25 +0100 Subject: [PATCH] add support for ttl after finished controller for cleaning up finished jobsets --- Dockerfile | 1 + api/jobset/v1alpha2/jobset_types.go | 12 + api/jobset/v1alpha2/openapi_generated.go | 7 + api/jobset/v1alpha2/zz_generated.deepcopy.go | 5 + .../jobset/v1alpha2/jobsetspec.go | 19 +- .../crd/bases/jobset.x-k8s.io_jobsets.yaml | 13 + go.mod | 2 +- hack/python-sdk/swagger.json | 5 + main.go | 28 +- pkg/controllers/metrics/metrics.go | 28 ++ .../ttlafterfinished_controller.go | 319 ++++++++++++++++++ .../ttlafterfinished_controller_test.go | 289 ++++++++++++++++ pkg/util/testing/wrappers.go | 6 + sdk/python/docs/JobsetV1alpha2JobSetSpec.md | 1 + .../models/jobset_v1alpha2_job_set_spec.py | 34 +- .../test/test_jobset_v1alpha2_job_set.py | 3 +- .../test/test_jobset_v1alpha2_job_set_list.py | 6 +- .../test/test_jobset_v1alpha2_job_set_spec.py | 3 +- test/integration/controller/suite_test.go | 18 + .../ttlafterfinished_controller_test.go | 84 +++++ 20 files changed, 867 insertions(+), 16 deletions(-) create mode 100644 pkg/controllers/metrics/metrics.go create mode 100644 pkg/controllers/ttlafterfinished_controller.go create mode 100644 pkg/controllers/ttlafterfinished_controller_test.go create mode 100644 test/integration/controller/ttlafterfinished_controller_test.go diff --git a/Dockerfile b/Dockerfile index 423017d2..59828763 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,6 +14,7 @@ RUN go mod download # Copy the go source COPY main.go main.go COPY api/ api/ +COPY client-go/ client-go/ COPY pkg/controllers/ pkg/controllers/ COPY pkg/util/ pkg/util/ COPY pkg/webhooks pkg/webhooks diff --git a/api/jobset/v1alpha2/jobset_types.go b/api/jobset/v1alpha2/jobset_types.go index 5f43f277..f9072551 100644 --- a/api/jobset/v1alpha2/jobset_types.go +++ b/api/jobset/v1alpha2/jobset_types.go @@ -76,6 +76,18 @@ type JobSetSpec struct { // Suspend suspends all running child Jobs when set to true. Suspend *bool `json:"suspend,omitempty"` + + // TTLSecondsAfterFinished limits the lifetime of a JobSet that has finished + // execution (either Complete or Failed). If this field is set, + // TTLSecondsAfterFinished after the JobSet finishes, it is eligible to be + // automatically deleted. When the JobSet is being deleted, its lifecycle + // guarantees (e.g. finalizers) will be honored. If this field is unset, + // the JobSet won't be automatically deleted. If this field is set to zero, + // the JobSet becomes eligible to be deleted immediately after it finishes. + // +kubebuilder:validation:Minimum=0 + // +kubebuilder:default=0 + // +optional + TTLSecondsAfterFinished *int32 `json:"ttlSecondsAfterFinished,omitempty"` } // JobSetStatus defines the observed state of JobSet diff --git a/api/jobset/v1alpha2/openapi_generated.go b/api/jobset/v1alpha2/openapi_generated.go index 2ea2e6e5..914ee369 100644 --- a/api/jobset/v1alpha2/openapi_generated.go +++ b/api/jobset/v1alpha2/openapi_generated.go @@ -207,6 +207,13 @@ func schema_jobset_api_jobset_v1alpha2_JobSetSpec(ref common.ReferenceCallback) Format: "", }, }, + "ttlSecondsAfterFinished": { + SchemaProps: spec.SchemaProps{ + Description: "TTLSecondsAfterFinished limits the lifetime of a JobSet that has finished execution (either Complete or Failed). If this field is set, TTLSecondsAfterFinished after the JobSet finishes, it is eligible to be automatically deleted. When the JobSet is being deleted, its lifecycle guarantees (e.g. finalizers) will be honored. If this field is unset, the JobSet won't be automatically deleted. If this field is set to zero, the JobSet becomes eligible to be deleted immediately after it finishes.", + Type: []string{"integer"}, + Format: "int32", + }, + }, }, }, }, diff --git a/api/jobset/v1alpha2/zz_generated.deepcopy.go b/api/jobset/v1alpha2/zz_generated.deepcopy.go index ee4f51ea..36f1111c 100644 --- a/api/jobset/v1alpha2/zz_generated.deepcopy.go +++ b/api/jobset/v1alpha2/zz_generated.deepcopy.go @@ -127,6 +127,11 @@ func (in *JobSetSpec) DeepCopyInto(out *JobSetSpec) { *out = new(bool) **out = **in } + if in.TTLSecondsAfterFinished != nil { + in, out := &in.TTLSecondsAfterFinished, &out.TTLSecondsAfterFinished + *out = new(int32) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JobSetSpec. diff --git a/client-go/applyconfiguration/jobset/v1alpha2/jobsetspec.go b/client-go/applyconfiguration/jobset/v1alpha2/jobsetspec.go index a8a2b778..09ecf1f0 100644 --- a/client-go/applyconfiguration/jobset/v1alpha2/jobsetspec.go +++ b/client-go/applyconfiguration/jobset/v1alpha2/jobsetspec.go @@ -17,11 +17,12 @@ package v1alpha2 // JobSetSpecApplyConfiguration represents an declarative configuration of the JobSetSpec type for use // with apply. type JobSetSpecApplyConfiguration struct { - ReplicatedJobs []ReplicatedJobApplyConfiguration `json:"replicatedJobs,omitempty"` - Network *NetworkApplyConfiguration `json:"network,omitempty"` - SuccessPolicy *SuccessPolicyApplyConfiguration `json:"successPolicy,omitempty"` - FailurePolicy *FailurePolicyApplyConfiguration `json:"failurePolicy,omitempty"` - Suspend *bool `json:"suspend,omitempty"` + ReplicatedJobs []ReplicatedJobApplyConfiguration `json:"replicatedJobs,omitempty"` + Network *NetworkApplyConfiguration `json:"network,omitempty"` + SuccessPolicy *SuccessPolicyApplyConfiguration `json:"successPolicy,omitempty"` + FailurePolicy *FailurePolicyApplyConfiguration `json:"failurePolicy,omitempty"` + Suspend *bool `json:"suspend,omitempty"` + TTLSecondsAfterFinished *int32 `json:"ttlSecondsAfterFinished,omitempty"` } // JobSetSpecApplyConfiguration constructs an declarative configuration of the JobSetSpec type for use with @@ -74,3 +75,11 @@ func (b *JobSetSpecApplyConfiguration) WithSuspend(value bool) *JobSetSpecApplyC b.Suspend = &value return b } + +// WithTTLSecondsAfterFinished sets the TTLSecondsAfterFinished field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the TTLSecondsAfterFinished field is set to the value of the last call. +func (b *JobSetSpecApplyConfiguration) WithTTLSecondsAfterFinished(value int32) *JobSetSpecApplyConfiguration { + b.TTLSecondsAfterFinished = &value + return b +} diff --git a/config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml b/config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml index 3b55a61b..dd131686 100644 --- a/config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml +++ b/config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml @@ -9711,6 +9711,19 @@ spec: suspend: description: Suspend suspends all running child Jobs when set to true. type: boolean + ttlSecondsAfterFinished: + default: 0 + description: TTLSecondsAfterFinished limits the lifetime of a JobSet + that has finished execution (either Complete or Failed). If this + field is set, TTLSecondsAfterFinished after the JobSet finishes, + it is eligible to be automatically deleted. When the JobSet is being + deleted, its lifecycle guarantees (e.g. finalizers) will be honored. + If this field is unset, the JobSet won't be automatically deleted. + If this field is set to zero, the JobSet becomes eligible to be + deleted immediately after it finishes. + format: int32 + minimum: 0 + type: integer type: object status: description: JobSetStatus defines the observed state of JobSet diff --git a/go.mod b/go.mod index 4af54334..f078754c 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/onsi/ginkgo/v2 v2.13.2 github.com/onsi/gomega v1.30.0 github.com/open-policy-agent/cert-controller v0.10.1 + github.com/prometheus/client_golang v1.16.0 github.com/stretchr/testify v1.8.4 k8s.io/api v0.28.5 k8s.io/apimachinery v0.28.5 @@ -51,7 +52,6 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/client_golang v1.16.0 // indirect github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.10.1 // indirect diff --git a/hack/python-sdk/swagger.json b/hack/python-sdk/swagger.json index bba30e71..6b561d1e 100644 --- a/hack/python-sdk/swagger.json +++ b/hack/python-sdk/swagger.json @@ -102,6 +102,11 @@ "suspend": { "description": "Suspend suspends all running child Jobs when set to true.", "type": "boolean" + }, + "ttlSecondsAfterFinished": { + "description": "TTLSecondsAfterFinished limits the lifetime of a JobSet that has finished execution (either Complete or Failed). If this field is set, TTLSecondsAfterFinished after the JobSet finishes, it is eligible to be automatically deleted. When the JobSet is being deleted, its lifecycle guarantees (e.g. finalizers) will be honored. If this field is unset, the JobSet won't be automatically deleted. If this field is set to zero, the JobSet becomes eligible to be deleted immediately after it finishes.", + "type": "integer", + "format": "int32" } } }, diff --git a/main.go b/main.go index c79f8403..4185d30a 100644 --- a/main.go +++ b/main.go @@ -17,8 +17,15 @@ limitations under the License. package main import ( + "context" "flag" "os" + "time" + + "sigs.k8s.io/jobset/pkg/util/cert" + + "sigs.k8s.io/jobset/client-go/clientset/versioned" + "sigs.k8s.io/jobset/client-go/informers/externalversions" // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. @@ -35,7 +42,6 @@ import ( jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" "sigs.k8s.io/jobset/pkg/controllers" - "sigs.k8s.io/jobset/pkg/util/cert" "sigs.k8s.io/jobset/pkg/webhooks" //+kubebuilder:scaffold:imports ) @@ -125,7 +131,7 @@ func main() { // Cert won't be ready until manager starts, so start a goroutine here which // will block until the cert is ready before setting up the controllers. // Controllers who register after manager starts will start directly. - go setupControllers(mgr, certsReady) + go setupControllers(ctx, mgr, certsReady) setupHealthzAndReadyzCheck(mgr) @@ -136,7 +142,7 @@ func main() { } } -func setupControllers(mgr ctrl.Manager, certsReady chan struct{}) { +func setupControllers(ctx context.Context, mgr ctrl.Manager, certsReady chan struct{}) { // The controllers won't work until the webhooks are operating, // and the webhook won't work until the certs are all in places. setupLog.Info("waiting for the cert generation to complete") @@ -157,6 +163,22 @@ func setupControllers(mgr ctrl.Manager, certsReady chan struct{}) { os.Exit(1) } + clientset := versioned.NewForConfigOrDie(mgr.GetConfig()) + sharedInformers := externalversions.NewSharedInformerFactory(clientset, 30*time.Minute) + jobSetInformer := sharedInformers.Jobset().V1alpha2().JobSets() + ttlAfterFinishedController := controllers.NewTTLAfterFinishedReconciler( + mgr.GetClient(), + mgr.GetScheme(), + jobSetInformer, + ctrl.Log.WithValues("controller", "TTLAfterFinished"), + ) + if err := ttlAfterFinishedController.SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "TTLAfterFinished") + os.Exit(1) + } + + go ttlAfterFinishedController.Run(ctx, 1) + // Set up JobSet validating/defaulting webhook. if err := (&jobset.JobSet{}).SetupWebhookWithManager(mgr); err != nil { setupLog.Error(err, "unable to create webhook", "webhook", "JobSet") diff --git a/pkg/controllers/metrics/metrics.go b/pkg/controllers/metrics/metrics.go new file mode 100644 index 00000000..0ce99a14 --- /dev/null +++ b/pkg/controllers/metrics/metrics.go @@ -0,0 +1,28 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "sigs.k8s.io/controller-runtime/pkg/metrics" +) + +// TTLAfterFinishedSubsystem - subsystem name used for this controller. +const TTLAfterFinishedSubsystem = "ttl_after_finished_controller" + +var ( + // JobSetDeletionDurationSeconds tracks the time it took to delete the jobset since it + // became eligible for deletion. + JobSetDeletionDurationSeconds = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Subsystem: TTLAfterFinishedSubsystem, + Name: "jobset_deletion_duration_seconds", + Help: "The time it took to delete the jobset since it became eligible for deletion", + // Start with 100ms with the last bucket being [~27m, Inf). + Buckets: prometheus.ExponentialBuckets(0.1, 2, 14), + }, + ) +) + +func init() { + // Register custom metrics with the global prometheus registry + metrics.Registry.MustRegister(JobSetDeletionDurationSeconds) +} diff --git a/pkg/controllers/ttlafterfinished_controller.go b/pkg/controllers/ttlafterfinished_controller.go new file mode 100644 index 00000000..edf323ac --- /dev/null +++ b/pkg/controllers/ttlafterfinished_controller.go @@ -0,0 +1,319 @@ +/* +Copyright 2023 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "fmt" + "time" + + "github.com/go-logr/logr" + + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + "k8s.io/utils/clock" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + jobsetv1alpha2 "sigs.k8s.io/jobset/api/jobset/v1alpha2" + jobsetinformerv1alpha2 "sigs.k8s.io/jobset/client-go/informers/externalversions/jobset/v1alpha2" + "sigs.k8s.io/jobset/pkg/controllers/metrics" +) + +//+kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets,verbs=get;delete +//+kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets/status,verbs=get + +// TTLAfterFinishedReconciler reconciles a Pod owned by a JobSet using exclusive placement. +type TTLAfterFinishedReconciler struct { + client.Client + Scheme *runtime.Scheme + // listerSynced returns true if the JobSet store has been synced at least once. + // Added as a member to the struct to allow injection for testing. + listerSynced cache.InformerSynced + // Jobs that the controller will check its TTL and attempt to delete when the TTL expires. + queue workqueue.RateLimitingInterface + // The clock for tracking time + clock clock.Clock + // log is the logger for the controller + log logr.Logger +} + +// NewTTLAfterFinishedReconciler creates an instance of Controller +func NewTTLAfterFinishedReconciler( + client client.Client, + scheme *runtime.Scheme, + jobSetInformer jobsetinformerv1alpha2.JobSetInformer, + log logr.Logger, +) *TTLAfterFinishedReconciler { + config := workqueue.RateLimitingQueueConfig{Name: "ttl_jobsets_to_delete"} + tc := &TTLAfterFinishedReconciler{ + Client: client, + Scheme: scheme, + queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), config), + log: log, + } + + _, _ = jobSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + tc.addJobSet(obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + tc.updateJobSet(oldObj, newObj) + }, + }) + + tc.listerSynced = jobSetInformer.Informer().HasSynced + + tc.clock = clock.RealClock{} + + return tc +} + +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +func (r *TTLAfterFinishedReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + return ctrl.Result{}, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (r *TTLAfterFinishedReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&jobsetv1alpha2.JobSet{}). + Complete(r) +} + +// Run starts the workers to clean up Jobs. +func (r *TTLAfterFinishedReconciler) Run(ctx context.Context, workers int) { + defer utilruntime.HandleCrash() + defer r.queue.ShutDown() + + r.log.V(2).Info("Starting TTL after finished controller") + defer r.log.V(2).Info("Shutting down TTL after finished controller") + + if !cache.WaitForNamedCacheSync("TTL after finished", ctx.Done(), r.listerSynced) { + return + } + + for i := 0; i < workers; i++ { + go wait.UntilWithContext(ctx, r.worker, time.Second) + } + + <-ctx.Done() +} + +func (r *TTLAfterFinishedReconciler) addJobSet(obj interface{}) { + jobSet := obj.(*jobsetv1alpha2.JobSet) + r.log.V(2).Info("Adding jobset", "jobset", klog.KObj(jobSet)) + + if jobSet.DeletionTimestamp == nil && needsCleanup(jobSet) { + r.enqueue(jobSet) + } + +} + +func (r *TTLAfterFinishedReconciler) updateJobSet(old, cur interface{}) { + jobSet := cur.(*jobsetv1alpha2.JobSet) + r.log.V(2).Info("Updating jobset", "jobset", klog.KObj(jobSet)) + + if jobSet.DeletionTimestamp == nil && needsCleanup(jobSet) { + r.enqueue(jobSet) + } +} + +func (r *TTLAfterFinishedReconciler) enqueue(jobSet *jobsetv1alpha2.JobSet) { + r.log.V(2).Info("Add jobset to cleanup", "jobset", klog.KObj(jobSet)) + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(jobSet) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", jobSet, err)) + return + } + + r.queue.Add(key) +} + +func (r *TTLAfterFinishedReconciler) enqueueAfter(jobSet *jobsetv1alpha2.JobSet, after time.Duration) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(jobSet) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", jobSet, err)) + return + } + + r.queue.AddAfter(key, after) +} + +func (r *TTLAfterFinishedReconciler) worker(ctx context.Context) { + for r.processNextWorkItem(ctx) { + } +} + +func (r *TTLAfterFinishedReconciler) processNextWorkItem(ctx context.Context) bool { + key, quit := r.queue.Get() + if quit { + return false + } + defer r.queue.Done(key) + + err := r.processJobSet(ctx, key.(string)) + r.handleErr(err, key) + + return true +} + +func (r *TTLAfterFinishedReconciler) handleErr(err error, key interface{}) { + if err == nil { + r.queue.Forget(key) + return + } + + utilruntime.HandleError(fmt.Errorf("error cleaning up JobSet %v, will retry: %v", key, err)) + r.queue.AddRateLimited(key) +} + +// processJobSet will check the JobSet's state and TTL and delete the JobSet when it +// finishes and its TTL after finished has expired. If the JobSet hasn't finished or +// its TTL hasn't expired, it will be added to the queue after the TTL is expected +// to expire. +// This function is not meant to be invoked concurrently with the same key. +func (r *TTLAfterFinishedReconciler) processJobSet(ctx context.Context, key string) error { + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + + // Ignore the JobSets that are already deleted or being deleted, or the ones that don't need to clean up. + var jobSet jobsetv1alpha2.JobSet + err = r.Client.Get(ctx, client.ObjectKey{Namespace: namespace, Name: name}, &jobSet) + + r.log.V(2).Info("Checking if JobSet is ready for cleanup", "jobset", klog.KRef(namespace, name)) + + if errors.IsNotFound(err) { + return nil + } + if err != nil { + return err + } + + if expiredAt, err := r.processTTL(&jobSet); err != nil { + return err + } else if expiredAt == nil { + return nil + } + + // The JobSet's TTL is assumed to have expired, but the JobSet TTL might be stale. + // Before deleting the JobSet, do a final sanity check. + // If TTL is modified before we do this check, we cannot be sure if the TTL truly expires. + // The latest JobSet may have a different UID, but it's fine because the checks will be run again. + var fresh jobsetv1alpha2.JobSet + err = r.Client.Get(ctx, client.ObjectKey{Namespace: namespace, Name: name}, &fresh) + if errors.IsNotFound(err) { + return nil + } + if err != nil { + return err + } + // Use the latest JobSet TTL to see if the TTL truly expires. + expiredAt, err := r.processTTL(&fresh) + if err != nil { + return err + } else if expiredAt == nil { + return nil + } + // Cascade deletes the JobSets if TTL truly expires. + policy := metav1.DeletePropagationForeground + options := []client.DeleteOption{client.PropagationPolicy(policy), client.Preconditions{UID: &fresh.UID}} + r.log.V(2).Info("Cleaning up JobSet", "jobset", klog.KObj(&fresh)) + + if err := r.Client.Delete(ctx, &fresh, options...); err != nil { + return err + } + metrics.JobSetDeletionDurationSeconds.Observe(time.Since(*expiredAt).Seconds()) + return nil +} + +// processTTL checks whether a given JobSet's TTL has expired, and add it to the queue after the TTL is expected to expire +// if the TTL will expire later. +func (r *TTLAfterFinishedReconciler) processTTL(jobSet *jobsetv1alpha2.JobSet) (expiredAt *time.Time, err error) { + // We don't care about the JobSets that are going to be deleted, or the ones that don't need cleanup. + if jobSet.DeletionTimestamp != nil || !needsCleanup(jobSet) { + return nil, nil + } + + now := r.clock.Now() + t, e, err := timeLeft(r.log, jobSet, &now) + if err != nil { + return nil, err + } + + // TTL has expired + if *t <= 0 { + return e, nil + } + + r.enqueueAfter(jobSet, *t) + return nil, nil +} + +// needsCleanup checks whether a JobSet has finished and has a TTL set. +func needsCleanup(js *jobsetv1alpha2.JobSet) bool { + return js.Spec.TTLSecondsAfterFinished != nil && jobSetFinished(js) +} + +func getFinishAndExpireTime(js *jobsetv1alpha2.JobSet) (*time.Time, *time.Time, error) { + if !needsCleanup(js) { + return nil, nil, fmt.Errorf("jobset %s/%s should not be cleaned up", js.Namespace, js.Name) + } + t, err := jobSetFinishTime(js) + if err != nil { + return nil, nil, err + } + finishAt := t.Time + expireAt := finishAt.Add(time.Duration(*js.Spec.TTLSecondsAfterFinished) * time.Second) + return &finishAt, &expireAt, nil +} + +func timeLeft(log logr.Logger, js *jobsetv1alpha2.JobSet, since *time.Time) (*time.Duration, *time.Time, error) { + finishAt, expireAt, err := getFinishAndExpireTime(js) + if err != nil { + return nil, nil, err + } + + if finishAt.After(*since) { + log.V(2).Info("Warning: Found JobSet finished in the future. This is likely due to time skew in the cluster. JobSet cleanup will be deferred.", "jobset", klog.KObj(js)) + } + remaining := expireAt.Sub(*since) + log.V(2).Info("Found JobSet finished", "jobset", klog.KObj(js), "finishTime", finishAt.UTC(), "remainingTTL", remaining, "startTime", since.UTC(), "deadlineTTL", expireAt.UTC()) + return &remaining, expireAt, nil +} + +// jobSetFinishTime takes an already finished JobSet and returns the time it finishes. +func jobSetFinishTime(finishedJobSet *jobsetv1alpha2.JobSet) (metav1.Time, error) { + for _, c := range finishedJobSet.Status.Conditions { + if (c.Type == string(jobsetv1alpha2.JobSetCompleted) || c.Type == string(jobsetv1alpha2.JobSetFailed)) && c.Status == metav1.ConditionTrue { + finishAt := c.LastTransitionTime + if finishAt.IsZero() { + return metav1.Time{}, fmt.Errorf("unable to find the time when the JobSet %s/%s finished", finishedJobSet.Namespace, finishedJobSet.Name) + } + return c.LastTransitionTime, nil + } + } + + // This should never happen if the JobSets have finished + return metav1.Time{}, fmt.Errorf("unable to find the status of the finished JobSet %s/%s", finishedJobSet.Namespace, finishedJobSet.Name) +} diff --git a/pkg/controllers/ttlafterfinished_controller_test.go b/pkg/controllers/ttlafterfinished_controller_test.go new file mode 100644 index 00000000..2e25a88b --- /dev/null +++ b/pkg/controllers/ttlafterfinished_controller_test.go @@ -0,0 +1,289 @@ +package controllers + +import ( + "context" + "strings" + "testing" + "time" + + ctrl "sigs.k8s.io/controller-runtime" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/util/workqueue" + clocktesting "k8s.io/utils/clock/testing" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + utiltesting "sigs.k8s.io/jobset/pkg/util/testing" + + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + "k8s.io/klog/v2/ktesting" + "k8s.io/utils/ptr" + + jobsetv1alpha2 "sigs.k8s.io/jobset/api/jobset/v1alpha2" +) + +func TestTTLAfterFinishedReconciler_Run(t *testing.T) { + var ( + jobSetName = "test-jobset" + ns = "default" + ) + + tests := []struct { + name string + clock *clocktesting.FakeClock + expectDeletion bool + ttl int32 + }{ + { + name: "jobset completed 10s ago, 15s ttl", + clock: clocktesting.NewFakeClock(time.Now().Add(10 * time.Second)), + ttl: 15, + }, + { + name: "jobset completed 10s ago, 5s ttl", + clock: clocktesting.NewFakeClock(time.Now().Add(10 * time.Second)), + ttl: 5, + expectDeletion: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + t.Cleanup(cancel) + + fakeClient, controller := startTTLAfterFinishedController(ctx, t, tc.clock) + + js := utiltesting.MakeJobSet(jobSetName, ns).TTLSecondsAfterFinished(tc.ttl).Obj() + jobSetCompletedCondition := metav1.Condition{ + Type: string(jobsetv1alpha2.JobSetCompleted), + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.Now(), + } + js.Status.Conditions = append(js.Status.Conditions, jobSetCompletedCondition) + if err := fakeClient.Create(ctx, js); err != nil { + t.Fatalf("failed to create jobset %s/%s: %v", ns, jobSetName, err) + } + controller.enqueue(js) + + err := wait.PollUntilContextTimeout( + ctx, + 10*time.Millisecond, + 100*time.Millisecond, + false, + func(ctx context.Context) (bool, error) { + var fresh jobsetv1alpha2.JobSet + err := fakeClient.Get(ctx, client.ObjectKeyFromObject(js), &fresh) + if tc.expectDeletion { + return errors.IsNotFound(err), nil + } else { + if err != nil { + t.Fatalf("failed to get jobset %s/%s: %v", ns, jobSetName, err) + } + return fresh.DeletionTimestamp.IsZero(), nil + } + }, + ) + if err != nil { + t.Fatalf("failed to wait for jobset %s/%s to be deleted: %v", ns, jobSetName, err) + } + }) + } +} + +func TestTimeLeft(t *testing.T) { + now := metav1.Now() + + testCases := []struct { + name string + completionTime metav1.Time + failedTime metav1.Time + ttl *int32 + since *time.Time + expectErr bool + expectErrStr string + expectedTimeLeft *time.Duration + expectedExpireAt time.Time + }{ + { + name: "Error case: JobSet unfinished", + ttl: ptr.To[int32](100), + since: &now.Time, + expectErr: true, + expectErrStr: "should not be cleaned up", + }, + { + name: "Error case: JobSet completed now, no TTL", + completionTime: now, + since: &now.Time, + expectErr: true, + expectErrStr: "should not be cleaned up", + }, + { + name: "JobSet completed now, 0s TTL", + completionTime: now, + ttl: ptr.To[int32](0), + since: &now.Time, + expectedTimeLeft: ptr.To(0 * time.Second), + expectedExpireAt: now.Time, + }, + { + name: "JobSet completed now, 10s TTL", + completionTime: now, + ttl: ptr.To[int32](10), + since: &now.Time, + expectedTimeLeft: ptr.To(10 * time.Second), + expectedExpireAt: now.Add(10 * time.Second), + }, + { + name: "JobSet completed 10s ago, 15s TTL", + completionTime: metav1.NewTime(now.Add(-10 * time.Second)), + ttl: ptr.To[int32](15), + since: &now.Time, + expectedTimeLeft: ptr.To(5 * time.Second), + expectedExpireAt: now.Add(5 * time.Second), + }, + { + name: "Error case: JobSet failed now, no TTL", + failedTime: now, + since: &now.Time, + expectErr: true, + expectErrStr: "should not be cleaned up", + }, + { + name: "JobSet failed now, 0s TTL", + failedTime: now, + ttl: ptr.To[int32](0), + since: &now.Time, + expectedTimeLeft: ptr.To(0 * time.Second), + expectedExpireAt: now.Time, + }, + { + name: "JobSet failed now, 10s TTL", + failedTime: now, + ttl: ptr.To[int32](10), + since: &now.Time, + expectedTimeLeft: ptr.To(10 * time.Second), + expectedExpireAt: now.Add(10 * time.Second), + }, + { + name: "JobSet failed 10s ago, 15s TTL", + failedTime: metav1.NewTime(now.Add(-10 * time.Second)), + ttl: ptr.To[int32](15), + since: &now.Time, + expectedTimeLeft: ptr.To(5 * time.Second), + expectedExpireAt: now.Add(5 * time.Second), + }, + } + + for _, tc := range testCases { + + jobSet := newJobSet(tc.completionTime, tc.failedTime, tc.ttl) + _, ctx := ktesting.NewTestContext(t) + logger := klog.FromContext(ctx) + gotTimeLeft, gotExpireAt, gotErr := timeLeft(logger, jobSet, tc.since) + if tc.expectErr != (gotErr != nil) { + t.Errorf("%s: expected error is %t, got %t, error: %v", tc.name, tc.expectErr, gotErr != nil, gotErr) + } + if tc.expectErr && len(tc.expectErrStr) == 0 { + t.Errorf("%s: invalid test setup; error message must not be empty for error cases", tc.name) + } + if tc.expectErr && !strings.Contains(gotErr.Error(), tc.expectErrStr) { + t.Errorf("%s: expected error message contains %q, got %v", tc.name, tc.expectErrStr, gotErr) + } + if !tc.expectErr { + if *gotTimeLeft != *tc.expectedTimeLeft { + t.Errorf("%s: expected time left %v, got %v", tc.name, tc.expectedTimeLeft, gotTimeLeft) + } + if *gotExpireAt != tc.expectedExpireAt { + t.Errorf("%s: expected expire at %v, got %v", tc.name, tc.expectedExpireAt, *gotExpireAt) + } + } + } +} + +func newJobSet(completionTime, failedTime metav1.Time, ttl *int32) *jobsetv1alpha2.JobSet { + js := &jobsetv1alpha2.JobSet{ + TypeMeta: metav1.TypeMeta{Kind: "JobSet"}, + ObjectMeta: metav1.ObjectMeta{ + Name: "foobar", + Namespace: metav1.NamespaceDefault, + }, + Spec: jobsetv1alpha2.JobSetSpec{ + ReplicatedJobs: []jobsetv1alpha2.ReplicatedJob{ + { + Name: "foobar-job", + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"foo": "bar"}, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "foo": "bar", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Image: "foo/bar"}, + }, + }, + }, + }, + }, + }, + }, + }, + } + + if !completionTime.IsZero() { + c := metav1.Condition{Type: string(jobsetv1alpha2.JobSetCompleted), Status: metav1.ConditionTrue, LastTransitionTime: completionTime} + js.Status.Conditions = append(js.Status.Conditions, c) + } + + if !failedTime.IsZero() { + c := metav1.Condition{Type: string(jobsetv1alpha2.JobSetFailed), Status: metav1.ConditionTrue, LastTransitionTime: failedTime} + js.Status.Conditions = append(js.Status.Conditions, c) + } + + if ttl != nil { + js.Spec.TTLSecondsAfterFinished = ttl + } + + return js +} + +func startTTLAfterFinishedController(ctx context.Context, t *testing.T, clock *clocktesting.FakeClock) (client.WithWatch, *TTLAfterFinishedReconciler) { + t.Helper() + + scheme := runtime.NewScheme() + if err := clientgoscheme.AddToScheme(scheme); err != nil { + t.Fatalf("failed to add client-go scheme: %v", err) + } + if err := jobsetv1alpha2.AddToScheme(scheme); err != nil { + t.Fatalf("failed to add resource scheme: %v", err) + } + + queueConfig := workqueue.RateLimitingQueueConfig{Name: "ttl_jobsets_to_delete"} + fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() + controller := TTLAfterFinishedReconciler{ + Client: fakeClient, + Scheme: scheme, + listerSynced: func() bool { return true }, + queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), queueConfig), + clock: clock, + log: ctrl.Log.WithValues("controller", "TTLAfterFinished"), + } + + go controller.Run(ctx, 1) + + return fakeClient, &controller +} diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index 7401de64..c9b54dc2 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -54,6 +54,12 @@ func MakeJobSet(name, ns string) *JobSetWrapper { } } +// TTLSecondsAfterFinished sets the value of jobSet.spec.ttlSecondsAfterFinished +func (j *JobSetWrapper) TTLSecondsAfterFinished(seconds int32) *JobSetWrapper { + j.Spec.TTLSecondsAfterFinished = &seconds + return j +} + // SuccessPolicy sets the value of jobSet.spec.successPolicy func (j *JobSetWrapper) SuccessPolicy(policy *jobset.SuccessPolicy) *JobSetWrapper { j.Spec.SuccessPolicy = policy diff --git a/sdk/python/docs/JobsetV1alpha2JobSetSpec.md b/sdk/python/docs/JobsetV1alpha2JobSetSpec.md index 151be66f..9ce010ef 100644 --- a/sdk/python/docs/JobsetV1alpha2JobSetSpec.md +++ b/sdk/python/docs/JobsetV1alpha2JobSetSpec.md @@ -9,6 +9,7 @@ Name | Type | Description | Notes **replicated_jobs** | [**list[JobsetV1alpha2ReplicatedJob]**](JobsetV1alpha2ReplicatedJob.md) | ReplicatedJobs is the group of jobs that will form the set. | [optional] **success_policy** | [**JobsetV1alpha2SuccessPolicy**](JobsetV1alpha2SuccessPolicy.md) | | [optional] **suspend** | **bool** | Suspend suspends all running child Jobs when set to true. | [optional] +**ttl_seconds_after_finished** | **int** | TTLSecondsAfterFinished limits the lifetime of a JobSet that has finished execution (either Complete or Failed). If this field is set, TTLSecondsAfterFinished after the JobSet finishes, it is eligible to be automatically deleted. When the JobSet is being deleted, its lifecycle guarantees (e.g. finalizers) will be honored. If this field is unset, the JobSet won't be automatically deleted. If this field is set to zero, the JobSet becomes eligible to be deleted immediately after it finishes. | [optional] [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/sdk/python/jobset/models/jobset_v1alpha2_job_set_spec.py b/sdk/python/jobset/models/jobset_v1alpha2_job_set_spec.py index 645c927f..60d58154 100644 --- a/sdk/python/jobset/models/jobset_v1alpha2_job_set_spec.py +++ b/sdk/python/jobset/models/jobset_v1alpha2_job_set_spec.py @@ -37,7 +37,8 @@ class JobsetV1alpha2JobSetSpec(object): 'network': 'JobsetV1alpha2Network', 'replicated_jobs': 'list[JobsetV1alpha2ReplicatedJob]', 'success_policy': 'JobsetV1alpha2SuccessPolicy', - 'suspend': 'bool' + 'suspend': 'bool', + 'ttl_seconds_after_finished': 'int' } attribute_map = { @@ -45,10 +46,11 @@ class JobsetV1alpha2JobSetSpec(object): 'network': 'network', 'replicated_jobs': 'replicatedJobs', 'success_policy': 'successPolicy', - 'suspend': 'suspend' + 'suspend': 'suspend', + 'ttl_seconds_after_finished': 'ttlSecondsAfterFinished' } - def __init__(self, failure_policy=None, network=None, replicated_jobs=None, success_policy=None, suspend=None, local_vars_configuration=None): # noqa: E501 + def __init__(self, failure_policy=None, network=None, replicated_jobs=None, success_policy=None, suspend=None, ttl_seconds_after_finished=None, local_vars_configuration=None): # noqa: E501 """JobsetV1alpha2JobSetSpec - a model defined in OpenAPI""" # noqa: E501 if local_vars_configuration is None: local_vars_configuration = Configuration() @@ -59,6 +61,7 @@ def __init__(self, failure_policy=None, network=None, replicated_jobs=None, succ self._replicated_jobs = None self._success_policy = None self._suspend = None + self._ttl_seconds_after_finished = None self.discriminator = None if failure_policy is not None: @@ -71,6 +74,8 @@ def __init__(self, failure_policy=None, network=None, replicated_jobs=None, succ self.success_policy = success_policy if suspend is not None: self.suspend = suspend + if ttl_seconds_after_finished is not None: + self.ttl_seconds_after_finished = ttl_seconds_after_finished @property def failure_policy(self): @@ -181,6 +186,29 @@ def suspend(self, suspend): self._suspend = suspend + @property + def ttl_seconds_after_finished(self): + """Gets the ttl_seconds_after_finished of this JobsetV1alpha2JobSetSpec. # noqa: E501 + + TTLSecondsAfterFinished limits the lifetime of a JobSet that has finished execution (either Complete or Failed). If this field is set, TTLSecondsAfterFinished after the JobSet finishes, it is eligible to be automatically deleted. When the JobSet is being deleted, its lifecycle guarantees (e.g. finalizers) will be honored. If this field is unset, the JobSet won't be automatically deleted. If this field is set to zero, the JobSet becomes eligible to be deleted immediately after it finishes. # noqa: E501 + + :return: The ttl_seconds_after_finished of this JobsetV1alpha2JobSetSpec. # noqa: E501 + :rtype: int + """ + return self._ttl_seconds_after_finished + + @ttl_seconds_after_finished.setter + def ttl_seconds_after_finished(self, ttl_seconds_after_finished): + """Sets the ttl_seconds_after_finished of this JobsetV1alpha2JobSetSpec. + + TTLSecondsAfterFinished limits the lifetime of a JobSet that has finished execution (either Complete or Failed). If this field is set, TTLSecondsAfterFinished after the JobSet finishes, it is eligible to be automatically deleted. When the JobSet is being deleted, its lifecycle guarantees (e.g. finalizers) will be honored. If this field is unset, the JobSet won't be automatically deleted. If this field is set to zero, the JobSet becomes eligible to be deleted immediately after it finishes. # noqa: E501 + + :param ttl_seconds_after_finished: The ttl_seconds_after_finished of this JobsetV1alpha2JobSetSpec. # noqa: E501 + :type: int + """ + + self._ttl_seconds_after_finished = ttl_seconds_after_finished + def to_dict(self): """Returns the model properties as a dict""" result = {} diff --git a/sdk/python/test/test_jobset_v1alpha2_job_set.py b/sdk/python/test/test_jobset_v1alpha2_job_set.py index d99cc748..3e0ddcd0 100644 --- a/sdk/python/test/test_jobset_v1alpha2_job_set.py +++ b/sdk/python/test/test_jobset_v1alpha2_job_set.py @@ -58,7 +58,8 @@ def make_instance(self, include_optional): target_replicated_jobs = [ '0' ], ), - suspend = True, ), + suspend = True, + ttl_seconds_after_finished = 56, ), status = jobset.models.jobset_v1alpha2_job_set_status.JobsetV1alpha2JobSetStatus( conditions = [ None diff --git a/sdk/python/test/test_jobset_v1alpha2_job_set_list.py b/sdk/python/test/test_jobset_v1alpha2_job_set_list.py index d026f015..b02584e5 100644 --- a/sdk/python/test/test_jobset_v1alpha2_job_set_list.py +++ b/sdk/python/test/test_jobset_v1alpha2_job_set_list.py @@ -61,7 +61,8 @@ def make_instance(self, include_optional): target_replicated_jobs = [ '0' ], ), - suspend = True, ), + suspend = True, + ttl_seconds_after_finished = 56, ), status = jobset.models.jobset_v1alpha2_job_set_status.JobsetV1alpha2JobSetStatus( conditions = [ None @@ -104,7 +105,8 @@ def make_instance(self, include_optional): target_replicated_jobs = [ '0' ], ), - suspend = True, ), + suspend = True, + ttl_seconds_after_finished = 56, ), status = jobset.models.jobset_v1alpha2_job_set_status.JobsetV1alpha2JobSetStatus( conditions = [ None diff --git a/sdk/python/test/test_jobset_v1alpha2_job_set_spec.py b/sdk/python/test/test_jobset_v1alpha2_job_set_spec.py index 475721fb..1d9ec8f7 100644 --- a/sdk/python/test/test_jobset_v1alpha2_job_set_spec.py +++ b/sdk/python/test/test_jobset_v1alpha2_job_set_spec.py @@ -54,7 +54,8 @@ def make_instance(self, include_optional): target_replicated_jobs = [ '0' ], ), - suspend = True + suspend = True, + ttl_seconds_after_finished = 56 ) else : return JobsetV1alpha2JobSetSpec( diff --git a/test/integration/controller/suite_test.go b/test/integration/controller/suite_test.go index 7929e63a..413c2184 100644 --- a/test/integration/controller/suite_test.go +++ b/test/integration/controller/suite_test.go @@ -21,6 +21,9 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "sigs.k8s.io/jobset/client-go/clientset/versioned" + "sigs.k8s.io/jobset/client-go/informers/externalversions" + "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" ctrl "sigs.k8s.io/controller-runtime" @@ -103,6 +106,21 @@ var _ = BeforeSuite(func() { err = podReconciler.SetupWithManager(k8sManager) Expect(err).ToNot(HaveOccurred()) + clientset := versioned.NewForConfigOrDie(k8sManager.GetConfig()) + sharedInformers := externalversions.NewSharedInformerFactory(clientset, 0) + jobSetInformer := sharedInformers.Jobset().V1alpha2().JobSets() + ttlAfterFinishedController := controllers.NewTTLAfterFinishedReconciler( + k8sManager.GetClient(), + k8sManager.GetScheme(), + jobSetInformer, + ctrl.Log.WithValues("controller", "TTLAfterFinished"), + ) + Expect(ttlAfterFinishedController.SetupWithManager(k8sManager)).To(Succeed()) + + sharedInformers.Start(ctx.Done()) + go ttlAfterFinishedController.Run(ctx, 1) + sharedInformers.WaitForCacheSync(ctx.Done()) + go func() { defer GinkgoRecover() err = k8sManager.Start(ctx) diff --git a/test/integration/controller/ttlafterfinished_controller_test.go b/test/integration/controller/ttlafterfinished_controller_test.go new file mode 100644 index 00000000..1977a6ed --- /dev/null +++ b/test/integration/controller/ttlafterfinished_controller_test.go @@ -0,0 +1,84 @@ +/* +Copyright 2023 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllertest + +import ( + "context" + "fmt" + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" + testutil "sigs.k8s.io/jobset/test/util" +) + +var _ = ginkgo.Describe("TTLAfterFinished controller", func() { + ginkgo.It("should delete a JobSet with TTLAfterFinished field set after configured seconds pass", func() { + ctx := context.Background() + + // Create test namespace for each entry. + ns := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "jobset-ns-", + }, + } + + gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed()) + + defer func() { + gomega.Expect(testutil.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + }() + + // Create JobSet. + js := testJobSet(ns).TTLSecondsAfterFinished(5).Obj() + + // Verify jobset created successfully. + ginkgo.By(fmt.Sprintf("creating jobSet %s/%s", js.Name, js.Namespace)) + gomega.Eventually(k8sClient.Create(ctx, js), timeout, interval).Should(gomega.Succeed()) + + ginkgo.By("checking all jobs were created successfully") + gomega.Eventually(testutil.NumJobs, timeout, interval).WithArguments(ctx, k8sClient, js).Should(gomega.Equal(testutil.NumExpectedJobs(js))) + + // Fetch updated job objects, so we always have the latest resource versions to perform mutations on. + var jobList batchv1.JobList + gomega.Expect(k8sClient.List(ctx, &jobList, client.InNamespace(js.Namespace))).Should(gomega.Succeed()) + gomega.Expect(len(jobList.Items)).To(gomega.Equal(testutil.NumExpectedJobs(js))) + completeAllJobs(&jobList) + + // Verify jobset is marked as completed. + testutil.JobSetCompleted(ctx, k8sClient, js, timeout) + + // Verify jobset has not been deleted if ttl has not passed. + ginkgo.By("checking that jobset has not been deleted before configured seconds pass") + var fresh jobset.JobSet + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(js), &fresh)).To(gomega.Succeed()) + gomega.Expect(fresh.DeletionTimestamp).To(gomega.BeNil()) + + ginkgo.By("checking that ttl after finished controller deletes jobset after configured seconds pass") + gomega.Eventually(func() bool { + if err := k8sClient.Get(ctx, client.ObjectKeyFromObject(js), &fresh); err != nil { + return false + } + return !fresh.DeletionTimestamp.IsZero() + }, 15*time.Second, 500*time.Millisecond).Should(gomega.BeTrue()) + }) +})