Skip to content

Commit

Permalink
Support scaling changes during ongoing scaling
Browse files Browse the repository at this point in the history
Decommissioning label reconcilation was moved from StatefulSet
controller into Service controller.
StatefulSet controller can now react dynamically to scaling events
occuring while scaling operations are still ongoing.

Fixes #1188
  • Loading branch information
zimnx committed Jun 16, 2023
1 parent defb9ee commit cf5178c
Show file tree
Hide file tree
Showing 10 changed files with 454 additions and 113 deletions.
1 change: 1 addition & 0 deletions pkg/api/scylla/v1/types_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@ type RackCondition struct {
type RackConditionType string

const (
// Deprecated: in favor of RackConditionTypeMemberDecommissioning.
RackConditionTypeMemberLeaving RackConditionType = "MemberLeaving"
RackConditionTypeUpgrading RackConditionType = "RackUpgrading"
RackConditionTypeMemberReplacing RackConditionType = "MemberReplacing"
Expand Down
12 changes: 12 additions & 0 deletions pkg/cmd/operator/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
Expand Down Expand Up @@ -227,7 +228,18 @@ func (o *SidecarOptions) Run(streams genericclioptions.IOStreams, cmd *cobra.Com
if err != nil {
return fmt.Errorf("can't wait for service %q: %w", naming.ManualRef(o.Namespace, o.ServiceName), err)
}

service := event.Object.(*corev1.Service)
service, err = o.kubeClient.CoreV1().Services(service.Namespace).Patch(
ctx,
service.Name,
types.MergePatchType,
[]byte(fmt.Sprintf(`{"metadata": {"annotations": {%q: %q} } }`, naming.NodeInitialized, naming.LabelValueTrue)),
metav1.PatchOptions{},
)
if err != nil {
return fmt.Errorf("can't add node initialized annotation in service: %w", err)
}

// Wait for this Pod to have ContainerID set.
podFieldSelector := fields.OneTermEqualSelector("metadata.name", o.ServiceName)
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/scyllacluster/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ func MemberService(sc *scyllav1.ScyllaCluster, rackName, name string, oldService
if hasReplaceLabel {
labels[naming.ReplaceLabel] = replaceAddr
}

if decommissionLabel, ok := oldService.Labels[naming.DecommissionedLabel]; ok {
labels[naming.DecommissionedLabel] = decommissionLabel
}
}

// Only new service should get the replace address, old service keeps "" until deleted.
Expand Down
30 changes: 30 additions & 0 deletions pkg/controller/scyllacluster/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,36 @@ func TestMemberService(t *testing.T) {
},
},
},
{
name: "existing service with decommission label carries it over into required object",
scyllaCluster: basicSC,
rackName: basicRackName,
svcName: basicSVCName,
oldService: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
naming.DecommissionedLabel: naming.LabelValueFalse,
},
},
},
expectedService: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: basicSVCName,
Labels: func() map[string]string {
labels := basicSVCLabels()
labels[naming.DecommissionedLabel] = naming.LabelValueFalse
return labels
}(),
OwnerReferences: basicSCOwnerRefs,
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
Selector: basicSVCSelector,
PublishNotReadyAddresses: true,
Ports: basicPorts,
},
},
},
}

