Skip to content

Commit

Permalink
Add a Scylla Manager integration E2E test for error propagation for i…
Browse files Browse the repository at this point in the history
…nvalid tasks
  • Loading branch information
rzetelskik committed Jun 5, 2024
1 parent 3e45e81 commit 8ec06d6
Show file tree
Hide file tree
Showing 2 changed files with 302 additions and 55 deletions.
58 changes: 3 additions & 55 deletions test/e2e/set/scyllacluster/scyllamanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,58 +188,6 @@ var _ = g.Describe("Scylla Manager integration", func() {
})
o.Expect(err).NotTo(o.HaveOccurred())

framework.By("Updating the repair task with invalid properties")
sc, err = f.ScyllaClient().ScyllaV1().ScyllaClusters(f.Namespace()).Patch(
ctx,
sc.Name,
types.JSONPatchType,
[]byte(`[{"op":"replace","path":"/spec/repairs/0/host","value":"invalid"}]`),
metav1.PatchOptions{},
)
o.Expect(err).NotTo(o.HaveOccurred())
o.Expect(sc.Spec.Repairs).To(o.HaveLen(1))
o.Expect(sc.Spec.Repairs[0].Name).To(o.Equal("repair"))
o.Expect(sc.Spec.Repairs[0].Host).NotTo(o.BeNil())
o.Expect(*sc.Spec.Repairs[0].Host).To(o.Equal("invalid"))
o.Expect(sc.Spec.Repairs[0].Parallel).To(o.Equal(int64(1)))
o.Expect(sc.Spec.Backups).To(o.BeEmpty())

framework.By("Waiting for ScyllaCluster to sync repair task error with Scylla Manager")
repairTaskFailedCond := func(cluster *scyllav1.ScyllaCluster) (bool, error) {
for _, r := range cluster.Status.Repairs {
if r.Name == sc.Spec.Repairs[0].Name {
return r.Error != nil && len(*r.Error) != 0, nil
}
}
return false, nil
}

waitCtx5, waitCtx5Cancel := utils.ContextForManagerSync(ctx, sc)
defer waitCtx5Cancel()
sc, err = controllerhelpers.WaitForScyllaClusterState(waitCtx5, f.ScyllaClient().ScyllaV1().ScyllaClusters(sc.Namespace), sc.Name, controllerhelpers.WaitForStateOptions{}, repairTaskFailedCond)
o.Expect(err).NotTo(o.HaveOccurred())

framework.By("Verifying that repair task error was propagated and properties were retained in status")
o.Expect(sc.Status.Repairs).To(o.HaveLen(1))
o.Expect(sc.Status.Repairs[0].Name).To(o.Equal(sc.Spec.Repairs[0].Name))
o.Expect(sc.Status.Repairs[0].ID).NotTo(o.BeNil())
o.Expect(*sc.Status.Repairs[0].ID).To(o.Equal(repairTask.ID))
o.Expect(sc.Status.Repairs[0].Parallel).NotTo(o.BeNil())
o.Expect(*sc.Status.Repairs[0].Parallel).To(o.Equal(sc.Spec.Repairs[0].Parallel))
o.Expect(sc.Status.Repairs[0].Error).NotTo(o.BeNil())
o.Expect(*sc.Status.Repairs[0].Error).NotTo(o.BeEmpty())
o.Expect(sc.Status.Backups).To(o.BeEmpty())

previousRepairTask := repairTask
framework.By("Verifying that repair task in manager state wasn't modified")
tasks, err = managerClient.ListTasks(ctx, *sc.Status.ManagerID, "repair", false, "", "")
o.Expect(err).NotTo(o.HaveOccurred())
o.Expect(tasks.TaskListItemSlice).To(o.HaveLen(1))
repairTask = tasks.TaskListItemSlice[0]
o.Expect(repairTask.Name).To(o.Equal(sc.Status.Repairs[0].Name))
o.Expect(repairTask.ID).To(o.Equal(*sc.Status.Repairs[0].ID))
o.Expect(repairTask.Properties).To(o.Equal(previousRepairTask.Properties))

