Skip to content

Commit

Permalink
cluster: automatic replacement of orphaned nodes (#215 #114)
Browse files Browse the repository at this point in the history
When k8s node is gone, PVC might still have node affinity pointing
to lost node. In this situation, PVC is deleted by the Operator
and node replacement logic is triggered to restore cluster RF.

Fixes #215
Fixes #114
  • Loading branch information
zimnx committed Nov 20, 2020
1 parent e64d0a7 commit 2630c1c
Show file tree
Hide file tree
Showing 10 changed files with 337 additions and 23 deletions.
16 changes: 16 additions & 0 deletions config/operator/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ rules:
- create
- patch
- update
- apiGroups:
- ""
resources:
- nodes
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
Expand All @@ -23,6 +31,14 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- persistentvolumes
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
Expand Down
16 changes: 16 additions & 0 deletions examples/eks/operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1683,6 +1683,14 @@ rules:
- create
- patch
- update
- apiGroups:
- ""
resources:
- nodes
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
Expand All @@ -1692,6 +1700,14 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- persistentvolumes
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
Expand Down
16 changes: 16 additions & 0 deletions examples/generic/operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1683,6 +1683,14 @@ rules:
- create
- patch
- update
- apiGroups:
- ""
resources:
- nodes
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
Expand All @@ -1692,6 +1700,14 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- persistentvolumes
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
Expand Down
16 changes: 16 additions & 0 deletions examples/gke/operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1683,6 +1683,14 @@ rules:
- create
- patch
- update
- apiGroups:
- ""
resources:
- nodes
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
Expand All @@ -1692,6 +1700,14 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- persistentvolumes
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
Expand Down
108 changes: 89 additions & 19 deletions pkg/controllers/cluster/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
scyllav1alpha1 "github.com/scylladb/scylla-operator/pkg/api/v1alpha1"
"github.com/scylladb/scylla-operator/pkg/controllers/cluster/util"
"github.com/scylladb/scylla-operator/pkg/naming"
"github.com/scylladb/scylla-operator/pkg/util/nodeaffinity"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -36,39 +37,108 @@ func (cc *ClusterReconciler) cleanup(ctx context.Context, c *scyllav1alpha1.Scyl
LabelSelector: naming.RackSelector(r, c)},
)
if err != nil {
return errors.Wrap(err, "error listing member services")
return errors.Wrap(err, "listing member services")
}
logger.Debug(ctx, "Cleanup: service list", "items", svcList.Items)

memberCount := *sts.Spec.Replicas
memberServiceCount := int32(len(svcList.Items))
// If there are more services than members, some services need to be cleaned up
if memberServiceCount > memberCount {
// maxIndex is the maximum index that should be present in a
// member service of this rack
maxIndex := memberCount - 1
for _, svc := range svcList.Items {
svcIndex, err := naming.IndexFromName(svc.Name)
logger.Debug(ctx, "Cleanup: service list", "len", len(svcList.Items), "items", svcList.Items)

if err := cc.decommissionCleanup(ctx, c, sts, svcList); err != nil {
return errors.Wrap(err, "decommission cleanup")
}

if err := cc.orphanedCleanup(ctx, c, svcList); err != nil {
return errors.Wrap(err, "orphaned cleanup")
}
}
return nil
}

