Skip to content

Commit

Permalink
Merge pull request #13905 from sp98/remove-stale-cluster-peers
Browse files Browse the repository at this point in the history
core: disable mirroring on pool with "image" mode
  • Loading branch information
sp98 authored Mar 19, 2024
2 parents a536b36 + a37c476 commit 20d2461
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 31 deletions.
37 changes: 35 additions & 2 deletions pkg/daemon/ceph/client/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ type PeerToken struct {
Namespace string `json:"namespace"`
}

type MirroredImages struct {
// Images is the list of mirrored images on a pool
Images *[]Images
}

type Images struct {
// Name of the pool image
Name string
}

var (
rbdMirrorPeerCaps = []string{"mon", "profile rbd-mirror-peer", "osd", "profile rbd"}
rbdMirrorPeerKeyringID = "rbd-mirror-peer"
Expand Down Expand Up @@ -120,7 +130,7 @@ func enablePoolMirroring(context *clusterd.Context, clusterInfo *ClusterInfo, po
}

// disablePoolMirroring turns off mirroring on a pool
func disablePoolMirroring(context *clusterd.Context, clusterInfo *ClusterInfo, poolName string) error {
func DisablePoolMirroring(context *clusterd.Context, clusterInfo *ClusterInfo, poolName string) error {
logger.Infof("disabling mirroring for pool %q", poolName)

// Build command
Expand All @@ -136,7 +146,7 @@ func disablePoolMirroring(context *clusterd.Context, clusterInfo *ClusterInfo, p
return nil
}

func removeClusterPeer(context *clusterd.Context, clusterInfo *ClusterInfo, poolName, peerUUID string) error {
func RemoveClusterPeer(context *clusterd.Context, clusterInfo *ClusterInfo, poolName, peerUUID string) error {
logger.Infof("removing cluster peer with UUID %q for the pool %q", peerUUID, poolName)

// Build command
Expand Down Expand Up @@ -175,6 +185,29 @@ func GetPoolMirroringStatus(context *clusterd.Context, clusterInfo *ClusterInfo,
return &poolMirroringStatus, nil
}

// GetMirroredPoolImages returns a list of mirrored images for a given pool
func GetMirroredPoolImages(context *clusterd.Context, clusterInfo *ClusterInfo, poolName string) (*MirroredImages, error) {
logger.Debugf("retrieving mirrored images for pool %q", poolName)

// Build command
args := []string{"mirror", "pool", "status", "--verbose", poolName}
cmd := NewRBDCommand(context, clusterInfo, args)
cmd.JsonOutput = true

// Run command
buf, err := cmd.Run()
if err != nil {
return nil, errors.Wrapf(err, "failed to retrieve mirroring pool %q status", poolName)
}

var mirroredImages MirroredImages
if err := json.Unmarshal([]byte(buf), &mirroredImages); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal mirror pool status response")
}

return &mirroredImages, nil
}

// GetPoolMirroringInfo prints the pool mirroring information
func GetPoolMirroringInfo(context *clusterd.Context, clusterInfo *ClusterInfo, poolName string) (*cephv1.PoolMirroringInfo, error) {
logger.Debugf("retrieving mirroring pool %q info", poolName)
Expand Down
25 changes: 23 additions & 2 deletions pkg/daemon/ceph/client/mirror_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
var (
bootstrapPeerToken = `eyJmc2lkIjoiYzZiMDg3ZjItNzgyOS00ZGJiLWJjZmMtNTNkYzM0ZTBiMzVkIiwiY2xpZW50X2lkIjoicmJkLW1pcnJvci1wZWVyIiwia2V5IjoiQVFBV1lsWmZVQ1Q2RGhBQVBtVnAwbGtubDA5YVZWS3lyRVV1NEE9PSIsIm1vbl9ob3N0IjoiW3YyOjE5Mi4xNjguMTExLjEwOjMzMDAsdjE6MTkyLjE2OC4xMTEuMTA6Njc4OV0sW3YyOjE5Mi4xNjguMTExLjEyOjMzMDAsdjE6MTkyLjE2OC4xMTEuMTI6Njc4OV0sW3YyOjE5Mi4xNjguMTExLjExOjMzMDAsdjE6MTkyLjE2OC4xMTEuMTE6Njc4OV0ifQ==` //nolint:gosec // This is just a var name, not a real token
mirrorStatus = `{"summary":{"health":"WARNING","daemon_health":"OK","image_health":"WARNING","states":{"starting_replay":1,"replaying":1}}}`
mirrorStatusVerbose = `{"summary":{"health":"WARNING","daemon_health":"OK","image_health":"WARNING","states":{"starting_replay":1,"replaying":1}}, "images":[{"name":"test","global_id":"ebad309d-4d8f-4c6f-afe0-46e02ace26ac","state":"up+stopped","description":"local image is primary","daemon_service":{"service_id":"4339","instance_id":"4644","daemon_id":"a","hostname":"fv-az1195-222"},"last_update":"2024-03-18 04:16:54","peer_sites":[{"site_name":"4c3e1cb8-8129-43ab-8d75-abc20154fd4a","mirror_uuids":"a71727e9-63a4-4386-a44b-4cf48dc77fa8","state":"up+replaying","description":"replaying, {\"bytes_per_second\":0.0,\"bytes_per_snapshot\":0.0,\"last_snapshot_bytes\":0,\"last_snapshot_sync_seconds\":0,\"remote_snapshot_timestamp\":1710734899,\"replay_state\":\"idle\"}","last_update":"2024-03-18 04:16:36"}]}]}`
mirrorInfo = `{"mode":"image","site_name":"39074576-5884-4ef3-8a4d-8a0c5ed33031","peers":[{"uuid":"4a6983c0-3c9d-40f5-b2a9-2334a4659827","direction":"rx-tx","site_name":"ocs","mirror_uuid":"","client_name":"client.rbd-mirror-peer"}]}`
snapshotScheduleStatus = `[{"schedule_time": "14:00:00-05:00", "image": "foo"}, {"schedule_time": "08:00:00-05:00", "image": "bar"}]`
snapshotScheduleList = `[{"interval":"3d","start_time":""},{"interval":"1d","start_time":"14:00:00-05:00"}]`
Expand Down Expand Up @@ -101,6 +102,26 @@ func TestGetPoolMirroringStatus(t *testing.T) {
assert.Equal(t, "OK", poolMirrorStatus.Summary.DaemonHealth)
}

func TestGetMirroredPoolImages(t *testing.T) {
pool := "pool-test"
executor := &exectest.MockExecutor{}
executor.MockExecuteCommandWithOutput = func(command string, args ...string) (string, error) {
if args[0] == "mirror" {
assert.Equal(t, "pool", args[1])
assert.Equal(t, "status", args[2])
assert.Equal(t, "--verbose", args[3])
assert.Equal(t, pool, args[4])
return mirrorStatusVerbose, nil
}
return "", errors.New("unknown command")
}
context := &clusterd.Context{Executor: executor}

mirroredImages, err := GetMirroredPoolImages(context, AdminTestClusterInfo("mycluster"), pool)
assert.NoError(t, err)
assert.Equal(t, 1, len(*mirroredImages.Images))
}

func TestImportRBDMirrorBootstrapPeer(t *testing.T) {
pool := "pool-test"
executor := &exectest.MockExecutor{}
Expand Down Expand Up @@ -328,7 +349,7 @@ func TestDisableMirroring(t *testing.T) {
}
context := &clusterd.Context{Executor: executor}

err := disablePoolMirroring(context, AdminTestClusterInfo("mycluster"), pool)
err := DisablePoolMirroring(context, AdminTestClusterInfo("mycluster"), pool)
assert.NoError(t, err)
}

Expand All @@ -349,6 +370,6 @@ func TestRemoveClusterPeer(t *testing.T) {
}
context := &clusterd.Context{Executor: executor}

err := removeClusterPeer(context, AdminTestClusterInfo("mycluster"), pool, peerUUID)
err := RemoveClusterPeer(context, AdminTestClusterInfo("mycluster"), pool, peerUUID)
assert.NoError(t, err)
}
25 changes: 0 additions & 25 deletions pkg/daemon/ceph/client/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,32 +332,7 @@ func setCommonPoolProperties(context *clusterd.Context, clusterInfo *ClusterInfo
return errors.Wrapf(err, "failed to enable snapshot scheduling for pool %q", pool.Name)
}
}
} else {
if pool.Mirroring.Mode == "pool" {
// Remove storage cluster peers
mirrorInfo, err := GetPoolMirroringInfo(context, clusterInfo, pool.Name)
if err != nil {
return errors.Wrapf(err, "failed to get mirroring info for the pool %q", pool.Name)
}
for _, peer := range mirrorInfo.Peers {
if peer.UUID != "" {
err := removeClusterPeer(context, clusterInfo, pool.Name, peer.UUID)
if err != nil {
return errors.Wrapf(err, "failed to remove cluster peer with UUID %q for the pool %q", peer.UUID, pool.Name)
}
}
}

// Disable mirroring
err = disablePoolMirroring(context, clusterInfo, pool.Name)
if err != nil {
return errors.Wrapf(err, "failed to disable mirroring for pool %q", pool.Name)
}
} else if pool.Mirroring.Mode == "image" {
logger.Warningf("manually disable mirroring on images in the pool %q", pool.Name)
}
}

// set maxSize quota
if pool.Quotas.MaxSize != nil {
// check for format errors
Expand Down
7 changes: 7 additions & 0 deletions pkg/operator/ceph/object/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,13 @@ func TestCephObjectStoreController(t *testing.T) {
{"poolnum":9,"poolname":"my-store.rgw.buckets.data"}
]`, nil
}
if args[0] == "mirror" && args[2] == "info" {
return "{}", nil
}
if args[0] == "mirror" && args[2] == "disable" {
return "", nil
}

return "", nil
},
MockExecuteCommandWithTimeout: func(timeout time.Duration, command string, args ...string) (string, error) {
Expand Down
45 changes: 45 additions & 0 deletions pkg/operator/ceph/pool/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,11 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile

// If not mirrored there is no Status Info field to fulfil
} else {
// disable mirroring
err := r.disableMirroring(cephBlockPool)
if err != nil {
return reconcile.Result{}, *cephBlockPool, errors.Wrapf(err, "failed to disable mirroring on pool %q", cephBlockPool.Name)
}
// update ObservedGeneration in status at the end of reconcile
// Set Ready status, we are done reconciling
updateStatus(r.opManagerContext, r.client, request.NamespacedName, cephv1.ConditionReady, nil, observedGeneration)
Expand Down Expand Up @@ -497,3 +502,43 @@ func (r *ReconcileCephBlockPool) cancelMirrorMonitoring(cephBlockPool *cephv1.Ce
delete(r.blockPoolContexts, channelKey)
}
}

func (r *ReconcileCephBlockPool) disableMirroring(pool *cephv1.CephBlockPool) error {
mirrorInfo, err := cephclient.GetPoolMirroringInfo(r.context, r.clusterInfo, pool.Name)
if err != nil {
return errors.Wrapf(err, "failed to get mirroring info for the pool %q", pool.Name)
}

if mirrorInfo.Mode == "image" {
mirroredPools, err := cephclient.GetMirroredPoolImages(r.context, r.clusterInfo, pool.Name)
if err != nil {
return errors.Wrapf(err, "failed to list mirrored images for pool %q", pool.Name)
}

if len(*mirroredPools.Images) > 0 {
msg := fmt.Sprintf("there are images in the pool %q. Please manually disable mirroring for each image", pool.Name)
logger.Errorf(msg)
return errors.New(msg)
}
}

// Remove storage cluster peers
for _, peer := range mirrorInfo.Peers {
if peer.UUID != "" {
err := cephclient.RemoveClusterPeer(r.context, r.clusterInfo, pool.Name, peer.UUID)
if err != nil {
return errors.Wrapf(err, "failed to remove cluster peer with UUID %q for the pool %q", peer.UUID, pool.Name)
}
logger.Infof("successfully removed peer site %q for the pool %q", peer.UUID, pool.Name)
}
}

// Disable mirroring on pool
err = cephclient.DisablePoolMirroring(r.context, r.clusterInfo, pool.Name)
if err != nil {
return errors.Wrapf(err, "failed to disable mirroring for pool %q", pool.Name)
}
logger.Infof("successfully disabled mirroring on the pool %q", pool.Name)

return nil
}
17 changes: 15 additions & 2 deletions pkg/operator/ceph/pool/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,14 @@ func TestCreatePool(t *testing.T) {
}
}
if command == "rbd" {
assert.Equal(t, []string{"pool", "init", p.Name}, args[0:3])
if args[0] == "mirror" && args[2] == "info" {
return "{}", nil
} else if args[0] == "mirror" && args[2] == "disable" {
return "", nil
} else {
assert.Equal(t, []string{"pool", "init", p.Name}, args[0:3])
}

}
return "", nil
},
Expand Down Expand Up @@ -335,8 +342,11 @@ func TestCephBlockPoolController(t *testing.T) {
if args[0] == "config" && args[2] == "mgr" && args[3] == "mgr/prometheus/rbd_stats_pools" {
return "", nil
}

if args[0] == "mirror" && args[1] == "pool" && args[2] == "info" {
return "{}", nil
}
return "", nil

},
}
c.Executor = executor
Expand Down Expand Up @@ -426,6 +436,9 @@ func TestCephBlockPoolController(t *testing.T) {
if args[0] == "mirror" && args[1] == "pool" && args[2] == "peer" && args[3] == "bootstrap" && args[4] == "create" {
return `eyJmc2lkIjoiYzZiMDg3ZjItNzgyOS00ZGJiLWJjZmMtNTNkYzM0ZTBiMzVkIiwiY2xpZW50X2lkIjoicmJkLW1pcnJvci1wZWVyIiwia2V5IjoiQVFBV1lsWmZVQ1Q2RGhBQVBtVnAwbGtubDA5YVZWS3lyRVV1NEE9PSIsIm1vbl9ob3N0IjoiW3YyOjE5Mi4xNjguMTExLjEwOjMzMDAsdjE6MTkyLjE2OC4xMTEuMTA6Njc4OV0sW3YyOjE5Mi4xNjguMTExLjEyOjMzMDAsdjE6MTkyLjE2OC4xMTEuMTI6Njc4OV0sW3YyOjE5Mi4xNjguMTExLjExOjMzMDAsdjE6MTkyLjE2OC4xMTEuMTE6Njc4OV0ifQ==`, nil
}
if args[0] == "mirror" && args[1] == "pool" && args[2] == "info" {
return "{}", nil
}
return "", nil
},
}
Expand Down

0 comments on commit 20d2461

Please sign in to comment.