Skip to content

Commit

Permalink
ceph: improve upgrade procedure
Browse files Browse the repository at this point in the history
When a cluster is updated with a different image version, this triggers
a serialized restart of all the pods. Prior to this commit, no safety
check were performed and rook was hoping for the best outcome.

Now before doing restarting a daemon we check it can be restarted. Once
it's restarted we also check we can pursue with the rest of the
platform. For instance, with monitors we check that they are in quorum,
for OSD we check that PGs are clean and for MDS we make sure they are
 all active.

Fixes: rook#2889
Signed-off-by: Sébastien Han <seb@redhat.com>
  • Loading branch information
leseb committed Jun 13, 2019
1 parent 7d38783 commit 8505238
Show file tree
Hide file tree
Showing 12 changed files with 391 additions and 17 deletions.
65 changes: 65 additions & 0 deletions pkg/daemon/ceph/client/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type CephStatus struct {
} `json:"osdmap"`
PgMap PgMap `json:"pgmap"`
MgrMap MgrMap `json:"mgrmap"`
Fsmap Fsmap `json:"fsmap"`
}

type HealthStatus struct {
Expand Down Expand Up @@ -123,6 +124,23 @@ type PgStateEntry struct {
Count int `json:"count"`
}

// Fsmap is a struc representing the filesystem map
type Fsmap struct {
Epoch int `json:"epoch"`
ID int `json:"id"`
Up int `json:"up"`
In int `json:"in"`
Max int `json:"max"`
ByRank []struct {
FilesystemID int `json:"filesystem_id"`
Rank int `json:"rank"`
Name string `json:"name"`
Status string `json:"status"`
Gid int `json:"gid"`
} `json:"by_rank"`
UpStandby int `json:"up:standby"`
}

func Status(context *clusterd.Context, clusterName string, debug bool) (CephStatus, error) {
args := []string{"status"}
cmd := NewCephCommand(context, clusterName, args)
Expand Down Expand Up @@ -171,3 +189,50 @@ func isClusterClean(status CephStatus) error {

return fmt.Errorf("cluster is not fully clean. PGs: %+v", status.PgMap.PgsByState)
}

// getMDSRank returns the rank of a given MDS
func getMDSRank(status CephStatus, clusterName, mdsName string) (int, error) {
// dummy rank
mdsRank := -1000
for r := range status.Fsmap.ByRank {
if status.Fsmap.ByRank[r].Name == mdsName {
mdsRank = r
}
}
// if the mds is not shown in the map one reason might be because it's in standby
// if not in standby there is something else going wron
if mdsRank < 0 && status.Fsmap.UpStandby < 1 {
// it might seem strange to log an error since this could be a warning too
// it is a warning until we reach the timeout, this should give enough time to the mds to transtion its state
// after the timeout we consider that the mds might be gone or the timeout was not long enough...
return mdsRank, fmt.Errorf("mds %s not found in fsmap, this likely means mdss are transitioning between active and standby states", mdsName)
}

return mdsRank, nil
}

// MdsActiveOrStandbyReplay returns wether a given MDS is active or in standby
func MdsActiveOrStandbyReplay(context *clusterd.Context, clusterName, mdsName string) error {
status, err := Status(context, clusterName, false)
if err != nil {
return err
}

mdsRank, err := getMDSRank(status, clusterName, mdsName)
if err != nil {
return fmt.Errorf("%+v", err)
}

// this MDS is in standby so let's return immediatly
if mdsRank < 0 {
logger.Infof("mds %s is in standby, nothing to check", mdsName)
return nil
}

if status.Fsmap.ByRank[mdsRank].Status == "up:active" || status.Fsmap.ByRank[mdsRank].Status == "up:standby-replay" || status.Fsmap.ByRank[mdsRank].Status == "up:standby" {
logger.Infof("mds %s is %s", mdsName, status.Fsmap.ByRank[mdsRank].Status)
return nil
}

return fmt.Errorf("mds %s is %s, bad state", mdsName, status.Fsmap.ByRank[mdsRank].Status)
}
14 changes: 14 additions & 0 deletions pkg/daemon/ceph/client/status_test.go

Large diffs are not rendered by default.

179 changes: 174 additions & 5 deletions pkg/daemon/ceph/client/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,27 @@ package client
import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/rook/rook/pkg/clusterd"
cephver "github.com/rook/rook/pkg/operator/ceph/version"
"github.com/rook/rook/pkg/util"
)

// CephDaemonsVersions is a structure that can be used to parsed the output of the 'ceph versions' command
type CephDaemonsVersions struct {
Mon map[string]int `json:"mon,omitempty"`
Mgr map[string]int `json:"mgr,omitempty"`
Osd map[string]int `json:"osd,omitempty"`
Mds map[string]int `json:"mds,omitempty"`
Overall map[string]int `json:"overall,omitempty"`
}

func getCephMonVersionString(context *clusterd.Context) (string, error) {
output, err := context.Executor.ExecuteCommandWithOutput(false, "", "ceph", "version")
if err != nil {
return "", fmt.Errorf("failed to run ceph version: %+v", err)
return "", fmt.Errorf("failed to run ceph version. %+v", err)
}
logger.Debug(output)

Expand All @@ -45,7 +49,22 @@ func getCephMonVersionString(context *clusterd.Context) (string, error) {
func getCephVersionsString(context *clusterd.Context) (string, error) {
output, err := context.Executor.ExecuteCommandWithOutput(false, "", "ceph", "versions")
if err != nil {
return "", fmt.Errorf("failed to run ceph versions: %+v", err)
return "", fmt.Errorf("failed to run ceph versions. %+v", err)
}
logger.Debug(output)

return output, nil
}

func getCephDaemonVersionString(context *clusterd.Context, deployment string) (string, error) {
deploymentSplit := strings.Split(deployment, "-")
daemonID := deploymentSplit[len(deploymentSplit)-1]
daemonName := deploymentSplit[len(deploymentSplit)-2]
daemon := daemonName + "." + daemonID

output, err := context.Executor.ExecuteCommandWithOutput(false, "", "ceph", "tell", daemon, "version")
if err != nil {
return "", fmt.Errorf("failed to run ceph tell. %+v", err)
}
logger.Debug(output)

Expand All @@ -56,7 +75,23 @@ func getCephVersionsString(context *clusterd.Context) (string, error) {
func GetCephMonVersion(context *clusterd.Context) (*cephver.CephVersion, error) {
output, err := getCephMonVersionString(context)
if err != nil {
return nil, fmt.Errorf("failed to run ceph version: %+v", err)
return nil, fmt.Errorf("failed to run ceph version. %+v", err)
}
logger.Debug(output)

v, err := cephver.ExtractCephVersion(output)
if err != nil {
return nil, fmt.Errorf("failed to extract ceph version. %+v", err)
}

return v, nil
}

// GetCephDaemonVersion reports the Ceph version of a particular daemon
func GetCephDaemonVersion(context *clusterd.Context, deployment string) (*cephver.CephVersion, error) {
output, err := getCephDaemonVersionString(context, deployment)
if err != nil {
return nil, fmt.Errorf("failed to run ceph tell. %+v", err)
}
logger.Debug(output)

Expand All @@ -72,7 +107,7 @@ func GetCephMonVersion(context *clusterd.Context) (*cephver.CephVersion, error)
func GetCephVersions(context *clusterd.Context) (*CephDaemonsVersions, error) {
output, err := getCephVersionsString(context)
if err != nil {
return nil, fmt.Errorf("failed to run ceph versions: %+v", err)
return nil, fmt.Errorf("failed to run ceph versions. %+v", err)
}
logger.Debug(output)

Expand All @@ -89,9 +124,143 @@ func GetCephVersions(context *clusterd.Context) (*CephDaemonsVersions, error) {
func EnableMessenger2(context *clusterd.Context) error {
_, err := context.Executor.ExecuteCommandWithOutput(false, "", "ceph", "mon", "enable-msgr2")
if err != nil {
return fmt.Errorf("failed to enable msgr2 protocol: %+v", err)
return fmt.Errorf("failed to enable msgr2 protocol. %+v", err)
}
logger.Infof("successfully enabled msgr2 protocol")

return nil
}

// OkToStopOrContinue determines wether it's ok to stop or continue an upgrade
func OkToStopOrContinue(context *clusterd.Context, namespace, deployment, action string, cephVersion cephver.CephVersion) error {
deploymentSplit := strings.Split(deployment, "-")
daemonName := deploymentSplit[len(deploymentSplit)-2]

// special case for MDS since it has the fs name in the deployment
if strings.Contains(deployment, "mds") {
daemonName = deploymentSplit[len(deploymentSplit)-3]
}

// The ok-to-stop command for mon and mds landed on 14.2.1
// so we return nil if that Ceph version is not satisfied
if !cephVersion.IsAtLeast(cephver.CephVersion{Major: 14, Minor: 2, Extra: 1}) {
if action == "stop" && daemonName != "osd" {
return nil
}
}

switch action {
case "stop":
versions, err := GetCephVersions(context)
if err != nil {
return fmt.Errorf("failed to get ceph daemons versions. %+v", err)
}

switch daemonName {
// Trying to handle the case where a **single** mon is deployed and an upgrade is called
case "mon":
// if len(versions.Mon) > 1, we have different Ceph versions for some monitor(s)
// this is fine running the upgrade we can run the upgrade checks
if len(versions.Mon) == 1 {
// now trying to parse and find how many mons are presents
// if we have less than 3 mons we skip the check and do best-effort
// we do less than 3 because during the initial bootstrap the mon sequence is updated too
// so running running the check on 2/3 mon fails
// versions.Mon looks like this map[ceph version 15.0.0-12-g6c8fb92 (6c8fb920cb1d862f36ee852ed849a15f9a50bd68) octopus (dev):1]
// now looping over a single element since we can't address the key directly (we don't know its name)
for _, monCount := range versions.Mon {
if monCount < 3 {
logger.Infof("the cluster has less than 3 monitors, not performing upgrade check, running in best-effort")
return nil
}
}
}
// Trying to handle the case where a **single** osd is deployed and an upgrade is called
case "osd":
// if len(versions.Osd) > 1, we have different Ceph versions for some osd(s)
// this is fine running the upgrade we can run the upgrade checks
if len(versions.Osd) == 1 {
// now trying to parse and find how many mons are presents
// if we have less than 3 osds we skip the check and do best-effort
for _, osdCount := range versions.Osd {
if osdCount < 3 {
logger.Infof("the cluster has less than 3 OSDs, not performing upgrade check, running in best-effort")
return nil
}
}
}
}
// we don't implement any checks for mon, rgw and rbdmirror since:
// - mon: the is done in the monitor code since it ensures all the mons are always in quorum before continuing
// - rgw: the pod spec has a liveness probe so if the pod successfully start
// - rbdmirror: you can chain as many as you want like mdss but there is no ok-to-stop logic yet
err = okToStopDaemon(context, deployment, daemonName)
if err != nil {
return fmt.Errorf("failed to check if %s was ok to %s. %+v", deployment, action, err)
}

case "continue":
// the mon case is handled directly in the deployment where the mon checks for quorum
switch daemonName {
case "osd":
err := okToContinueOSDDaemon(context, namespace)
if err != nil {
return fmt.Errorf("failed to check if %s was ok to %s. %+v", deployment, action, err)
}
case "mds":
err := okToContinueMDSDaemon(context, namespace, deployment)
if err != nil {
return fmt.Errorf("failed to check if %s was ok to %s. %+v", deployment, action, err)
}
}
}

return nil
}

func okToStopDaemon(context *clusterd.Context, deployment, daemon string) error {
deploymentSplit := strings.Split(deployment, "-")
daemonID := deploymentSplit[len(deploymentSplit)-1]

// mgr and rbdmirror do not implement that check
if daemon != "mgr" && daemon != "rbdmirror" {
output, err := context.Executor.ExecuteCommandWithOutput(false, "", "ceph", daemon, "ok-to-stop", daemonID)
if err != nil {
return fmt.Errorf("deployment %s cannot be stopped. %+v", deployment, err)
}
logger.Infof("deployment %s is ok to be updated. %s", deployment, output)
}
logger.Infof("deployment %s is ok to be updated.", deployment)

return nil
}

// okToContinueOSDDaemon determines whether it's fine to go to the next osd during an upgrade
// This basically makes sure all the PGs have settled
func okToContinueOSDDaemon(context *clusterd.Context, namespace string) error {
err := util.Retry(3000, 15*time.Second, func() error {
return IsClusterClean(context, namespace)
})
if err != nil {
return err
}

return nil
}

// okToContinueMDSDaemon determines whether it's fine to go to the next mds during an upgrade
// mostly a placeholder function for the future but since we have standby mds this shouldn't be needed
func okToContinueMDSDaemon(context *clusterd.Context, namespace, deployment string) error {
deploymentSplit := strings.Split(deployment, "-")
daemonName := deploymentSplit[len(deploymentSplit)-2] + "-" + deploymentSplit[len(deploymentSplit)-1]

// wait for the MDS to be active again or in standby-replay
err := util.Retry(10, 15*time.Second, func() error {
return MdsActiveOrStandbyReplay(context, namespace, daemonName)
})
if err != nil {
return err
}

return nil
}
33 changes: 33 additions & 0 deletions pkg/daemon/ceph/client/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,22 @@ func TestGetCephMonVersionsString(t *testing.T) {
assert.Nil(t, err)
}

func TestGetCephDaemonVersionString(t *testing.T) {
executor := &exectest.MockExecutor{}
deployment := "rook-ceph-mds-a"
executor.MockExecuteCommandWithOutput = func(debug bool, name string, command string, args ...string) (string, error) {
assert.Equal(t, "tell", args[0])
assert.Equal(t, "mds.a", args[1])
assert.Equal(t, "version", args[2])
assert.Equal(t, 3, len(args))
return "", nil
}
context := &clusterd.Context{Executor: executor}

_, err := getCephDaemonVersionString(context, deployment)
assert.Nil(t, err)
}

func TestEnableMessenger2(t *testing.T) {
executor := &exectest.MockExecutor{}
executor.MockExecuteCommandWithOutput = func(debug bool, name string, command string, args ...string) (string, error) {
Expand All @@ -63,3 +79,20 @@ func TestEnableMessenger2(t *testing.T) {
err := EnableMessenger2(context)
assert.Nil(t, err)
}

func TestOkToStopDaemon(t *testing.T) {
executor := &exectest.MockExecutor{}
executor.MockExecuteCommandWithOutput = func(debug bool, name string, command string, args ...string) (string, error) {
assert.Equal(t, "mon", args[0])
assert.Equal(t, "ok-to-stop", args[1])
assert.Equal(t, "a", args[2])
assert.Equal(t, 3, len(args))
return "", nil
}
context := &clusterd.Context{Executor: executor}

deployment := "rook-ceph-mon-a"
daemon := "mon"
err := okToStopDaemon(context, deployment, daemon)
assert.Nil(t, err)
}
11 changes: 10 additions & 1 deletion pkg/operator/ceph/cluster/mgr/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,15 @@ var updateDeploymentAndWait = k8sutil.UpdateDeploymentAndWait

// Start begins the process of running a cluster of Ceph mgrs.
func (c *Cluster) Start() error {
callback := func(actionToCheck, deploymentName string) error {
logger.Infof("checking if deployment %s can be %sed", deploymentName, actionToCheck)
err := client.OkToStopOrContinue(c.context, c.Namespace, deploymentName, actionToCheck, c.clusterInfo.CephVersion)
if err != nil {
return fmt.Errorf("failed to check if deployment %s can be updated: %+v", deploymentName, err)
}
return nil
}

// Validate pod's memory if specified
err := opspec.CheckPodMemory(c.resources, cephMgrPodMinimumMemory)
if err != nil {
Expand Down Expand Up @@ -141,7 +150,7 @@ func (c *Cluster) Start() error {
return fmt.Errorf("failed to create mgr deployment %s. %+v", resourceName, err)
}
logger.Infof("deployment for mgr %s already exists. updating if needed", resourceName)
if _, err := updateDeploymentAndWait(c.context, d, c.Namespace); err != nil {
if _, err := updateDeploymentAndWait(c.context, d, c.Namespace, callback); err != nil {
return fmt.Errorf("failed to update mgr deployment %s. %+v", resourceName, err)
}
}
Expand Down
Loading

0 comments on commit 8505238

Please sign in to comment.