Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package v2

import (
"strings"

"github.com/robfig/cron"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -27,7 +29,6 @@ import (
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/webhook"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
"strings"
)

// log is for logging in this package.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ref "k8s.io/client-go/tools/reference"
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -87,6 +88,7 @@ var (
scheduledTimeAnnotation = "batch.tutorial.kubebuilder.io/scheduled-at"
)

// nolint: gocyclo
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
Expand Down Expand Up @@ -124,11 +126,14 @@ func (r *CronJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
### 2: List all active jobs, and update the status

To fully update our status, we'll need to list all child jobs in this namespace that belong to this CronJob.
Similarly to Get, we can use the List method to list the child jobs. Notice that we use variadic options to
Similarly to Get, we can use the List method to list the child jobs. Notice that we use variadic options to
set the namespace and field match (which is actually an index lookup that we set up below).

After listing the jobs and determining their status, we'll update the CronJob's status. We use a retry mechanism
to handle potential conflicts that can arise due to concurrent updates in the cluster.
*/
var childJobs kbatch.JobList
if err := r.List(ctx, &childJobs, client.InNamespace(req.Namespace), client.MatchingFields{jobOwnerKey: req.Name}); err != nil {
if err := r.List(ctx, &childJobs, client.InNamespace(req.Namespace), client.MatchingLabels{jobOwnerKey: req.Name}); err != nil {
log.Error(err, "unable to list child Jobs")
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -223,28 +228,36 @@ func (r *CronJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}
}

if mostRecentTime != nil {
cronJob.Status.LastScheduleTime = &metav1.Time{Time: *mostRecentTime}
} else {
cronJob.Status.LastScheduleTime = nil
}
cronJob.Status.Active = nil
for _, activeJob := range activeJobs {
jobRef, err := ref.GetReference(r.Scheme, activeJob)
if err != nil {
log.Error(err, "unable to make reference to active job", "job", activeJob)
continue
// Retry loop for updating status
updateErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
// Fetch the latest version of the CronJob
if err := r.Get(ctx, req.NamespacedName, &cronJob); err != nil {
return err
}
cronJob.Status.Active = append(cronJob.Status.Active, *jobRef)
}

/*
Here, we'll log how many jobs we observed at a slightly higher logging level,
for debugging. Notice how instead of using a format string, we use a fixed message,
and attach key-value pairs with the extra information. This makes it easier to
filter and query log lines.
*/
log.V(1).Info("job count", "active jobs", len(activeJobs), "successful jobs", len(successfulJobs), "failed jobs", len(failedJobs))
// Update the status
if mostRecentTime != nil {
cronJob.Status.LastScheduleTime = &metav1.Time{Time: *mostRecentTime}
} else {
cronJob.Status.LastScheduleTime = nil
}

cronJob.Status.Active = nil
for _, activeJob := range activeJobs {
jobRef, err := ref.GetReference(r.Scheme, activeJob)
if err != nil {
log.Error(err, "unable to make reference to active job", "job", activeJob)
continue
}
cronJob.Status.Active = append(cronJob.Status.Active, *jobRef)
}

if err := r.Status().Update(ctx, &cronJob); err != nil {
log.Error(err, "unable to update CronJob status")
return err
}
return nil
})

/*
Using the data we've gathered, we'll update the status of our CRD.
Expand All @@ -255,9 +268,9 @@ func (r *CronJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
The status subresource ignores changes to spec, so it's less likely to conflict
with any other updates, and can have separate permissions.
*/
if err := r.Status().Update(ctx, &cronJob); err != nil {
log.Error(err, "unable to update CronJob status")
return ctrl.Result{}, err
if updateErr != nil {
log.Error(updateErr, "failed to update CronJob status after retrying")
return ctrl.Result{}, updateErr
}

/*
Expand Down Expand Up @@ -414,7 +427,7 @@ func (r *CronJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
/*
### 6: Run a new job if it's on schedule, not past the deadline, and not blocked by our concurrency policy

If we've missed a run, and we're still within the deadline to start it, we'll need to run a job.
If we've missed a run, and we're still within the deadline to start it, we'll need to run a job. Ensure that the concurrency policy is respected, blocking or replacing jobs as necessary. This integrates smoothly with the retry logic in place, ensuring consistent and reliable job scheduling.
*/
if missedRun.IsZero() {
log.V(1).Info("no upcoming scheduled times, sleeping until next")
Expand Down Expand Up @@ -508,7 +521,7 @@ func (r *CronJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}

// ...and create it on the cluster
if err := r.Create(ctx, job); err != nil {
if err := r.Create(ctx, job, client.FieldOwner("cronjob-controller")); err != nil {
log.Error(err, "unable to create Job for CronJob", "job", job)
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -542,7 +555,7 @@ will automatically call Reconcile on the underlying CronJob when a Job changes,
deleted, etc.
*/
var (
jobOwnerKey = ".metadata.controller"
jobOwnerKey = "owner-key"
apiGVStr = batchv1.GroupVersion.String()
)

Expand All @@ -555,7 +568,10 @@ func (r *CronJobReconciler) SetupWithManager(mgr ctrl.Manager) error {

if err := mgr.GetFieldIndexer().IndexField(context.Background(), &kbatch.Job{}, jobOwnerKey, func(rawObj client.Object) []string {
// grab the job object, extract the owner...
job := rawObj.(*kbatch.Job)
job, ok := rawObj.(*kbatch.Job)
if !ok {
return nil
}
owner := metav1.GetControllerOf(job)
if owner == nil {
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,86 +18,166 @@ package controller

import (
"context"
kbatch "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"fmt"
ctrl "sigs.k8s.io/controller-runtime"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

kbatch "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
batchv1 "tutorial.kubebuilder.io/project/api/v1"
)

var _ = Describe("CronJob Controller", func() {
Context("When reconciling a resource", func() {
const resourceName = "test-resource"
const (
resourceName = "test-resource"
namespace = "default"
timeout = time.Second * 10
duration = time.Second * 10
interval = time.Millisecond * 250
)

ctx := context.Background()
var (
ctx = context.Background()
)

typeNamespacedName := types.NamespacedName{
typeNamespacedName := types.NamespacedName{
Name: resourceName,
Namespace: namespace,
}

cronJob := &batchv1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: resourceName,
Namespace: "default", // TODO(user):Modify as needed
}
cronjob := &batchv1.CronJob{}

BeforeEach(func() {
By("creating the custom resource for the Kind CronJob")
err := k8sClient.Get(ctx, typeNamespacedName, cronjob)
if err != nil && errors.IsNotFound(err) {
resource := &batchv1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: resourceName,
Namespace: "default",
},
Spec: batchv1.CronJobSpec{
Schedule: "*/1 * * * *", // Example: runs every minute
JobTemplate: kbatch.JobTemplateSpec{
Spec: kbatch.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "example-container",
Image: "example-image",
Command: []string{"echo", "Hello World"},
},
},
RestartPolicy: corev1.RestartPolicyOnFailure,
},
Namespace: namespace,
},
Spec: batchv1.CronJobSpec{
Schedule: "*/1 * * * *",
JobTemplate: kbatch.JobTemplateSpec{
Spec: kbatch.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "example-container",
Image: "example-image",
Command: []string{"echo", "Hello World"},
},
},
RestartPolicy: corev1.RestartPolicyOnFailure,
},
},
}
Expect(k8sClient.Create(ctx, resource)).To(Succeed())
}
},
},
},
}

BeforeEach(func() {
By("Creating the custom resource for the Kind CronJob")
err := k8sClient.Get(ctx, typeNamespacedName, cronJob)
if err != nil && errors.IsNotFound(err) {
cronJob.ResourceVersion = "" // Ensure resourceVersion is not set
Expect(k8sClient.Create(ctx, cronJob)).To(Succeed())
}
})

AfterEach(func() {
By("Cleaning up the CronJob resource")
Expect(k8sClient.Delete(ctx, cronJob)).To(Succeed())
})

It("Should reconcile successfully and match the expected CronJob data", func() {
By("Reconciling the CronJob")
controllerReconciler := &CronJobReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
Clock: realClock{},
}

_, err := controllerReconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: typeNamespacedName,
})
Expect(err).NotTo(HaveOccurred())

AfterEach(func() {
// TODO(user): Cleanup logic after each test, like removing the resource instance.
resource := &batchv1.CronJob{}
err := k8sClient.Get(ctx, typeNamespacedName, resource)
Expect(err).NotTo(HaveOccurred())
By("Verifying the CronJob spec schedule")
createdCronJob := &batchv1.CronJob{}
Eventually(func() bool {
err := k8sClient.Get(ctx, typeNamespacedName, createdCronJob)
return err == nil
}, timeout, interval).Should(BeTrue())
Expect(createdCronJob.Spec.Schedule).To(Equal("*/1 * * * *"))

By("Cleanup the specific resource instance CronJob")
Expect(k8sClient.Delete(ctx, resource)).To(Succeed())
By("Checking the CronJob has zero active jobs")
Consistently(func() (int, error) {
err := k8sClient.Get(ctx, typeNamespacedName, createdCronJob)
if err != nil {
return -1, err
}
return len(createdCronJob.Status.Active), nil
}, duration, interval).Should(Equal(0))
})

It("Should successfully manage job history according to CronJob policy", func() {
By("Creating a job for the CronJob")
scheduledTime := time.Now().Add(-time.Minute)
controllerReconciler := &CronJobReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
Clock: realClock{},
}
constructJobForCronJob := func(cronJob *batchv1.CronJob, scheduledTime time.Time) (*kbatch.Job, error) {
// We want job names for a given nominal start time to have a deterministic name to avoid the same job being created twice
name := fmt.Sprintf("%s-%d", cronJob.Name, scheduledTime.Unix())

// actually make the job...
job := &kbatch.Job{
ObjectMeta: metav1.ObjectMeta{
Labels: make(map[string]string),
Annotations: make(map[string]string),
Name: name,
Namespace: cronJob.Namespace,
},
Spec: *cronJob.Spec.JobTemplate.Spec.DeepCopy(),
}

for k, v := range cronJob.Spec.JobTemplate.Annotations {
job.Annotations[k] = v
}
job.Annotations[scheduledTimeAnnotation] = scheduledTime.Format(time.RFC3339)
for k, v := range cronJob.Spec.JobTemplate.Labels {
job.Labels[k] = v
}

if err := ctrl.SetControllerReference(cronJob, job, k8sClient.Scheme()); err != nil {
return nil, err
}

return job, nil
}
job, err := constructJobForCronJob(cronJob, scheduledTime)
Expect(err).NotTo(HaveOccurred())

job.ResourceVersion = "" // Ensure resourceVersion is not set for new job creation
Expect(k8sClient.Create(ctx, job)).To(Succeed())

By("Re-running the reconciler to clean up old jobs")
_, err = controllerReconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: typeNamespacedName,
})
// TODO: Fix me. We need to implement the tests and ensure
// that the controller implementation of multi-version tutorial is accurate
//It("should successfully reconcile the resource", func() {
// By("Reconciling the created resource")
// controllerReconciler := &CronJobReconciler{
// Client: k8sClient,
// Scheme: k8sClient.Scheme(),
// }
//
// _, err := controllerReconciler.Reconcile(ctx, reconcile.Request{
// NamespacedName: typeNamespacedName,
// })
// Expect(err).NotTo(HaveOccurred())
// // TODO(user): Add more specific assertions depending on your controller's reconciliation logic.
// // Example: If you expect a certain status condition after reconciliation, verify it here.
//})
Expect(err).NotTo(HaveOccurred())

By("Verifying that the job history is managed correctly")
var childJobs kbatch.JobList
Expect(k8sClient.List(ctx, &childJobs, client.InNamespace(namespace), client.MatchingLabels{"owner-key": resourceName})).To(Succeed())
if cronJob.Spec.SuccessfulJobsHistoryLimit != nil {
Expect(len(childJobs.Items)).To(BeNumerically("<=", *cronJob.Spec.SuccessfulJobsHistoryLimit))
}
})
})