framework.By("Deleting the repair task")
sc, err = f.ScyllaClient().ScyllaV1().ScyllaClusters(f.Namespace()).Patch(
ctx,
Expand All @@ -256,9 +204,9 @@ var _ = g.Describe("Scylla Manager integration", func() {
return len(cluster.Status.Repairs) == 0, nil
}

waitCtx6, waitCtx6Cancel := utils.ContextForManagerSync(ctx, sc)
defer waitCtx6Cancel()
sc, err = controllerhelpers.WaitForScyllaClusterState(waitCtx6, f.ScyllaClient().ScyllaV1().ScyllaClusters(sc.Namespace), sc.Name, controllerhelpers.WaitForStateOptions{}, repairTaskDeletedCond)
waitCtx5, waitCtx5Cancel := utils.ContextForManagerSync(ctx, sc)
defer waitCtx5Cancel()
sc, err = controllerhelpers.WaitForScyllaClusterState(waitCtx5, f.ScyllaClient().ScyllaV1().ScyllaClusters(sc.Namespace), sc.Name, controllerhelpers.WaitForStateOptions{}, repairTaskDeletedCond)
o.Expect(err).NotTo(o.HaveOccurred())

framework.By("Verifying that repair task deletion was synchronized")
Expand Down
299 changes: 299 additions & 0 deletions test/e2e/set/scyllacluster/scyllamanager_object_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
scyllav1 "github.com/scylladb/scylla-operator/pkg/api/scylla/v1"
"github.com/scylladb/scylla-operator/pkg/controllerhelpers"
"github.com/scylladb/scylla-operator/pkg/naming"
"github.com/scylladb/scylla-operator/pkg/pointer"
"github.com/scylladb/scylla-operator/test/e2e/framework"
"github.com/scylladb/scylla-operator/test/e2e/utils"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -438,6 +439,304 @@ var _ = g.Describe("Scylla Manager integration", framework.RequiresObjectStorage

verifyCQLData(ctx, di)
})

g.It("should discover cluster and sync errors for invalid tasks and invalid updates to existing tasks", func() {
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()

sc := f.GetDefaultScyllaCluster()
sc.Spec.Datacenter.Racks[0].Members = 1

objectStorageType := f.GetObjectStorageType()
switch objectStorageType {
case framework.ObjectStorageTypeGCS:
gcServiceAccountKey := f.GetGCSServiceAccountKey()
o.Expect(gcServiceAccountKey).NotTo(o.BeEmpty())

sc = setUpGCSCredentials(ctx, f.KubeClient().CoreV1(), sc, f.Namespace(), gcServiceAccountKey)
default:
g.Fail("unsupported object storage type")
}

validObjectStorageLocation := fmt.Sprintf("%s:%s", f.GetObjectStorageProvider(), f.GetObjectStorageBucket())

framework.By("Creating a ScyllaCluster")
sc, err := f.ScyllaClient().ScyllaV1().ScyllaClusters(f.Namespace()).Create(ctx, sc, metav1.CreateOptions{})
o.Expect(err).NotTo(o.HaveOccurred())

framework.By("Waiting for the ScyllaCluster to roll out (RV=%s)", sc.ResourceVersion)
waitCtx1, waitCtx1Cancel := utils.ContextForRollout(ctx, sc)
defer waitCtx1Cancel()
sc, err = controllerhelpers.WaitForScyllaClusterState(waitCtx1, f.ScyllaClient().ScyllaV1().ScyllaClusters(sc.Namespace), sc.Name, controllerhelpers.WaitForStateOptions{}, utils.IsScyllaClusterRolledOut)
o.Expect(err).NotTo(o.HaveOccurred())

verifyScyllaCluster(ctx, f.KubeClient(), sc)
waitForFullQuorum(ctx, f.KubeClient().CoreV1(), sc)

hosts, err := utils.GetBroadcastRPCAddresses(ctx, f.KubeClient().CoreV1(), sc)
o.Expect(err).NotTo(o.HaveOccurred())
o.Expect(hosts).To(o.HaveLen(1))
di := insertAndVerifyCQLData(ctx, hosts)
defer di.Close()

framework.By("Waiting for ScyllaCluster to register with Scylla Manager")
registeredInManagerCond := func(sc *scyllav1.ScyllaCluster) (bool, error) {
return sc.Status.ManagerID != nil, nil
}

waitCtx2, waitCtx2Cancel := utils.ContextForManagerSync(ctx, sc)
defer waitCtx2Cancel()
sc, err = controllerhelpers.WaitForScyllaClusterState(waitCtx2, f.ScyllaClient().ScyllaV1().ScyllaClusters(sc.Namespace), sc.Name, controllerhelpers.WaitForStateOptions{}, registeredInManagerCond)
o.Expect(err).NotTo(o.HaveOccurred())

framework.By("Scheduling invalid tasks for ScyllaCluster")
scCopy := sc.DeepCopy()
scCopy.Spec.Backups = append(scCopy.Spec.Backups, scyllav1.BackupTaskSpec{
TaskSpec: scyllav1.TaskSpec{
Name: "backup",
},
Location: []string{"invalid:invalid"},
})
scCopy.Spec.Repairs = append(scCopy.Spec.Repairs, scyllav1.RepairTaskSpec{
TaskSpec: scyllav1.TaskSpec{
Name: "repair",
},
Host: pointer.Ptr("invalid"),
})

patchData, err := controllerhelpers.GenerateMergePatch(sc, scCopy)
o.Expect(err).NotTo(o.HaveOccurred())

sc, err = f.ScyllaClient().ScyllaV1().ScyllaClusters(f.Namespace()).Patch(ctx, sc.Name, types.MergePatchType, patchData, metav1.PatchOptions{})
o.Expect(err).NotTo(o.HaveOccurred())
o.Expect(sc.Spec.Backups).To(o.HaveLen(1))
o.Expect(sc.Spec.Backups[0].Name).To(o.Equal("backup"))
o.Expect(sc.Spec.Backups[0].Location).To(o.Equal([]string{"invalid:invalid"}))
o.Expect(sc.Spec.Repairs).To(o.HaveLen(1))
o.Expect(sc.Spec.Repairs[0].Name).To(o.Equal("repair"))
o.Expect(sc.Spec.Repairs[0].Host).To(o.Equal(pointer.Ptr("invalid")))

framework.By("Waiting for ScyllaCluster to sync task errors with Scylla Manager")
backupTaskFailedCond := func(cluster *scyllav1.ScyllaCluster) (bool, error) {
for _, b := range cluster.Status.Backups {
if b.Name == sc.Spec.Backups[0].Name {
return b.Error != nil && len(*b.Error) != 0, nil
}
}

return false, nil
}
repairTaskFailedCond := func(cluster *scyllav1.ScyllaCluster) (bool, error) {
for _, r := range cluster.Status.Repairs {
if r.Name == sc.Spec.Repairs[0].Name {
return r.Error != nil && len(*r.Error) != 0, nil
}
}

return false, nil
}

waitCtx3, waitCtx3Cancel := utils.ContextForManagerSync(ctx, sc)
defer waitCtx3Cancel()
sc, err = controllerhelpers.WaitForScyllaClusterState(waitCtx3, f.ScyllaClient().ScyllaV1().ScyllaClusters(sc.Namespace), sc.Name, controllerhelpers.WaitForStateOptions{}, repairTaskFailedCond, backupTaskFailedCond)
o.Expect(err).NotTo(o.HaveOccurred())

framework.By("Verifying that only task names and errors were propagated to status")
o.Expect(sc.Status.Repairs).To(o.HaveLen(1))
o.Expect(sc.Status.Repairs[0].Name).To(o.Equal(sc.Spec.Repairs[0].Name))
o.Expect(sc.Status.Repairs[0].Error).NotTo(o.BeNil())
o.Expect(sc.Status.Repairs[0].Host).To(o.BeNil())
o.Expect(sc.Status.Backups).To(o.HaveLen(1))
o.Expect(sc.Status.Backups[0].Name).To(o.Equal(sc.Spec.Backups[0].Name))
o.Expect(sc.Status.Backups[0].Error).NotTo(o.BeNil())
o.Expect(sc.Status.Backups[0].Location).To(o.BeNil())

managerClient, err := utils.GetManagerClient(ctx, f.KubeAdminClient().CoreV1())
o.Expect(err).NotTo(o.HaveOccurred())

framework.By("Verifying that tasks are not in manager state")
// Sanity check.
o.Expect(sc.Status.ManagerID).NotTo(o.BeNil())

repairTasks, err := managerClient.ListTasks(ctx, *sc.Status.ManagerID, "repair", false, "", "")
o.Expect(err).NotTo(o.HaveOccurred())
o.Expect(repairTasks.TaskListItemSlice).To(o.BeEmpty())
backupTasks, err := managerClient.ListTasks(ctx, *sc.Status.ManagerID, "backup", false, "", "")
o.Expect(err).NotTo(o.HaveOccurred())
o.Expect(backupTasks.TaskListItemSlice).To(o.BeEmpty())

framework.By("Scheduling valid backup and repair tasks")
scCopy = sc.DeepCopy()
scCopy.Spec.Backups = []scyllav1.BackupTaskSpec{
{
TaskSpec: scyllav1.TaskSpec{
Name: "backup",
},
Location: []string{validObjectStorageLocation},
},
}
scCopy.Spec.Repairs = []scyllav1.RepairTaskSpec{
{
TaskSpec: scyllav1.TaskSpec{
Name: "repair",
},
},
}

patchData, err = controllerhelpers.GenerateMergePatch(sc, scCopy)
o.Expect(err).NotTo(o.HaveOccurred())

sc, err = f.ScyllaClient().ScyllaV1().ScyllaClusters(f.Namespace()).Patch(ctx, scCopy.Name, types.MergePatchType, patchData, metav1.PatchOptions{})
o.Expect(err).NotTo(o.HaveOccurred())
o.Expect(sc.Spec.Backups).To(o.HaveLen(1))
o.Expect(sc.Spec.Backups[0].Name).To(o.Equal("backup"))
o.Expect(sc.Spec.Backups[0].Location).To(o.Equal([]string{validObjectStorageLocation}))
o.Expect(sc.Spec.Repairs).To(o.HaveLen(1))
o.Expect(sc.Spec.Repairs[0].Name).To(o.Equal("repair"))
o.Expect(sc.Spec.Repairs[0].Host).To(o.BeNil())

framework.By("Waiting for ScyllaCluster to sync valid tasks with Scylla Manager")
backupTaskScheduledCond := func(cluster *scyllav1.ScyllaCluster) (bool, error) {
for _, b := range cluster.Status.Backups {
if b.Name == sc.Spec.Backups[0].Name {
if b.ID == nil || len(*b.ID) == 0 {
return false, nil
}

if b.Error != nil {
return false, nil
}

return true, nil
}
}
return false, nil
}
repairTaskScheduledCond := func(cluster *scyllav1.ScyllaCluster) (bool, error) {
for _, r := range cluster.Status.Repairs {
if r.Name == sc.Spec.Repairs[0].Name {
if r.ID == nil || len(*r.ID) == 0 {
return false, nil
}

if r.Error != nil {
return false, nil
}

return true, nil
}
}
return false, nil
}

waitCtx4, waitCtx4Cancel := utils.ContextForManagerSync(ctx, sc)
defer waitCtx4Cancel()
sc, err = controllerhelpers.WaitForScyllaClusterState(waitCtx4, f.ScyllaClient().ScyllaV1().ScyllaClusters(sc.Namespace), sc.Name, controllerhelpers.WaitForStateOptions{}, backupTaskScheduledCond, repairTaskScheduledCond)
o.Expect(err).NotTo(o.HaveOccurred())
o.Expect(sc.Status.Backups).To(o.HaveLen(1))
o.Expect(sc.Status.Backups[0].ID).NotTo(o.BeNil())
o.Expect(sc.Status.Backups[0].Name).To(o.Equal(sc.Spec.Backups[0].Name))
o.Expect(sc.Status.Backups[0].Error).To(o.BeNil())
o.Expect(sc.Status.Backups[0].Location).To(o.Equal(sc.Spec.Backups[0].Location))
o.Expect(sc.Status.Repairs).To(o.HaveLen(1))
o.Expect(sc.Status.Repairs[0].ID).NotTo(o.BeNil())
o.Expect(sc.Status.Repairs[0].Name).To(o.Equal(sc.Spec.Repairs[0].Name))
o.Expect(sc.Status.Repairs[0].Error).To(o.BeNil())
o.Expect(sc.Status.Repairs[0].Host).To(o.BeNil())

framework.By("Verifying that valid task statuses were synchronized")
// Sanity check.
o.Expect(sc.Status.ManagerID).NotTo(o.BeNil())

backupTasks, err = managerClient.ListTasks(ctx, *sc.Status.ManagerID, "backup", false, "", "")
o.Expect(err).NotTo(o.HaveOccurred())
o.Expect(backupTasks.TaskListItemSlice).To(o.HaveLen(1))
backupTask := backupTasks.TaskListItemSlice[0]
o.Expect(backupTask.Name).To(o.Equal(sc.Status.Backups[0].Name))
o.Expect(backupTask.ID).To(o.Equal(*sc.Status.Backups[0].ID))

repairTasks, err = managerClient.ListTasks(ctx, *sc.Status.ManagerID, "repair", false, "", "")
o.Expect(err).NotTo(o.HaveOccurred())
o.Expect(repairTasks.TaskListItemSlice).To(o.HaveLen(1))
repairTask := repairTasks.TaskListItemSlice[0]
o.Expect(repairTask.Name).To(o.Equal(sc.Status.Repairs[0].Name))
o.Expect(repairTask.ID).To(o.Equal(*sc.Status.Repairs[0].ID))

framework.By("Updating the tasks with invalid properties")
scCopy = sc.DeepCopy()
scCopy.Spec.Backups = []scyllav1.BackupTaskSpec{
{
TaskSpec: scyllav1.TaskSpec{
Name: "backup",
},
Location: []string{"invalid:invalid"},
},
}
scCopy.Spec.Repairs = []scyllav1.RepairTaskSpec{
{
TaskSpec: scyllav1.TaskSpec{
Name: "repair",
},
Host: pointer.Ptr("invalid"),
},
}

patchData, err = controllerhelpers.GenerateMergePatch(sc, scCopy)
o.Expect(err).NotTo(o.HaveOccurred())

sc, err = f.ScyllaClient().ScyllaV1().ScyllaClusters(f.Namespace()).Patch(ctx, sc.Name, types.MergePatchType, patchData, metav1.PatchOptions{})
o.Expect(err).NotTo(o.HaveOccurred())
o.Expect(sc.Spec.Backups).To(o.HaveLen(1))
o.Expect(sc.Spec.Backups[0].Name).To(o.Equal("backup"))
o.Expect(sc.Spec.Backups[0].Location).To(o.Equal([]string{"invalid:invalid"}))
o.Expect(sc.Spec.Repairs).To(o.HaveLen(1))
o.Expect(sc.Spec.Repairs[0].Name).To(o.Equal("repair"))
o.Expect(sc.Spec.Repairs[0].Host).To(o.Equal(pointer.Ptr("invalid")))

framework.By("Waiting for ScyllaCluster to sync task errors with Scylla Manager")
waitCtx5, waitCtx5Cancel := utils.ContextForManagerSync(ctx, sc)
defer waitCtx5Cancel()
sc, err = controllerhelpers.WaitForScyllaClusterState(waitCtx5, f.ScyllaClient().ScyllaV1().ScyllaClusters(sc.Namespace), sc.Name, controllerhelpers.WaitForStateOptions{}, backupTaskFailedCond, repairTaskFailedCond)
o.Expect(err).NotTo(o.HaveOccurred())

framework.By("Verifying that task errors were propagated and task properties retained in status")
o.Expect(sc.Status.Backups).To(o.HaveLen(1))
o.Expect(sc.Status.Backups[0].Name).To(o.Equal(sc.Spec.Backups[0].Name))
o.Expect(sc.Status.Backups[0].ID).NotTo(o.BeNil())
o.Expect(*sc.Status.Backups[0].ID).To(o.Equal(backupTask.ID))
o.Expect(sc.Status.Backups[0].Error).NotTo(o.BeNil())
o.Expect(*sc.Status.Backups[0].Error).NotTo(o.BeEmpty())
o.Expect(sc.Status.Backups[0].Location).To(o.Equal([]string{validObjectStorageLocation}))
o.Expect(sc.Status.Repairs).To(o.HaveLen(1))
o.Expect(sc.Status.Repairs[0].Name).To(o.Equal(sc.Spec.Repairs[0].Name))
o.Expect(sc.Status.Repairs[0].ID).NotTo(o.BeNil())
o.Expect(*sc.Status.Repairs[0].ID).To(o.Equal(repairTask.ID))
o.Expect(sc.Status.Repairs[0].Error).NotTo(o.BeNil())
o.Expect(*sc.Status.Repairs[0].Error).NotTo(o.BeEmpty())
o.Expect(sc.Status.Repairs[0].Host).To(o.BeNil())

framework.By("Verifying that tasks in manager state weren't modified")
// Sanity check.
o.Expect(sc.Status.ManagerID).NotTo(o.BeNil())

previousBackupTask := backupTask
backupTasks, err = managerClient.ListTasks(ctx, *sc.Status.ManagerID, "backup", false, "", "")
o.Expect(err).NotTo(o.HaveOccurred())
o.Expect(backupTasks.TaskListItemSlice).To(o.HaveLen(1))
backupTask = backupTasks.TaskListItemSlice[0]
o.Expect(backupTask.Name).To(o.Equal(sc.Status.Backups[0].Name))
o.Expect(backupTask.ID).To(o.Equal(*sc.Status.Backups[0].ID))
o.Expect(backupTask.Properties).To(o.Equal(previousBackupTask.Properties))

previousRepairTask := repairTask
repairTasks, err = managerClient.ListTasks(ctx, *sc.Status.ManagerID, "repair", false, "", "")
o.Expect(err).NotTo(o.HaveOccurred())
o.Expect(repairTasks.TaskListItemSlice).To(o.HaveLen(1))
repairTask = repairTasks.TaskListItemSlice[0]
o.Expect(repairTask.Name).To(o.Equal(sc.Status.Repairs[0].Name))
o.Expect(repairTask.ID).To(o.Equal(*sc.Status.Repairs[0].ID))
o.Expect(repairTask.Properties).To(o.Equal(previousRepairTask.Properties))
})
})

func setUpGCSCredentials(ctx context.Context, coreClient corev1client.CoreV1Interface, sc *scyllav1.ScyllaCluster, namespace string, serviceAccountKey []byte) *scyllav1.ScyllaCluster {
Expand Down

0 comments on commit 8ec06d6

Please sign in to comment.