Skip to content

Commit

Permalink
cluster: automatic replacement of orphaned nodes (#114 #215)
Browse files Browse the repository at this point in the history
When k8s node is gone, PVC might still have node affinity pointing
to lost node. In this situation, PVC is deleted by the Operator
and node replacement logic is triggered to restore cluster RF.

Fixes #114
Fixes #215
  • Loading branch information
zimnx committed Nov 23, 2020
1 parent ee6c2fe commit b5cd5f8
Show file tree
Hide file tree
Showing 12 changed files with 327 additions and 25 deletions.
Expand Up @@ -46,6 +46,9 @@ spec:
format: int32
type: integer
type: object
automaticOrphanedNodeCleanup:
description: AutomaticOrphanedNodeCleanup controls if automatic orphan node cleanup should be performed.
type: boolean
backups:
description: Backups specifies backup task in Scylla Manager. When Scylla Manager is not installed, these will be ignored.
items:
Expand Down
16 changes: 16 additions & 0 deletions config/operator/rbac/role.yaml
Expand Up @@ -14,6 +14,14 @@ rules:
- create
- patch
- update
- apiGroups:
- ""
resources:
- nodes
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
Expand All @@ -23,6 +31,14 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- persistentvolumes
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
Expand Down
11 changes: 9 additions & 2 deletions docs/source/node_operations.md
Expand Up @@ -48,7 +48,7 @@ _This procedure is for replacing one dead node. To replace more than one dead no
```bash
$ kubectl -n scylla label svc simple-cluster-us-east-1-us-east-1a-2 scylla/replace=""
```
Your failed pod should be recreated on available node
Your failed Pod should be recreated on available k8s node
```bash
$ kubectl -n scylla get pods
NAME READY STATUS RESTARTS AGE
Expand All @@ -70,4 +70,11 @@ _This procedure is for replacing one dead node. To replace more than one dead no
UN 10.43.231.189 91.03 KB 256 ? 35d0cb19-35ef-482b-92a4-b63eee4527e5 us-east-1a
UN 10.43.191.172 74.77 KB 256 ? 1ffa7a82-c41c-4706-8f5f-4d45a39c7003 us-east-1a
```
1. Run the repair on the cluster to make sure that the data is synced with the other nodes in the cluster. You can use [Scylla Manager](manager.md) to run the repair.
1. Run the repair on the cluster to make sure that the data is synced with the other nodes in the cluster. You can use [Scylla Manager](manager.md) to run the repair.

### Automatic cleanup and replacement in case when k8s node is lost

In case when your k8s cluster loses one of the nodes due to incident or explicit removal, Scylla Pods may become unschedulable due to PVC node affinity.

When `automaticOrphanedNodeCleanup` flag is enabled in your ScyllaCluster, Scylla Operator will perform automatic
node replacement of a Pod which lost his bound resources.
1 change: 1 addition & 0 deletions docs/source/scylla_cluster_crd.md
Expand Up @@ -18,6 +18,7 @@ spec:
repository: scylladb/scylla
developerMode: true
cpuset: false
automaticOrphanedNodeCleanup: true
repairs:
- name: "weekly us-east-1 repair"
intensity: 2
Expand Down
19 changes: 19 additions & 0 deletions examples/common/operator.yaml
Expand Up @@ -61,6 +61,9 @@ spec:
format: int32
type: integer
type: object
automaticOrphanedNodeCleanup:
description: AutomaticOrphanedNodeCleanup controls if automatic orphan node cleanup should be performed.
type: boolean
backups:
description: Backups specifies backup task in Scylla Manager. When Scylla Manager is not installed, these will be ignored.
items:
Expand Down Expand Up @@ -1683,6 +1686,14 @@ rules:
- create
- patch
- update
- apiGroups:
- ""
resources:
- nodes
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
Expand All @@ -1692,6 +1703,14 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- persistentvolumes
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/v1alpha1/cluster_types.go
Expand Up @@ -45,6 +45,8 @@ type ClusterSpec struct {
DeveloperMode bool `json:"developerMode,omitempty"`
// CpuSet determines if the cluster will use cpu-pinning for max performance.
CpuSet bool `json:"cpuset,omitempty"`
// AutomaticOrphanedNodeCleanup controls if automatic orphan node cleanup should be performed.
AutomaticOrphanedNodeCleanup bool `json:"automaticOrphanedNodeCleanup,omitempty"`
// Datacenter that will make up this cluster.
Datacenter DatacenterSpec `json:"datacenter"`
// User-provided image for the sidecar that replaces default.
Expand Down
111 changes: 92 additions & 19 deletions pkg/controllers/cluster/cleanup.go
Expand Up @@ -7,6 +7,7 @@ import (
scyllav1alpha1 "github.com/scylladb/scylla-operator/pkg/api/v1alpha1"
"github.com/scylladb/scylla-operator/pkg/controllers/cluster/util"
"github.com/scylladb/scylla-operator/pkg/naming"
"github.com/scylladb/scylla-operator/pkg/util/nodeaffinity"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -36,39 +37,111 @@ func (cc *ClusterReconciler) cleanup(ctx context.Context, c *scyllav1alpha1.Scyl
LabelSelector: naming.RackSelector(r, c)},
)
if err != nil {
return errors.Wrap(err, "error listing member services")
return errors.Wrap(err, "listing member services")
}
logger.Debug(ctx, "Cleanup: service list", "items", svcList.Items)

memberCount := *sts.Spec.Replicas
memberServiceCount := int32(len(svcList.Items))
// If there are more services than members, some services need to be cleaned up
if memberServiceCount > memberCount {
// maxIndex is the maximum index that should be present in a
// member service of this rack
maxIndex := memberCount - 1
for _, svc := range svcList.Items {
svcIndex, err := naming.IndexFromName(svc.Name)
logger.Debug(ctx, "Cleanup: service list", "len", len(svcList.Items), "items", svcList.Items)

if err := cc.decommissionCleanup(ctx, c, sts, svcList); err != nil {
return errors.Wrap(err, "decommission cleanup")
}

if c.Spec.AutomaticOrphanedNodeCleanup {
if err := cc.orphanedCleanup(ctx, c, svcList); err != nil {
return errors.Wrap(err, "orphaned cleanup")
}
}

}
return nil
}

// orphanedCleanup verifies if any Pod has PVC with node affinity set to not existing node.
// This may happen when node disappears.
// StatefulSet controller won't be able to schedule new pod on next available node
// due to orphaned affinity.
// Operator checks if any Pod PVC is orphaned, and if so, mark member service as
// a replacement candidate.
func (cc *ClusterReconciler) orphanedCleanup(ctx context.Context, c *scyllav1alpha1.ScyllaCluster, svcs *corev1.ServiceList) error {
nodes := &corev1.NodeList{}
if err := cc.List(ctx, nodes); err != nil {
return errors.Wrap(err, "list nodes")
}
cc.Logger.Debug(ctx, "Found nodes", "len", len(nodes.Items))

for _, svc := range svcs.Items {
pvc := &corev1.PersistentVolumeClaim{}
if err := cc.Get(ctx, naming.NamespacedName(naming.PVCNameForPod(svc.Name), c.Namespace), pvc); err != nil {
if apierrors.IsNotFound(err) {
cc.Logger.Debug(ctx, "Pod PVC not found", "pod", svc.Name, "name", naming.PVCNameForPod(svc.Name))
continue
}
return errors.Wrap(err, "get pvc")
}

pv := &corev1.PersistentVolume{}
if err := cc.Get(ctx, naming.NamespacedName(pvc.Spec.VolumeName, ""), pv); err != nil {
if apierrors.IsNotFound(err) {
cc.Logger.Debug(ctx, "PV not found", "pv", pvc.Spec.VolumeName)
continue
}
return errors.Wrap(err, "get pv")
}

ns, err := nodeaffinity.NewNodeSelector(pv.Spec.NodeAffinity.Required)
if err != nil {
return errors.Wrap(err, "new node selector")
}

orphanedVolume := true
for _, node := range nodes.Items {
if ns.Match(&node) {
cc.Logger.Debug(ctx, "PVC attachment found", "pvc", pvc.Name, "node", node.Name)
orphanedVolume = false
break
}
}

if orphanedVolume {
cc.Logger.Info(ctx, "Found orphaned PVC, triggering replace node", "member", svc.Name)
if err := util.MarkAsReplaceCandidate(ctx, &svc, cc.KubeClient); err != nil {
return errors.Wrap(err, "mark orphaned service as replace")
}
}
}

return nil
}

func (cc *ClusterReconciler) decommissionCleanup(ctx context.Context, c *scyllav1alpha1.ScyllaCluster, sts *appsv1.StatefulSet, svcs *corev1.ServiceList) error {
memberCount := *sts.Spec.Replicas
memberServiceCount := int32(len(svcs.Items))
// If there are more services than members, some services need to be cleaned up
if memberServiceCount > memberCount {
// maxIndex is the maximum index that should be present in a
// member service of this rack
maxIndex := memberCount - 1
for _, svc := range svcs.Items {
svcIndex, err := naming.IndexFromName(svc.Name)
if err != nil {
return errors.WithStack(err)
}
if svcIndex > maxIndex && svc.Labels[naming.DecommissionLabel] == naming.LabelValueTrue {
err := cc.cleanupMemberResources(ctx, &svc, c)
if err != nil {
return errors.WithStack(err)
}
if svcIndex > maxIndex && svc.Labels[naming.DecommissionLabel] == naming.LabelValueTrue {
err := cc.cleanupMemberResources(ctx, &svc, r, c)
if err != nil {
return errors.WithStack(err)
}
}
}
}
}

return nil
}

// cleanupMemberResources deletes all resources associated with a given member.
// Currently those are :
// - A PVC
// - A ClusterIP Service
func (cc *ClusterReconciler) cleanupMemberResources(ctx context.Context, memberService *corev1.Service, r scyllav1alpha1.RackSpec, c *scyllav1alpha1.ScyllaCluster) error {
func (cc *ClusterReconciler) cleanupMemberResources(ctx context.Context, memberService *corev1.Service, c *scyllav1alpha1.ScyllaCluster) error {
memberName := memberService.Name
logger := util.LoggerForCluster(c)
logger.Info(ctx, "Cleaning up resources for member", "name", memberName)
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/cluster/cluster_controller.go
Expand Up @@ -89,9 +89,11 @@ func New(ctx context.Context, mgr ctrl.Manager, logger log.Logger) (*ClusterReco
}, nil
}

// +kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch
// +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;delete
// +kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;list;watch;delete
// +kubebuilder:rbac:groups="",resources=persistentvolumes,verbs=get;list;watch
// +kubebuilder:rbac:groups="",resources=events,verbs=create;update;patch
// +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=scylla.scylladb.com,resources=scyllaclusters,verbs=get;list;watch;create;update;patch;delete
Expand Down
79 changes: 79 additions & 0 deletions pkg/controllers/cluster/controller_integration_test.go
Expand Up @@ -61,6 +61,49 @@ var _ = Describe("Cluster controller", func() {
})
})

It("When PVC affinity is bound to lost node, node is replaced", func() {
scylla := singleNodeCluster(ns)
scylla.Spec.AutomaticOrphanedNodeCleanup = true

Expect(testEnv.Create(ctx, scylla)).To(Succeed())
defer func() {
Expect(testEnv.Delete(ctx, scylla)).To(Succeed())
}()

Expect(waitForCluster(ctx, scylla)).To(Succeed())
Expect(testEnv.Refresh(ctx, scylla)).To(Succeed())

sstStub := integration.NewStatefulSetOperatorStub(testEnv)
rack := scylla.Spec.Datacenter.Racks[0]

pvOption := integration.WithPVNodeAffinity([]corev1.NodeSelectorRequirement{
{
Key: "some-label",
Operator: corev1.NodeSelectorOpIn,
Values: []string{"some-value"},
},
})

// Cluster should be scaled sequentially up to member count
for _, replicas := range clusterScaleSteps(rack.Members) {
Expect(assertRackScaled(ctx, rack, scylla, replicas)).To(Succeed())
Expect(sstStub.CreatePods(ctx, scylla)).To(Succeed())
Expect(sstStub.CreatePVCs(ctx, scylla, pvOption)).To(Succeed())
}

services, err := rackMemberService(ns.Namespace, rack, scylla)
Expect(err).To(BeNil())
Expect(services).To(Not(BeEmpty()))

service := services[0]

Eventually(func() map[string]string {
Expect(testEnv.Refresh(ctx, &service)).To(Succeed())

return service.Labels
}).Should(HaveKeyWithValue(naming.ReplaceLabel, ""))
})

Context("Node replace", func() {
var (
scylla *scyllav1alpha1.ScyllaCluster
Expand Down Expand Up @@ -264,6 +307,42 @@ func singleRackCluster(ns *corev1.Namespace) *scyllav1alpha1.ScyllaCluster {
}
}

func singleNodeCluster(ns *corev1.Namespace) *scyllav1alpha1.ScyllaCluster {
return &scyllav1alpha1.ScyllaCluster{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "test-",
Namespace: ns.Name,
},
Spec: scyllav1alpha1.ClusterSpec{
Version: "4.2.0",
AgentVersion: pointer.StringPtr("2.2.0"),
DeveloperMode: true,
Datacenter: scyllav1alpha1.DatacenterSpec{
Name: "dc1",
Racks: []scyllav1alpha1.RackSpec{
{
Name: "rack1",
Members: 1,
Storage: scyllav1alpha1.StorageSpec{
Capacity: "10M",
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("200M"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("200M"),
},
},
},
},
},
},
}
}

func clusterScaleSteps(desiredNodeCount int32) []int32 {
steps := make([]int32, desiredNodeCount+1)
for i := range steps {
Expand Down
14 changes: 14 additions & 0 deletions pkg/controllers/cluster/util/util.go
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"go.uber.org/zap/zapcore"
"k8s.io/client-go/kubernetes"

"github.com/pkg/errors"
"github.com/scylladb/go-log"
Expand Down Expand Up @@ -96,3 +97,16 @@ func NewControllerRef(c *scyllav1alpha1.ScyllaCluster) metav1.OwnerReference {
Kind: "ScyllaCluster",
})
}

// MarkAsReplaceCandidate patches member service with special label indicating
// that service must be replaced.
func MarkAsReplaceCandidate(ctx context.Context, member *corev1.Service, kubeClient kubernetes.Interface) error {
if _, ok := member.Labels[naming.ReplaceLabel]; !ok {
patched := member.DeepCopy()
patched.Labels[naming.ReplaceLabel] = ""
if err := PatchService(ctx, member, patched, kubeClient); err != nil {
return errors.Wrap(err, "patch service as replace")
}
}
return nil
}

0 comments on commit b5cd5f8

Please sign in to comment.