Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/apis/enterprise/v1beta1/indexercluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ type IndexerClusterStatus struct {
// Indicates if we need to recheck the revision update on pods
SkipRecheckUpdate bool `json:"skip_recheck_update"`

// Indicates whether we are scaling up or down
IsScaling bool `json:"is_scaling"`

// status of each indexer cluster peer
Peers []IndexerClusterMemberStatus `json:"peers"`
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/enterprise/v1beta1/searchheadcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ type SearchHeadClusterStatus struct {
// Indicates if we need to recheck the revision update on pods
SkipRecheckUpdate bool `json:"skip_recheck_update"`

// Indicates whether we are scaling up or down
IsScaling bool `json:"is_scaling"`

// status of each search head cluster member
Members []SearchHeadClusterMemberStatus `json:"members"`
}
Expand Down
24 changes: 15 additions & 9 deletions pkg/splunk/controller/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ type DefaultStatefulSetPodManager struct{}
func (mgr *DefaultStatefulSetPodManager) Update(client splcommon.ControllerClient, statefulSet *appsv1.StatefulSet, desiredReplicas int32) (splcommon.Phase, error) {
phase, err := ApplyStatefulSet(client, statefulSet)
skipRecheckUpdate := false
isScaling := false
if err == nil && phase == splcommon.PhaseReady {
phase, err = UpdateStatefulSetPods(client, statefulSet, mgr, desiredReplicas, &skipRecheckUpdate)
phase, err = UpdateStatefulSetPods(client, statefulSet, mgr, desiredReplicas, &skipRecheckUpdate, &isScaling)
}
return phase, err
}
Expand Down Expand Up @@ -161,14 +162,15 @@ func isRevisionUpdateSuccessful(c splcommon.ControllerClient, statefulSet *appsv
}

// checkAndUpdatePodRevision updates the pod revision hash labels on pods if statefulset update was successful
func checkAndUpdatePodRevision(c splcommon.ControllerClient, statefulSet *appsv1.StatefulSet, readyReplicas int32, skipRecheckUpdate *bool) error {
func checkAndUpdatePodRevision(c splcommon.ControllerClient, statefulSet *appsv1.StatefulSet, readyReplicas int32, skipRecheckUpdate *bool, isScaling *bool) error {
scopedLog := log.WithName("checkAndUpdatePodRevision").WithValues(
"name", statefulSet.GetObjectMeta().GetName(),
"namespace", statefulSet.GetObjectMeta().GetNamespace())
var err error
if !isRevisionUpdateSuccessful(c, statefulSet) {
scopedLog.Error(err, "Statefulset not updated yet")
*skipRecheckUpdate = false
*isScaling = true
return err
}
// update the controller-revision-hash label on pods to
Expand All @@ -177,14 +179,16 @@ func checkAndUpdatePodRevision(c splcommon.ControllerClient, statefulSet *appsv1
if err != nil {
scopedLog.Error(err, "Unable to update pod-revision-hash for the pods")
*skipRecheckUpdate = false
*isScaling = true
return err
}
*skipRecheckUpdate = true
*isScaling = false
return nil
}

// UpdateStatefulSetPods manages scaling and config updates for StatefulSets
func UpdateStatefulSetPods(c splcommon.ControllerClient, statefulSet *appsv1.StatefulSet, mgr splcommon.StatefulSetPodManager, desiredReplicas int32, skipRecheckUpdate *bool) (splcommon.Phase, error) {
func UpdateStatefulSetPods(c splcommon.ControllerClient, statefulSet *appsv1.StatefulSet, mgr splcommon.StatefulSetPodManager, desiredReplicas int32, skipRecheckUpdate *bool, isScaling *bool) (splcommon.Phase, error) {
scopedLog := log.WithName("UpdateStatefulSetPods").WithValues(
"name", statefulSet.GetObjectMeta().GetName(),
"namespace", statefulSet.GetObjectMeta().GetNamespace())
Expand All @@ -195,8 +199,8 @@ func UpdateStatefulSetPods(c splcommon.ControllerClient, statefulSet *appsv1.Sta
if readyReplicas < replicas {
scopedLog.Info("Waiting for pods to become ready")
if readyReplicas > 0 {
if !*skipRecheckUpdate {
err := checkAndUpdatePodRevision(c, statefulSet, readyReplicas, skipRecheckUpdate)
if !*skipRecheckUpdate && *isScaling {
err := checkAndUpdatePodRevision(c, statefulSet, readyReplicas, skipRecheckUpdate, isScaling)
if !*skipRecheckUpdate || err != nil {
scopedLog.Error(err, "Unable to update pod-revision-hash for the pods")
return splcommon.PhaseError, err
Expand All @@ -207,8 +211,8 @@ func UpdateStatefulSetPods(c splcommon.ControllerClient, statefulSet *appsv1.Sta
return splcommon.PhasePending, nil
} else if readyReplicas > replicas {
scopedLog.Info("Waiting for scale down to complete")
if !*skipRecheckUpdate {
err := checkAndUpdatePodRevision(c, statefulSet, readyReplicas-1, skipRecheckUpdate)
if !*skipRecheckUpdate && *isScaling {
err := checkAndUpdatePodRevision(c, statefulSet, readyReplicas-1, skipRecheckUpdate, isScaling)
if !*skipRecheckUpdate || err != nil {
scopedLog.Error(err, "Unable to update pod-revision-hash for the pods")
return splcommon.PhaseError, err
Expand All @@ -224,6 +228,7 @@ func UpdateStatefulSetPods(c splcommon.ControllerClient, statefulSet *appsv1.Sta
// scale up StatefulSet to match desiredReplicas
scopedLog.Info("Scaling replicas up", "replicas", desiredReplicas)
*statefulSet.Spec.Replicas = desiredReplicas
*isScaling = true
err := splutil.UpdateResource(c, statefulSet)
if err != nil {
scopedLog.Error(err, "Unable to update statefulset")
Expand All @@ -234,7 +239,7 @@ func UpdateStatefulSetPods(c splcommon.ControllerClient, statefulSet *appsv1.Sta
// It so can happen that it may take few seconds for the update to be
// reflected in the resource. In that case, just return from here and
// check the status back in the next reconcile loop.
err = checkAndUpdatePodRevision(c, statefulSet, readyReplicas, skipRecheckUpdate)
err = checkAndUpdatePodRevision(c, statefulSet, readyReplicas, skipRecheckUpdate, isScaling)
if !*skipRecheckUpdate || err != nil {
scopedLog.Error(err, "Unable to update pod-revision-hash for the pods")
return splcommon.PhaseError, err
Expand All @@ -260,6 +265,7 @@ func UpdateStatefulSetPods(c splcommon.ControllerClient, statefulSet *appsv1.Sta
// scale down statefulset to terminate pod
scopedLog.Info("Scaling replicas down", "replicas", n)
*statefulSet.Spec.Replicas = n
*isScaling = true
err = splutil.UpdateResource(c, statefulSet)
if err != nil {
scopedLog.Error(err, "Scale down update failed for StatefulSet")
Expand All @@ -270,7 +276,7 @@ func UpdateStatefulSetPods(c splcommon.ControllerClient, statefulSet *appsv1.Sta
// It so can happen that it may take few seconds for the update to be
// reflected in the resource. In that case, just return from here and
// check the status back in the next reconcile loop.
err = checkAndUpdatePodRevision(c, statefulSet, readyReplicas-1, skipRecheckUpdate)
err = checkAndUpdatePodRevision(c, statefulSet, readyReplicas-1, skipRecheckUpdate, isScaling)
if !*skipRecheckUpdate || err != nil {
scopedLog.Error(err, "Unable to update pod-revision-hash for the pods")
return splcommon.PhaseError, err
Expand Down
20 changes: 10 additions & 10 deletions pkg/splunk/controller/statefulset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ func TestDefaultStatefulSetPodManager(t *testing.T) {
spltest.PodManagerTester(t, method, &mgr)
}

func updateStatefulSetPodsTester(t *testing.T, mgr splcommon.StatefulSetPodManager, statefulSet *appsv1.StatefulSet, desiredReplicas int32, initObjects ...runtime.Object) (splcommon.Phase, error) {
func updateStatefulSetPodsTester(t *testing.T, mgr splcommon.StatefulSetPodManager, statefulSet *appsv1.StatefulSet, desiredReplicas int32, isScaling bool, initObjects ...runtime.Object) (splcommon.Phase, error) {
// initialize client
c := spltest.NewMockClient()
c.AddObjects(initObjects)
skipRecheckUpdate := false
phase, err := UpdateStatefulSetPods(c, statefulSet, mgr, desiredReplicas, &skipRecheckUpdate)
phase, err := UpdateStatefulSetPods(c, statefulSet, mgr, desiredReplicas, &skipRecheckUpdate, &isScaling)
return phase, err
}

Expand Down Expand Up @@ -108,13 +108,13 @@ func TestUpdateStatefulSetPods(t *testing.T) {
}

var phase splcommon.Phase
phase, err := updateStatefulSetPodsTester(t, &mgr, statefulSet, 1 /*desiredReplicas*/, statefulSet, pod)
phase, err := updateStatefulSetPodsTester(t, &mgr, statefulSet, 1 /*desiredReplicas*/, false /*isScaling*/, statefulSet, pod)
if err != nil && phase != splcommon.PhaseUpdating {
t.Errorf("UpdateStatefulSetPods should not have returned error=%s with phase=%s", err, phase)
}

// Check the scenario where UpdatePodRevisionHash should return error when Pod is not added to client.
phase, err = updateStatefulSetPodsTester(t, &mgr, statefulSet, 2 /*desiredReplicas*/, statefulSet)
phase, err = updateStatefulSetPodsTester(t, &mgr, statefulSet, 2 /*desiredReplicas*/, false /*isScaling*/, statefulSet)
if err == nil && phase != splcommon.PhaseError {
t.Errorf("UpdateStatefulSetPods should have returned error or phase should have been PhaseError, but we got phase=%s", phase)
}
Expand All @@ -123,7 +123,7 @@ func TestUpdateStatefulSetPods(t *testing.T) {
statefulSet.Status.ReadyReplicas = 3
statefulSet.Spec.Replicas = &replicas
// Check the scenario where UpdatePodRevisionHash should return error when Pod is not added to client.
phase, err = updateStatefulSetPodsTester(t, &mgr, statefulSet, 1 /*desiredReplicas*/, statefulSet)
phase, err = updateStatefulSetPodsTester(t, &mgr, statefulSet, 1 /*desiredReplicas*/, false /*isScaling*/, statefulSet)
if err == nil && phase != splcommon.PhaseError {
t.Errorf("UpdateStatefulSetPods should have returned error or phase should have been PhaseError, but we got phase=%s", phase)
}
Expand All @@ -133,14 +133,14 @@ func TestUpdateStatefulSetPods(t *testing.T) {
statefulSet.Status.ReadyReplicas = 2
statefulSet.Spec.Replicas = &replicas
// Check the scenario where UpdatePodRevisionHash should return error when readyReplicas < replicas and Pod is not found
phase, err = updateStatefulSetPodsTester(t, &mgr, statefulSet, 1 /*desiredReplicas*/, statefulSet, pod)
phase, err = updateStatefulSetPodsTester(t, &mgr, statefulSet, 1 /*desiredReplicas*/, true /*isScaling*/, statefulSet, pod)
if err == nil && phase != splcommon.PhaseError {
t.Errorf("UpdateStatefulSetPods should have returned error or phase should have been PhaseError, but we got phase=%s", phase)
}

// CurrentRevision = UpdateRevision
statefulSet.Status.CurrentRevision = "v1"
phase, err = updateStatefulSetPodsTester(t, &mgr, statefulSet, 1 /*desiredReplicas*/, statefulSet, pod)
phase, err = updateStatefulSetPodsTester(t, &mgr, statefulSet, 1 /*desiredReplicas*/, true /*isScaling*/, statefulSet, pod)
if err == nil && phase != splcommon.PhaseError {
t.Errorf("UpdateStatefulSetPods should have returned error or phase should have been PhaseError, but we got phase=%s", phase)
}
Expand All @@ -151,20 +151,20 @@ func TestUpdateStatefulSetPods(t *testing.T) {
statefulSet.Spec.Replicas = &replicas
statefulSet.Status.CurrentRevision = ""
// Check the scenario where UpdatePodRevisionHash should return error when readyReplicas > replicas and Pod is not found
phase, err = updateStatefulSetPodsTester(t, &mgr, statefulSet, 1 /*desiredReplicas*/, statefulSet, pod)
phase, err = updateStatefulSetPodsTester(t, &mgr, statefulSet, 1 /*desiredReplicas*/, true /*isScaling*/, statefulSet, pod)
if err == nil && phase != splcommon.PhaseError {
t.Errorf("UpdateStatefulSetPods should have returned error or phase should have been PhaseError, but we got phase=%s", phase)
}

// CurrentRevision = UpdateRevision
statefulSet.Status.CurrentRevision = "v1"
phase, err = updateStatefulSetPodsTester(t, &mgr, statefulSet, 1 /*desiredReplicas*/, statefulSet, pod)
phase, err = updateStatefulSetPodsTester(t, &mgr, statefulSet, 1 /*desiredReplicas*/, true /*isScaling*/, statefulSet, pod)
if err == nil && phase != splcommon.PhaseError {
t.Errorf("UpdateStatefulSetPods should have returned error or phase should have been PhaseError, but we got phase=%s", phase)
}

// Check the scenario where UpdatePodRevisionHash should return error when statefulset is not added to client.
phase, err = updateStatefulSetPodsTester(t, &mgr, statefulSet, 2 /*desiredReplicas*/, pod)
phase, err = updateStatefulSetPodsTester(t, &mgr, statefulSet, 2 /*desiredReplicas*/, true /*isScaling*/, pod)
if err == nil && phase != splcommon.PhaseError {
t.Errorf("UpdateStatefulSetPods should have returned error or phase should have been PhaseError, but we got phase=%s", phase)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/splunk/enterprise/indexercluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ func (mgr *indexerClusterPodManager) Update(c splcommon.ControllerClient, statef
}

// manage scaling and updates
return splctrl.UpdateStatefulSetPods(c, statefulSet, mgr, desiredReplicas, &mgr.cr.Status.SkipRecheckUpdate)
return splctrl.UpdateStatefulSetPods(c, statefulSet, mgr, desiredReplicas, &mgr.cr.Status.SkipRecheckUpdate, &mgr.cr.Status.IsScaling)
}

// PrepareScaleDown for indexerClusterPodManager prepares indexer pod to be removed via scale down event; it returns true when ready
Expand Down
2 changes: 1 addition & 1 deletion pkg/splunk/enterprise/searchheadcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func (mgr *searchHeadClusterPodManager) Update(c splcommon.ControllerClient, sta
}

// manage scaling and updates
return splctrl.UpdateStatefulSetPods(mgr.c, statefulSet, mgr, desiredReplicas, &mgr.cr.Status.SkipRecheckUpdate)
return splctrl.UpdateStatefulSetPods(mgr.c, statefulSet, mgr, desiredReplicas, &mgr.cr.Status.SkipRecheckUpdate, &mgr.cr.Status.IsScaling)
}

// PrepareScaleDown for searchHeadClusterPodManager prepares search head pod to be removed via scale down event; it returns true when ready
Expand Down
3 changes: 1 addition & 2 deletions pkg/splunk/test/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,8 +512,6 @@ func PodManagerTester(t *testing.T, method string, mgr splcommon.StatefulSetPodM
replicas = 2
current.Status.Replicas = 2
current.Status.ReadyReplicas = 1
scaleUpCalls["Get"] = append(scaleUpCalls["Get"], funcCalls[0], funcCalls[0], funcCalls[1])
scaleUpCalls["Update"] = append(scaleUpCalls["Update"], funcCalls[1])
methodPlus = fmt.Sprintf("%s(%s)", method, "ScalingUp, 1/2 ready")
PodManagerUpdateTester(t, methodPlus, mgr, 2, splcommon.PhaseScalingUp, revised, scaleUpCalls, nil, current, pod)

Expand All @@ -522,6 +520,7 @@ func PodManagerTester(t *testing.T, method string, mgr splcommon.StatefulSetPodM
current.Status.Replicas = 1
current.Status.ReadyReplicas = 1
updateCalls["Get"] = append(updateCalls["Get"], funcCalls[0], funcCalls[0], funcCalls[1])
updateCalls["Update"] = append(updateCalls["Update"], funcCalls[1])
methodPlus = fmt.Sprintf("%s(%s)", method, "ScalingUp, Update Replicas 1=>2")
PodManagerUpdateTester(t, methodPlus, mgr, 2, splcommon.PhaseScalingUp, revised, updateCalls, nil, current, pod)

Expand Down