Skip to content

Commit

Permalink
fix: delete orphaned azdrivernode
Browse files Browse the repository at this point in the history
  • Loading branch information
hccheng72 committed Nov 30, 2022
1 parent ecbbf65 commit d19baf1
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 47 deletions.
2 changes: 1 addition & 1 deletion pkg/azuredisk/azuredisk_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func (d *DriverV2) StartControllersAndDieOnExit(ctx context.Context) {
<-mgr.Elected()
var errors []error
ctx, w := workflow.New(ctx)
defer func() { w.Finish(err) }()
defer func() { w.Finish(errors...) }()
// recover lost states if necessary
w.Logger().Infof("Elected as leader; initiating CRI deperecation / recovery...")
if err := nodeReconciler.Recover(ctx); err != nil {
Expand Down
19 changes: 18 additions & 1 deletion pkg/azureutils/azure_disk_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1108,6 +1108,15 @@ func UpdateCRIWithRetry(ctx context.Context, informerFactory azdiskinformers.Sha
} else {
originalObj, err = azDiskClient.DiskV1beta2().AzVolumeAttachments(target.Namespace).Get(ctx, objName, metav1.GetOptions{})
}
case *azdiskv1beta2.AzDriverNode:
if informerFactory != nil {
originalObj, err = informerFactory.Disk().V1beta2().AzDriverNodes().Lister().AzDriverNodes(target.Namespace).Get(objName)
} else if cachedClient != nil {
originalObj = &azdiskv1beta2.AzDriverNode{}
err = cachedClient.Get(ctx, types.NamespacedName{Namespace: target.Namespace, Name: objName}, originalObj)
} else {
originalObj, err = azDiskClient.DiskV1beta2().AzDriverNodes(target.Namespace).Get(ctx, objName, metav1.GetOptions{})
}
case *storagev1.VolumeAttachment:
if cachedClient != nil {
originalObj = &storagev1.VolumeAttachment{}
Expand Down Expand Up @@ -1155,7 +1164,15 @@ func UpdateCRIWithRetry(ctx context.Context, informerFactory azdiskinformers.Sha
if (updateMode & UpdateCRI) != 0 {
updatedObj, err = azDiskClient.DiskV1beta2().AzVolumeAttachments(target.Namespace).Update(ctx, target, metav1.UpdateOptions{})
}

case *azdiskv1beta2.AzDriverNode:
if (updateMode&UpdateCRIStatus) != 0 && !reflect.DeepEqual(originalObj.(*azdiskv1beta2.AzDriverNode).Status, target.Status) {
if updatedObj, err = azDiskClient.DiskV1beta2().AzDriverNodes(target.Namespace).UpdateStatus(ctx, target, metav1.UpdateOptions{}); err != nil {
return err
}
}
if (updateMode & UpdateCRI) != 0 {
updatedObj, err = azDiskClient.DiskV1beta2().AzDriverNodes(target.Namespace).Update(ctx, target, metav1.UpdateOptions{})
}
case *storagev1.VolumeAttachment:
if (updateMode&UpdateCRIStatus) != 0 && !reflect.DeepEqual(originalObj.(*storagev1.VolumeAttachment).Status, target.Status) {
if err = cachedClient.Status().Update(ctx, target); err != nil {
Expand Down
44 changes: 32 additions & 12 deletions pkg/controller/azdrivernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controller

import (
"context"
"fmt"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
Expand All @@ -29,6 +30,7 @@ import (
"sigs.k8s.io/azuredisk-csi-driver/pkg/workflow"

"sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand Down Expand Up @@ -92,23 +94,41 @@ func (r *ReconcileAzDriverNode) Recover(ctx context.Context) error {
ctx, w := workflow.New(ctx)
defer func() { w.Finish(err) }()

var nodes *azdiskv1beta2.AzDriverNodeList
if nodes, err = r.azClient.DiskV1beta2().AzDriverNodes(r.objectNamespace).List(ctx, metav1.ListOptions{}); err != nil {
if errors.IsNotFound(err) {
return nil
}
var azNodes *azdiskv1beta2.AzDriverNodeList
if azNodes, err = r.azClient.DiskV1beta2().AzDriverNodes(r.objectNamespace).List(ctx, metav1.ListOptions{}); err != nil {
return err
}

for _, node := range nodes.Items {
updated := node.DeepCopy()
updated.Annotations = azureutils.AddToMap(updated.Annotations, consts.RecoverAnnotation, "azDriverNode")
if _, err = r.azClient.DiskV1beta2().AzDriverNodes(r.objectNamespace).Update(ctx, updated, metav1.UpdateOptions{}); err != nil {
return err
for _, azNode := range azNodes.Items {
// if the corresponding node has been deleted, delete the azdrivernode object
_, err := r.kubeClient.CoreV1().Nodes().Get(ctx, azNode.Spec.NodeName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
n := r.azClient.DiskV1beta2().AzDriverNodes(r.objectNamespace)
if err = n.Delete(ctx, azNode.Name, metav1.DeleteOptions{}); err != nil {
w.Logger().Errorf(err, "failed to delete azDriverNode (%s)", azNode.Name)
}
} else {
w.Logger().Errorf(err, "failed to find node (%s)", azNode.Spec.NodeName)
}
} else {
updateFunc := func(obj client.Object) error {
azNode := obj.(*azdiskv1beta2.AzDriverNode)
azNode.Annotations = azureutils.AddToMap(azNode.Annotations, consts.RecoverAnnotation, "azDriverNode")
return nil
}
if _, err = azureutils.UpdateCRIWithRetry(ctx, nil, r.cachedClient, r.azClient, &azNode, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRI); err != nil {
w.Logger().Errorf(err, "failed to recover azDriverNode (%s) with annotation", azNode.Name)
continue
}
r.addNodeToAvailableAttachmentsMap(ctx, azNode.Name, azNode.GetLabels())
}
r.addNodeToAvailableAttachmentsMap(ctx, node.Name, node.GetLabels())
}
return nil

if err != nil {
err = fmt.Errorf("azDriverNode recovery failed")
}
return err
}

// NewAzDriverNodeController initializes azdrivernode-controller
Expand Down
60 changes: 33 additions & 27 deletions pkg/controller/azdrivernode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
fakev1 "k8s.io/client-go/kubernetes/fake"
"k8s.io/klog/v2/klogr"
azdiskfakes "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/client/clientset/versioned/fake"
consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants"
Expand All @@ -40,8 +41,8 @@ import (
)

func NewTestAzDriverNodeController(controller *gomock.Controller, namespace string, objects ...runtime.Object) *ReconcileAzDriverNode {
azDiskObjs, _ := splitObjects(objects...)
controllerSharedState := initState(mockclient.NewMockClient(controller), azdiskfakes.NewSimpleClientset(azDiskObjs...), nil, objects...)
azDiskObjs, kubeObjs := splitObjects(objects...)
controllerSharedState := initState(mockclient.NewMockClient(controller), azdiskfakes.NewSimpleClientset(azDiskObjs...), fakev1.NewSimpleClientset(kubeObjs...), objects...)

return &ReconcileAzDriverNode{
SharedState: controllerSharedState,
Expand Down Expand Up @@ -233,41 +234,50 @@ func TestAzDriverNodeRecover(t *testing.T) {
verifyFunc func(*testing.T, *ReconcileAzDriverNode, error)
}{
{
description: "[Success] Should update annotations of all AzDriverNodes.",
description: "[Success] Should update AzDriverNode annotation and add it to availableAttachmentsMap.",
setupFunc: func(t *testing.T, mockCtl *gomock.Controller) *ReconcileAzDriverNode {
controller := NewTestAzDriverNodeController(
mockCtl,
testNamespace,
&testNode0,
&testAzDriverNode0)

mockClients(controller.cachedClient.(*mockclient.MockClient), controller.azClient, controller.kubeClient)

return controller
},
verifyFunc: func(t *testing.T, controller *ReconcileAzDriverNode, err error) {
require.NoError(t, err)
_, node0Exists := controller.availableAttachmentsMap.Load(testAzDriverNode0.Name)
require.True(t, node0Exists)

azDriverNodes, err := controller.azClient.DiskV1beta2().AzDriverNodes(testNamespace).List(context.TODO(), metav1.ListOptions{})
require.NoError(t, err)
require.Len(t, azDriverNodes.Items, 1)
require.Equal(t, testNode0Name, azDriverNodes.Items[0].Name)
require.Contains(t, azDriverNodes.Items[0].Annotations, consts.RecoverAnnotation)
},
},
{
description: "[Success] Should delete orphaned AzDriverNodes whose corresponding nodes have been deleted.",
setupFunc: func(t *testing.T, mockCtl *gomock.Controller) *ReconcileAzDriverNode {
controller := NewTestAzDriverNodeController(
mockCtl,
testNamespace,
&testNode0,
&testAzDriverNode0,
&testAzDriverNode1)

cachedObj := testNode1.DeepCopy()
controller.cachedClient.(*mockclient.MockClient).EXPECT().
Get(gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx context.Context, key types.NamespacedName, obj runtime.Object) error {
if objToUpdate, ok := obj.(*v1.Node); ok {
cachedObj.DeepCopyInto(objToUpdate)
return nil
}
return fmt.Errorf("unexpected object type: %s", reflect.TypeOf(obj).Name())
}).
AnyTimes()

controller.cachedClient.(*mockclient.MockClient).EXPECT().
List(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).
AnyTimes()
mockClients(controller.cachedClient.(*mockclient.MockClient), controller.azClient, controller.kubeClient)

return controller
},
verifyFunc: func(t *testing.T, controller *ReconcileAzDriverNode, err error) {
require.NoError(t, err)
azDriverNodes, err := controller.azClient.DiskV1beta2().AzDriverNodes(testNamespace).List(context.TODO(), metav1.ListOptions{})
require.NoError(t, err)
require.Len(t, azDriverNodes.Items, 2)
for _, azDriverNode := range azDriverNodes.Items {
require.Equal(t, azDriverNode.Annotations, map[string]string{consts.RecoverAnnotation: "azDriverNode", "key": "value"})
}
require.Len(t, azDriverNodes.Items, 1)
require.Equal(t, testNode0Name, azDriverNodes.Items[0].Name)
},
},
}
Expand All @@ -279,10 +289,6 @@ func TestAzDriverNodeRecover(t *testing.T) {
defer mockCtl.Finish()
controller := tt.setupFunc(t, mockCtl)
err := controller.Recover(context.TODO())
_, node0Exists := controller.availableAttachmentsMap.Load(testAzDriverNode0.Name)
require.True(t, node0Exists)
_, node1Exists := controller.availableAttachmentsMap.Load(testAzDriverNode1.Name)
require.True(t, node1Exists)
tt.verifyFunc(t, controller, err)
})
}
Expand Down
20 changes: 14 additions & 6 deletions pkg/controller/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,18 @@ var (

testAzDriverNode0 = azdiskv1beta2.AzDriverNode{
ObjectMeta: metav1.ObjectMeta{
Name: testNode0Name,
Namespace: testNamespace,
Annotations: map[string]string{"key": "value"},
Name: testNode0Name,
Namespace: testNamespace,
},
Spec: azdiskv1beta2.AzDriverNodeSpec{NodeName: testNode0Name},
}

testAzDriverNode1 = azdiskv1beta2.AzDriverNode{
ObjectMeta: metav1.ObjectMeta{
Name: testNode1Name,
Namespace: testNamespace,
Annotations: map[string]string{"key": "value"},
Name: testNode1Name,
Namespace: testNamespace,
},
Spec: azdiskv1beta2.AzDriverNodeSpec{NodeName: testNode1Name},
}

testNode1Request = createReconcileRequest(testNamespace, testNode1Name)
Expand Down Expand Up @@ -582,6 +582,14 @@ func mockClients(mockClient *mockclient.MockClient, azVolumeClient azdisk.Interf

azVolumeAttachment.DeepCopyInto(obj.(*azdiskv1beta1.AzVolumeAttachment))

case *azdiskv1beta2.AzDriverNode:
azDriverNode, err := azVolumeClient.DiskV1beta2().AzDriverNodes(key.Namespace).Get(ctx, key.Name, metav1.GetOptions{})
if err != nil {
return err
}

azDriverNode.DeepCopyInto(obj.(*azdiskv1beta2.AzDriverNode))

case *v1.PersistentVolume:
pv, err := kubeClient.CoreV1().PersistentVolumes().Get(ctx, key.Name, metav1.GetOptions{})
if err != nil {
Expand Down

0 comments on commit d19baf1

Please sign in to comment.