Skip to content

Commit

Permalink
retrying decommission instead of assuming it works (#356)
Browse files Browse the repository at this point in the history
* retrying decommission instead of assuming it works

* added tests and fixed state bug in previous fix

* node count filters agents

* remove extraneous logging

* adding back in tests
  • Loading branch information
rsds committed Jan 15, 2021
1 parent 56d6ea6 commit 4b8163c
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 11 deletions.
22 changes: 20 additions & 2 deletions operator/pkg/psp/emm.go
Expand Up @@ -23,6 +23,8 @@
package psp

import (
"fmt"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"

Expand Down Expand Up @@ -270,8 +272,22 @@ func (impl *EMMServiceImpl) getNodeNameSet() (utils.StringSet, error) {
if err != nil {
return nil, err
}
totalNodes := len(nodes)

return utils.GetNodeNameSet(nodes), nil
agentNodesIndex := []int{}
for i := 0; i < totalNodes; i++ {
if nodes[i].Labels["kubernetes.io/role"] == "agent" {
agentNodesIndex = append(agentNodesIndex, i)
}
}

agents := make([]*corev1.Node, len(agentNodesIndex))
for i, agentIndex := range agentNodesIndex {
agents[i] = nodes[agentIndex]
}

nameSet := utils.GetNodeNameSet(agents)
return nameSet, nil
}

func (impl *EMMServiceImpl) getPodNameSet() utils.StringSet {
Expand Down Expand Up @@ -523,8 +539,10 @@ func checkNodeEMM(provider EMMService) result.ReconcileResult {
// pods
unavailableNodes := utils.UnionStringSet(plannedDownNodeNameSet, evacuateDataNodeNameSet)
availableNodes := utils.SubtractStringSet(allNodes, unavailableNodes)

if len(provider.getPodNameSet()) > len(availableNodes) {
logger.Info(fmt.Sprintf("Not enough space to do EMM. Total number of nodes: %v, unavailable: %v, available: %v, pods: %v",
len(allNodes), len(unavailableNodes), len(availableNodes), len(provider.getPodNameSet())))

anyUpdated := false
updated := false
for node := range unavailableNodes {
Expand Down
7 changes: 4 additions & 3 deletions operator/pkg/reconciliation/check_nodes.go
Expand Up @@ -137,9 +137,10 @@ func (rc *ReconciliationContext) GetAllNodes() ([]*corev1.Node, error) {
if err := rc.Client.List(rc.Ctx, nodeList, listOptions); err != nil {
return nil, err
}
var nodes []*corev1.Node
for _, node := range nodeList.Items {
nodes = append(nodes, &node)
nodeCount := len(nodeList.Items)
nodes := make([]*corev1.Node, nodeCount)
for i := 0; i < nodeCount; i++ {
nodes[i] = &nodeList.Items[i]
}
return nodes, nil
}
Expand Down
26 changes: 20 additions & 6 deletions operator/pkg/reconciliation/decommission_node.go
Expand Up @@ -127,9 +127,8 @@ func (rc *ReconciliationContext) DecommissionNodeOnRack(rackName string, epData
}

if err := rc.NodeMgmtClient.CallDecommissionNodeEndpoint(pod); err != nil {
// TODO this returns a 500 when it works
// We are waiting for a new version of mgmt api with a fix for this
// return err
rc.ReqLogger.Info(fmt.Sprintf("Error from decommission attempt. This is only an attempt and can"+
" fail it will be retried later if decomission has not started. Error: %v", err))
}

rc.ReqLogger.Info("Marking node as decommissioning")
Expand Down Expand Up @@ -159,14 +158,20 @@ func (rc *ReconciliationContext) CheckDecommissioningNodes(epData httphelper.Cas
for _, pod := range rc.dcPods {
if pod.Labels[api.CassNodeState] == stateDecommissioning {
if !IsDoneDecommissioning(pod, epData) {
rc.ReqLogger.Info("Node decommissioning, reconciling again soon")
if !HasStartedDecommissioning(pod, epData) {
rc.ReqLogger.Info("Decommission has not started trying again")
if err := rc.NodeMgmtClient.CallDecommissionNodeEndpoint(pod); err != nil {
rc.ReqLogger.Info(fmt.Sprintf("Error from decomimssion attempt. This is only an attempt and can fail. Error: %v", err))
}
} else {
rc.ReqLogger.Info("Node decommissioning, reconciling again soon")
}
} else {
rc.ReqLogger.Info("Node finished decommissioning")
if res := rc.cleanUpAfterDecommissionedPod(pod); res != nil {
return res
}
}

return result.RequeueSoon(5)
}
}
Expand Down Expand Up @@ -195,7 +200,6 @@ func (rc *ReconciliationContext) cleanUpAfterDecommissionedPod(pod *corev1.Pod)
if err != nil {
return result.Error(err)
}

rc.ReqLogger.Info("Deleting pod PVCs")
err = rc.DeletePodPvcs(pod)
if err != nil {
Expand All @@ -214,6 +218,16 @@ func (rc *ReconciliationContext) cleanUpAfterDecommissionedPod(pod *corev1.Pod)
return nil
}

func HasStartedDecommissioning(pod *v1.Pod, epData httphelper.CassMetadataEndpoints) bool {
for idx := range epData.Entity {
ep := &epData.Entity[idx]
if ep.GetRpcAddress() == pod.Status.PodIP {
return strings.HasPrefix(ep.Status, "LEAVING")
}
}
return false
}

func IsDoneDecommissioning(pod *v1.Pod, epData httphelper.CassMetadataEndpoints) bool {
for idx := range epData.Entity {
ep := &epData.Entity[idx]
Expand Down
157 changes: 157 additions & 0 deletions operator/pkg/reconciliation/decommission_node_test.go
@@ -0,0 +1,157 @@
// Copyright DataStax, Inc.
// Please see the included license file for details.

package reconciliation

import (
"context"
"io/ioutil"
"net/http"
"strings"
"testing"

appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/datastax/cass-operator/operator/internal/result"
api "github.com/datastax/cass-operator/operator/pkg/apis/cassandra/v1beta1"
"github.com/datastax/cass-operator/operator/pkg/httphelper"
"github.com/datastax/cass-operator/operator/pkg/mocks"
"github.com/stretchr/testify/mock"
)

func TestRetryDecommissionNode(t *testing.T) {
rc, _, cleanupMockScr := setupTest()
defer cleanupMockScr()
state := "UP"
podIP := "192.168.101.11"

mockClient := &mocks.Client{}
rc.Client = mockClient

rc.Datacenter.SetCondition(api.DatacenterCondition{
Status: v1.ConditionTrue,
Type: api.DatacenterScalingDown,
})
res := &http.Response{
StatusCode: http.StatusBadRequest,
Body: ioutil.NopCloser(strings.NewReader("OK")),
}
mockHttpClient := &mocks.HttpClient{}
mockHttpClient.On("Do",
mock.MatchedBy(
func(req *http.Request) bool {
return req.URL.Path == "/api/v0/ops/node/decommission"
})).
Return(res, nil).
Once()

rc.NodeMgmtClient = httphelper.NodeMgmtClient{
Client: mockHttpClient,
Log: rc.ReqLogger,
Protocol: "http",
}

labels := make(map[string]string)
labels[api.CassNodeState] = stateDecommissioning

rc.dcPods = []*v1.Pod{{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
Labels: labels,
},
Status: v1.PodStatus{
PodIP: podIP,
},
}}

epData := httphelper.CassMetadataEndpoints{
Entity: []httphelper.EndpointState{
{
RpcAddress: podIP,
Status: state,
},
},
}
r := rc.CheckDecommissioningNodes(epData)
if r != result.RequeueSoon(5) {
t.Fatalf("expected result of result.RequeueSoon(5) but got %s", r)
}
}

func TestRemoveResourcesWhenDone(t *testing.T) {
rc, _, cleanupMockScr := setupTest()
defer cleanupMockScr()
podIP := "192.168.101.11"
state := "LEFT"

mockClient := &mocks.Client{}
rc.Client = mockClient
rc.Datacenter.SetCondition(api.DatacenterCondition{
Status: v1.ConditionTrue,
Type: api.DatacenterScalingDown,
})
mockStatus := &statusMock{}
k8sMockClientStatus(mockClient, mockStatus)

labels := make(map[string]string)
labels[api.CassNodeState] = stateDecommissioning

rc.dcPods = []*v1.Pod{{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
Labels: labels,
},
Status: v1.PodStatus{
PodIP: podIP,
},
}}

makeInt := func(i int32) *int32 {
return &i
}
ssLabels := make(map[string]string)
rc.statefulSets = []*appsv1.StatefulSet{{
ObjectMeta: metav1.ObjectMeta{
Name: "ss-1",
Labels: ssLabels,
},
Spec: appsv1.StatefulSetSpec{
Replicas: makeInt(1),
},
}}

epData := httphelper.CassMetadataEndpoints{
Entity: []httphelper.EndpointState{
{
RpcAddress: podIP,
Status: state,
},
},
}

r := rc.CheckDecommissioningNodes(epData)
if r != result.RequeueSoon(5) {
t.Fatalf("expected result of blah but got %s", r)
}
if mockStatus.called != 1 {
t.Fatalf("expected 1 call to mockStatus but had %v", mockStatus.called)
}
}

type statusMock struct {
called int
}

func (s *statusMock) Update(ctx context.Context, obj runtime.Object, opts ...client.UpdateOption) error {
return nil
}

func (s *statusMock) Patch(ctx context.Context, obj runtime.Object, patch client.Patch, opts ...client.PatchOption) error {
s.called = s.called + 1
return nil
}
6 changes: 6 additions & 0 deletions operator/pkg/reconciliation/testing.go
Expand Up @@ -184,6 +184,12 @@ func k8sMockClientGet(mockClient *mocks.Client, returnArg interface{}) *mock.Cal
Once()
}

func k8sMockClientStatus(mockClient *mocks.Client, returnArg interface{}) *mock.Call {
return mockClient.On("Status").
Return(returnArg).
Once()
}

func k8sMockClientUpdate(mockClient *mocks.Client, returnArg interface{}) *mock.Call {
return mockClient.On("Update",
mock.MatchedBy(
Expand Down

0 comments on commit 4b8163c

Please sign in to comment.