Skip to content

Commit

Permalink
ceph: improve upgrade procedure
Browse files Browse the repository at this point in the history
When a cluster is updated with a different image version, this triggers
a serialized restart of all the pods. Prior to this commit, no safety
check were performed and rook was hoping for the best outcome.

Now before doing restarting a daemon we check it can be restarted. Once
it's restarted we also check we can pursue with the rest of the
platform. For instance, with monitors we check that they are in quorum,
for OSD we check that PGs are clean and for MDS we make sure they are
 active.

Fixes: rook#2889
Signed-off-by: Sébastien Han <seb@redhat.com>
  • Loading branch information
leseb committed Jun 6, 2019
1 parent 298dc10 commit fca7ceb
Show file tree
Hide file tree
Showing 12 changed files with 312 additions and 17 deletions.
65 changes: 65 additions & 0 deletions pkg/daemon/ceph/client/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type CephStatus struct {
} `json:"osdmap"`
PgMap PgMap `json:"pgmap"`
MgrMap MgrMap `json:"mgrmap"`
Fsmap Fsmap `json:"fsmap"`
}

type HealthStatus struct {
Expand Down Expand Up @@ -123,6 +124,23 @@ type PgStateEntry struct {
Count int `json:"count"`
}

// Fsmap is a struc representing the filesystem map
type Fsmap struct {
Epoch int `json:"epoch"`
ID int `json:"id"`
Up int `json:"up"`
In int `json:"in"`
Max int `json:"max"`
ByRank []struct {
FilesystemID int `json:"filesystem_id"`
Rank int `json:"rank"`
Name string `json:"name"`
Status string `json:"status"`
Gid int `json:"gid"`
} `json:"by_rank"`
UpStandby int `json:"up:standby"`
}

func Status(context *clusterd.Context, clusterName string, debug bool) (CephStatus, error) {
args := []string{"status"}
cmd := NewCephCommand(context, clusterName, args)
Expand Down Expand Up @@ -171,3 +189,50 @@ func isClusterClean(status CephStatus) error {

return fmt.Errorf("cluster is not fully clean. PGs: %+v", status.PgMap.PgsByState)
}

// getMDSRank returns the rank of a given MDS
func getMDSRank(status CephStatus, clusterName, mdsName string) (int, error) {
// dummy rank
mdsRank := -1000
for r := range status.Fsmap.ByRank {
if status.Fsmap.ByRank[r].Name == mdsName {
mdsRank = r
}
}
// if the mds is not shown in the map one reason might be because it's in standby
// if not in standby there is something else going wron
if mdsRank < 0 && status.Fsmap.UpStandby < 1 {
// it might seem strange to log an error since this could be a warning too
// it is a warning until we reach the timeout, this should give enough time to the mds to transtion its state
// after the timeout we consider that the mds might be gone or the timeout was not long enough...
return mdsRank, fmt.Errorf("mds %s not found in fsmap, this likely means mdss are transitioning between active and standby states", mdsName)
}

return mdsRank, nil
}

// MdsActiveOrStandbyReplay returns wether a given MDS is active or in standby
func MdsActiveOrStandbyReplay(context *clusterd.Context, clusterName, mdsName string) error {
status, err := Status(context, clusterName, false)
if err != nil {
return err
}

mdsRank, err := getMDSRank(status, clusterName, mdsName)
if err != nil {
return fmt.Errorf("%+v", err)
}

// this MDS is in standby so let's return immediatly
if mdsRank < 0 {
logger.Infof("mds %s is in standby, nothing to check", mdsName)
return nil
}

if status.Fsmap.ByRank[mdsRank].Status == "up:active" || status.Fsmap.ByRank[mdsRank].Status == "up:standby-replay" || status.Fsmap.ByRank[mdsRank].Status == "up:standby" {
logger.Infof("mds %s is %s", mdsName, status.Fsmap.ByRank[mdsRank].Status)
return nil
}

return fmt.Errorf("mds %s is %s, bad state", mdsName, status.Fsmap.ByRank[mdsRank].Status)
}
14 changes: 14 additions & 0 deletions pkg/daemon/ceph/client/status_test.go

Large diffs are not rendered by default.

163 changes: 158 additions & 5 deletions pkg/daemon/ceph/client/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ package client
import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/rook/rook/pkg/clusterd"
cephver "github.com/rook/rook/pkg/operator/ceph/version"
"github.com/rook/rook/pkg/util"
)

// CephDaemonsVersions is a structure that can be used to parsed the output of the 'ceph versions' command
Expand All @@ -35,7 +38,7 @@ type CephDaemonsVersions struct {
func getCephMonVersionString(context *clusterd.Context) (string, error) {
output, err := context.Executor.ExecuteCommandWithOutput(false, "", "ceph", "version")
if err != nil {
return "", fmt.Errorf("failed to run ceph version: %+v", err)
return "", fmt.Errorf("failed to run ceph version. %+v", err)
}
logger.Debug(output)

Expand All @@ -45,7 +48,22 @@ func getCephMonVersionString(context *clusterd.Context) (string, error) {
func getCephVersionsString(context *clusterd.Context) (string, error) {
output, err := context.Executor.ExecuteCommandWithOutput(false, "", "ceph", "versions")
if err != nil {
return "", fmt.Errorf("failed to run ceph versions: %+v", err)
return "", fmt.Errorf("failed to run ceph versions. %+v", err)
}
logger.Debug(output)

return output, nil
}

func getCephDaemonVersionString(context *clusterd.Context, deployment string) (string, error) {
deploymentSplit := strings.Split(deployment, "-")
daemonID := deploymentSplit[len(deploymentSplit)-1]
daemonName := deploymentSplit[len(deploymentSplit)-2]
daemon := daemonName + "." + daemonID

output, err := context.Executor.ExecuteCommandWithOutput(false, "", "ceph", "tell", daemon, "version")
if err != nil {
return "", fmt.Errorf("failed to run ceph tell. %+v", err)
}
logger.Debug(output)

Expand All @@ -56,7 +74,23 @@ func getCephVersionsString(context *clusterd.Context) (string, error) {
func GetCephMonVersion(context *clusterd.Context) (*cephver.CephVersion, error) {
output, err := getCephMonVersionString(context)
if err != nil {
return nil, fmt.Errorf("failed to run ceph version: %+v", err)
return nil, fmt.Errorf("failed to run ceph version. %+v", err)
}
logger.Debug(output)

v, err := cephver.ExtractCephVersion(output)
if err != nil {
return nil, fmt.Errorf("failed to extract ceph version. %+v", err)
}

return v, nil
}

// GetCephDaemonVersion reports the Ceph version of a particular daemon
func GetCephDaemonVersion(context *clusterd.Context, deployment string) (*cephver.CephVersion, error) {
output, err := getCephDaemonVersionString(context, deployment)
if err != nil {
return nil, fmt.Errorf("failed to run ceph tell. %+v", err)
}
logger.Debug(output)

Expand All @@ -72,7 +106,7 @@ func GetCephMonVersion(context *clusterd.Context) (*cephver.CephVersion, error)
func GetCephVersions(context *clusterd.Context) (*CephDaemonsVersions, error) {
output, err := getCephVersionsString(context)
if err != nil {
return nil, fmt.Errorf("failed to run ceph versions: %+v", err)
return nil, fmt.Errorf("failed to run ceph versions. %+v", err)
}
logger.Debug(output)

Expand All @@ -89,9 +123,128 @@ func GetCephVersions(context *clusterd.Context) (*CephDaemonsVersions, error) {
func EnableMessenger2(context *clusterd.Context) error {
_, err := context.Executor.ExecuteCommandWithOutput(false, "", "ceph", "mon", "enable-msgr2")
if err != nil {
return fmt.Errorf("failed to enable msgr2 protocol: %+v", err)
return fmt.Errorf("failed to enable msgr2 protocol. %+v", err)
}
logger.Infof("successfully enabled msgr2 protocol")

return nil
}

// OkToStopOrContinue determines wether it's ok to stop or continue an upgrade
func OkToStopOrContinue(context *clusterd.Context, namespace, deployment, action string, cephVersion cephver.CephVersion) error {
deploymentSplit := strings.Split(deployment, "-")
daemonName := deploymentSplit[len(deploymentSplit)-2]

// special case for MDS since it has the fs name in the deployment
if strings.Contains(deployment, "mds") {
daemonName = deploymentSplit[len(deploymentSplit)-3]
}

// The ok-to-stop command for mon and mds landed on 14.2.1
// so we return nil if that Ceph version is not satisfied
if !cephVersion.IsAtLeast(cephver.CephVersion{Major: 14, Minor: 2, Extra: 1}) {
if action == "stop" && daemonName != "osd" {
return nil
}
}

if action == "stop" {
// Trying to handle the case where a **single** mon is deployed and an upgrade is called
if daemonName == "mon" {
versions, err := GetCephVersions(context)
if err != nil {
return fmt.Errorf("failed to get ceph daemons versions. %+v", err)
}
// if len(versions.Mon) > 1, we have different Ceph versions for some monitor
// this is fine running the upgrade we can run the upgrade checks
if len(versions.Mon) == 1 {
// now trying to parse and find how many mons are presents
// if we have less than 3 mons we skip the check and do best-effort
// we do less than 3 because during the initial bootstrap the mon sequence is updated too
// so running running the check on 2/3 mon fails
// versions.Mon looks like this map[ceph version 15.0.0-12-g6c8fb92 (6c8fb920cb1d862f36ee852ed849a15f9a50bd68) octopus (dev):1]
// now looping over a single element since we can't address the key directly (we don't know its name)
for _, monCount := range versions.Mon {
if monCount < 3 {
logger.Infof("the cluster has less than 3 monitors, not performing upgrade check, running in best-effort")
return nil
}
}
}
}

err := okToStopDaemon(context, deployment, daemonName)
if err != nil {
return fmt.Errorf("failed to check if %s was ok to %s. %+v", deployment, action, err)
}
}

// we don't implement any checks for mon, rgw and rbdmirror since:
// - mon: the is done in the monitor code since it ensures all the mons are always in quorum before continuing
// - rgw: the pod spec has a liveness probe so if the pod successfully start
// - rbdmirror: you can chain as many as you want like mdss but there is no ok-to-stop logic yet
if action == "continue" {
// the mon case is handled directly in the deployment where the mon checks for quorum
switch daemonName {
case "osd":
err := okToContinueOSDDaemon(context, namespace)
if err != nil {
return fmt.Errorf("failed to check if %s was ok to %s. %+v", deployment, action, err)
}
case "mds":
err := okToContinueMDSDaemon(context, namespace, deployment)
if err != nil {
return fmt.Errorf("failed to check if %s was ok to %s. %+v", deployment, action, err)
}
}
}

return nil
}

func okToStopDaemon(context *clusterd.Context, deployment, daemon string) error {
deploymentSplit := strings.Split(deployment, "-")
daemonID := deploymentSplit[len(deploymentSplit)-1]

// mgr and rbdmirror do not implement that check
if daemon != "mgr" && daemon != "rbdmirror" {
output, err := context.Executor.ExecuteCommandWithOutput(false, "", "ceph", daemon, "ok-to-stop", daemonID)
if err != nil {
return fmt.Errorf("deployment %s cannot be stopped. %+v", deployment, err)
}
logger.Infof("deployment %s is ok to be updated. %s", deployment, output)
}
logger.Infof("deployment %s is ok to be updated.", deployment)

return nil
}

// okToContinueOSDDaemon determines whether it's fine to go to the next osd during an upgrade
// This basically makes sure all the PGs have settled
func okToContinueOSDDaemon(context *clusterd.Context, namespace string) error {
err := util.Retry(3000, 15*time.Second, func() error {
return IsClusterClean(context, namespace)
})
if err != nil {
return err
}

return nil
}

// okToContinueMDSDaemon determines whether it's fine to go to the next mds during an upgrade
// mostly a placeholder function for the future but since we have standby mds this shouldn't be needed
func okToContinueMDSDaemon(context *clusterd.Context, namespace, deployment string) error {
deploymentSplit := strings.Split(deployment, "-")
daemonName := deploymentSplit[len(deploymentSplit)-2] + "-" + deploymentSplit[len(deploymentSplit)-1]

// wait for the MDS to be active again or in standby-replay
err := util.Retry(10, 15*time.Second, func() error {
return MdsActiveOrStandbyReplay(context, namespace, daemonName)
})
if err != nil {
return err
}

return nil
}
33 changes: 33 additions & 0 deletions pkg/daemon/ceph/client/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,22 @@ func TestGetCephMonVersionsString(t *testing.T) {
assert.Nil(t, err)
}

func TestGetCephDaemonVersionString(t *testing.T) {
executor := &exectest.MockExecutor{}
deployment := "rook-ceph-mds-a"
executor.MockExecuteCommandWithOutput = func(debug bool, name string, command string, args ...string) (string, error) {
assert.Equal(t, "tell", args[0])
assert.Equal(t, "mds.a", args[1])
assert.Equal(t, "version", args[2])
assert.Equal(t, 3, len(args))
return "", nil
}
context := &clusterd.Context{Executor: executor}

_, err := getCephDaemonVersionString(context, deployment)
assert.Nil(t, err)
}

func TestEnableMessenger2(t *testing.T) {
executor := &exectest.MockExecutor{}
executor.MockExecuteCommandWithOutput = func(debug bool, name string, command string, args ...string) (string, error) {
Expand All @@ -63,3 +79,20 @@ func TestEnableMessenger2(t *testing.T) {
err := EnableMessenger2(context)
assert.Nil(t, err)
}

func TestOkToStopDaemon(t *testing.T) {
executor := &exectest.MockExecutor{}
executor.MockExecuteCommandWithOutput = func(debug bool, name string, command string, args ...string) (string, error) {
assert.Equal(t, "mon", args[0])
assert.Equal(t, "ok-to-stop", args[1])
assert.Equal(t, "a", args[2])
assert.Equal(t, 3, len(args))
return "", nil
}
context := &clusterd.Context{Executor: executor}

deployment := "rook-ceph-mon-a"
daemon := "mon"
err := okToStopDaemon(context, deployment, daemon)
assert.Nil(t, err)
}
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/mgr/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (c *Cluster) Start() error {
return fmt.Errorf("failed to create mgr deployment %s. %+v", resourceName, err)
}
logger.Infof("deployment for mgr %s already exists. updating if needed", resourceName)
if _, err := updateDeploymentAndWait(c.context, d, c.Namespace); err != nil {
if _, err := updateDeploymentAndWait(c.context, d, c.Namespace, c.clusterInfo); err != nil {
return fmt.Errorf("failed to update mgr deployment %s. %+v", resourceName, err)
}
}
Expand Down
15 changes: 13 additions & 2 deletions pkg/operator/ceph/cluster/mon/mon.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,9 +423,20 @@ func (c *Cluster) startDeployments(mons []*monConfig, requireAllInQuorum bool) e
if err != nil {
return fmt.Errorf("failed to create mon %s. %+v", mons[i].DaemonName, err)
}
// For the initial deployment (first creation) it's expected to not have all the monitors in quorum
// However, in an event of an update, it's crucial to proceed monitors by monitors
// At the end of the method we perform one last check where all the monitors must be in quorum
requireAllInQuorum := false
err = c.waitForMonsToJoin(mons, requireAllInQuorum)
if err != nil {
return fmt.Errorf("failed to check mon quorum %s. %+v", mons[i].DaemonName, err)
}
}

logger.Infof("mons created: %d", len(mons))
// Final verification that **all** mons are in quorum
// Do not proceed if one monitor is still syncing
requireAllInQuorum = true
return c.waitForMonsToJoin(mons, requireAllInQuorum)
}

Expand Down Expand Up @@ -505,7 +516,7 @@ func (c *Cluster) startMon(m *monConfig, hostname string) error {
return fmt.Errorf("failed to create mon deployment %s. %+v", m.ResourceName, err)
}
logger.Infof("deployment for mon %s already exists. updating if needed", m.ResourceName)
if _, err := updateDeploymentAndWait(c.context, d, c.Namespace); err != nil {
if _, err := updateDeploymentAndWait(c.context, d, c.Namespace, c.clusterInfo); err != nil {
return fmt.Errorf("failed to update mon deployment %s. %+v", m.ResourceName, err)
}
}
Expand All @@ -518,7 +529,7 @@ func waitForQuorumWithMons(context *clusterd.Context, clusterName string, mons [

// wait for monitors to establish quorum
retryCount := 0
retryMax := 20
retryMax := 30
for {
retryCount++
if retryCount > retryMax {
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/osd/osd.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (c *Cluster) startOSDDaemonsOnNode(nodeName string, config *provisionConfig
continue
}
logger.Infof("deployment for osd %d already exists. updating if needed", osd.ID)
if _, err = k8sutil.UpdateDeploymentAndWait(c.context, dp, c.Namespace); err != nil {
if _, err = k8sutil.UpdateDeploymentAndWait(c.context, dp, c.Namespace, c.clusterInfo); err != nil {
config.addError(fmt.Sprintf("failed to update osd deployment %d. %+v", osd.ID, err))
}
}
Expand Down
Loading

0 comments on commit fca7ceb

Please sign in to comment.