Skip to content

Commit

Permalink
upgrade: update rack sequentially (#340)
Browse files Browse the repository at this point in the history
Wait until all racks are ready before upgrading next rack
during patch upgrade.

Fixes #340
  • Loading branch information
zimnx committed Jan 27, 2021
1 parent 00ff27a commit 666a29c
Show file tree
Hide file tree
Showing 4 changed files with 297 additions and 83 deletions.
42 changes: 39 additions & 3 deletions pkg/controllers/cluster/actions/upgrade_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,46 @@ func (a *ClusterVersionUpgrade) genericUpgrade(ctx context.Context) error {

func (a *ClusterVersionUpgrade) patchUpgrade(ctx context.Context) error {
c := a.Cluster

for _, r := range c.Spec.Datacenter.Racks {
a.logger.Debug(ctx, fmt.Sprintf("Rack: %s, Rack Members: %d, Spec members: %d\n", r.Name, r.Members, c.Status.Racks[r.Name].Members))
sts := &appsv1.StatefulSet{}
if err := a.cc.Get(ctx, naming.NamespacedName(naming.StatefulSetNameForRack(r, c), c.Namespace), sts); err != nil {
return errors.Wrap(err, "get statefulset")
}

scyllaVersion, err := naming.ScyllaImage(sts.Spec.Template.Spec.Containers)
if err != nil {
return errors.Wrap(err, "get scylla container version")
}

if c.Spec.Version == scyllaVersion {
pods := &corev1.PodList{}
if err := a.cc.List(ctx, pods, &client.ListOptions{
LabelSelector: naming.RackSelector(r, a.Cluster),
}); err != nil {
return errors.Wrap(err, "get pods")
}

for _, p := range pods.Items {
scyllaVersion, err := naming.ScyllaImage(p.Spec.Containers)
if err != nil {
return errors.Wrap(err, "get scylla container version")
}

if c.Spec.Version != scyllaVersion || !podReady(&p) {
a.logger.Info(ctx, "Waiting until rack is updated", "rack", r.Name, "pod", p.Name, "pod_version", scyllaVersion, "spec_version", c.Spec.Version)
return nil
}
}
}
}

for _, r := range c.Spec.Datacenter.Racks {
a.logger.Info(ctx, "Checking if rack needs to be upgraded", "rack", r.Name, "rack_version", c.Status.Racks[r.Name].Version, "spec_version", c.Spec.Version)
if c.Status.Racks[r.Name].Version != c.Spec.Version {
sts := &appsv1.StatefulSet{}
if err := a.cc.Get(ctx, naming.NamespacedName(naming.StatefulSetNameForRack(r, c), c.Namespace), sts); err != nil {
return errors.Wrap(err, "failed to get statefulset")
return errors.Wrap(err, "get statefulset")
}

scyllaVersion, err := naming.ScyllaImage(sts.Spec.Template.Spec.Containers)
Expand All @@ -200,10 +234,12 @@ func (a *ClusterVersionUpgrade) patchUpgrade(ctx context.Context) error {
}

if c.Spec.Version != scyllaVersion {
a.logger.Info(ctx, "Upgrading rack", "rack", r.Name, "rack_version", scyllaVersion, "spec_version", c.Spec.Version)
image := resource.ImageForCluster(c)
if err := util.UpgradeStatefulSetScyllaImage(ctx, sts, image, a.kubeClient); err != nil {
return errors.Wrap(err, "failed to upgrade statefulset")
return errors.Wrap(err, "upgrade Scylla image in statefulset")
}

// Record event for successful version upgrade
a.recorder.Event(c, corev1.EventTypeNormal, naming.SuccessSynced, fmt.Sprintf("Rack %s upgraded up to version %s", r.Name, c.Spec.Version))
}
Expand Down
130 changes: 104 additions & 26 deletions pkg/controllers/cluster/actions/upgrade_version_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,69 @@ var _ = Describe("Cluster controller", func() {
Expect(testEnv.Delete(ctx, ns)).To(Succeed())
})

It("Multi rack patch upgrade is sequential over racks", func() {
const (
preUpdateVersion = "4.2.0"
postUpdateVersion = "4.2.1"
)
scylla := testEnv.MultiRackCluster(ns, 1, 1)

Expect(testEnv.Create(ctx, scylla)).To(Succeed())
Expect(testEnv.WaitForCluster(ctx, scylla)).To(Succeed())
Expect(testEnv.Refresh(ctx, scylla)).To(Succeed())

sstStub := integration.NewStatefulSetOperatorStub(testEnv)

// Cluster should be scaled sequentially up to member count
for _, rack := range scylla.Spec.Datacenter.Racks {
for _, replicas := range testEnv.ClusterScaleSteps(rack.Members) {
Expect(sstStub.CreatePods(ctx, scylla)).To(Succeed())
Expect(testEnv.AssertRackScaled(ctx, rack, scylla, replicas)).To(Succeed())
Expect(sstStub.CreatePods(ctx, scylla)).To(Succeed())
}
}

By("When: patch version upgrade is requested")
Expect(testEnv.Refresh(ctx, scylla)).To(Succeed())
scylla.Spec.Version = postUpdateVersion
Expect(testEnv.Update(ctx, scylla)).To(Succeed())

firstRack := scylla.Spec.Datacenter.Racks[0]
secondRack := scylla.Spec.Datacenter.Racks[1]

By("Then: image is updated in first rack")
Eventually(func() string {
ver, err := scyllaImageInRackStatefulSet(ctx, firstRack, scylla)
Expect(err).ToNot(HaveOccurred())

return ver
}).Should(Equal(postUpdateVersion))

By("Then: Scylla image stays the same in second rack")
Eventually(func() string {
ver, err := scyllaImageInRackStatefulSet(ctx, secondRack, scylla)
Expect(err).ToNot(HaveOccurred())

return ver
}).Should(Equal(preUpdateVersion))

By("When: first rack pods enters ready state")

Expect(sstStub.SyncPods(ctx, firstRack, scylla)).To(Succeed())
Expect(sstStub.SyncStatus(ctx, scylla)).To(Succeed())
pods := &corev1.PodList{}
Expect(testEnv.List(ctx, pods, &client.ListOptions{LabelSelector: naming.RackSelector(firstRack, scylla)})).To(Succeed())
Expect(markPodReady(pods, 0))

By("Then: Scylla image is updated in second rack")
Eventually(func() string {
ver, err := scyllaImageInRackStatefulSet(ctx, secondRack, scylla)
Expect(err).ToNot(HaveOccurred())

return ver
}).Should(Equal(postUpdateVersion))
})

Context("Cluster upgrade", func() {
var (
scylla *scyllav1.ScyllaCluster
Expand Down Expand Up @@ -149,13 +212,7 @@ var _ = Describe("Cluster controller", func() {
By("Then: Scylla image is upgraded")
rack := scylla.Spec.Datacenter.Racks[0]
Eventually(func() string {
sts, err := testEnv.StatefulSetOfRack(ctx, rack, scylla)
Expect(err).ToNot(HaveOccurred())

idx, err := naming.FindScyllaContainer(sts.Spec.Template.Spec.Containers)
Expect(err).ToNot(HaveOccurred())

ver, err := naming.ImageToVersion(sts.Spec.Template.Spec.Containers[idx].Image)
ver, err := scyllaImageInRackStatefulSet(ctx, rack, scylla)
Expect(err).ToNot(HaveOccurred())

return ver
Expand Down Expand Up @@ -219,25 +276,7 @@ var _ = Describe("Cluster controller", func() {
scyllaFake.SetOperationalMode(scyllaclient.OperationalModeNormal)

By("When: node pod is ready")
for _, p := range podList.Items {
if strings.HasSuffix(p.Name, fmt.Sprintf("%d", nodeUnderUpgradeIdx)) {
found := false
for i, c := range p.Status.Conditions {
if c.Type == corev1.PodReady {
p.Status.Conditions[i].Status = corev1.ConditionTrue
found = true
}
}
if !found {
p.Status.Conditions = append(p.Status.Conditions, corev1.PodCondition{
Type: corev1.PodReady,
Status: corev1.ConditionTrue,
})
}

Expect(testEnv.Status().Update(ctx, &p)).To(Succeed())
}
}
Expect(markPodReady(podList, nodeUnderUpgradeIdx)).To(Succeed())

By("Then: data snapshot is removed")
Eventually(scyllaFake.KeyspaceSnapshots, shortWait).Should(ConsistOf(systemKeyspaces))
Expand All @@ -256,6 +295,45 @@ var _ = Describe("Cluster controller", func() {

})

func scyllaImageInRackStatefulSet(ctx context.Context, rack scyllav1.RackSpec, cluster *scyllav1.ScyllaCluster) (string, error) {
sts, err := testEnv.StatefulSetOfRack(ctx, rack, cluster)
if err != nil {
return "", err
}

ver, err := naming.ScyllaImage(sts.Spec.Template.Spec.Containers)
if err != nil {
return "", err
}

return ver, nil

}

func markPodReady(pods *corev1.PodList, idx int) error {
for _, p := range pods.Items {
if strings.HasSuffix(p.Name, fmt.Sprintf("%d", idx)) {
found := false
for i, c := range p.Status.Conditions {
if c.Type == corev1.PodReady {
p.Status.Conditions[i].Status = corev1.ConditionTrue
found = true
}
}
if !found {
p.Status.Conditions = append(p.Status.Conditions, corev1.PodCondition{
Type: corev1.PodReady,
Status: corev1.ConditionTrue,
})
}

return testEnv.Status().Update(ctx, &p)
}
}

return nil
}

type cqlSessionStub struct {
}

Expand Down

0 comments on commit 666a29c

Please sign in to comment.