From 629e325a75ce5a70ec82adea4c4e60d424ad7058 Mon Sep 17 00:00:00 2001 From: Maciej Zimnoch Date: Mon, 16 Nov 2020 16:41:11 +0100 Subject: [PATCH] controller: added replace dead node feature (#48) Dead node can be replaced using special label added to member service. When label "scylla/replace" with empty string value is added to member service, Operator will remove PVC attached to dead node, delete dead pod and remove member service. New pod will be scheduled on different node, and scylla will be run with additional parameter `--replace-address-first-boot` together with IP address of deleted node. Fixes #48 --- .../scylla.scylladb.com_scyllaclusters.yaml | 5 + examples/eks/operator.yaml | 5 + examples/generic/operator.yaml | 5 + examples/gke/operator.yaml | 5 + pkg/api/v1alpha1/cluster_types.go | 7 +- pkg/api/v1alpha1/zz_generated.deepcopy.go | 7 + pkg/controllers/cluster/actions/replace.go | 217 ++++++++++++++++++ .../cluster/controller_integration_test.go | 196 ++++++++++++++-- pkg/controllers/cluster/resource/resource.go | 4 + pkg/controllers/cluster/status.go | 46 +++- pkg/controllers/cluster/sync.go | 6 + pkg/controllers/sidecar/config/config.go | 9 +- pkg/controllers/sidecar/config/config_test.go | 75 ++++++ pkg/controllers/sidecar/identity/member.go | 25 +- pkg/controllers/sidecar/sidecar_controller.go | 3 +- pkg/naming/constants.go | 3 + pkg/test/integration/statefulset.go | 135 ++++++----- 17 files changed, 666 insertions(+), 87 deletions(-) create mode 100644 pkg/controllers/cluster/actions/replace.go diff --git a/config/operator/crd/bases/scylla.scylladb.com_scyllaclusters.yaml b/config/operator/crd/bases/scylla.scylladb.com_scyllaclusters.yaml index 82f5ff7ce0..902bbcdcf5 100644 --- a/config/operator/crd/bases/scylla.scylladb.com_scyllaclusters.yaml +++ b/config/operator/crd/bases/scylla.scylladb.com_scyllaclusters.yaml @@ -1549,6 +1549,11 @@ spec: description: ReadyMembers is the number of ready members in the specific Rack format: int32 type: integer + replace_address_first_boot: + additionalProperties: + type: string + description: Pool of addresses which should be replaced by new nodes. + type: object version: description: Version is the current version of Scylla in use. type: string diff --git a/examples/eks/operator.yaml b/examples/eks/operator.yaml index eea41fcb54..f28ec0fe75 100644 --- a/examples/eks/operator.yaml +++ b/examples/eks/operator.yaml @@ -1564,6 +1564,11 @@ spec: description: ReadyMembers is the number of ready members in the specific Rack format: int32 type: integer + replace_address_first_boot: + additionalProperties: + type: string + description: Pool of addresses which should be replaced by new nodes. + type: object version: description: Version is the current version of Scylla in use. type: string diff --git a/examples/generic/operator.yaml b/examples/generic/operator.yaml index eea41fcb54..f28ec0fe75 100644 --- a/examples/generic/operator.yaml +++ b/examples/generic/operator.yaml @@ -1564,6 +1564,11 @@ spec: description: ReadyMembers is the number of ready members in the specific Rack format: int32 type: integer + replace_address_first_boot: + additionalProperties: + type: string + description: Pool of addresses which should be replaced by new nodes. + type: object version: description: Version is the current version of Scylla in use. type: string diff --git a/examples/gke/operator.yaml b/examples/gke/operator.yaml index eea41fcb54..f28ec0fe75 100644 --- a/examples/gke/operator.yaml +++ b/examples/gke/operator.yaml @@ -1564,6 +1564,11 @@ spec: description: ReadyMembers is the number of ready members in the specific Rack format: int32 type: integer + replace_address_first_boot: + additionalProperties: + type: string + description: Pool of addresses which should be replaced by new nodes. + type: object version: description: Version is the current version of Scylla in use. type: string diff --git a/pkg/api/v1alpha1/cluster_types.go b/pkg/api/v1alpha1/cluster_types.go index fbc240f65e..d62042e861 100644 --- a/pkg/api/v1alpha1/cluster_types.go +++ b/pkg/api/v1alpha1/cluster_types.go @@ -234,6 +234,8 @@ type RackStatus struct { ReadyMembers int32 `json:"readyMembers"` // Conditions are the latest available observations of a rack's state. Conditions []RackCondition `json:"conditions,omitempty"` + // Pool of addresses which should be replaced by new nodes. + ReplaceAddressFirstBoot map[string]string `json:"replace_address_first_boot,omitempty"` } // RackCondition is an observation about the state of a rack. @@ -245,8 +247,9 @@ type RackCondition struct { type RackConditionType string const ( - RackConditionTypeMemberLeaving RackConditionType = "MemberLeaving" - RackConditionTypeUpgrading RackConditionType = "RackUpgrading" + RackConditionTypeMemberLeaving RackConditionType = "MemberLeaving" + RackConditionTypeUpgrading RackConditionType = "RackUpgrading" + RackConditionTypeMemberReplacing RackConditionType = "MemberReplacing" ) // +kubebuilder:object:root=true diff --git a/pkg/api/v1alpha1/zz_generated.deepcopy.go b/pkg/api/v1alpha1/zz_generated.deepcopy.go index 21619cd6cd..550816821d 100644 --- a/pkg/api/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/api/v1alpha1/zz_generated.deepcopy.go @@ -357,6 +357,13 @@ func (in *RackStatus) DeepCopyInto(out *RackStatus) { *out = make([]RackCondition, len(*in)) copy(*out, *in) } + if in.ReplaceAddressFirstBoot != nil { + in, out := &in.ReplaceAddressFirstBoot, &out.ReplaceAddressFirstBoot + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RackStatus. diff --git a/pkg/controllers/cluster/actions/replace.go b/pkg/controllers/cluster/actions/replace.go new file mode 100644 index 0000000000..d6db5d6feb --- /dev/null +++ b/pkg/controllers/cluster/actions/replace.go @@ -0,0 +1,217 @@ +// Copyright (C) 2017 ScyllaDB + +package actions + +import ( + "context" + "fmt" + "time" + + "github.com/pkg/errors" + "github.com/scylladb/go-log" + scyllav1alpha1 "github.com/scylladb/scylla-operator/pkg/api/v1alpha1" + "github.com/scylladb/scylla-operator/pkg/controllers/cluster/util" + "github.com/scylladb/scylla-operator/pkg/naming" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/wait" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const RackReplaceNodeAction = "rack-replace-node" + +type RackReplaceNode struct { + Rack scyllav1alpha1.RackSpec + Cluster *scyllav1alpha1.ScyllaCluster + Logger log.Logger +} + +var _ Action = &RackReplaceNode{} + +// NewRackReplaceNodeAction returns action used for Scylla node replacement. +func NewRackReplaceNodeAction(r scyllav1alpha1.RackSpec, c *scyllav1alpha1.ScyllaCluster, l log.Logger) *RackReplaceNode { + return &RackReplaceNode{ + Rack: r, + Cluster: c, + Logger: l, + } +} + +// Name returns name of the action. +func (a *RackReplaceNode) Name() string { + return RackReplaceNodeAction +} + +// Execute performs replace node operation. +// This action should be executed when at least one member service contain replace label. +// It will save IP address in Cluster status, delete PVC associated with Pod bound to marked member service +// which will release node affinity. +// Then Pod and member service itself will be deleted. +// Once StatefulSet controller creates new Pod and this Pod will enter ready state +// this action will cleanup replace label from member service, and replacement IP +// will be removed from Cluster status. +func (a *RackReplaceNode) Execute(ctx context.Context, s *State) error { + a.Logger.Debug(ctx, "Replace action executed") + + r, c := a.Rack, a.Cluster + + // Find the member to decommission + memberServices := &corev1.ServiceList{} + + err := s.List(ctx, memberServices, &client.ListOptions{ + LabelSelector: naming.RackSelector(r, c), + }) + if err != nil { + return errors.Wrap(err, "failed to list Member Service") + } + + for _, member := range memberServices.Items { + if value, ok := member.Labels[naming.ReplaceLabel]; ok { + if value == "" { + a.Logger.Debug(ctx, "Member needs to be replaced", "member", member.Name) + if err := a.replaceNode(ctx, s, &member); err != nil { + return errors.WithStack(err) + } + } else { + a.Logger.Debug(ctx, "Member is being replaced", "member", member.Name) + if err := a.maybeFinishReplaceNode(ctx, s, &member); err != nil { + return errors.WithStack(err) + } + } + } + } + + return nil +} + +func (a *RackReplaceNode) maybeFinishReplaceNode(ctx context.Context, state *State, member *corev1.Service) error { + r, c, cc := a.Rack, a.Cluster, state.Client + + pod := &corev1.Pod{} + err := cc.Get(ctx, naming.NamespacedName(member.Name, member.Namespace), pod) + if err != nil { + if !apierrors.IsNotFound(err) { + return errors.Wrap(err, "get pod") + } + a.Logger.Info(ctx, "Member Pod not found", "member", member.Name) + } else { + if replaceAddr := member.Labels[naming.ReplaceLabel]; replaceAddr != "" { + a.Logger.Info(ctx, "Replace member Pod found", "member", member.Name, "replace_address", replaceAddr, "ready", podReady(pod)) + if podReady(pod) { + a.Logger.Info(ctx, "Replace member Pod ready, removing replace label", "member", member.Name, "replace_address", replaceAddr) + + old := member.DeepCopy() + delete(member.Labels, naming.ReplaceLabel) + if err := util.PatchService(ctx, old, member, state.kubeclient); err != nil { + return errors.Wrap(err, "error patching member service") + } + + a.Logger.Info(ctx, "Removing replace IP from Cluster status", "member", member.Name) + delete(c.Status.Racks[r.Name].ReplaceAddressFirstBoot, member.Name) + + state.recorder.Event(c, corev1.EventTypeNormal, naming.SuccessSynced, + fmt.Sprintf("Rack %q replaced %q node", r.Name, member.Name), + ) + } + } + } + + return nil +} + +func podReady(pod *corev1.Pod) bool { + for _, c := range pod.Status.Conditions { + if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue { + return true + } + } + return false +} + +const ( + retryInterval = time.Second + waitForPVCTimeout = 30 * time.Second +) + +func waitForPVC(ctx context.Context, cc client.Client, name, namespace string) error { + pvc := &corev1.PersistentVolumeClaim{} + return wait.PollImmediate(retryInterval, waitForPVCTimeout, func() (bool, error) { + err := cc.Get(ctx, naming.NamespacedName(name, namespace), pvc) + if err != nil { + if apierrors.IsNotFound(err) { + return false, nil + } + return false, err + } + return true, nil + }) +} + +func (a *RackReplaceNode) replaceNode(ctx context.Context, state *State, member *corev1.Service) error { + r, c := a.Rack, a.Cluster + + cc := state.Client + + // Save replace address in RackStatus + rackStatus := c.Status.Racks[r.Name] + rackStatus.ReplaceAddressFirstBoot[member.Name] = member.Spec.ClusterIP + a.Logger.Debug(ctx, "Adding member address to replace address list", "member", member.Name, "ip", member.Spec.ClusterIP, "replace_addresses", rackStatus.ReplaceAddressFirstBoot) + + // Proceed to destructive operations only when IP address is saved in cluster Status. + if err := cc.Status().Update(ctx, c); err != nil { + return errors.Wrap(err, "failed to delete pvc") + } + + // Delete PVC if it exists + pvc := &corev1.PersistentVolumeClaim{} + err := cc.Get(ctx, naming.NamespacedName(naming.PVCNameForPod(member.Name), member.Namespace), pvc) + if err != nil { + if !apierrors.IsNotFound(err) { + return errors.Wrap(err, "failed to get pvc") + } + a.Logger.Info(ctx, "Member PVC not found", "member", member.Name) + } else { + a.Logger.Info(ctx, "Deleting member PVC", "member", member.Name, "pvc", pvc.Name) + if err = cc.Delete(ctx, pvc); err != nil { + return errors.Wrap(err, "failed to delete pvc") + } + + // Wait until PVC is deleted, ignore error + a.Logger.Info(ctx, "Waiting for PVC deletion", "member", member.Name, "pvc", pvc.Name) + _ = waitForPVC(ctx, cc, naming.PVCNameForPod(member.Name), member.Namespace) + + state.recorder.Event(c, corev1.EventTypeNormal, naming.SuccessSynced, + fmt.Sprintf("Rack %q removed %q PVC", r.Name, member.Name), + ) + } + + // Delete Pod if it exists + pod := &corev1.Pod{} + err = cc.Get(ctx, naming.NamespacedName(member.Name, member.Namespace), pod) + if err != nil { + if !apierrors.IsNotFound(err) { + return errors.Wrap(err, "get pod") + } + a.Logger.Info(ctx, "Member Pod not found", "member", member.Name) + } else { + a.Logger.Info(ctx, "Deleting member Pod", "member", member.Name) + if err = cc.Delete(ctx, pod, client.GracePeriodSeconds(0)); err != nil { + return errors.Wrap(err, "delete pod") + } + state.recorder.Event(c, corev1.EventTypeNormal, naming.SuccessSynced, + fmt.Sprintf("Rack %q removed %q Pod", r.Name, member.Name), + ) + } + + // Delete member Service + a.Logger.Info(ctx, "Deleting member Service", "member", member.Name) + if err := cc.Delete(ctx, member); err != nil { + return errors.Wrap(err, "delete member service") + } + + state.recorder.Event(c, corev1.EventTypeNormal, naming.SuccessSynced, + fmt.Sprintf("Rack %q removed %q Service", r.Name, member.Name), + ) + + return nil +} diff --git a/pkg/controllers/cluster/controller_integration_test.go b/pkg/controllers/cluster/controller_integration_test.go index d5062f55f4..95c4dc609a 100644 --- a/pkg/controllers/cluster/controller_integration_test.go +++ b/pkg/controllers/cluster/controller_integration_test.go @@ -5,6 +5,7 @@ package cluster_test import ( "context" + "strings" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -18,24 +19,25 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/client" ) var _ = Describe("Cluster controller", func() { - Context("Cluster is scaled sequentially", func() { - var ( - ns *corev1.Namespace - ) + var ( + ns *corev1.Namespace + ) - BeforeEach(func() { - var err error - ns, err = testEnv.CreateNamespace(ctx, "ns") - Expect(err).To(BeNil()) - }) + BeforeEach(func() { + var err error + ns, err = testEnv.CreateNamespace(ctx, "ns") + Expect(err).To(BeNil()) + }) - AfterEach(func() { - Expect(testEnv.Delete(ctx, ns)).To(Succeed()) - }) + AfterEach(func() { + Expect(testEnv.Delete(ctx, ns)).To(Succeed()) + }) + Context("Cluster is scaled sequentially", func() { It("single rack", func() { scylla := singleRackCluster(ns) @@ -45,21 +47,187 @@ var _ = Describe("Cluster controller", func() { Expect(waitForCluster(ctx, scylla)).To(Succeed()) Expect(testEnv.Refresh(ctx, scylla)).To(Succeed()) - sst := integration.NewStatefulSetOperatorStub(GinkgoT(), testEnv, retryInterval) - sst.Start(ctx, scylla.Name, scylla.Namespace) + sst := integration.NewStatefulSetOperatorStub(testEnv) // Cluster should be scaled sequentially up to 3 for _, rack := range scylla.Spec.Datacenter.Racks { for _, replicas := range clusterScaleSteps(rack.Members) { Expect(assertRackScaled(ctx, rack, scylla, replicas)).To(Succeed()) + Expect(sst.CreatePods(ctx, scylla)).To(Succeed()) } } Expect(assertClusterStatusReflectsSpec(ctx, scylla)).To(Succeed()) }) }) + + Context("Node replace", func() { + var ( + scylla *scyllav1alpha1.ScyllaCluster + sstStub *integration.StatefulSetOperatorStub + ) + + BeforeEach(func() { + scylla = singleRackCluster(ns) + + Expect(testEnv.Create(ctx, scylla)).To(Succeed()) + Expect(waitForCluster(ctx, scylla)).To(Succeed()) + Expect(testEnv.Refresh(ctx, scylla)).To(Succeed()) + + sstStub = integration.NewStatefulSetOperatorStub(testEnv) + + // Cluster should be scaled sequentially up to member count + rack := scylla.Spec.Datacenter.Racks[0] + for _, replicas := range clusterScaleSteps(rack.Members) { + Expect(assertRackScaled(ctx, rack, scylla, replicas)).To(Succeed()) + Expect(sstStub.CreatePods(ctx, scylla)).To(Succeed()) + } + }) + + AfterEach(func() { + Expect(testEnv.Delete(ctx, scylla)).To(Succeed()) + }) + + It("replace non seed node", func() { + rack := scylla.Spec.Datacenter.Racks[0] + + services, err := nonSeedServices(ns.Namespace, rack, scylla) + Expect(err).To(BeNil()) + Expect(services).To(Not(BeEmpty())) + + serviceToReplace := services[0] + replacedServiceIP := serviceToReplace.Spec.ClusterIP + + serviceToReplace.Labels[naming.ReplaceLabel] = "" + Expect(testEnv.Update(ctx, &serviceToReplace)).To(Succeed()) + + By("Service IP should appear in ReplaceAddressFirstBoot rack status") + Eventually(func() (map[string]string, error) { + if err := testEnv.Refresh(ctx, scylla); err != nil { + return nil, err + } + + return scylla.Status.Racks[rack.Name].ReplaceAddressFirstBoot, nil + }).Should(HaveKeyWithValue(serviceToReplace.Name, replacedServiceIP)) + + By("Old Pod should be removed") + Eventually(func() (bool, error) { + if err := sstStub.SyncStatus(ctx, scylla); err != nil { + return false, err + } + + if err := testEnv.Refresh(ctx, scylla); err != nil { + return false, err + } + readyMembers := scylla.Status.Racks[rack.Name].ReadyMembers + expectedMembers := scylla.Spec.Datacenter.Racks[0].Members + return readyMembers == expectedMembers-1, nil + }).Should(BeTrue()) + + By("When new pod is scheduled") + Expect(sstStub.CreatePods(ctx, scylla)).To(Succeed()) + + By("New service should be created with replace label pointing to old one") + Eventually(func() (map[string]string, error) { + if err := testEnv.Refresh(ctx, scylla); err != nil { + return nil, err + } + + service := &corev1.Service{} + key := client.ObjectKey{ + Namespace: scylla.Namespace, + Name: serviceToReplace.Name, + } + if err := testEnv.Get(ctx, key, service); err != nil { + return nil, err + } + + return service.Labels, nil + }).Should(HaveKeyWithValue(naming.ReplaceLabel, replacedServiceIP)) + }) + + It("replace seed node", func() { + rack := scylla.Spec.Datacenter.Racks[0] + + services, err := seedServices(ns.Namespace, rack, scylla) + Expect(err).To(BeNil()) + Expect(services).To(Not(BeEmpty())) + + By("When replace label is added to seed member service") + service := services[0] + service.Labels[naming.ReplaceLabel] = "" + Expect(testEnv.Update(ctx, &service)).To(Succeed()) + + By("There should be an error event generated about replace failure") + Eventually(func() (done bool, err error) { + events := &corev1.EventList{} + err = testEnv.List(ctx, events, &client.ListOptions{ + Namespace: ns.Name, + }) + Expect(err).To(BeNil()) + + found := false + for _, e := range events.Items { + if e.Reason == naming.ErrSyncFailed && strings.Contains(e.Message, "replace") && strings.Contains(e.Message, "seed node") { + found = true + break + } + } + + return found, nil + }).Should(BeTrue()) + }) + }) }) +func rackMemberService(namespace string, rack scyllav1alpha1.RackSpec, cluster *scyllav1alpha1.ScyllaCluster) ([]corev1.Service, error) { + services := &corev1.ServiceList{} + Expect(wait.PollImmediate(retryInterval, timeout, func() (bool, error) { + err := testEnv.List(ctx, services, &client.ListOptions{ + Namespace: namespace, + LabelSelector: naming.RackSelector(rack, cluster), + }) + if err != nil { + return false, err + } + return len(services.Items) == int(rack.Members), nil + })).To(Succeed()) + + return services.Items, nil +} + +func nonSeedServices(namespace string, rack scyllav1alpha1.RackSpec, cluster *scyllav1alpha1.ScyllaCluster) ([]corev1.Service, error) { + services, err := rackMemberService(namespace, rack, cluster) + if err != nil { + return nil, err + } + + var nonSeedServices []corev1.Service + for _, s := range services { + if _, ok := s.Labels[naming.SeedLabel]; !ok { + nonSeedServices = append(nonSeedServices, s) + } + } + + return nonSeedServices, nil +} + +func seedServices(namespace string, rack scyllav1alpha1.RackSpec, cluster *scyllav1alpha1.ScyllaCluster) ([]corev1.Service, error) { + services, err := rackMemberService(namespace, rack, cluster) + if err != nil { + return nil, err + } + + var seedServices []corev1.Service + for _, s := range services { + if _, ok := s.Labels[naming.SeedLabel]; ok { + seedServices = append(seedServices, s) + } + } + + return seedServices, nil +} + func singleRackCluster(ns *corev1.Namespace) *scyllav1alpha1.ScyllaCluster { return &scyllav1alpha1.ScyllaCluster{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/controllers/cluster/resource/resource.go b/pkg/controllers/cluster/resource/resource.go index b1020963b9..34f96d6016 100644 --- a/pkg/controllers/cluster/resource/resource.go +++ b/pkg/controllers/cluster/resource/resource.go @@ -50,6 +50,10 @@ func MemberServiceForPod(pod *corev1.Pod, cluster *scyllav1alpha1.ScyllaCluster) if strings.HasSuffix(pod.Name, "-0") || strings.HasSuffix(pod.Name, "-1") { labels[naming.SeedLabel] = "" } + rackName := pod.Labels[naming.RackNameLabel] + if replaceAddr, ok := cluster.Status.Racks[rackName].ReplaceAddressFirstBoot[pod.Name]; ok && replaceAddr != "" { + labels[naming.ReplaceLabel] = replaceAddr + } return &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: pod.Name, diff --git a/pkg/controllers/cluster/status.go b/pkg/controllers/cluster/status.go index a3f437d4ef..91b8ceb1b3 100644 --- a/pkg/controllers/cluster/status.go +++ b/pkg/controllers/cluster/status.go @@ -18,13 +18,43 @@ import ( // It doesn't post the result to the API Server yet. // That will be done at the end of the sync loop. func (cc *ClusterReconciler) updateStatus(ctx context.Context, cluster *scyllav1alpha1.ScyllaCluster) error { - cluster.Status.Racks = map[string]scyllav1alpha1.RackStatus{} + if cluster.Status.Racks == nil { + cluster.Status.Racks = map[string]scyllav1alpha1.RackStatus{} + } sts := &appsv1.StatefulSet{} - for _, rack := range cluster.Spec.Datacenter.Racks { - rackStatus := scyllav1alpha1.RackStatus{} + // Find rack which are no longer specified + var unknownRacks []string + for rackName := range cluster.Status.Racks { + found := false + for _, rack := range cluster.Spec.Datacenter.Racks { + if rack.Name == rackName { + found = true + break + } + } + if !found { + unknownRacks = append(unknownRacks, rackName) + } + } + // Remove unknown rack from status + for _, rackName := range unknownRacks { + delete(cluster.Status.Racks, rackName) + } + // Update each rack status + for _, rack := range cluster.Spec.Datacenter.Racks { + rackStatus := scyllav1alpha1.RackStatus{ + ReplaceAddressFirstBoot: make(map[string]string), + } + // ReplaceAddress may keep address of non-existing service. + // Copy it from previous cluster status to persist this state. + if existingStatus, ok := cluster.Status.Racks[rack.Name]; ok { + for k, v := range existingStatus.ReplaceAddressFirstBoot { + rackStatus.ReplaceAddressFirstBoot[k] = v + } + } // Get corresponding StatefulSet from lister err := cc.Get(ctx, naming.NamespacedName(naming.StatefulSetNameForRack(rack, cluster), cluster.Namespace), sts) // If it wasn't found, continue @@ -96,6 +126,16 @@ func (cc *ClusterReconciler) updateStatus(ctx context.Context, cluster *scyllav1 return errors.New(fmt.Sprintf("only last member of each rack should be decommissioning, but %d-th member of %s found decommissioning while rack had %d members", index, rack.Name, rackStatus.Members)) } } + if replaceAddr, ok := svc.Labels[naming.ReplaceLabel]; ok { + if _, ok := svc.Labels[naming.SeedLabel]; ok { + return errors.New("seed node replace is not supported") + } + if replaceAddr == "" { + rackStatus.ReplaceAddressFirstBoot[svc.Name] = svc.Spec.ClusterIP + } + cc.Logger.Info(ctx, "Rack member is being replaced", "rack", rack.Name, "member", svc.Name) + scyllav1alpha1.SetRackCondition(&rackStatus, scyllav1alpha1.RackConditionTypeMemberReplacing) + } } // Update Status for Rack diff --git a/pkg/controllers/cluster/sync.go b/pkg/controllers/cluster/sync.go index d88aa7d748..3359620250 100644 --- a/pkg/controllers/cluster/sync.go +++ b/pkg/controllers/cluster/sync.go @@ -99,6 +99,12 @@ func (cc *ClusterReconciler) nextAction(ctx context.Context, cluster *scyllav1al // Check if there is a scale-down in progress for _, rack := range cluster.Spec.Datacenter.Racks { rackStatus := cluster.Status.Racks[rack.Name] + if scyllav1alpha1.IsRackConditionTrue(&rackStatus, scyllav1alpha1.RackConditionTypeMemberReplacing) { + // Perform node replace + logger.Info(ctx, "Next Action: Node replace rack", "name", rack.Name) + return actions.NewRackReplaceNodeAction(rack, cluster, logger.Named("replace")) + } + if scyllav1alpha1.IsRackConditionTrue(&rackStatus, scyllav1alpha1.RackConditionTypeMemberLeaving) { // Resume scale down logger.Info(ctx, "Next Action: Scale-Down rack", "name", rack.Name) diff --git a/pkg/controllers/sidecar/config/config.go b/pkg/controllers/sidecar/config/config.go index 8155ae995a..339a768ae3 100644 --- a/pkg/controllers/sidecar/config/config.go +++ b/pkg/controllers/sidecar/config/config.go @@ -3,15 +3,16 @@ package config import ( "context" "fmt" - "github.com/blang/semver" "io/ioutil" - "k8s.io/utils/pointer" "os" "os/exec" "regexp" "strconv" "strings" + "github.com/blang/semver" + "k8s.io/utils/pointer" + "github.com/ghodss/yaml" "github.com/magiconair/properties" "github.com/pkg/errors" @@ -220,6 +221,10 @@ func (s *ScyllaConfig) setupEntrypoint(ctx context.Context) (*exec.Cmd, error) { if cluster.Spec.Alternator.Enabled() { args["alternator-port"] = pointer.StringPtr(strconv.Itoa(int(cluster.Spec.Alternator.Port))) } + // If node is being replaced + if addr, ok := m.ServiceLabels[naming.ReplaceLabel]; ok { + args["replace-address-first-boot"] = pointer.StringPtr(addr) + } // See if we need to use cpu-pinning // TODO: Add more checks to make sure this is valid. // eg. parse the cpuset and check the number of cpus is the same as cpu limits diff --git a/pkg/controllers/sidecar/config/config_test.go b/pkg/controllers/sidecar/config/config_test.go index b919484765..c0c5e8efea 100644 --- a/pkg/controllers/sidecar/config/config_test.go +++ b/pkg/controllers/sidecar/config/config_test.go @@ -2,6 +2,8 @@ package config import ( "bytes" + "context" + "fmt" "io" "io/ioutil" "os" @@ -9,8 +11,19 @@ import ( "github.com/google/go-cmp/cmp" "github.com/magiconair/properties" + "github.com/scylladb/go-log" + "github.com/scylladb/scylla-operator/pkg/api/v1alpha1" + "github.com/scylladb/scylla-operator/pkg/cmd/options" "github.com/scylladb/scylla-operator/pkg/controllers/sidecar/identity" + "github.com/scylladb/scylla-operator/pkg/naming" "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kubefake "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client/fake" ) func TestCreateRackDCProperties(t *testing.T) { @@ -254,3 +267,65 @@ func TestScyllaArgumentsEmpty(t *testing.T) { require.Equal(t, "", argumentsMap["not_existing_key"]) require.Equal(t, 0, len(argumentsMap)) } + +func TestReplaceNodeLabelInMemberService(t *testing.T) { + atom := zap.NewAtomicLevelAt(zapcore.InfoLevel) + logger, _ := log.NewProduction(log.Config{ + Level: atom, + }) + if err := v1alpha1.AddToScheme(scheme.Scheme); err != nil { + t.Fatal(err) + } + + replaceAddr := "1.2.3.4" + options.GetSidecarOptions().CPU = "1" + + m := &identity.Member{ + Namespace: "namespace", + Cluster: "cluster", + ServiceLabels: map[string]string{ + naming.ReplaceLabel: replaceAddr, + }, + } + + fakeSeedService := &corev1.Service{ + ObjectMeta: v1.ObjectMeta{ + Namespace: m.Namespace, + Labels: map[string]string{ + naming.SeedLabel: "", + naming.ClusterNameLabel: m.Cluster, + }, + }, + } + fakeCluster := &v1alpha1.ScyllaCluster{ + ObjectMeta: v1.ObjectMeta{ + Name: m.Cluster, + Namespace: m.Namespace, + }, + } + clientFake := fake.NewFakeClientWithScheme(scheme.Scheme, fakeCluster) + kubeClientFake := kubefake.NewSimpleClientset(fakeSeedService) + + cfg := NewForMember(m, kubeClientFake, clientFake, logger) + + cmd, err := cfg.setupEntrypoint(context.Background()) + if err != nil { + t.Errorf("entrypoint setup, err: %s", err) + } + + expectedArg := fmt.Sprintf("--replace-address-first-boot=%s", replaceAddr) + + if !contains(cmd.Args, expectedArg) { + t.Errorf("missing Scylla parameter %s", expectedArg) + } + +} + +func contains(arr []string, v string) bool { + for _, elem := range arr { + if elem == v { + return true + } + } + return false +} diff --git a/pkg/controllers/sidecar/identity/member.go b/pkg/controllers/sidecar/identity/member.go index de845937f4..571cd9bdf3 100644 --- a/pkg/controllers/sidecar/identity/member.go +++ b/pkg/controllers/sidecar/identity/member.go @@ -22,10 +22,11 @@ type Member struct { // IP of the Pod IP string // ClusterIP of the member's Service - StaticIP string - Rack string - Datacenter string - Cluster string + StaticIP string + Rack string + Datacenter string + Cluster string + ServiceLabels map[string]string } func Retrieve(ctx context.Context, name, namespace string, kubeclient kubernetes.Interface) (*Member, error) { @@ -51,18 +52,18 @@ func Retrieve(ctx context.Context, name, namespace string, kubeclient kubernetes } return &Member{ - Name: name, - Namespace: namespace, - IP: pod.Status.PodIP, - StaticIP: memberService.Spec.ClusterIP, - Rack: pod.Labels[naming.RackNameLabel], - Datacenter: pod.Labels[naming.DatacenterNameLabel], - Cluster: pod.Labels[naming.ClusterNameLabel], + Name: name, + Namespace: namespace, + IP: pod.Status.PodIP, + StaticIP: memberService.Spec.ClusterIP, + Rack: pod.Labels[naming.RackNameLabel], + Datacenter: pod.Labels[naming.DatacenterNameLabel], + Cluster: pod.Labels[naming.ClusterNameLabel], + ServiceLabels: memberService.Labels, }, nil } func (m *Member) GetSeeds(ctx context.Context, kubeClient kubernetes.Interface) ([]string, error) { - var services *corev1.ServiceList var err error diff --git a/pkg/controllers/sidecar/sidecar_controller.go b/pkg/controllers/sidecar/sidecar_controller.go index 56c92f2f3f..cfb57eed5b 100644 --- a/pkg/controllers/sidecar/sidecar_controller.go +++ b/pkg/controllers/sidecar/sidecar_controller.go @@ -182,7 +182,8 @@ func (mc *MemberReconciler) onStartup(ctx context.Context) error { // Setup config files mc.logger.Info(ctx, "Setting up config files") - cmd, err := config.NewForMember(mc.member, mc.kubeClient, mc.Client, mc.logger).Setup(ctx) + cfg := config.NewForMember(mc.member, mc.kubeClient, mc.Client, mc.logger) + cmd, err := cfg.Setup(ctx) if err != nil { return errors.Wrap(err, "failed to setup config files") } diff --git a/pkg/naming/constants.go b/pkg/naming/constants.go index eee969bc90..666f35df6d 100644 --- a/pkg/naming/constants.go +++ b/pkg/naming/constants.go @@ -19,6 +19,9 @@ const ( // Values: {true, false} DecommissionLabel = "scylla/decommissioned" + // ReplaceLabel express the intent to replace pod under the specific member. + ReplaceLabel = "scylla/replace" + LabelValueTrue = "true" LabelValueFalse = "false" ) diff --git a/pkg/test/integration/statefulset.go b/pkg/test/integration/statefulset.go index 2dfe2fd89c..e039265650 100644 --- a/pkg/test/integration/statefulset.go +++ b/pkg/test/integration/statefulset.go @@ -5,9 +5,7 @@ package integration import ( "context" "fmt" - "time" - "github.com/onsi/ginkgo" "github.com/scylladb/go-log" scyllav1alpha1 "github.com/scylladb/scylla-operator/pkg/api/v1alpha1" "github.com/scylladb/scylla-operator/pkg/naming" @@ -15,68 +13,52 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) -type statefulSetOperatorStub struct { - t ginkgo.GinkgoTInterface - env *TestEnvironment - interval time.Duration - logger log.Logger +type StatefulSetOperatorStub struct { + env *TestEnvironment + logger log.Logger + + stopCh chan struct{} } -func NewStatefulSetOperatorStub(t ginkgo.GinkgoTInterface, env *TestEnvironment, interval time.Duration) *statefulSetOperatorStub { - return &statefulSetOperatorStub{ - t: t, - env: env, - interval: interval, - logger: env.logger.Named("sts_stub"), +func NewStatefulSetOperatorStub(env *TestEnvironment) *StatefulSetOperatorStub { + + return &StatefulSetOperatorStub{ + env: env, + logger: env.logger.Named("sts_stub"), } } -func (s *statefulSetOperatorStub) Start(ctx context.Context, name, namespace string) { - go func() { - ticker := time.NewTicker(s.interval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): +func WithPodCondition(condition corev1.PodCondition) func(pod *corev1.Pod) { + return func(pod *corev1.Pod) { + for i, pc := range pod.Status.Conditions { + if pc.Type == condition.Type { + pod.Status.Conditions[i].Status = condition.Status return - case <-ticker.C: - cluster := &scyllav1alpha1.ScyllaCluster{} - if err := s.env.Get(ctx, client.ObjectKey{ - Name: name, - Namespace: namespace, - }, cluster); err != nil { - s.t.Errorf("refresh scylla cluster obj, err: %s", err) - continue - } - - if err := s.syncStatefulSet(ctx, cluster); err != nil { - s.t.Errorf("sync statefulset, err: %s", err) - } } } - }() + pod.Status.Conditions = append(pod.Status.Conditions, condition) + } + } -func (s *statefulSetOperatorStub) syncStatefulSet(ctx context.Context, cluster *scyllav1alpha1.ScyllaCluster) error { - stss := &appsv1.StatefulSetList{} - err := s.env.List(ctx, stss, &client.ListOptions{Namespace: cluster.Namespace}) - if err != nil { - return err - } +type PodOption func(pod *corev1.Pod) - for _, sts := range stss.Items { - var rack scyllav1alpha1.RackSpec - for _, r := range cluster.Spec.Datacenter.Racks { - if sts.Name == naming.StatefulSetNameForRack(r, cluster) { - rack = r - break - } +func (s *StatefulSetOperatorStub) CreatePods(ctx context.Context, cluster *scyllav1alpha1.ScyllaCluster, options ...PodOption) error { + for _, rack := range cluster.Spec.Datacenter.Racks { + sts := &appsv1.StatefulSet{} + + err := s.env.Get(ctx, client.ObjectKey{ + Name: naming.StatefulSetNameForRack(rack, cluster), + Namespace: cluster.Namespace, + }, sts) + if err != nil { + return err } - podTemplate := corev1.Pod{ + podTemplate := &corev1.Pod{ TypeMeta: metav1.TypeMeta{ Kind: "Pod", APIVersion: "v1", @@ -100,14 +82,29 @@ func (s *statefulSetOperatorStub) syncStatefulSet(ctx context.Context, cluster * }) } - for i := sts.Status.Replicas; i < *sts.Spec.Replicas; i++ { + for _, opt := range options { + opt(podTemplate) + } + + mutateFn := func() error { + return nil + } + + for i := 0; i < int(*sts.Spec.Replicas); i++ { pod := podTemplate.DeepCopy() pod.Name = fmt.Sprintf("%s-%d", sts.Name, i) pod.Spec.Hostname = pod.Name pod.Spec.Subdomain = cluster.Name - s.logger.Info(ctx, "Spawning fake Pod", "sts", sts.Name, "pod", pod.Name) - if err := s.env.Create(ctx, pod); err != nil { + + if op, err := controllerutil.CreateOrUpdate(ctx, s.env, pod, mutateFn); err != nil { return err + } else { + switch op { + case controllerutil.OperationResultCreated: + s.logger.Info(ctx, "Spawned fake Pod", "sts", sts.Name, "pod", pod.Name) + case controllerutil.OperationResultUpdated: + s.logger.Info(ctx, "Updated fake Pod", "sts", sts.Name, "pod", pod.Name) + } } } @@ -115,7 +112,39 @@ func (s *statefulSetOperatorStub) syncStatefulSet(ctx context.Context, cluster * sts.Status.ReadyReplicas = *sts.Spec.Replicas sts.Status.ObservedGeneration = sts.Generation s.logger.Info(ctx, "Updating StatefulSet status", "replicas", sts.Status.Replicas, "observed_generation", sts.Status.ObservedGeneration) - if err := s.env.Status().Update(ctx, &sts); err != nil { + if err := s.env.Status().Update(ctx, sts); err != nil { + return err + } + } + + return nil +} + +func (s *StatefulSetOperatorStub) SyncStatus(ctx context.Context, cluster *scyllav1alpha1.ScyllaCluster) error { + for _, rack := range cluster.Spec.Datacenter.Racks { + sts := &appsv1.StatefulSet{} + + err := s.env.Get(ctx, client.ObjectKey{ + Name: naming.StatefulSetNameForRack(rack, cluster), + Namespace: cluster.Namespace, + }, sts) + if err != nil { + return err + } + + pods := &corev1.PodList{} + if err := s.env.List(ctx, pods, &client.ListOptions{ + Namespace: cluster.Namespace, + LabelSelector: naming.RackSelector(rack, cluster), + }); err != nil { + return err + } + + sts.Status.Replicas = int32(len(pods.Items)) + sts.Status.ReadyReplicas = int32(len(pods.Items)) + sts.Status.ObservedGeneration = sts.Generation + s.logger.Info(ctx, "Updating StatefulSet status", "replicas", sts.Status.Replicas, "observed_generation", sts.Status.ObservedGeneration) + if err := s.env.Status().Update(ctx, sts); err != nil { return err } }