Skip to content

Commit

Permalink
Merge pull request #13806 from subhamkrai/remove-cephfs-fencing-code
Browse files Browse the repository at this point in the history
csi: remove cephFs networkFence code temporarily
  • Loading branch information
travisn committed Feb 22, 2024
2 parents d36f7b5 + 28182e9 commit febef52
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 268 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/banzaicloud/k8s-objectmatcher v1.8.0
github.com/ceph/go-ceph v0.24.0
github.com/coreos/pkg v0.0.0-20230601102743-20bbbf26f4d8
github.com/csi-addons/kubernetes-csi-addons v0.7.0
github.com/csi-addons/kubernetes-csi-addons v0.8.0
github.com/gemalto/kmip-go v0.0.10
github.com/go-ini/ini v1.67.0
github.com/google/go-cmp v0.6.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsr
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/csi-addons/kubernetes-csi-addons v0.7.0 h1:Cqon7e9Wnil8gq4/M80uilMJcTv7LypdKJoOVhEb4oM=
github.com/csi-addons/kubernetes-csi-addons v0.7.0/go.mod h1:7thZgf/0YB59yWWv8jb0l6mv1bnPEFh5Kxj4k66pnPs=
github.com/csi-addons/kubernetes-csi-addons v0.8.0 h1:zvYGp4DM6KdQzEX3dQSYKykqJdLZlxpVBJjtpbaqFjs=
github.com/csi-addons/kubernetes-csi-addons v0.8.0/go.mod h1:dvinzoiXlqdOGDpKkYx8Jxl507BzVEEEO+SI0OmBaRI=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
Expand Down Expand Up @@ -613,7 +613,7 @@ github.com/onsi/ginkgo/v2 v2.3.0/go.mod h1:Eew0uilEqZmIEZr8JrvYlvOM7Rr6xzTmMV8Ay
github.com/onsi/ginkgo/v2 v2.4.0/go.mod h1:iHkDK1fKGcBoEHT5W7YBq4RFWaQulw+caOMkAt4OrFo=
github.com/onsi/ginkgo/v2 v2.5.0/go.mod h1:Luc4sArBICYCS8THh8v3i3i5CuSZO+RaQRaJoeNwomw=
github.com/onsi/ginkgo/v2 v2.6.0/go.mod h1:63DOGlLAH8+REH8jUGdL3YpCpu7JODesutUjdENfUAc=
github.com/onsi/ginkgo/v2 v2.11.0 h1:WgqUCUt/lT6yXoQ8Wef0fsNn5cAuMK7+KT9UFRz2tcU=
github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4=
github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
Expand All @@ -627,7 +627,7 @@ github.com/onsi/gomega v1.22.1/go.mod h1:x6n7VNe4hw0vkyYUM4mjIXx3JbLiPaBPNgB7PRQ
github.com/onsi/gomega v1.23.0/go.mod h1:Z/NWtiqwBrwUt4/2loMmHL63EDLnYHmVbuBpDr2vQAg=
github.com/onsi/gomega v1.24.0/go.mod h1:Z/NWtiqwBrwUt4/2loMmHL63EDLnYHmVbuBpDr2vQAg=
github.com/onsi/gomega v1.24.1/go.mod h1:3AOiACssS3/MajrniINInwbfOOtfZvplPzuRSmvt1jM=
github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI=
github.com/onsi/gomega v1.30.0 h1:hvMK7xYz4D3HapigLTeGdId/NcfQx1VHMJc60ew99+8=
github.com/openshift/api v0.0.0-20231204192004-bfea29e5e6c4 h1:5RyeLvTSZEn/fDQA6e6+qIvFPssWjreY8pbwfg4/EEQ=
github.com/openshift/api v0.0.0-20231204192004-bfea29e5e6c4/go.mod h1:qNtV0315F+f8ld52TLtPvrfivZpdimOzTi3kn9IVbtU=
github.com/pborman/uuid v1.2.0 h1:J7Q5mO4ysT1dv8hyrUGHb9+ooztCXu1D8MY8DZYsu3g=
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/onsi/ginkgo/v2 v2.11.0 // indirect
github.com/onsi/gomega v1.27.10 // indirect
github.com/onsi/ginkgo/v2 v2.13.0 // indirect
github.com/onsi/gomega v1.30.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/ryanuber/go-glob v1.0.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
Expand Down
8 changes: 4 additions & 4 deletions pkg/apis/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,8 @@ github.com/onsi/ginkgo/v2 v2.3.0/go.mod h1:Eew0uilEqZmIEZr8JrvYlvOM7Rr6xzTmMV8Ay
github.com/onsi/ginkgo/v2 v2.4.0/go.mod h1:iHkDK1fKGcBoEHT5W7YBq4RFWaQulw+caOMkAt4OrFo=
github.com/onsi/ginkgo/v2 v2.5.0/go.mod h1:Luc4sArBICYCS8THh8v3i3i5CuSZO+RaQRaJoeNwomw=
github.com/onsi/ginkgo/v2 v2.6.0/go.mod h1:63DOGlLAH8+REH8jUGdL3YpCpu7JODesutUjdENfUAc=
github.com/onsi/ginkgo/v2 v2.11.0 h1:WgqUCUt/lT6yXoQ8Wef0fsNn5cAuMK7+KT9UFRz2tcU=
github.com/onsi/ginkgo/v2 v2.11.0/go.mod h1:ZhrRA5XmEE3x3rhlzamx/JJvujdZoJ2uvgI7kR0iZvM=
github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4=
github.com/onsi/ginkgo/v2 v2.13.0/go.mod h1:TE309ZR8s5FsKKpuB1YAQYBzCaAfUgatB/xlT/ETL/o=
github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
Expand All @@ -420,8 +420,8 @@ github.com/onsi/gomega v1.22.1/go.mod h1:x6n7VNe4hw0vkyYUM4mjIXx3JbLiPaBPNgB7PRQ
github.com/onsi/gomega v1.23.0/go.mod h1:Z/NWtiqwBrwUt4/2loMmHL63EDLnYHmVbuBpDr2vQAg=
github.com/onsi/gomega v1.24.0/go.mod h1:Z/NWtiqwBrwUt4/2loMmHL63EDLnYHmVbuBpDr2vQAg=
github.com/onsi/gomega v1.24.1/go.mod h1:3AOiACssS3/MajrniINInwbfOOtfZvplPzuRSmvt1jM=
github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI=
github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M=
github.com/onsi/gomega v1.30.0 h1:hvMK7xYz4D3HapigLTeGdId/NcfQx1VHMJc60ew99+8=
github.com/onsi/gomega v1.30.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ=
github.com/openshift/api v0.0.0-20231204192004-bfea29e5e6c4 h1:5RyeLvTSZEn/fDQA6e6+qIvFPssWjreY8pbwfg4/EEQ=
github.com/openshift/api v0.0.0-20231204192004-bfea29e5e6c4/go.mod h1:qNtV0315F+f8ld52TLtPvrfivZpdimOzTi3kn9IVbtU=
github.com/pborman/uuid v1.2.0 h1:J7Q5mO4ysT1dv8hyrUGHb9+ooztCXu1D8MY8DZYsu3g=
Expand Down
175 changes: 10 additions & 165 deletions pkg/operator/ceph/cluster/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ var (

// drivers that supports fencing, used in naming networkFence object
const (
rbdDriver = "rbd"
cephfsDriver = "cephfs"
rbdDriver = "rbd"
)

func newClientCluster(client client.Client, namespace string, context *clusterd.Context) *clientCluster {
Expand Down Expand Up @@ -201,11 +200,6 @@ func (c *clientCluster) handleNodeFailure(ctx context.Context, cluster *cephv1.C
return pkgerror.Wrapf(err, "failed to delete rbd network fence for node %q.", node.Name)
}

err = c.unfenceAndDeleteNetworkFence(ctx, *node, cluster, cephfsDriver)
if err != nil {
return pkgerror.Wrapf(err, "failed to delete cephFS network fence for node %q.", node.Name)
}

return nil
}

Expand All @@ -217,9 +211,9 @@ func (c *clientCluster) fenceNode(ctx context.Context, node *corev1.Node, cluste
}
logger.Debugf("volumesInuse %s", volumesInuse)

rbdVolumesInUse, cephFSVolumeInUse := getCephVolumesInUse(cluster, volumesInuse)
if len(rbdVolumesInUse) == 0 && len(cephFSVolumeInUse) == 0 {
logger.Debugf("no rbd or cephFS volumes in use for out of service node %q", node.Name)
rbdVolumesInUse := getCephVolumesInUse(cluster, volumesInuse)
if len(rbdVolumesInUse) == 0 {
logger.Debugf("no rbd volumes in use for out of service node %q", node.Name)
return nil
}

Expand Down Expand Up @@ -258,46 +252,11 @@ func (c *clientCluster) fenceNode(ctx context.Context, node *corev1.Node, cluste
}
}

if len(cephFSVolumeInUse) != 0 {
cephFSVolumeInUseMap := make(map[string]struct{})
for _, vol := range cephFSVolumeInUse {
cephFSVolumeInUseMap[vol] = struct{}{}
}
cephFSPVList := listRWOCephFSPV(listPVs, cluster, cephFSVolumeInUseMap)
if len(cephFSPVList) == 0 {
logger.Debug("No cephFS PVs found on the node")
return nil
}
logger.Infof("node %q require fencing, found cephFS volumes in use", node.Name)
clusterInfo, _, _, err := opcontroller.LoadClusterInfo(c.context, ctx, cluster.Namespace, &cluster.Spec)
if err != nil {
return pkgerror.Wrapf(err, "Failed to load cluster info.")
}

for i := range cephFSPVList {
err = c.fenceCephFSVolume(ctx, node, cluster, clusterInfo, cephFSPVList[i])
// We only need to create the network fence for any one of cephFS pv.
if err == nil {
break
}

// continue to fence next rbd volume if active client not found
if stderrors.Is(err, errActiveClientNotFound) {
continue
}
if i == len(cephFSPVList)-1 {
return pkgerror.Wrapf(err, "failed to fence cephFS volumes")
}
logger.Errorf("failed to fence cephFS volumes %q, trying next cephFS volume", cephFSPVList[i].Name)
}

}

return nil
}

