Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support scaling changes during ongoing scaling #1283

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/api/scylla/v1/types_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ type RackCondition struct {
type RackConditionType string

const (
// Deprecated: in favor of RackConditionTypeMemberDecommissioning.
RackConditionTypeMemberLeaving RackConditionType = "MemberLeaving"
RackConditionTypeUpgrading RackConditionType = "RackUpgrading"
RackConditionTypeMemberReplacing RackConditionType = "MemberReplacing"
Expand Down
12 changes: 12 additions & 0 deletions pkg/cmd/operator/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
Expand Down Expand Up @@ -229,7 +230,18 @@ func (o *SidecarOptions) Run(streams genericclioptions.IOStreams, cmd *cobra.Com
if err != nil {
return fmt.Errorf("can't wait for service %q: %w", naming.ManualRef(o.Namespace, o.ServiceName), err)
}

service := event.Object.(*corev1.Service)
service, err = o.kubeClient.CoreV1().Services(service.Namespace).Patch(
ctx,
service.Name,
types.MergePatchType,
[]byte(fmt.Sprintf(`{"metadata": {"annotations": {%q: %q} } }`, naming.NodeInitialized, naming.LabelValueTrue)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the semantics behind this annotation. The comment for naming.NodeInitialized says "NodeInitialized means given node was started.", while the Scylla process is run at a later time. Should this be moved to after the process is started?

metav1.PatchOptions{},
)
if err != nil {
return fmt.Errorf("can't add node initialized annotation in service: %w", err)
}

// Wait for this Pod to have ContainerID set.
podFieldSelector := fields.OneTermEqualSelector("metadata.name", o.ServiceName)
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/scyllacluster/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ func MemberService(sc *scyllav1.ScyllaCluster, rackName, name string, oldService
if hasReplaceLabel {
svcLabels[naming.ReplaceLabel] = replaceAddr
}

if decommissionLabel, ok := oldService.Labels[naming.DecommissionedLabel]; ok {
svcLabels[naming.DecommissionedLabel] = decommissionLabel
}
}

// Only new service should get the replace address, old service keeps "" until deleted.
Expand Down
26 changes: 25 additions & 1 deletion pkg/controller/scyllacluster/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,8 @@ func TestMemberService(t *testing.T) {
"internal.scylla-operator.scylladb.com/current-token-ring-hash": "abc",
},
},
}, jobs: nil,
},
jobs: nil,
expectedService: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: basicSVCName,
Expand Down Expand Up @@ -433,6 +434,29 @@ func TestMemberService(t *testing.T) {
},
},
},
{
name: "existing service with decommission label carries it over into required object",
scyllaCluster: basicSC,
rackName: basicRackName,
svcName: basicSVCName,
oldService: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
naming.DecommissionedLabel: naming.LabelValueFalse,
},
},
},
expectedService: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: basicSVCName,
Labels: func() map[string]string {
labels := basicSVCLabels()
labels[naming.DecommissionedLabel] = naming.LabelValueFalse
return labels
}(),
},
},
},
}

