Skip to content

Commit

Permalink
osd: skip upgrade if migration is pending
Browse files Browse the repository at this point in the history
Currently we allow upgrade of other OSDs while migration
of OSD, due to change in backing store, is pending. This
will not work if the updated OSD ceph version does not support
the currently applied backing store. So rook will skip any
OSD upgrade if the OSD migration is pending.

Signed-off-by: sp98 <sapillai@redhat.com>
  • Loading branch information
sp98 committed May 21, 2024
1 parent 07768b1 commit 3d76974
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 206 deletions.
68 changes: 25 additions & 43 deletions pkg/operator/ceph/cluster/osd/osd.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,10 @@ func (c *Cluster) Start() error {
}
logger.Infof("wait timeout for healthy OSDs during upgrade or restart is %q", c.clusterInfo.OsdUpgradeTimeout)

// identify OSDs that need migration to a new backend store
err := c.replaceOSDForNewStore()
// replace OSDs for a new backing store
osdsToBeReplaced, err := c.replaceOSDForNewStore()
if err != nil {
return errors.Wrapf(err, "failed to replace an OSD that needs migration to new backend store in namespace %q", namespace)
return errors.Wrapf(err, "failed to replace OSD for new backing store %q in namespace %q", c.spec.Storage.Store.Type, namespace)
}

osdsToSkipReconcile, err := controller.GetDaemonsToSkipReconcile(c.clusterInfo.Context, c.context, c.clusterInfo.Namespace, OsdIdLabelKey, AppName)
Expand All @@ -233,6 +233,11 @@ func (c *Cluster) Start() error {
return errors.Wrapf(err, "failed to get information about currently-running OSD Deployments in namespace %q", namespace)
}

// OSDs that are to be replaced should not be upgraded. So remove them from the `updateQueue`
if len(osdsToBeReplaced) > 0 {
updateQueue.Remove(osdsToBeReplaced.getOSDIds())
}

logger.Debugf("%d of %d OSD Deployments need update", updateQueue.Len(), deployments.Len())
updateConfig := c.newUpdateConfig(config, updateQueue, deployments, osdsToSkipReconcile)

Expand Down Expand Up @@ -284,27 +289,23 @@ func (c *Cluster) Start() error {
}

if c.spec.Storage.Store.UpdateStore == OSDStoreUpdateConfirmation {
if c.replaceOSD != nil {
delOpts := &k8sutil.DeleteOptions{MustDelete: true, WaitOptions: k8sutil.WaitOptions{Wait: true}}
err := k8sutil.DeleteConfigMap(c.clusterInfo.Context, c.context.Clientset, OSDReplaceConfigName, namespace, delOpts)
if err != nil {
delOpts := &k8sutil.DeleteOptions{MustDelete: true, WaitOptions: k8sutil.WaitOptions{Wait: true}}
err := k8sutil.DeleteConfigMap(c.clusterInfo.Context, c.context.Clientset, OSDReplaceConfigName, namespace, delOpts)
if err != nil {
if kerrors.IsNotFound(err) {
logger.Debugf("config map %q not found. Ignoring since object must be deleted.", OSDReplaceConfigName)
} else {
return errors.Wrapf(err, "failed to delete the %q configmap", OSDReplaceConfigName)
}
}

// wait for the pgs to be healthy before attempting to migrate the next OSD
_, err := c.waitForHealthyPGs()
if err != nil {
return errors.Wrapf(err, "failed to wait for pgs to be healhty")
}

// reconcile if migration of one or more OSD is pending.
osdsToReplace, err := c.getOSDWithNonMatchingStore()
if err != nil {
return errors.Wrapf(err, "failed to check if any OSD migration is pending")
}

if len(osdsToReplace) != 0 {
if len(osdsToBeReplaced) > 0 {
// wait for the pgs to be healthy before attempting to migrate the next OSD
_, err := c.waitForHealthyPGs()
if err != nil {
return errors.Wrapf(err, "failed to wait for pgs to be healhty")
}
// return with error to reconcile the operator since there are OSDs that are pending migration
return errors.New("reconcile operator to replace OSDs that are pending migration")
}
}
Expand Down Expand Up @@ -825,27 +826,10 @@ func (c *Cluster) applyUpgradeOSDFunctionality() {
}
}

// replaceOSDForNewStore deletes an existing OSD deployment that does not match the expected OSD backend store provided in the storage spec
func (c *Cluster) replaceOSDForNewStore() error {
if c.spec.Storage.Store.UpdateStore != OSDStoreUpdateConfirmation {
logger.Debugf("no OSD migration to a new backend store is requested")
return nil
}

osdToReplace, err := c.getOSDReplaceInfo()
if err != nil {
return errors.Wrapf(err, "failed to get OSD replace info")
}

if osdToReplace == nil {
logger.Info("no osd to replace")
return nil
}

logger.Infof("starting migration of the OSD.%d", osdToReplace.ID)

// deleteOSDDeployment deletes an existing OSD deployment and saves the information in the configmap
func (c *Cluster) deleteOSDDeployment(osdID int) error {
// Delete the OSD deployment
deploymentName := fmt.Sprintf("rook-ceph-osd-%d", osdToReplace.ID)
deploymentName := fmt.Sprintf("rook-ceph-osd-%d", osdID)
logger.Infof("removing the OSD deployment %q", deploymentName)
if err := k8sutil.DeleteDeployment(c.clusterInfo.Context, c.context.Clientset, c.clusterInfo.Namespace, deploymentName); err != nil {
if err != nil {
Expand All @@ -857,9 +841,7 @@ func (c *Cluster) replaceOSDForNewStore() error {
}
}

c.replaceOSD = osdToReplace

return osdToReplace.saveAsConfig(c.context, c.clusterInfo)
return c.replaceOSD.saveAsConfig(c.context, c.clusterInfo)
}

func (c *Cluster) waitForHealthyPGs() (bool, error) {
Expand Down
17 changes: 9 additions & 8 deletions pkg/operator/ceph/cluster/osd/osd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package osd
import (
"context"
"encoding/json"
"fmt"
"testing"

"github.com/pkg/errors"
Expand Down Expand Up @@ -728,7 +727,7 @@ func TestReplaceOSDForNewStore(t *testing.T) {
},
}
c := New(context, clusterInfo, spec, "myversion")
err := c.replaceOSDForNewStore()
_, err := c.replaceOSDForNewStore()
assert.NoError(t, err)
assert.Nil(t, c.replaceOSD)
})
Expand All @@ -743,7 +742,7 @@ func TestReplaceOSDForNewStore(t *testing.T) {
},
}
c := New(context, clusterInfo, spec, "myversion")
err := c.replaceOSDForNewStore()
_, err := c.replaceOSDForNewStore()
assert.NoError(t, err)
assert.Nil(t, c.replaceOSD)
})
Expand All @@ -761,8 +760,9 @@ func TestReplaceOSDForNewStore(t *testing.T) {
d := getDummyDeploymentOnNode(clientset, c, "node2", 0)
d.Labels[osdStore] = "bluestore-rdr"
createDeploymentOrPanic(clientset, d)
err := c.replaceOSDForNewStore()
osdsToBeReplaced, err := c.replaceOSDForNewStore()
assert.NoError(t, err)
assert.Equal(t, 0, len(osdsToBeReplaced))
assert.Nil(t, c.replaceOSD)
})