for _, tc := range tt {
Expand Down
18 changes: 4 additions & 14 deletions pkg/controller/scyllacluster/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,12 @@ func (scc *Controller) calculateRackStatus(sc *scyllav1.ScyllaCluster, rackName
controllerhelpers.SetRackCondition(status, scyllav1.RackConditionTypeUpgrading)
}

// Update Scaling Down condition
// Set decommissioning condition
for _, svc := range serviceMap {
// Check if there is a decommission in progress
if _, ok := svc.Labels[naming.DecommissionedLabel]; ok {
// Add MemberLeaving Condition to rack status
if svc.Labels[naming.RackNameLabel] == rackName && len(svc.Labels[naming.DecommissionedLabel]) != 0 {
// TODO: Deprecated condition can be removed in 1.11.
controllerhelpers.SetRackCondition(status, scyllav1.RackConditionTypeMemberLeaving)
// Sanity check. Only the last member should be decommissioning.
index, err := naming.IndexFromName(svc.Name)
if err != nil {
klog.ErrorS(err, "Can't determine service index from its name", "Service", klog.KObj(svc))
continue
}
if index != status.Members-1 {
klog.Errorf("only last member of each rack should be decommissioning, but %d-th member of %s found decommissioning while rack had %d members", index, rackName, status.Members)
continue
}
controllerhelpers.SetRackCondition(status, scyllav1.RackConditionTypeMemberDecommissioning)
}
}

Expand Down
90 changes: 41 additions & 49 deletions pkg/controller/scyllacluster/sync_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package scyllacluster
import (
"context"
"fmt"
"regexp"
"strconv"
"time"

scyllav1 "github.com/scylladb/scylla-operator/pkg/api/scylla/v1"
Expand All @@ -19,32 +17,58 @@ import (
"k8s.io/klog/v2"
)

var serviceOrdinalRegex = regexp.MustCompile("^.*-([0-9]+)$")
func (scc *Controller) makeServices(sc *scyllav1.ScyllaCluster, oldServices map[string]*corev1.Service, statefulSets map[string]*appsv1.StatefulSet) ([]*corev1.Service, error) {
var errs []error

func (scc *Controller) makeServices(sc *scyllav1.ScyllaCluster, oldServices map[string]*corev1.Service) []*corev1.Service {
services := []*corev1.Service{
IdentityService(sc),
}

for _, rack := range sc.Spec.Datacenter.Racks {
stsName := naming.StatefulSetNameForRack(rack, sc)
sts, ok := statefulSets[stsName]
if !ok {
errs = append(errs, fmt.Errorf("statefulset %s/%s is missing", sc.Namespace, stsName))
continue
}

for ord := int32(0); ord < rack.Members; ord++ {
for ord := int32(0); ord < *sts.Spec.Replicas; ord++ {
svcName := fmt.Sprintf("%s-%d", stsName, ord)
oldSvc := oldServices[svcName]
services = append(services, MemberService(sc, rack.Name, svcName, oldSvc))

svc := MemberService(sc, rack.Name, svcName, oldSvc)

// Services for nodes which were successfully decommissioned are not required.
if decommissionLabel, ok := svc.Labels[naming.DecommissionedLabel]; ok {
if decommissionLabel == naming.LabelValueTrue {
continue
}
}

// Mark last node for decommission
if rack.Members < *sts.Spec.Replicas && ord == *sts.Spec.Replicas-1 {
if len(svc.Labels[naming.DecommissionedLabel]) == 0 {
svc.Labels[naming.DecommissionedLabel] = naming.LabelValueFalse
}
}

services = append(services, svc)
}
}

return services
err := utilerrors.NewAggregate(errs)
if err != nil {
return nil, err
}

return services, nil
}

func (scc *Controller) pruneServices(
ctx context.Context,
sc *scyllav1.ScyllaCluster,
requiredServices []*corev1.Service,
services map[string]*corev1.Service,
statefulSets map[string]*appsv1.StatefulSet,
) ([]metav1.Condition, error) {
var errs []error
var progressingConditions []metav1.Condition
Expand All @@ -63,43 +87,8 @@ func (scc *Controller) pruneServices(
continue
}

// Do not delete services for scale down.
rackName, ok := svc.Labels[naming.RackNameLabel]
if !ok {
errs = append(errs, fmt.Errorf("service %s/%s is missing %q label", svc.Namespace, svc.Name, naming.RackNameLabel))
continue
}
stsName := fmt.Sprintf("%s-%s-%s", sc.Name, sc.Spec.Datacenter.Name, rackName)
sts, ok := statefulSets[stsName]
if !ok {
errs = append(errs, fmt.Errorf("statefulset %s/%s is missing", sc.Namespace, stsName))
continue
}
// TODO: Label services with the ordinal instead of parsing.
// TODO: Move it to a function and unit test it.
svcOrdinalStrings := serviceOrdinalRegex.FindStringSubmatch(svc.Name)
if len(svcOrdinalStrings) != 2 {
errs = append(errs, fmt.Errorf("can't parse ordinal from service %s/%s", svc.Namespace, svc.Name))
continue
}
svcOrdinal, err := strconv.Atoi(svcOrdinalStrings[1])
if err != nil {
errs = append(errs, err)
continue
}
if int32(svcOrdinal) < *sts.Spec.Replicas {
progressingConditions = append(progressingConditions, metav1.Condition{
Type: serviceControllerProgressingCondition,
Status: metav1.ConditionTrue,
Reason: "WaitingForAnotherService",
Message: fmt.Sprintf("Service %q is waiting for another service to be decommissioned first.", naming.ObjRef(svc)),
ObservedGeneration: sc.Generation,
})
continue
}

// Do not delete services that weren't properly decommissioned.
if svc.Labels[naming.DecommissionedLabel] != naming.LabelValueTrue {
// Do not delete services that were bootstrapped but weren't properly decommissioned.
if svc.Annotations[naming.NodeInitialized] != "" && svc.Labels[naming.DecommissionedLabel] != naming.LabelValueTrue {
klog.Warningf("Refusing to cleanup service %s/%s whose member wasn't decommissioned.", svc.Namespace, svc.Name)
progressingConditions = append(progressingConditions, metav1.Condition{
Type: serviceControllerProgressingCondition,
Expand Down Expand Up @@ -132,7 +121,7 @@ func (scc *Controller) pruneServices(
continue
}

if freshSvc.Labels[naming.DecommissionedLabel] != naming.LabelValueTrue {
if freshSvc.Annotations[naming.NodeInitialized] != "" && freshSvc.Labels[naming.DecommissionedLabel] != naming.LabelValueTrue {
klog.V(2).InfoS("Stale caches, won't delete the pvc because the service is no longer decommissioned.", "Service", klog.KObj(svc))
progressingConditions = append(progressingConditions, metav1.Condition{
Type: serviceControllerProgressingCondition,
Expand All @@ -149,7 +138,7 @@ func (scc *Controller) pruneServices(

controllerhelpers.AddGenericProgressingStatusCondition(&progressingConditions, serviceControllerProgressingCondition, &corev1.PersistentVolumeClaim{}, "delete", sc.Generation)
pvcName := naming.PVCNameForService(svc.Name)
err = scc.kubeClient.CoreV1().PersistentVolumeClaims(svc.Namespace).Delete(ctx, pvcName, metav1.DeleteOptions{
err := scc.kubeClient.CoreV1().PersistentVolumeClaims(svc.Namespace).Delete(ctx, pvcName, metav1.DeleteOptions{
PropagationPolicy: &backgroundPropagationPolicy,
})
if err != nil && !apierrors.IsNotFound(err) {
Expand Down Expand Up @@ -182,11 +171,14 @@ func (scc *Controller) syncServices(
) ([]metav1.Condition, error) {
var err error

requiredServices := scc.makeServices(sc, services)
requiredServices, err := scc.makeServices(sc, services, statefulSets)
if err != nil {
return nil, fmt.Errorf("can't make Service(s): %w", err)
}

// Delete any excessive Services.
// Delete has to be the fist action to avoid getting stuck on quota.
progressingConditions, err := scc.pruneServices(ctx, sc, requiredServices, services, statefulSets)
progressingConditions, err := scc.pruneServices(ctx, sc, requiredServices, services)
if err != nil {
return nil, fmt.Errorf("can't delete Service(s): %w", err)
}
Expand Down
Loading

0 comments on commit cf5178c

Please sign in to comment.