Skip to content

Commit

Permalink
Merge pull request #13615 from riya-singhal31/node-name
Browse files Browse the repository at this point in the history
csi: update network fence CR name
  • Loading branch information
travisn committed Feb 5, 2024
2 parents f5fcf96 + c2e3cf3 commit 73427c9
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 24 deletions.
39 changes: 27 additions & 12 deletions pkg/operator/ceph/cluster/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ type clientCluster struct {

var nodesCheckedForReconcile = sets.New[string]()

// drivers that supports fencing, used in naming networkFence object
const (
rbdDriver = "rbd"
cephfsDriver = "cephfs"
)

func newClientCluster(client client.Client, namespace string, context *clusterd.Context) *clientCluster {
return &clientCluster{
client: client,
Expand Down Expand Up @@ -185,9 +191,14 @@ func (c *clientCluster) handleNodeFailure(ctx context.Context, cluster *cephv1.C
return nil
}

err = c.unfenceAndDeleteNetworkFence(ctx, *node, cluster)
err = c.unfenceAndDeleteNetworkFence(ctx, *node, cluster, rbdDriver)
if err != nil {
return pkgerror.Wrapf(err, "failed to delete network fence for node %q.", node.Name)
return pkgerror.Wrapf(err, "failed to delete rbd network fence for node %q.", node.Name)
}

err = c.unfenceAndDeleteNetworkFence(ctx, *node, cluster, cephfsDriver)
if err != nil {
return pkgerror.Wrapf(err, "failed to delete cephFS network fence for node %q.", node.Name)
}

return nil
Expand Down Expand Up @@ -343,7 +354,7 @@ func listRWOCephFSPV(listPVs *corev1.PersistentVolumeList, cluster *cephv1.CephC
continue
}

if pv.Spec.CSI.VolumeAttributes["staticVolume"] == "true" || pv.Spec.CSI.VolumeAttributes["pool"] == "" {
if pv.Spec.CSI.VolumeAttributes["staticVolume"] == "true" {
logger.Debugf("skipping, static pv %q", pv.Name)
continue
}
Expand Down Expand Up @@ -388,7 +399,7 @@ func (c *clientCluster) fenceRbdImage(
return pkgerror.Wrapf(err, "failed to unmarshal rbd status output")
}
if len(ips) != 0 {
err = c.createNetworkFence(ctx, rbdPV, node, cluster, ips)
err = c.createNetworkFence(ctx, rbdPV, node, cluster, ips, rbdDriver)
if err != nil {
return pkgerror.Wrapf(err, "failed to create network fence for node %q", node.Name)
}
Expand Down Expand Up @@ -428,7 +439,7 @@ func (c *clientCluster) fenceCephFSVolume(
return fmt.Errorf("failed to unmarshal cephfs mds output. %v", err)
}

err = c.createNetworkFence(ctx, cephFSPV, node, cluster, ips)
err = c.createNetworkFence(ctx, cephFSPV, node, cluster, ips, cephfsDriver)
if err != nil {
return fmt.Errorf("failed to create network fence for node %q. %v", node.Name, err)
}
Expand Down Expand Up @@ -500,7 +511,11 @@ func concatenateWatcherIp(address string) string {
return watcherIP
}

func (c *clientCluster) createNetworkFence(ctx context.Context, pv corev1.PersistentVolume, node *corev1.Node, cluster *cephv1.CephCluster, cidr []string) error {
func fenceResourceName(nodeName, driver string) string {
return fmt.Sprintf("%s-%s", nodeName, driver)
}

func (c *clientCluster) createNetworkFence(ctx context.Context, pv corev1.PersistentVolume, node *corev1.Node, cluster *cephv1.CephCluster, cidr []string, driver string) error {
logger.Warningf("Blocking node IP %s", cidr)

secretName := pv.Annotations["volume.kubernetes.io/provisioner-deletion-secret-name"]
Expand All @@ -516,7 +531,7 @@ func (c *clientCluster) createNetworkFence(ctx context.Context, pv corev1.Persis

networkFence := &addonsv1alpha1.NetworkFence{
ObjectMeta: metav1.ObjectMeta{
Name: node.Name,
Name: fenceResourceName(node.Name, driver),
Namespace: cluster.Namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(cluster, cephv1.SchemeGroupVersion.WithKind("CephCluster")),
Expand Down Expand Up @@ -546,9 +561,9 @@ func (c *clientCluster) createNetworkFence(ctx context.Context, pv corev1.Persis
return nil
}

func (c *clientCluster) unfenceAndDeleteNetworkFence(ctx context.Context, node corev1.Node, cluster *cephv1.CephCluster) error {
func (c *clientCluster) unfenceAndDeleteNetworkFence(ctx context.Context, node corev1.Node, cluster *cephv1.CephCluster, driver string) error {
networkFence := &addonsv1alpha1.NetworkFence{}
err := c.client.Get(ctx, types.NamespacedName{Name: node.Name, Namespace: cluster.Namespace}, networkFence)
err := c.client.Get(ctx, types.NamespacedName{Name: fenceResourceName(node.Name, driver), Namespace: cluster.Namespace}, networkFence)
if err != nil && !errors.IsNotFound(err) {
return err
} else if errors.IsNotFound(err) {
Expand All @@ -565,7 +580,7 @@ func (c *clientCluster) unfenceAndDeleteNetworkFence(ctx context.Context, node c
}

err = wait.PollUntilContextTimeout(ctx, 2*time.Second, 60*time.Second, true, func(ctx context.Context) (bool, error) {
err = c.client.Get(ctx, types.NamespacedName{Name: node.Name, Namespace: cluster.Namespace}, networkFence)
err = c.client.Get(ctx, types.NamespacedName{Name: fenceResourceName(node.Name, driver), Namespace: cluster.Namespace}, networkFence)
if err != nil && !errors.IsNotFound(err) {
return false, err
}
Expand All @@ -575,7 +590,7 @@ func (c *clientCluster) unfenceAndDeleteNetworkFence(ctx context.Context, node c
return false, err
}

logger.Infof("successfully unfenced network fence cr %q, proceeding with deletion", networkFence.Name)
logger.Infof("successfully unfenced %q network fence cr %q, proceeding with deletion", driver, networkFence.Name)

err = c.client.Delete(ctx, networkFence)
if err == nil || errors.IsNotFound(err) {
Expand All @@ -585,7 +600,7 @@ func (c *clientCluster) unfenceAndDeleteNetworkFence(ctx context.Context, node c
return false, nil
})
if err != nil {
return pkgerror.Wrapf(err, "timeout out deleting the network fence CR %s", networkFence.Name)
return pkgerror.Wrapf(err, "timeout out deleting the %s network fence CR %s", driver, networkFence.Name)
}

return nil
Expand Down
87 changes: 75 additions & 12 deletions pkg/operator/ceph/cluster/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"os"
"strings"
"testing"

"github.com/coreos/pkg/capnslog"
Expand All @@ -46,7 +47,7 @@ import (
func getFakeClient(obj ...runtime.Object) client.Client {
// Register operator types with the runtime scheme.
scheme := scheme.Scheme
scheme.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephCluster{}, &addonsv1alpha1.NetworkFence{})
scheme.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephCluster{}, &addonsv1alpha1.NetworkFence{}, &addonsv1alpha1.NetworkFenceList{})
client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(obj...).Build()
return client
}
Expand Down Expand Up @@ -175,8 +176,10 @@ func TestHandleNodeFailure(t *testing.T) {
switch {
case command == "rbd" && args[0] == "status":
return `{"watchers":[{"address":"192.168.39.137:0/3762982934","client":4307,"cookie":18446462598732840961}]}`, nil
case command == "ceph" && args[0] == "status":
return `{"entity":[{"addr": [{"addr": "10.244.0.12:0", "nonce":3247243972}]}], "client_metadata":{"root":"/"}}`, nil
case command == "ceph" && args[0] == "tell":
return `{"watchers":[{"id":5201,"entity":[{"addr": [{"addr": "10.244.0.12:0", "nonce":3247243972}]}]]}`, nil
return `[{"entity":{"addr":{"addr":"10.244.0.12:0","nonce":3247243972}}, "client_metadata":{"root":"/"}}]`, nil

}
return "", errors.Errorf("unexpected rbd/ceph command %q", args)
Expand Down Expand Up @@ -208,7 +211,7 @@ func TestHandleNodeFailure(t *testing.T) {
},
}

pv := &corev1.PersistentVolume{
rbdPV := &corev1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: "pvc-58469d41-f6c0-4720-b23a-0a0826b841ca",
Annotations: map[string]string{
Expand All @@ -231,6 +234,29 @@ func TestHandleNodeFailure(t *testing.T) {
},
}

cephfsPV := &corev1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: "pvc-58469d41-f6c0-4720-b23a-0a0826b842ca",
Annotations: map[string]string{
"pv.kubernetes.io/provisioned-by": fmt.Sprintf("%s.cephfs.csi.ceph.com", ns),
"volume.kubernetes.io/provisioner-deletion-secret-name": "rook-csi-cephfs-provisioner",
"volume.kubernetes.io/provisioner-deletion-secret-namespace": ns,
},
},
Spec: corev1.PersistentVolumeSpec{
PersistentVolumeSource: corev1.PersistentVolumeSource{
CSI: &corev1.CSIPersistentVolumeSource{
Driver: fmt.Sprintf("%s.cephfs.csi.ceph.com", ns),
VolumeHandle: "0001-0009-rook-ceph-0000000000000002-24862838-240d-4215-9183-abfc0e9e4001",
VolumeAttributes: map[string]string{
"fsName": "myfs",
"subvolumeName": "csi-vol-58469d41-f6c0-4720-b23a-0a0826b842ca",
},
},
},
},
}

staticRbdPV := &corev1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: "pvc-58469d41-f6c0-4720-b23a-0a0826b841cb",
Expand Down Expand Up @@ -263,9 +289,11 @@ func TestHandleNodeFailure(t *testing.T) {
Spec: corev1.PersistentVolumeSpec{
PersistentVolumeSource: corev1.PersistentVolumeSource{
CSI: &corev1.CSIPersistentVolumeSource{
Driver: fmt.Sprintf("%s.cephfs.csi.ceph.com", ns),
VolumeHandle: "0001-0009-rook-ceph-0000000000000002-24862838-240d-4215-9183-abfc0e9e4001",
VolumeAttributes: map[string]string{},
Driver: fmt.Sprintf("%s.cephfs.csi.ceph.com", ns),
VolumeHandle: "0001-0009-rook-ceph-0000000000000002-24862838-240d-4215-9183-abfc0e9e4001",
VolumeAttributes: map[string]string{
"staticVolume": "true",
},
},
},
},
Expand Down Expand Up @@ -311,7 +339,10 @@ func TestHandleNodeFailure(t *testing.T) {
_, err := c.context.Clientset.CoreV1().Secrets(ns).Create(ctx, secret, metav1.CreateOptions{})
assert.NoError(t, err)

_, err = c.context.Clientset.CoreV1().PersistentVolumes().Create(ctx, pv, metav1.CreateOptions{})
_, err = c.context.Clientset.CoreV1().PersistentVolumes().Create(ctx, rbdPV, metav1.CreateOptions{})
assert.NoError(t, err)

_, err = c.context.Clientset.CoreV1().PersistentVolumes().Create(ctx, cephfsPV, metav1.CreateOptions{})
assert.NoError(t, err)

_, err = c.context.ApiExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Create(ctx, &v1.CustomResourceDefinition{ObjectMeta: metav1.ObjectMeta{Name: "networkfences.csiaddons.openshift.io"}}, metav1.CreateOptions{})
Expand All @@ -321,10 +352,33 @@ func TestHandleNodeFailure(t *testing.T) {
err = c.handleNodeFailure(ctx, cephCluster, node)
assert.NoError(t, err)

networkFence := &addonsv1alpha1.NetworkFence{}
err = c.client.Get(ctx, types.NamespacedName{Name: node.Name, Namespace: cephCluster.Namespace}, networkFence)
networkFenceRbd := &addonsv1alpha1.NetworkFence{}
err = c.client.Get(ctx, types.NamespacedName{Name: fenceResourceName(node.Name, rbdDriver), Namespace: cephCluster.Namespace}, networkFenceRbd)
assert.NoError(t, err)

networkFenceCephFs := &addonsv1alpha1.NetworkFence{}
err = c.client.Get(ctx, types.NamespacedName{Name: fenceResourceName(node.Name, cephfsDriver), Namespace: cephCluster.Namespace}, networkFenceCephFs)
assert.NoError(t, err)

networkFences := &addonsv1alpha1.NetworkFenceList{}
err = c.client.List(ctx, networkFences)
assert.NoError(t, err)
var rbdCount, cephFsCount int

for _, fence := range networkFences.Items {
// Check if the resource is in the desired namespace
if fence.Namespace == cephCluster.Namespace {
if strings.Contains(fence.Name, rbdDriver) {
rbdCount++
} else if strings.Contains(fence.Name, cephfsDriver) {
cephFsCount++
}
}
}

assert.Equal(t, 1, rbdCount)
assert.Equal(t, 1, cephFsCount)

// For static rbd pv
_, err = c.context.Clientset.CoreV1().PersistentVolumes().Create(ctx, staticRbdPV, metav1.CreateOptions{})
assert.NoError(t, err)
Expand All @@ -334,7 +388,7 @@ func TestHandleNodeFailure(t *testing.T) {

rbdVolumesInUse, _ := getCephVolumesInUse(cephCluster, node.Status.VolumesInUse)
rbdPVList := listRBDPV(pvList, cephCluster, rbdVolumesInUse)
assert.Equal(t, len(rbdPVList), 1) // it will be equal to one since we have one pv provisioned by csi named `PV`
assert.Equal(t, len(rbdPVList), 1) // it will be equal to one since we have one pv provisioned by csi named `rbdPV`

err = c.handleNodeFailure(ctx, cephCluster, node)
assert.NoError(t, err)
Expand All @@ -352,7 +406,7 @@ func TestHandleNodeFailure(t *testing.T) {
cephFSVolumesInUseMap[vol] = struct{}{}
}
cephFSPVList := listRWOCephFSPV(pvList, cephCluster, cephFSVolumesInUseMap)
assert.Equal(t, len(cephFSPVList), 0)
assert.Equal(t, len(cephFSPVList), 1) // it will be equal to one since we have one pv provisioned by csi named `cephfsPV`

err = c.handleNodeFailure(ctx, cephCluster, node)
assert.NoError(t, err)
Expand All @@ -377,8 +431,12 @@ func TestHandleNodeFailure(t *testing.T) {
err = c.handleNodeFailure(ctx, cephCluster, node)
assert.NoError(t, err)

err = c.client.Get(ctx, types.NamespacedName{Name: node.Name, Namespace: cephCluster.Namespace}, networkFence)
err = c.client.Get(ctx, types.NamespacedName{Name: fenceResourceName(node.Name, rbdDriver), Namespace: cephCluster.Namespace}, networkFenceRbd)
assert.Error(t, err, kerrors.IsNotFound(err))

err = c.client.Get(ctx, types.NamespacedName{Name: fenceResourceName(node.Name, cephfsDriver), Namespace: cephCluster.Namespace}, networkFenceCephFs)
assert.Error(t, err, kerrors.IsNotFound(err))

}

func TestGetCephVolumesInUse(t *testing.T) {
Expand Down Expand Up @@ -428,6 +486,11 @@ func TestConcatenateWatcherIp(t *testing.T) {
assert.Equal(t, WatcherIP, "192.168.39.137/32")
}

func TestFenceResourceName(t *testing.T) {
FenceName := fenceResourceName("fakenode", "rbd")
assert.Equal(t, FenceName, "fakenode-rbd")
}

func TestOnDeviceCMUpdate(t *testing.T) {
// Set DEBUG logging
capnslog.SetGlobalLogLevel(capnslog.DEBUG)
Expand Down

0 comments on commit 73427c9

Please sign in to comment.