Skip to content

Commit

Permalink
Run node cleanup after scaling the ScyllaCluster
Browse files Browse the repository at this point in the history
Running node cleanup after scaling a cluster allows to avoid the
accumulation of unnecessary data on the node disks. When nodes are added
or removed from the cluster, they gain or lose some tokens, which can
result in files stored on the node disks still containing data
associated with lost tokens. Over time, this can lead to a build-up of
unnecessary data and cause disk space issues. By running node cleanup
after scaling, these files can be cleared, freeing up disk space.

Scylla Operator was extended with controllers responsible for executing
a cleanup on nodes after horizontally scaling.

Fixes #1207
  • Loading branch information
zimnx committed Jul 31, 2023
1 parent 62b6119 commit f0cdcd8
Show file tree
Hide file tree
Showing 25 changed files with 1,305 additions and 46 deletions.
154 changes: 154 additions & 0 deletions pkg/cmd/operator/cleanupjob.go
@@ -0,0 +1,154 @@
// Copyright (c) 2023 ScyllaDB.

package operator

import (
"context"
"fmt"
"os"
"time"

"github.com/scylladb/scylla-operator/pkg/controllerhelpers"
"github.com/scylladb/scylla-operator/pkg/genericclioptions"
"github.com/scylladb/scylla-operator/pkg/helpers"
"github.com/scylladb/scylla-operator/pkg/scyllaclient"
"github.com/scylladb/scylla-operator/pkg/signals"
"github.com/scylladb/scylla-operator/pkg/version"
"github.com/spf13/cobra"
apierrors "k8s.io/apimachinery/pkg/util/errors"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/klog/v2"
)

type CleanupJobOptions struct {
ManagerAuthConfigPath string
NodeAddress string

scyllaClient *scyllaclient.Client
}

func NewCleanupJobOptions(streams genericclioptions.IOStreams) *CleanupJobOptions {
return &CleanupJobOptions{}
}

func NewCleanupJobCmd(streams genericclioptions.IOStreams) *cobra.Command {
o := NewCleanupJobOptions(streams)

cmd := &cobra.Command{
Use: "cleanup-job",
Short: "Runs a cleanup procedure against a node.",
Long: "Runs a cleanup procedure against a node.",
RunE: func(cmd *cobra.Command, args []string) error {
err := o.Validate()
if err != nil {
return err
}

err = o.Complete()
if err != nil {
return err
}

err = o.Run(streams, cmd)
if err != nil {
return err
}

return nil
},

SilenceErrors: true,
SilenceUsage: true,
}

cmd.Flags().StringVarP(&o.ManagerAuthConfigPath, "manager-auth-config-path", "", o.ManagerAuthConfigPath, "Path to a file containing Scylla Manager config containing auth token.")
cmd.Flags().StringVarP(&o.NodeAddress, "node-address", "", o.NodeAddress, "Address of a node where cleanup will be performed.")

return cmd
}

func (o *CleanupJobOptions) Validate() error {
var errs []error

if len(o.ManagerAuthConfigPath) == 0 {
errs = append(errs, fmt.Errorf("manager-auth-config-path cannot be empty"))
}

if len(o.NodeAddress) == 0 {
errs = append(errs, fmt.Errorf("node-address cannot be empty"))
}

return apierrors.NewAggregate(errs)
}

func (o *CleanupJobOptions) Complete() error {
var err error

buf, err := os.ReadFile(o.ManagerAuthConfigPath)
if err != nil {
return fmt.Errorf("can't read auth token file at %q: %w", o.ManagerAuthConfigPath, err)
}

authToken, err := helpers.ParseTokenFromConfig(buf)
if err != nil {
return fmt.Errorf("can't parse auth token file at %q: %w", o.ManagerAuthConfigPath, err)
}

if len(authToken) == 0 {
return fmt.Errorf("manager agent auth token cannot be empty")
}

o.scyllaClient, err = controllerhelpers.NewScyllaClientFromToken([]string{o.NodeAddress}, authToken)
if err != nil {
return fmt.Errorf("can't create scylla client: %w", err)
}

return nil
}

func (o *CleanupJobOptions) Run(streams genericclioptions.IOStreams, cmd *cobra.Command) error {
klog.InfoS("Starting the node cleanup", "version", version.Get())

defer func(startTime time.Time) {
klog.InfoS("Node cleanup completed", "duration", time.Since(startTime))
}(time.Now())

cliflag.PrintFlags(cmd.Flags())

stopCh := signals.StopChannel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
<-stopCh
cancel()
}()

keyspaces, err := o.scyllaClient.Keyspaces(ctx)
if err != nil {
return fmt.Errorf("can't get list of keyspaces: %w", err)
}

klog.InfoS("Discovered keyspaces for cleanup", "keyspaces", keyspaces)

var errs []error
for _, keyspace := range keyspaces {
klog.InfoS("Starting a keyspace cleanup", "keyspace", keyspace)
startTime := time.Now()

err = o.scyllaClient.Cleanup(ctx, o.NodeAddress, keyspace)
if err != nil {
klog.Warningf("Can't cleanup keyspace %q: %s", keyspace, err)
errs = append(errs, fmt.Errorf("can't cleanup keyspace %q: %w", keyspace, err))
continue
}

klog.InfoS("Finished keyspace cleanup", "keyspace", keyspace, "duration", time.Since(startTime))
}

err = apierrors.NewAggregate(errs)
if err != nil {
return err
}

