Skip to content

Commit

Permalink
controller: added replace dead node feature (#48)
Browse files Browse the repository at this point in the history
Dead node can be replaced using special label added to member service.

When label "scylla/replace" with empty string value is added to member
service, Operator will remove PVC attached to dead node, delete dead pod
and remove member service.
New pod will be scheduled on different node, and scylla will be run with
additional parameter `--replace-address-first-boot` together with IP
address of deleted node.

Fixes #48
  • Loading branch information
zimnx committed Nov 20, 2020
1 parent 29448c6 commit 629e325
Show file tree
Hide file tree
Showing 17 changed files with 666 additions and 87 deletions.
Expand Up @@ -1549,6 +1549,11 @@ spec:
description: ReadyMembers is the number of ready members in the specific Rack
format: int32
type: integer
replace_address_first_boot:
additionalProperties:
type: string
description: Pool of addresses which should be replaced by new nodes.
type: object
version:
description: Version is the current version of Scylla in use.
type: string
Expand Down
5 changes: 5 additions & 0 deletions examples/eks/operator.yaml
Expand Up @@ -1564,6 +1564,11 @@ spec:
description: ReadyMembers is the number of ready members in the specific Rack
format: int32
type: integer
replace_address_first_boot:
additionalProperties:
type: string
description: Pool of addresses which should be replaced by new nodes.
type: object
version:
description: Version is the current version of Scylla in use.
type: string
Expand Down
5 changes: 5 additions & 0 deletions examples/generic/operator.yaml
Expand Up @@ -1564,6 +1564,11 @@ spec:
description: ReadyMembers is the number of ready members in the specific Rack
format: int32
type: integer
replace_address_first_boot:
additionalProperties:
type: string
description: Pool of addresses which should be replaced by new nodes.
type: object
version:
description: Version is the current version of Scylla in use.
type: string
Expand Down
5 changes: 5 additions & 0 deletions examples/gke/operator.yaml
Expand Up @@ -1564,6 +1564,11 @@ spec:
description: ReadyMembers is the number of ready members in the specific Rack
format: int32
type: integer
replace_address_first_boot:
additionalProperties:
type: string
description: Pool of addresses which should be replaced by new nodes.
type: object
version:
description: Version is the current version of Scylla in use.
type: string
Expand Down
7 changes: 5 additions & 2 deletions pkg/api/v1alpha1/cluster_types.go
Expand Up @@ -234,6 +234,8 @@ type RackStatus struct {
ReadyMembers int32 `json:"readyMembers"`
// Conditions are the latest available observations of a rack's state.
Conditions []RackCondition `json:"conditions,omitempty"`
// Pool of addresses which should be replaced by new nodes.
ReplaceAddressFirstBoot map[string]string `json:"replace_address_first_boot,omitempty"`
}

// RackCondition is an observation about the state of a rack.
Expand All @@ -245,8 +247,9 @@ type RackCondition struct {
type RackConditionType string

const (
RackConditionTypeMemberLeaving RackConditionType = "MemberLeaving"
RackConditionTypeUpgrading RackConditionType = "RackUpgrading"
RackConditionTypeMemberLeaving RackConditionType = "MemberLeaving"
RackConditionTypeUpgrading RackConditionType = "RackUpgrading"
RackConditionTypeMemberReplacing RackConditionType = "MemberReplacing"
)

// +kubebuilder:object:root=true
Expand Down
7 changes: 7 additions & 0 deletions pkg/api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

217 changes: 217 additions & 0 deletions pkg/controllers/cluster/actions/replace.go
@@ -0,0 +1,217 @@
// Copyright (C) 2017 ScyllaDB

package actions

import (
"context"
"fmt"
"time"

"github.com/pkg/errors"
"github.com/scylladb/go-log"
scyllav1alpha1 "github.com/scylladb/scylla-operator/pkg/api/v1alpha1"
"github.com/scylladb/scylla-operator/pkg/controllers/cluster/util"
"github.com/scylladb/scylla-operator/pkg/naming"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const RackReplaceNodeAction = "rack-replace-node"

type RackReplaceNode struct {
Rack scyllav1alpha1.RackSpec
Cluster *scyllav1alpha1.ScyllaCluster
Logger log.Logger
}

var _ Action = &RackReplaceNode{}

// NewRackReplaceNodeAction returns action used for Scylla node replacement.
func NewRackReplaceNodeAction(r scyllav1alpha1.RackSpec, c *scyllav1alpha1.ScyllaCluster, l log.Logger) *RackReplaceNode {
return &RackReplaceNode{
Rack: r,
Cluster: c,
Logger: l,
}
}

// Name returns name of the action.
func (a *RackReplaceNode) Name() string {
return RackReplaceNodeAction
}

// Execute performs replace node operation.
// This action should be executed when at least one member service contain replace label.
// It will save IP address in Cluster status, delete PVC associated with Pod bound to marked member service
// which will release node affinity.
// Then Pod and member service itself will be deleted.
// Once StatefulSet controller creates new Pod and this Pod will enter ready state
// this action will cleanup replace label from member service, and replacement IP
// will be removed from Cluster status.
func (a *RackReplaceNode) Execute(ctx context.Context, s *State) error {
a.Logger.Debug(ctx, "Replace action executed")

r, c := a.Rack, a.Cluster

// Find the member to decommission
memberServices := &corev1.ServiceList{}

err := s.List(ctx, memberServices, &client.ListOptions{
LabelSelector: naming.RackSelector(r, c),
})
if err != nil {
return errors.Wrap(err, "failed to list Member Service")
}

for _, member := range memberServices.Items {
if value, ok := member.Labels[naming.ReplaceLabel]; ok {
if value == "" {
a.Logger.Debug(ctx, "Member needs to be replaced", "member", member.Name)
if err := a.replaceNode(ctx, s, &member); err != nil {
return errors.WithStack(err)
}
} else {
a.Logger.Debug(ctx, "Member is being replaced", "member", member.Name)
if err := a.maybeFinishReplaceNode(ctx, s, &member); err != nil {
return errors.WithStack(err)
}
}
}
}

return nil
}

func (a *RackReplaceNode) maybeFinishReplaceNode(ctx context.Context, state *State, member *corev1.Service) error {
r, c, cc := a.Rack, a.Cluster, state.Client

pod := &corev1.Pod{}
err := cc.Get(ctx, naming.NamespacedName(member.Name, member.Namespace), pod)
if err != nil {
if !apierrors.IsNotFound(err) {
return errors.Wrap(err, "get pod")
}
a.Logger.Info(ctx, "Member Pod not found", "member", member.Name)
} else {
if replaceAddr := member.Labels[naming.ReplaceLabel]; replaceAddr != "" {
a.Logger.Info(ctx, "Replace member Pod found", "member", member.Name, "replace_address", replaceAddr, "ready", podReady(pod))
if podReady(pod) {
a.Logger.Info(ctx, "Replace member Pod ready, removing replace label", "member", member.Name, "replace_address", replaceAddr)

old := member.DeepCopy()
delete(member.Labels, naming.ReplaceLabel)
if err := util.PatchService(ctx, old, member, state.kubeclient); err != nil {
return errors.Wrap(err, "error patching member service")
}

a.Logger.Info(ctx, "Removing replace IP from Cluster status", "member", member.Name)
delete(c.Status.Racks[r.Name].ReplaceAddressFirstBoot, member.Name)

state.recorder.Event(c, corev1.EventTypeNormal, naming.SuccessSynced,
fmt.Sprintf("Rack %q replaced %q node", r.Name, member.Name),
)
}
}
}

return nil
}

func podReady(pod *corev1.Pod) bool {
for _, c := range pod.Status.Conditions {
if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue {
return true
}
}
return false
}

const (
retryInterval = time.Second
waitForPVCTimeout = 30 * time.Second
)

func waitForPVC(ctx context.Context, cc client.Client, name, namespace string) error {
pvc := &corev1.PersistentVolumeClaim{}
return wait.PollImmediate(retryInterval, waitForPVCTimeout, func() (bool, error) {
err := cc.Get(ctx, naming.NamespacedName(name, namespace), pvc)
if err != nil {
if apierrors.IsNotFound(err) {
return false, nil
}
return false, err
}
return true, nil
})
}

func (a *RackReplaceNode) replaceNode(ctx context.Context, state *State, member *corev1.Service) error {
r, c := a.Rack, a.Cluster

cc := state.Client

// Save replace address in RackStatus
rackStatus := c.Status.Racks[r.Name]
rackStatus.ReplaceAddressFirstBoot[member.Name] = member.Spec.ClusterIP
a.Logger.Debug(ctx, "Adding member address to replace address list", "member", member.Name, "ip", member.Spec.ClusterIP, "replace_addresses", rackStatus.ReplaceAddressFirstBoot)

// Proceed to destructive operations only when IP address is saved in cluster Status.
if err := cc.Status().Update(ctx, c); err != nil {
return errors.Wrap(err, "failed to delete pvc")
}

// Delete PVC if it exists
pvc := &corev1.PersistentVolumeClaim{}
err := cc.Get(ctx, naming.NamespacedName(naming.PVCNameForPod(member.Name), member.Namespace), pvc)
if err != nil {
if !apierrors.IsNotFound(err) {
return errors.Wrap(err, "failed to get pvc")
}
a.Logger.Info(ctx, "Member PVC not found", "member", member.Name)
} else {
a.Logger.Info(ctx, "Deleting member PVC", "member", member.Name, "pvc", pvc.Name)
if err = cc.Delete(ctx, pvc); err != nil {
return errors.Wrap(err, "failed to delete pvc")
}

// Wait until PVC is deleted, ignore error
a.Logger.Info(ctx, "Waiting for PVC deletion", "member", member.Name, "pvc", pvc.Name)
_ = waitForPVC(ctx, cc, naming.PVCNameForPod(member.Name), member.Namespace)

state.recorder.Event(c, corev1.EventTypeNormal, naming.SuccessSynced,
fmt.Sprintf("Rack %q removed %q PVC", r.Name, member.Name),
)
}

// Delete Pod if it exists
pod := &corev1.Pod{}
err = cc.Get(ctx, naming.NamespacedName(member.Name, member.Namespace), pod)
if err != nil {
if !apierrors.IsNotFound(err) {
return errors.Wrap(err, "get pod")
}
a.Logger.Info(ctx, "Member Pod not found", "member", member.Name)
} else {
a.Logger.Info(ctx, "Deleting member Pod", "member", member.Name)
if err = cc.Delete(ctx, pod, client.GracePeriodSeconds(0)); err != nil {
return errors.Wrap(err, "delete pod")
}
state.recorder.Event(c, corev1.EventTypeNormal, naming.SuccessSynced,
fmt.Sprintf("Rack %q removed %q Pod", r.Name, member.Name),
)
}

// Delete member Service
a.Logger.Info(ctx, "Deleting member Service", "member", member.Name)
if err := cc.Delete(ctx, member); err != nil {
return errors.Wrap(err, "delete member service")
}

state.recorder.Event(c, corev1.EventTypeNormal, naming.SuccessSynced,
fmt.Sprintf("Rack %q removed %q Service", r.Name, member.Name),
)

return nil
}

0 comments on commit 629e325

Please sign in to comment.