// orphanedCleanup verifies if any Pod has PVC with node affinity set to not existing node.
// This may happen when node disappears.
// StatefulSet controller won't be able to schedule new pod on next available node
// due to orphaned affinity.
// Operator checks if any Pod PVC is orphaned, and if so, mark member service as
// a replacement candidate.
func (cc *ClusterReconciler) orphanedCleanup(ctx context.Context, c *scyllav1alpha1.ScyllaCluster, svcs *corev1.ServiceList) error {
nodes := &corev1.NodeList{}
if err := cc.List(ctx, nodes); err != nil {
return errors.Wrap(err, "list nodes")
}
cc.Logger.Debug(ctx, "Found nodes", "len", len(nodes.Items))

for _, svc := range svcs.Items {
pvc := &corev1.PersistentVolumeClaim{}
if err := cc.Get(ctx, naming.NamespacedName(naming.PVCNameForPod(svc.Name), c.Namespace), pvc); err != nil {
if apierrors.IsNotFound(err) {
cc.Logger.Debug(ctx, "Pod PVC not found", "pod", svc.Name, "name", naming.PVCNameForPod(svc.Name))
continue
}
return errors.Wrap(err, "get pvc")
}

pv := &corev1.PersistentVolume{}
if err := cc.Get(ctx, naming.NamespacedName(pvc.Spec.VolumeName, ""), pv); err != nil {
if apierrors.IsNotFound(err) {
cc.Logger.Debug(ctx, "PV not found", "pv", pvc.Spec.VolumeName)
continue
}
return errors.Wrap(err, "get pv")
}

ns, err := nodeaffinity.NewNodeSelector(pv.Spec.NodeAffinity.Required)
if err != nil {
return errors.Wrap(err, "new node selector")
}

orphanedVolume := true
for _, node := range nodes.Items {
if ns.Match(&node) {
cc.Logger.Debug(ctx, "PVC attachment found", "pvc", pvc.Name, "node", node.Name)
orphanedVolume = false
break
}
}

if orphanedVolume {
cc.Logger.Info(ctx, "Found orphaned PVC, triggering replace node", "member", svc.Name)
if err := util.MarkAsReplaceCandidate(ctx, &svc, cc.KubeClient); err != nil {
return errors.Wrap(err, "mark orphaned service as replace")
}
}
}

return nil
}

func (cc *ClusterReconciler) decommissionCleanup(ctx context.Context, c *scyllav1alpha1.ScyllaCluster, sts *appsv1.StatefulSet, svcs *corev1.ServiceList) error {
memberCount := *sts.Spec.Replicas
memberServiceCount := int32(len(svcs.Items))
// If there are more services than members, some services need to be cleaned up
if memberServiceCount > memberCount {
// maxIndex is the maximum index that should be present in a
// member service of this rack
maxIndex := memberCount - 1
for _, svc := range svcs.Items {
svcIndex, err := naming.IndexFromName(svc.Name)
if err != nil {
return errors.WithStack(err)
}
if svcIndex > maxIndex && svc.Labels[naming.DecommissionLabel] == naming.LabelValueTrue {
err := cc.cleanupMemberResources(ctx, &svc, c)
if err != nil {
return errors.WithStack(err)
}
if svcIndex > maxIndex && svc.Labels[naming.DecommissionLabel] == naming.LabelValueTrue {
err := cc.cleanupMemberResources(ctx, &svc, r, c)
if err != nil {
return errors.WithStack(err)
}
}
}
}
}

return nil
}

// cleanupMemberResources deletes all resources associated with a given member.
// Currently those are :
// - A PVC
// - A ClusterIP Service
func (cc *ClusterReconciler) cleanupMemberResources(ctx context.Context, memberService *corev1.Service, r scyllav1alpha1.RackSpec, c *scyllav1alpha1.ScyllaCluster) error {
func (cc *ClusterReconciler) cleanupMemberResources(ctx context.Context, memberService *corev1.Service, c *scyllav1alpha1.ScyllaCluster) error {
memberName := memberService.Name
logger := util.LoggerForCluster(c)
logger.Info(ctx, "Cleaning up resources for member", "name", memberName)
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/cluster/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,11 @@ func New(ctx context.Context, mgr ctrl.Manager, logger log.Logger) (*ClusterReco
}, nil
}

// +kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch
// +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;delete
// +kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;list;watch;delete
// +kubebuilder:rbac:groups="",resources=persistentvolumes,verbs=get;list;watch
// +kubebuilder:rbac:groups="",resources=events,verbs=create;update;patch
// +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=scylla.scylladb.com,resources=scyllaclusters,verbs=get;list;watch;create;update;patch;delete
Expand Down
78 changes: 78 additions & 0 deletions pkg/controllers/cluster/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,48 @@ var _ = Describe("Cluster controller", func() {
})
})

