From 27833234446dfae135b294ea6e14af745fda5fa1 Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Mon, 15 Apr 2024 18:11:23 +0300 Subject: [PATCH] Add new task refresh, which ensures CassandraDatacenter is up to date --- apis/control/v1alpha1/cassandratask_types.go | 3 ++ .../control/cassandratask_controller.go | 12 ++++- .../control/cassandratask_controller_test.go | 46 ++++++++++++++++++- internal/controllers/control/jobs.go | 40 ++++++++++++++++ 4 files changed, 98 insertions(+), 3 deletions(-) diff --git a/apis/control/v1alpha1/cassandratask_types.go b/apis/control/v1alpha1/cassandratask_types.go index b54eb213..d02703b3 100644 --- a/apis/control/v1alpha1/cassandratask_types.go +++ b/apis/control/v1alpha1/cassandratask_types.go @@ -80,6 +80,7 @@ const ( CommandMove CassandraCommand = "move" CommandGarbageCollect CassandraCommand = "garbagecollect" CommandFlush CassandraCommand = "flush" + CommandRefresh CassandraCommand = "refresh" ) type CassandraJob struct { @@ -167,6 +168,8 @@ const ( JobFailed JobConditionType = "Failed" // JobRunning means the job is currently executing JobRunning JobConditionType = "Running" + // DatacenterUpdated + DatacenterUpdated JobConditionType = "DatacenterUpdated" ) //+kubebuilder:object:root=true diff --git a/internal/controllers/control/cassandratask_controller.go b/internal/controllers/control/cassandratask_controller.go index c99207df..73b6d8d0 100644 --- a/internal/controllers/control/cassandratask_controller.go +++ b/internal/controllers/control/cassandratask_controller.go @@ -308,12 +308,20 @@ JobDefinition: flush(taskConfig) case api.CommandGarbageCollect: gc(taskConfig) + case api.CommandRefresh: + // This targets the Datacenter only + res, err = r.refreshDatacenter(ctx, dc, &cassTask) + if err != nil { + return ctrl.Result{}, err + } + completed = taskConfig.Completed + break JobDefinition default: err = fmt.Errorf("unknown job command: %s", job.Command) return ctrl.Result{}, err } - if !r.HasCondition(cassTask, api.JobRunning, metav1.ConditionTrue) { + if !r.HasCondition(&cassTask, api.JobRunning, metav1.ConditionTrue) { valid, errValidate := taskConfig.Validate() if errValidate != nil && valid { // Retry, this is a transient error @@ -399,7 +407,7 @@ func (r *CassandraTaskReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -func (r *CassandraTaskReconciler) HasCondition(task api.CassandraTask, condition api.JobConditionType, status metav1.ConditionStatus) bool { +func (r *CassandraTaskReconciler) HasCondition(task *api.CassandraTask, condition api.JobConditionType, status metav1.ConditionStatus) bool { for _, cond := range task.Status.Conditions { if cond.Type == string(condition) { return cond.Status == status diff --git a/internal/controllers/control/cassandratask_controller_test.go b/internal/controllers/control/cassandratask_controller_test.go index b157bd3e..523af020 100644 --- a/internal/controllers/control/cassandratask_controller_test.go +++ b/internal/controllers/control/cassandratask_controller_test.go @@ -10,6 +10,7 @@ import ( "github.com/k8ssandra/cass-operator/pkg/httphelper" + cassapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" cassdcapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" api "github.com/k8ssandra/cass-operator/apis/control/v1alpha1" . "github.com/onsi/ginkgo/v2" @@ -683,7 +684,6 @@ var _ = Describe("CassandraTask controller tests", func() { AfterEach(func() { deleteDatacenter(testNamespaceName) - // Expect(k8sClient.Delete(context.TODO(), testDc)).Should(Succeed()) }) Context("Restart", func() { @@ -781,4 +781,48 @@ var _ = Describe("CassandraTask controller tests", func() { }) }) }) + Describe("Execute jobs against Datacenters", func() { + var testNamespaceName string + BeforeEach(func() { + testNamespaceName = fmt.Sprintf("test-task-%d", rand.Int31()) + By("create datacenter", createDatacenter(testDatacenterName, testNamespaceName)) + }) + + AfterEach(func() { + deleteDatacenter(testNamespaceName) + }) + + Context("Refresh", func() { + It("Adds an annotation if CassandraDatacenter does not have one and waits for completion", func() { + taskKey, task := buildTask(api.CommandRefresh, testNamespaceName) + Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) + + dc := &cassdcapi.CassandraDatacenter{} + Eventually(func() bool { + if err := k8sClient.Get(context.TODO(), types.NamespacedName{Name: testDatacenterName, Namespace: testNamespaceName}, dc); err != nil { + return false + } + if metav1.HasAnnotation(dc.ObjectMeta, cassapi.UpdateAllowedAnnotation) { + return dc.Annotations[cassapi.UpdateAllowedAnnotation] == "once" + } + return false + }, "5s", "50ms").Should(BeTrue()) + + delete(dc.Annotations, cassapi.UpdateAllowedAnnotation) + Expect(k8sClient.Update(context.Background(), dc)).Should(Succeed()) + + _ = waitForTaskCompletion(taskKey) + }) + It("Completes if autoupdate-spec is always allowed", func() { + dc := &cassdcapi.CassandraDatacenter{} + Expect(k8sClient.Get(context.TODO(), types.NamespacedName{Name: testDatacenterName, Namespace: testNamespaceName}, dc)).To(Succeed()) + metav1.SetMetaDataAnnotation(&dc.ObjectMeta, cassapi.UpdateAllowedAnnotation, string(cassapi.AllowUpdateAlways)) + Expect(k8sClient.Update(context.Background(), dc)).Should(Succeed()) + + taskKey, task := buildTask(api.CommandRefresh, testNamespaceName) + Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) + _ = waitForTaskCompletion(taskKey) + }) + }) + }) }) diff --git a/internal/controllers/control/jobs.go b/internal/controllers/control/jobs.go index 0cc5a039..54f53f70 100644 --- a/internal/controllers/control/jobs.go +++ b/internal/controllers/control/jobs.go @@ -13,6 +13,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" cassapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" api "github.com/k8ssandra/cass-operator/apis/control/v1alpha1" @@ -401,6 +402,45 @@ func compact(taskConfig *TaskConfiguration) { taskConfig.AsyncFunc = compactAsync } +// Refresh CassandraDatacenter + +func (r *CassandraTaskReconciler) refreshDatacenter(ctx context.Context, dc *cassapi.CassandraDatacenter, task *api.CassandraTask) (ctrl.Result, error) { + // If there's no "always" annotation, add "once" annotation and check that it's removed (that indicates finished) + if metav1.HasAnnotation(dc.ObjectMeta, cassapi.UpdateAllowedAnnotation) { + // No need to add anything, process is still going or it was always allowed + val := cassapi.AllowUpdateType(dc.Annotations[cassapi.UpdateAllowedAnnotation]) + if val == cassapi.AllowUpdateAlways { + // Nothing to do here, the autoupdate is set + return ctrl.Result{}, nil + } else { + // Still waiting for the refresh to happen + return ctrl.Result{RequeueAfter: JobRunningRequeue}, nil + } + } + + if r.HasCondition(task, api.DatacenterUpdated, metav1.ConditionTrue) { + // The refresh has completed, since the annotation is gone + return ctrl.Result{}, nil + } + + // Lets start the process + patch := client.MergeFrom(dc.DeepCopy()) + + metav1.SetMetaDataAnnotation(&dc.ObjectMeta, cassapi.UpdateAllowedAnnotation, string(cassapi.AllowUpdateOnce)) + + if err := r.Patch(ctx, dc, patch); err != nil { + return ctrl.Result{}, err + } + + taskPatch := client.MergeFrom(task.DeepCopy()) + if modified := SetCondition(task, api.DatacenterUpdated, metav1.ConditionTrue, "Datacenter updated to update spec once"); modified { + if err := r.Client.Status().Patch(ctx, task, taskPatch); err != nil { + return ctrl.Result{}, err + } + } + return ctrl.Result{RequeueAfter: JobRunningRequeue}, nil +} + // Common functions func isCassandraUp(pod *corev1.Pod) bool {