Skip to content

Commit

Permalink
Add new task refresh, which ensures CassandraDatacenter is up to date
Browse files Browse the repository at this point in the history
  • Loading branch information
burmanm committed Apr 15, 2024
1 parent 5de8d7f commit 2783323
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 3 deletions.
3 changes: 3 additions & 0 deletions apis/control/v1alpha1/cassandratask_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ const (
CommandMove CassandraCommand = "move"
CommandGarbageCollect CassandraCommand = "garbagecollect"
CommandFlush CassandraCommand = "flush"
CommandRefresh CassandraCommand = "refresh"
)

type CassandraJob struct {
Expand Down Expand Up @@ -167,6 +168,8 @@ const (
JobFailed JobConditionType = "Failed"
// JobRunning means the job is currently executing
JobRunning JobConditionType = "Running"
// DatacenterUpdated
DatacenterUpdated JobConditionType = "DatacenterUpdated"
)

//+kubebuilder:object:root=true
Expand Down
12 changes: 10 additions & 2 deletions internal/controllers/control/cassandratask_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,12 +308,20 @@ JobDefinition:
flush(taskConfig)
case api.CommandGarbageCollect:
gc(taskConfig)
case api.CommandRefresh:
// This targets the Datacenter only
res, err = r.refreshDatacenter(ctx, dc, &cassTask)
if err != nil {
return ctrl.Result{}, err
}
completed = taskConfig.Completed
break JobDefinition
default:
err = fmt.Errorf("unknown job command: %s", job.Command)
return ctrl.Result{}, err
}

if !r.HasCondition(cassTask, api.JobRunning, metav1.ConditionTrue) {
if !r.HasCondition(&cassTask, api.JobRunning, metav1.ConditionTrue) {
valid, errValidate := taskConfig.Validate()
if errValidate != nil && valid {
// Retry, this is a transient error
Expand Down Expand Up @@ -399,7 +407,7 @@ func (r *CassandraTaskReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

func (r *CassandraTaskReconciler) HasCondition(task api.CassandraTask, condition api.JobConditionType, status metav1.ConditionStatus) bool {
func (r *CassandraTaskReconciler) HasCondition(task *api.CassandraTask, condition api.JobConditionType, status metav1.ConditionStatus) bool {
for _, cond := range task.Status.Conditions {
if cond.Type == string(condition) {
return cond.Status == status
Expand Down
46 changes: 45 additions & 1 deletion internal/controllers/control/cassandratask_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/k8ssandra/cass-operator/pkg/httphelper"

cassapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1"
cassdcapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1"
api "github.com/k8ssandra/cass-operator/apis/control/v1alpha1"
. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -683,7 +684,6 @@ var _ = Describe("CassandraTask controller tests", func() {

AfterEach(func() {
deleteDatacenter(testNamespaceName)
// Expect(k8sClient.Delete(context.TODO(), testDc)).Should(Succeed())
})

Context("Restart", func() {
Expand Down Expand Up @@ -781,4 +781,48 @@ var _ = Describe("CassandraTask controller tests", func() {
})
})
})
Describe("Execute jobs against Datacenters", func() {
var testNamespaceName string
BeforeEach(func() {
testNamespaceName = fmt.Sprintf("test-task-%d", rand.Int31())
By("create datacenter", createDatacenter(testDatacenterName, testNamespaceName))
})

AfterEach(func() {
deleteDatacenter(testNamespaceName)
})

Context("Refresh", func() {
It("Adds an annotation if CassandraDatacenter does not have one and waits for completion", func() {
taskKey, task := buildTask(api.CommandRefresh, testNamespaceName)
Expect(k8sClient.Create(context.Background(), task)).Should(Succeed())

dc := &cassdcapi.CassandraDatacenter{}
Eventually(func() bool {
if err := k8sClient.Get(context.TODO(), types.NamespacedName{Name: testDatacenterName, Namespace: testNamespaceName}, dc); err != nil {
return false
}
if metav1.HasAnnotation(dc.ObjectMeta, cassapi.UpdateAllowedAnnotation) {
return dc.Annotations[cassapi.UpdateAllowedAnnotation] == "once"
}
return false
}, "5s", "50ms").Should(BeTrue())

delete(dc.Annotations, cassapi.UpdateAllowedAnnotation)
Expect(k8sClient.Update(context.Background(), dc)).Should(Succeed())

_ = waitForTaskCompletion(taskKey)
})
It("Completes if autoupdate-spec is always allowed", func() {
dc := &cassdcapi.CassandraDatacenter{}
Expect(k8sClient.Get(context.TODO(), types.NamespacedName{Name: testDatacenterName, Namespace: testNamespaceName}, dc)).To(Succeed())
metav1.SetMetaDataAnnotation(&dc.ObjectMeta, cassapi.UpdateAllowedAnnotation, string(cassapi.AllowUpdateAlways))
Expect(k8sClient.Update(context.Background(), dc)).Should(Succeed())

taskKey, task := buildTask(api.CommandRefresh, testNamespaceName)
Expect(k8sClient.Create(context.Background(), task)).Should(Succeed())
_ = waitForTaskCompletion(taskKey)
})
})
})
})
40 changes: 40 additions & 0 deletions internal/controllers/control/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

cassapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1"
api "github.com/k8ssandra/cass-operator/apis/control/v1alpha1"
Expand Down Expand Up @@ -401,6 +402,45 @@ func compact(taskConfig *TaskConfiguration) {
taskConfig.AsyncFunc = compactAsync
}

// Refresh CassandraDatacenter

func (r *CassandraTaskReconciler) refreshDatacenter(ctx context.Context, dc *cassapi.CassandraDatacenter, task *api.CassandraTask) (ctrl.Result, error) {
// If there's no "always" annotation, add "once" annotation and check that it's removed (that indicates finished)
if metav1.HasAnnotation(dc.ObjectMeta, cassapi.UpdateAllowedAnnotation) {
// No need to add anything, process is still going or it was always allowed
val := cassapi.AllowUpdateType(dc.Annotations[cassapi.UpdateAllowedAnnotation])
if val == cassapi.AllowUpdateAlways {
// Nothing to do here, the autoupdate is set
return ctrl.Result{}, nil
} else {
// Still waiting for the refresh to happen
return ctrl.Result{RequeueAfter: JobRunningRequeue}, nil
}
}

if r.HasCondition(task, api.DatacenterUpdated, metav1.ConditionTrue) {
// The refresh has completed, since the annotation is gone
return ctrl.Result{}, nil
}

// Lets start the process
patch := client.MergeFrom(dc.DeepCopy())

metav1.SetMetaDataAnnotation(&dc.ObjectMeta, cassapi.UpdateAllowedAnnotation, string(cassapi.AllowUpdateOnce))

if err := r.Patch(ctx, dc, patch); err != nil {
return ctrl.Result{}, err
}

taskPatch := client.MergeFrom(task.DeepCopy())
if modified := SetCondition(task, api.DatacenterUpdated, metav1.ConditionTrue, "Datacenter updated to update spec once"); modified {
if err := r.Client.Status().Patch(ctx, task, taskPatch); err != nil {
return ctrl.Result{}, err
}
}
return ctrl.Result{RequeueAfter: JobRunningRequeue}, nil
}

// Common functions

func isCassandraUp(pod *corev1.Pod) bool {
Expand Down

0 comments on commit 2783323

Please sign in to comment.