Skip to content

Commit

Permalink
Fix canary upgrade hash calculation by including canary information i…
Browse files Browse the repository at this point in the history
…n the hash calculation. Move generation update at the end of the reconcile, ignoring state change
  • Loading branch information
burmanm committed May 17, 2024
1 parent a1171e3 commit a92de20
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 46 deletions.
17 changes: 17 additions & 0 deletions pkg/reconciliation/construct_statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,23 @@ func newStatefulSetForCassandraDatacenter(
result.Spec.ServiceName = sts.Spec.ServiceName
}

if dc.Spec.CanaryUpgrade {
var partition int32
if dc.Spec.CanaryUpgradeCount == 0 || dc.Spec.CanaryUpgradeCount > replicaCountInt32 {
partition = replicaCountInt32
} else {
partition = replicaCountInt32 - dc.Spec.CanaryUpgradeCount
}

strategy := appsv1.StatefulSetUpdateStrategy{
Type: appsv1.RollingUpdateStatefulSetStrategyType,
RollingUpdate: &appsv1.RollingUpdateStatefulSetStrategy{
Partition: &partition,
},
}
result.Spec.UpdateStrategy = strategy
}

// add a hash here to facilitate checking if updates are needed
utils.AddHashAnnotation(result)

Expand Down
9 changes: 4 additions & 5 deletions pkg/reconciliation/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ package reconciliation
// This file defines constructors for k8s objects

import (
"fmt"

api "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1"
"github.com/k8ssandra/cass-operator/pkg/oplabels"
"github.com/k8ssandra/cass-operator/pkg/utils"

corev1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
Expand Down Expand Up @@ -48,7 +49,7 @@ func newPodDisruptionBudgetForDatacenter(dc *api.CassandraDatacenter) *policyv1.
}

func setOperatorProgressStatus(rc *ReconciliationContext, newState api.ProgressState) error {
rc.ReqLogger.Info("reconcile_racks::setOperatorProgressStatus")
rc.ReqLogger.Info(fmt.Sprintf("reconcile_racks::setOperatorProgressStatus::%v", newState))
currentState := rc.Datacenter.Status.CassandraOperatorProgress
if currentState == newState {
// early return, no need to ping k8s
Expand All @@ -57,13 +58,11 @@ func setOperatorProgressStatus(rc *ReconciliationContext, newState api.ProgressS

patch := client.MergeFrom(rc.Datacenter.DeepCopy())
rc.Datacenter.Status.CassandraOperatorProgress = newState
// TODO there may be a better place to push status.observedGeneration in the reconcile loop

if newState == api.ProgressReady {
rc.Datacenter.Status.ObservedGeneration = rc.Datacenter.Generation
if rc.Datacenter.Status.DatacenterName == nil {
rc.Datacenter.Status.DatacenterName = &rc.Datacenter.Spec.DatacenterName
}
rc.setCondition(api.NewDatacenterCondition(api.DatacenterRequiresUpdate, corev1.ConditionFalse))
}
if err := rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, patch); err != nil {
rc.ReqLogger.Error(err, "error updating the Cassandra Operator Progress state")
Expand Down
25 changes: 23 additions & 2 deletions pkg/reconciliation/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"fmt"
"sync"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"

api "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1"
Expand Down Expand Up @@ -65,9 +67,28 @@ func (rc *ReconciliationContext) CalculateReconciliationActions() (reconcile.Res
return result.Error(err).Output()
}

result, err := rc.ReconcileAllRacks()
res, err := rc.ReconcileAllRacks()
if err != nil {
return result.Error(err).Output()
}

if err := rc.updateStatus(); err != nil {
return result.Error(err).Output()
}

return result, err
return res, nil
}

func (rc *ReconciliationContext) updateStatus() error {
patch := client.MergeFrom(rc.Datacenter.DeepCopy())
rc.Datacenter.Status.ObservedGeneration = rc.Datacenter.Generation
rc.setCondition(api.NewDatacenterCondition(api.DatacenterRequiresUpdate, corev1.ConditionFalse))
if err := rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, patch); err != nil {
rc.ReqLogger.Error(err, "error updating the Cassandra Operator Progress state")
return err
}

return nil
}

// This file contains various definitions and plumbing setup used for reconciliation.
Expand Down
63 changes: 24 additions & 39 deletions pkg/reconciliation/reconcile_racks.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,30 @@ func (rc *ReconciliationContext) CheckRackPodTemplate() result.ReconcileResult {
}
statefulSet := rc.statefulSets[idx]

status := statefulSet.Status

updatedReplicas := status.UpdatedReplicas
if status.CurrentRevision != status.UpdateRevision {
// Previous update was a canary upgrade, so we have pods in different versions
updatedReplicas = status.CurrentReplicas + status.UpdatedReplicas
}

if statefulSet.Generation != status.ObservedGeneration ||
status.Replicas != status.ReadyReplicas ||
status.Replicas != updatedReplicas {

logger.Info(
"waiting for upgrade to finish on statefulset",
"statefulset", statefulSet.Name,
"replicas", status.Replicas,
"readyReplicas", status.ReadyReplicas,
"currentReplicas", status.CurrentReplicas,
"updatedReplicas", status.UpdatedReplicas,
)

return result.RequeueSoon(10)
}

desiredSts, err := newStatefulSetForCassandraDatacenter(statefulSet, rackName, dc, int(*statefulSet.Spec.Replicas))

if err != nil {
Expand Down Expand Up @@ -232,22 +256,6 @@ func (rc *ReconciliationContext) CheckRackPodTemplate() result.ReconcileResult {
// selector must match podTemplate.Labels, those can't be updated either
desiredSts.Spec.Selector = statefulSet.Spec.Selector

if dc.Spec.CanaryUpgrade {
var partition int32
if dc.Spec.CanaryUpgradeCount == 0 || dc.Spec.CanaryUpgradeCount > int32(rc.desiredRackInformation[idx].NodeCount) {
partition = int32(rc.desiredRackInformation[idx].NodeCount)
} else {
partition = int32(rc.desiredRackInformation[idx].NodeCount) - dc.Spec.CanaryUpgradeCount
}

strategy := appsv1.StatefulSetUpdateStrategy{
Type: appsv1.RollingUpdateStatefulSetStrategyType,
RollingUpdate: &appsv1.RollingUpdateStatefulSetStrategy{
Partition: &partition,
},
}
desiredSts.Spec.UpdateStrategy = strategy
}
stateMeta, err := meta.Accessor(statefulSet)
resVersion := stateMeta.GetResourceVersion()
if err != nil {
Expand Down Expand Up @@ -300,29 +308,6 @@ func (rc *ReconciliationContext) CheckRackPodTemplate() result.ReconcileResult {
// we just updated k8s and pods will be knocked out of ready state, so let k8s
// call us back when these changes are done and the new pods are back to ready
return result.Done()
} else {

// the pod template is right, but if any pods don't match it,
// or are missing, we should not move onto the next rack,
// because there's an upgrade in progress

status := statefulSet.Status
if statefulSet.Generation != status.ObservedGeneration ||
status.Replicas != status.ReadyReplicas ||
status.Replicas != status.CurrentReplicas ||
status.Replicas != status.UpdatedReplicas {

logger.Info(
"waiting for upgrade to finish on statefulset",
"statefulset", statefulSet.Name,
"replicas", status.Replicas,
"readyReplicas", status.ReadyReplicas,
"currentReplicas", status.CurrentReplicas,
"updatedReplicas", status.UpdatedReplicas,
)

return result.RequeueSoon(10)
}
}
}

Expand Down

0 comments on commit a92de20

Please sign in to comment.