for _, tc := range tt {
Expand Down
18 changes: 4 additions & 14 deletions pkg/controller/scyllacluster/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,12 @@ func (scc *Controller) calculateRackStatus(sc *scyllav1.ScyllaCluster, rackName
controllerhelpers.SetRackCondition(status, scyllav1.RackConditionTypeUpgrading)
}

// Update Scaling Down condition
// Set decommissioning condition
for _, svc := range serviceMap {
// Check if there is a decommission in progress
if _, ok := svc.Labels[naming.DecommissionedLabel]; ok {
// Add MemberLeaving Condition to rack status
if svc.Labels[naming.RackNameLabel] == rackName && len(svc.Labels[naming.DecommissionedLabel]) != 0 {
// TODO: Deprecated condition can be removed in 1.11.
controllerhelpers.SetRackCondition(status, scyllav1.RackConditionTypeMemberLeaving)
// Sanity check. Only the last member should be decommissioning.
index, err := naming.IndexFromName(svc.Name)
if err != nil {
klog.ErrorS(err, "Can't determine service index from its name", "Service", klog.KObj(svc))
continue
}
if index != status.Members-1 {
klog.Errorf("only last member of each rack should be decommissioning, but %d-th member of %s found decommissioning while rack had %d members", index, rackName, status.Members)
continue
}
controllerhelpers.SetRackCondition(status, scyllav1.RackConditionTypeMemberDecommissioning)
}
}

Expand Down
90 changes: 40 additions & 50 deletions pkg/controller/scyllacluster/sync_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package scyllacluster
import (
"context"
"fmt"
"regexp"
"strconv"
"time"

scyllav1 "github.com/scylladb/scylla-operator/pkg/api/scylla/v1"
Expand All @@ -22,32 +20,56 @@ import (
"k8s.io/klog/v2"
)

var serviceOrdinalRegex = regexp.MustCompile("^.*-([0-9]+)$")

func (scc *Controller) makeServices(sc *scyllav1.ScyllaCluster, oldServices map[string]*corev1.Service, jobs map[string]*batchv1.Job) []*corev1.Service {
func (scc *Controller) makeServices(sc *scyllav1.ScyllaCluster, oldServices map[string]*corev1.Service, statefulSets map[string]*appsv1.StatefulSet, jobs map[string]*batchv1.Job) ([]*corev1.Service, error) {
var errs []error
services := []*corev1.Service{
IdentityService(sc),
}

for _, rack := range sc.Spec.Datacenter.Racks {
stsName := naming.StatefulSetNameForRack(rack, sc)
sts, ok := statefulSets[stsName]
if !ok {
errs = append(errs, fmt.Errorf("statefulset %s/%s is missing", sc.Namespace, stsName))
continue
}

for ord := int32(0); ord < rack.Members; ord++ {
for ord := int32(0); ord < *sts.Spec.Replicas; ord++ {
svcName := fmt.Sprintf("%s-%d", stsName, ord)
oldSvc := oldServices[svcName]
services = append(services, MemberService(sc, rack.Name, svcName, oldSvc, jobs))
svc := MemberService(sc, rack.Name, svcName, oldSvc, jobs)

// Services for nodes which were successfully decommissioned are not required.
if decommissionLabel, ok := svc.Labels[naming.DecommissionedLabel]; ok {
if decommissionLabel == naming.LabelValueTrue {
continue
}
}

// Mark last node for decommission
if rack.Members < *sts.Spec.Replicas && ord == *sts.Spec.Replicas-1 {
if len(svc.Labels[naming.DecommissionedLabel]) == 0 {
svc.Labels[naming.DecommissionedLabel] = naming.LabelValueFalse
}
}

services = append(services, svc)
}
}

return services
err := utilerrors.NewAggregate(errs)
if err != nil {
return nil, err
}

return services, nil
}

func (scc *Controller) pruneServices(
ctx context.Context,
sc *scyllav1.ScyllaCluster,
requiredServices []*corev1.Service,
services map[string]*corev1.Service,
statefulSets map[string]*appsv1.StatefulSet,
) ([]metav1.Condition, error) {
var errs []error
var progressingConditions []metav1.Condition
Expand All @@ -66,43 +88,8 @@ func (scc *Controller) pruneServices(
continue
}

// Do not delete services for scale down.
rackName, ok := svc.Labels[naming.RackNameLabel]
if !ok {
errs = append(errs, fmt.Errorf("service %s/%s is missing %q label", svc.Namespace, svc.Name, naming.RackNameLabel))
continue
}
stsName := fmt.Sprintf("%s-%s-%s", sc.Name, sc.Spec.Datacenter.Name, rackName)
sts, ok := statefulSets[stsName]
if !ok {
errs = append(errs, fmt.Errorf("statefulset %s/%s is missing", sc.Namespace, stsName))
continue
}
// TODO: Label services with the ordinal instead of parsing.
// TODO: Move it to a function and unit test it.
svcOrdinalStrings := serviceOrdinalRegex.FindStringSubmatch(svc.Name)
if len(svcOrdinalStrings) != 2 {
errs = append(errs, fmt.Errorf("can't parse ordinal from service %s/%s", svc.Namespace, svc.Name))
continue
}
svcOrdinal, err := strconv.Atoi(svcOrdinalStrings[1])
if err != nil {
errs = append(errs, err)
continue
}
if int32(svcOrdinal) < *sts.Spec.Replicas {
progressingConditions = append(progressingConditions, metav1.Condition{
Type: serviceControllerProgressingCondition,
Status: metav1.ConditionTrue,
Reason: "WaitingForAnotherService",
Message: fmt.Sprintf("Service %q is waiting for another service to be decommissioned first.", naming.ObjRef(svc)),
ObservedGeneration: sc.Generation,
})
continue
}

// Do not delete services that weren't properly decommissioned.
if svc.Labels[naming.DecommissionedLabel] != naming.LabelValueTrue {
// Do not delete services that were bootstrapped but weren't properly decommissioned.
if svc.Annotations[naming.NodeInitialized] != "" && svc.Labels[naming.DecommissionedLabel] != naming.LabelValueTrue {
klog.Warningf("Refusing to cleanup service %s/%s whose member wasn't decommissioned.", svc.Namespace, svc.Name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can you also switch from using "%s/%s" to KObj or similar while we're at it?

progressingConditions = append(progressingConditions, metav1.Condition{
Type: serviceControllerProgressingCondition,
Expand Down Expand Up @@ -135,7 +122,7 @@ func (scc *Controller) pruneServices(
continue
}

if freshSvc.Labels[naming.DecommissionedLabel] != naming.LabelValueTrue {
if freshSvc.Annotations[naming.NodeInitialized] != "" && freshSvc.Labels[naming.DecommissionedLabel] != naming.LabelValueTrue {
klog.V(2).InfoS("Stale caches, won't delete the pvc because the service is no longer decommissioned.", "Service", klog.KObj(svc))
progressingConditions = append(progressingConditions, metav1.Condition{
Type: serviceControllerProgressingCondition,
Expand All @@ -152,7 +139,7 @@ func (scc *Controller) pruneServices(

controllerhelpers.AddGenericProgressingStatusCondition(&progressingConditions, serviceControllerProgressingCondition, &corev1.PersistentVolumeClaim{}, "delete", sc.Generation)
pvcName := naming.PVCNameForService(svc.Name)
err = scc.kubeClient.CoreV1().PersistentVolumeClaims(svc.Namespace).Delete(ctx, pvcName, metav1.DeleteOptions{
err := scc.kubeClient.CoreV1().PersistentVolumeClaims(svc.Namespace).Delete(ctx, pvcName, metav1.DeleteOptions{
PropagationPolicy: &backgroundPropagationPolicy,
})
if err != nil && !apierrors.IsNotFound(err) {
Expand Down Expand Up @@ -186,11 +173,14 @@ func (scc *Controller) syncServices(
) ([]metav1.Condition, error) {
var err error

requiredServices := scc.makeServices(sc, services, jobs)
requiredServices, err := scc.makeServices(sc, services, statefulSets, jobs)
if err != nil {
return nil, fmt.Errorf("can't make Service(s): %w", err)
}

// Delete any excessive Services.
// Delete has to be the fist action to avoid getting stuck on quota.
progressingConditions, err := scc.pruneServices(ctx, sc, requiredServices, services, statefulSets)
progressingConditions, err := scc.pruneServices(ctx, sc, requiredServices, services)
if err != nil {
return nil, fmt.Errorf("can't delete Service(s): %w", err)
}
Expand Down
Loading