It("When PVC affinity is bound to lost node, node is replaced", func() {
scylla := singleNodeCluster(ns)

Expect(testEnv.Create(ctx, scylla)).To(Succeed())
defer func() {
Expect(testEnv.Delete(ctx, scylla)).To(Succeed())
}()

Expect(waitForCluster(ctx, scylla)).To(Succeed())
Expect(testEnv.Refresh(ctx, scylla)).To(Succeed())

sstStub := integration.NewStatefulSetOperatorStub(testEnv)
rack := scylla.Spec.Datacenter.Racks[0]

pvOption := integration.WithPVNodeAffinity([]corev1.NodeSelectorRequirement{
{
Key: "some-label",
Operator: corev1.NodeSelectorOpIn,
Values: []string{"some-value"},
},
})

// Cluster should be scaled sequentially up to member count
for _, replicas := range clusterScaleSteps(rack.Members) {
Expect(assertRackScaled(ctx, rack, scylla, replicas)).To(Succeed())
Expect(sstStub.CreatePods(ctx, scylla)).To(Succeed())
Expect(sstStub.CreatePVCs(ctx, scylla, pvOption)).To(Succeed())
}

services, err := rackMemberService(ns.Namespace, rack, scylla)
Expect(err).To(BeNil())
Expect(services).To(Not(BeEmpty()))

service := services[0]

Eventually(func() map[string]string {
Expect(testEnv.Refresh(ctx, &service)).To(Succeed())

return service.Labels
}).Should(HaveKeyWithValue(naming.ReplaceLabel, ""))
})

Context("Node replace", func() {
var (
scylla *scyllav1alpha1.ScyllaCluster
Expand Down Expand Up @@ -264,6 +306,42 @@ func singleRackCluster(ns *corev1.Namespace) *scyllav1alpha1.ScyllaCluster {
}
}

func singleNodeCluster(ns *corev1.Namespace) *scyllav1alpha1.ScyllaCluster {
return &scyllav1alpha1.ScyllaCluster{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "test-",
Namespace: ns.Name,
},
Spec: scyllav1alpha1.ClusterSpec{
Version: "4.2.0",
AgentVersion: pointer.StringPtr("2.2.0"),
DeveloperMode: true,
Datacenter: scyllav1alpha1.DatacenterSpec{
Name: "dc1",
Racks: []scyllav1alpha1.RackSpec{
{
Name: "rack1",
Members: 1,
Storage: scyllav1alpha1.StorageSpec{
Capacity: "10M",
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("200M"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("200M"),
},
},
},
},
},
},
}
}

func clusterScaleSteps(desiredNodeCount int32) []int32 {
steps := make([]int32, desiredNodeCount+1)
for i := range steps {
Expand Down
14 changes: 14 additions & 0 deletions pkg/controllers/cluster/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"go.uber.org/zap/zapcore"
"k8s.io/client-go/kubernetes"

"github.com/pkg/errors"
"github.com/scylladb/go-log"
Expand Down Expand Up @@ -96,3 +97,16 @@ func NewControllerRef(c *scyllav1alpha1.ScyllaCluster) metav1.OwnerReference {
Kind: "ScyllaCluster",
})
}

// MarkAsReplaceCandidate patches member service with special label indicating
// that service must be replaced.
func MarkAsReplaceCandidate(ctx context.Context, member *corev1.Service, kubeClient kubernetes.Interface) error {
if _, ok := member.Labels[naming.ReplaceLabel]; !ok {
patched := member.DeepCopy()
patched.Labels[naming.ReplaceLabel] = ""
if err := PatchService(ctx, member, patched, kubeClient); err != nil {
return errors.Wrap(err, "patch service as replace")
}
}
return nil
}

0 comments on commit 2630c1c

Please sign in to comment.