return nil
}
1 change: 1 addition & 0 deletions pkg/cmd/operator/cmd.go
Expand Up @@ -20,6 +20,7 @@ func NewOperatorCommand(streams genericclioptions.IOStreams) *cobra.Command {
cmd.AddCommand(NewSidecarCmd(streams))
cmd.AddCommand(NewManagerControllerCmd(streams))
cmd.AddCommand(NewNodeSetupCmd(streams))
cmd.AddCommand(NewCleanupJobCmd(streams))

// TODO: wrap help func for the root command and every subcommand to add a line about automatic env vars and the prefix.

Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/operator/operator.go
Expand Up @@ -258,6 +258,7 @@ func (o *OperatorOptions) run(ctx context.Context, streams genericclioptions.IOS
kubeInformers.Apps().V1().StatefulSets(),
kubeInformers.Policy().V1().PodDisruptionBudgets(),
kubeInformers.Networking().V1().Ingresses(),
kubeInformers.Batch().V1().Jobs(),
scyllaInformers.Scylla().V1().ScyllaClusters(),
o.OperatorImage,
o.CQLSIngressPort,
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/nodeconfigpod/sync_configmaps.go
Expand Up @@ -24,6 +24,10 @@ func (ncpc *Controller) makeConfigMap(ctx context.Context, pod *corev1.Pod) (*co
return nil, nil
}

if !controllerhelpers.IsScyllaPod(pod) {
return nil, nil
}

node, err := ncpc.nodeLister.Get(pod.Spec.NodeName)
if err != nil {
return nil, fmt.Errorf("can't get node: %w", err)
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/scyllacluster/conditions.go
Expand Up @@ -18,4 +18,6 @@ const (
pdbControllerDegradedCondition = "PDBControllerDegraded"
ingressControllerProgressingCondition = "IngressControllerProgressing"
ingressControllerDegradedCondition = "IngressControllerDegraded"
jobControllerProgressingCondition = "JobControllerProgressing"
jobControllerDegradedCondition = "JobControllerDegraded"
)
36 changes: 36 additions & 0 deletions pkg/controller/scyllacluster/controller.go
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/scylladb/scylla-operator/pkg/kubeinterfaces"
"github.com/scylladb/scylla-operator/pkg/scheme"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
policyv1 "k8s.io/api/policy/v1"
Expand All @@ -26,13 +27,15 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
appsv1informers "k8s.io/client-go/informers/apps/v1"
batchv1informers "k8s.io/client-go/informers/batch/v1"
corev1informers "k8s.io/client-go/informers/core/v1"
networkingv1informers "k8s.io/client-go/informers/networking/v1"
policyv1informers "k8s.io/client-go/informers/policy/v1"
rbacv1informers "k8s.io/client-go/informers/rbac/v1"
"k8s.io/client-go/kubernetes"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
appsv1listers "k8s.io/client-go/listers/apps/v1"
batchv1listers "k8s.io/client-go/listers/batch/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
networkingv1listers "k8s.io/client-go/listers/networking/v1"
policyv1listers "k8s.io/client-go/listers/policy/v1"
Expand Down Expand Up @@ -72,6 +75,7 @@ type Controller struct {
pdbLister policyv1listers.PodDisruptionBudgetLister
ingressLister networkingv1listers.IngressLister
scyllaLister scyllav1listers.ScyllaClusterLister
jobLister batchv1listers.JobLister

cachesToSync []cache.InformerSynced

Expand All @@ -95,6 +99,7 @@ func NewController(
statefulSetInformer appsv1informers.StatefulSetInformer,
pdbInformer policyv1informers.PodDisruptionBudgetInformer,
ingressInformer networkingv1informers.IngressInformer,
jobInformer batchv1informers.JobInformer,
scyllaClusterInformer scyllav1informers.ScyllaClusterInformer,
operatorImage string,
cqlsIngressPort int,
Expand All @@ -121,6 +126,7 @@ func NewController(
pdbLister: pdbInformer.Lister(),
ingressLister: ingressInformer.Lister(),
scyllaLister: scyllaClusterInformer.Lister(),
jobLister: jobInformer.Lister(),

cachesToSync: []cache.InformerSynced{
podInformer.Informer().HasSynced,
Expand All @@ -133,6 +139,7 @@ func NewController(
pdbInformer.Informer().HasSynced,
ingressInformer.Informer().HasSynced,
scyllaClusterInformer.Informer().HasSynced,
jobInformer.Informer().HasSynced,
},

eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "scyllacluster-controller"}),
Expand Down Expand Up @@ -222,6 +229,12 @@ func NewController(
DeleteFunc: scc.deleteScyllaCluster,
})

jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: scc.addJob,
UpdateFunc: scc.updateJob,
DeleteFunc: scc.deleteJob,
})

return scc, nil
}

Expand Down Expand Up @@ -589,3 +602,26 @@ func (scc *Controller) deleteScyllaCluster(obj interface{}) {
scc.handlers.Enqueue,
)
}

func (scc *Controller) addJob(obj interface{}) {
scc.handlers.HandleAdd(
obj.(*batchv1.Job),
scc.handlers.EnqueueOwner,
)
}

func (scc *Controller) updateJob(old, cur interface{}) {
scc.handlers.HandleUpdate(
old.(*batchv1.Job),
cur.(*batchv1.Job),
scc.handlers.EnqueueOwner,
scc.deleteJob,
)
}

func (scc *Controller) deleteJob(obj interface{}) {
scc.handlers.HandleDelete(
obj,
scc.handlers.EnqueueOwner,
)
}

0 comments on commit f0cdcd8

Please sign in to comment.