Expand All @@ -779,7 +779,7 @@ func TestReplaceOSDForNewStore(t *testing.T) {
// create osd deployment with `bluestore` backend store
d := getDummyDeploymentOnNode(clientset, c, "node2", 1)
createDeploymentOrPanic(clientset, d)
err := c.replaceOSDForNewStore()
_, err := c.replaceOSDForNewStore()
assert.NoError(t, err)
assert.NotNil(t, c.replaceOSD)
assert.Equal(t, 1, c.replaceOSD.ID)
Expand Down Expand Up @@ -816,9 +816,9 @@ func TestReplaceOSDForNewStore(t *testing.T) {
c := New(context, clusterInfo, spec, "myversion")
d := getDummyDeploymentOnPVC(clientset, c, "pvc1", 2)
createDeploymentOrPanic(clientset, d)
err := c.replaceOSDForNewStore()
osdsToBeReplaced, err := c.replaceOSDForNewStore()
assert.NoError(t, err)
fmt.Printf("%+v", c.replaceOSD)
assert.Equal(t, 1, len(osdsToBeReplaced))
assert.NotNil(t, c.replaceOSD)
assert.Equal(t, 2, c.replaceOSD.ID)
assert.Equal(t, "pvc1", c.replaceOSD.Path)
Expand Down Expand Up @@ -862,8 +862,9 @@ func TestReplaceOSDForNewStore(t *testing.T) {
}
context.Executor = executor
c := New(context, clusterInfo, spec, "myversion")
err := c.replaceOSDForNewStore()
osdsToBeReplaced, err := c.replaceOSDForNewStore()
assert.NoError(t, err)
assert.Equal(t, 0, len(osdsToBeReplaced))
assert.Nil(t, c.replaceOSD)
})
}
Expand Down
74 changes: 46 additions & 28 deletions pkg/operator/ceph/cluster/osd/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package osd