func getCephVolumesInUse(cluster *cephv1.CephCluster, volumesInUse []corev1.UniqueVolumeName) ([]string, []string) {
var rbdVolumesInUse, cephFSVolumeInUse []string
func getCephVolumesInUse(cluster *cephv1.CephCluster, volumesInUse []corev1.UniqueVolumeName) []string {
var rbdVolumesInUse []string

for _, volume := range volumesInUse {
splitVolumeInUseBased := trimeVolumeInUse(volume)
Expand All @@ -306,13 +265,9 @@ func getCephVolumesInUse(cluster *cephv1.CephCluster, volumesInUse []corev1.Uniq
if len(splitVolumeInUseBased) == 2 && splitVolumeInUseBased[0] == fmt.Sprintf("%s.rbd.csi.ceph.com", cluster.Namespace) {
rbdVolumesInUse = append(rbdVolumesInUse, splitVolumeInUseBased[1])
}

if len(splitVolumeInUseBased) == 2 && splitVolumeInUseBased[0] == fmt.Sprintf("%s.cephfs.csi.ceph.com", cluster.Namespace) {
cephFSVolumeInUse = append(cephFSVolumeInUse, splitVolumeInUseBased[1])
}
}

return rbdVolumesInUse, cephFSVolumeInUse
return rbdVolumesInUse
}

func trimeVolumeInUse(volume corev1.UniqueVolumeName) []string {
Expand Down Expand Up @@ -351,36 +306,6 @@ func listRBDPV(listPVs *corev1.PersistentVolumeList, cluster *cephv1.CephCluster
return listRbdPV
}

func listRWOCephFSPV(listPVs *corev1.PersistentVolumeList, cluster *cephv1.CephCluster, cephFSVolumesInUse map[string]struct{}) []corev1.PersistentVolume {
var listCephFSPV []corev1.PersistentVolume

for _, pv := range listPVs.Items {
// Skip if pv is not provisioned by CSI
if pv.Spec.CSI == nil {
logger.Debugf("pv %q is not provisioned by CSI", pv.Name)
continue
}

if pv.Spec.CSI.Driver == fmt.Sprintf("%s.cephfs.csi.ceph.com", cluster.Namespace) {
// Ignore PVs that support multinode access (RWX, ROX), since they can be mounted on multiple nodes.
if pvSupportsMultiNodeAccess(pv.Spec.AccessModes) {
continue
}

if pv.Spec.CSI.VolumeAttributes["staticVolume"] == "true" {
logger.Debugf("skipping, static pv %q", pv.Name)
continue
}
// Check if the volume is in use
if _, exists := cephFSVolumesInUse[pv.Spec.CSI.VolumeHandle]; exists {
listCephFSPV = append(listCephFSPV, pv)
}
}

}
return listCephFSPV
}

// pvSupportsMultiNodeAccess returns true if the PV access modes contain ReadWriteMany or ReadOnlyMany.
func pvSupportsMultiNodeAccess(accessModes []corev1.PersistentVolumeAccessMode) bool {
for _, accessMode := range accessModes {
Expand Down Expand Up @@ -423,85 +348,6 @@ func (c *clientCluster) fenceRbdImage(
return nil
}

func (c *clientCluster) fenceCephFSVolume(
ctx context.Context, node *corev1.Node, cluster *cephv1.CephCluster,
clusterInfo *cephclient.ClusterInfo, cephFSPV corev1.PersistentVolume) error {

logger.Infof("fencing cephfs volume %q on node %q", cephFSPV.Name, node.Name)

status, err := cephclient.StatusWithUser(c.context, clusterInfo)
if err != nil {
return pkgerror.Wrapf(err, "failed to get ceph status for check active mds")
}

var activeMDS string
for _, fsRank := range status.Fsmap.ByRank {
if fsRank.Status == "up:active" {
activeMDS = fsRank.Name
}
}

args := []string{"tell", fmt.Sprintf("mds.%s", activeMDS), "client", "ls", "--format", "json"}
cmd := cephclient.NewCephCommand(c.context, clusterInfo, args)
cmd.JsonOutput = true

buf, err := cmd.Run()
if err != nil {
return fmt.Errorf("failed to list watchers for cephfs pool/subvoumeName %s/%s. %v", cephFSPV.Spec.CSI.VolumeAttributes["pool"], cephFSPV.Spec.CSI.VolumeAttributes["subvolumeName"], err)
}
ips, err := cephFSMDSClientMarshal(buf, cephFSPV)
if err != nil {
return pkgerror.Wrapf(err, "failed to unmarshal cephfs mds output")
}

if len(ips) == 0 {
logger.Infof("no active mds clients found for cephfs volume %q", cephFSPV.Name)
return errActiveClientNotFound
}

err = c.createNetworkFence(ctx, cephFSPV, node, cluster, ips, cephfsDriver)
if err != nil {
return pkgerror.Wrapf(err, "failed to create network fence for node %q", node.Name)
}

return nil
}

func cephFSMDSClientMarshal(output []byte, cephFSPV corev1.PersistentVolume) ([]string, error) {
type entity struct {
Addr struct {
Addr string `json:"addr"`
Nonce int `json:"nonce"`
} `json:"addr"`
}

type clientMetadata struct {
Root string `json:"root"`
}

type cephFSData struct {
Entity entity `json:"entity"`
ClientMetadata clientMetadata `json:"client_metadata"`
}

var data []cephFSData
err := json.Unmarshal([]byte(output), &data)
if err != nil {
return []string{}, pkgerror.Wrapf(err, "failed to unmarshal cephFS data output")
}

watcherIPlist := []string{}
for _, d := range data {
if cephFSPV.Spec.CSI.VolumeAttributes["subvolumePath"] == d.ClientMetadata.Root {
logger.Infof("cephfs mds client ips to fence %v", d.Entity.Addr)
watcherIP := concatenateWatcherIp(d.Entity.Addr.Addr)
watcherIPlist = append(watcherIPlist, watcherIP)
}
}

return watcherIPlist, nil
}

func rbdStatusUnMarshal(output []byte) ([]string, error) {
type rbdStatus struct {
Watchers []struct {
Expand All @@ -524,8 +370,7 @@ func rbdStatusUnMarshal(output []byte) ([]string, error) {
}

func concatenateWatcherIp(address string) string {
// address is in format `10.63.0.5:0/1254753579` for rbd and
// in the format '10.244.0.12:0' for cephfs
// address is in format `10.63.0.5:0/1254753579` for rbd
// split with separation ':0' to remove nounce and concatenating `/32` to define a network with only one IP address
watcherIP := strings.Split(address, ":0")[0] + "/32"
return watcherIP
Expand Down Expand Up @@ -604,8 +449,8 @@ func (c *clientCluster) unfenceAndDeleteNetworkFence(ctx context.Context, node c
return false, err
}

if networkFence.Spec.FenceState != addonsv1alpha1.Unfenced {
logger.Infof("waiting for network fence CR %s to get in %s state before deletion", networkFence.Name, addonsv1alpha1.Unfenced)
if networkFence.Status.Message != addonsv1alpha1.UnFenceOperationSuccessfulMessage {
logger.Infof("waiting for network fence CR %q status to get result %q", networkFence.Name, addonsv1alpha1.UnFenceOperationSuccessfulMessage)
return false, err
}

Expand Down

0 comments on commit febef52

Please sign in to comment.