import (
"encoding/json"
"fmt"

"github.com/pkg/errors"
"github.com/rook/rook/pkg/clusterd"
Expand All @@ -43,6 +42,16 @@ type OSDReplaceInfo struct {
Node string `json:"node"`
}

type OSDReplaceInfoList []OSDReplaceInfo

func (o *OSDReplaceInfoList) getOSDIds() []int {
osdIDs := []int{}
for _, osd := range *o {
osdIDs = append(osdIDs, osd.ID)
}
return osdIDs
}

func (o *OSDReplaceInfo) saveAsConfig(context *clusterd.Context, clusterInfo *cephclient.ClusterInfo) error {
configStr, err := o.string()
if err != nil {
Expand Down Expand Up @@ -81,58 +90,67 @@ func (o *OSDReplaceInfo) string() (string, error) {
return string(configInBytes), nil
}

// getOSDReplaceInfo returns an existing OSD that needs to be replaced for a new backend store
func (c *Cluster) getOSDReplaceInfo() (*OSDReplaceInfo, error) {
osdReplaceInfo, err := GetOSDReplaceConfigMap(c.context, c.clusterInfo)
func (c *Cluster) replaceOSDForNewStore() (OSDReplaceInfoList, error) {
if c.spec.Storage.Store.UpdateStore != OSDStoreUpdateConfirmation {
logger.Info("no replacement of osds is requested")
return nil, nil
}

osdsToBeReplaced, err := c.getOSDWithNonMatchingStore()
if err != nil {
return nil, errors.Wrap(err, "failed to get any existing OSD in replace configmap")
return nil, errors.Wrapf(err, "failed to get information about the OSDs where backing store does not match the spec in namespace %q", c.clusterInfo.Namespace)
}

if osdReplaceInfo != nil {
return osdReplaceInfo, nil
if len(osdsToBeReplaced) == 0 {
logger.Info("all OSDs are using the desired backing store. No replacement is required.")
return osdsToBeReplaced, nil
}

// replace an OSD only if Pgs are healthy
pgHealthMsg, pgClean, err := cephclient.IsClusterClean(c.context, c.clusterInfo, c.spec.DisruptionManagement.PGHealthyRegex)
if err != nil {
return nil, errors.Wrapf(err, "failed to check if the pgs are clean before replacing OSDs")
}

if !pgClean {
logger.Warningf("skipping OSD replacement because pgs are not healthy. PG status: %q", pgHealthMsg)
return nil, nil
return osdsToBeReplaced, nil
}

logger.Infof("placement group status: %q", pgHealthMsg)

osdsToReplace, err := c.getOSDWithNonMatchingStore()
// Check for an existing OSDs in the configmap
osdToBeReplaced, err := GetOSDReplaceConfigMap(c.context, c.clusterInfo)
if err != nil {
return nil, errors.Wrap(err, "failed to list out OSDs with non matching backend store")
return nil, errors.Wrap(err, "failed to get any existing OSD in replace configmap")
}

if len(osdsToReplace) == 0 {
logger.Infof("all osds have already been migrated to backend store %q", c.spec.Storage.Store.Type)
return nil, nil
if osdToBeReplaced != nil {
c.replaceOSD = osdToBeReplaced
} else {
c.replaceOSD = &osdsToBeReplaced[0]
}

logger.Infof("%d osd(s) require migration to new backend store %q.", len(osdsToReplace), c.spec.Storage.Store.Type)
logger.Infof("replacing OSD.%d to the new backing store %q", c.replaceOSD.ID, c.spec.Storage.Store.Type)
err = c.deleteOSDDeployment(c.replaceOSD.ID)
if err != nil {
return nil, errors.Wrapf(err, "failed to delete OSD deployment that needs migration to new backend store in namespace %q", c.clusterInfo.Namespace)
}

return &osdsToReplace[0], nil
return osdsToBeReplaced, nil
}

// getOSDWithNonMatchingStore returns OSDs with osd-store label different from expected store in cephCluster spec
func (c *Cluster) getOSDWithNonMatchingStore() ([]OSDReplaceInfo, error) {
listOpts := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", k8sutil.AppAttr, AppName)}
deployments, err := c.context.Clientset.AppsV1().Deployments(c.clusterInfo.Namespace).List(c.clusterInfo.Context, listOpts)
func (c *Cluster) getOSDWithNonMatchingStore() (OSDReplaceInfoList, error) {
osdReplaceList := []OSDReplaceInfo{}
// get existing OSD deployments
osdDeployments, err := c.getOSDDeployments()
if err != nil {
return nil, errors.Wrap(err, "failed to query OSDs to skip reconcile")
return osdReplaceList, errors.Wrapf(err, "failed to get existing OSD deployments in namespace %q", c.clusterInfo.Namespace)
}

osdReplaceList := []OSDReplaceInfo{}
for i := range deployments.Items {
if osdStore, ok := deployments.Items[i].Labels[osdStore]; ok {
for i := range osdDeployments.Items {
if osdStore, ok := osdDeployments.Items[i].Labels[osdStore]; ok {
if osdStore != string(c.spec.Storage.Store.Type) {
osdInfo, err := c.getOSDInfo(&deployments.Items[i])
osdInfo, err := c.getOSDInfo(&osdDeployments.Items[i])
if err != nil {
return nil, errors.Wrapf(err, "failed to details about the OSD %q", deployments.Items[i].Name)
return nil, errors.Wrapf(err, "failed to details about the OSD %q", osdDeployments.Items[i].Name)
}
var path string
if osdInfo.PVCName != "" {
Expand Down
Loading

0 comments on commit 3d76974

Please sign in to comment.