Skip to content

Commit

Permalink
ceph: improve upgrade procedure
Browse files Browse the repository at this point in the history
When a cluster is updated with a different image version, this triggers
a serialized restart of all the pods. Prior to this commit, no safety
check were performed and rook was hoping for the best outcome.

Now before doing restarting a daemon we check it can be restarted. Once
it's restarted we also check we can pursue with the rest of the
platform. For instance, with monitors we check that they are in quorum,
for OSD we check that PGs are clean and for MDS we make sure they are
 all active.

Fixes: rook#2889
Signed-off-by: Sébastien Han <seb@redhat.com>
  • Loading branch information
leseb committed Jun 18, 2019
1 parent 1d35f26 commit aa2d6fe
Show file tree
Hide file tree
Showing 13 changed files with 468 additions and 28 deletions.
65 changes: 65 additions & 0 deletions pkg/daemon/ceph/client/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type CephStatus struct {
} `json:"osdmap"`
PgMap PgMap `json:"pgmap"`
MgrMap MgrMap `json:"mgrmap"`
Fsmap Fsmap `json:"fsmap"`
}

type HealthStatus struct {
Expand Down Expand Up @@ -123,6 +124,23 @@ type PgStateEntry struct {
Count int `json:"count"`
}

// Fsmap is a struct representing the filesystem map
type Fsmap struct {
Epoch int `json:"epoch"`
ID int `json:"id"`
Up int `json:"up"`
In int `json:"in"`
Max int `json:"max"`
ByRank []struct {
FilesystemID int `json:"filesystem_id"`
Rank int `json:"rank"`
Name string `json:"name"`
Status string `json:"status"`
Gid int `json:"gid"`
} `json:"by_rank"`
UpStandby int `json:"up:standby"`
}

func Status(context *clusterd.Context, clusterName string, debug bool) (CephStatus, error) {
args := []string{"status"}
cmd := NewCephCommand(context, clusterName, args)
Expand Down Expand Up @@ -171,3 +189,50 @@ func isClusterClean(status CephStatus) error {

return fmt.Errorf("cluster is not fully clean. PGs: %+v", status.PgMap.PgsByState)
}

// getMDSRank returns the rank of a given MDS
func getMDSRank(status CephStatus, clusterName, mdsName string) (int, error) {
// dummy rank
mdsRank := -1000
for r := range status.Fsmap.ByRank {
if status.Fsmap.ByRank[r].Name == mdsName {
mdsRank = r
}
}
// if the mds is not shown in the map one reason might be because it's in standby
// if not in standby there is something else going wron
if mdsRank < 0 && status.Fsmap.UpStandby < 1 {
// it might seem strange to log an error since this could be a warning too
// it is a warning until we reach the timeout, this should give enough time to the mds to transtion its state
// after the timeout we consider that the mds might be gone or the timeout was not long enough...
return mdsRank, fmt.Errorf("mds %s not found in fsmap, this likely means mdss are transitioning between active and standby states", mdsName)
}

return mdsRank, nil
}

// MdsActiveOrStandbyReplay returns wether a given MDS is active or in standby
func MdsActiveOrStandbyReplay(context *clusterd.Context, clusterName, mdsName string) error {
status, err := Status(context, clusterName, false)
if err != nil {
return err
}

mdsRank, err := getMDSRank(status, clusterName, mdsName)
if err != nil {
return fmt.Errorf("%+v", err)
}

// this MDS is in standby so let's return immediatly
if mdsRank < 0 {
logger.Infof("mds %s is in standby, nothing to check", mdsName)
return nil
}

if status.Fsmap.ByRank[mdsRank].Status == "up:active" || status.Fsmap.ByRank[mdsRank].Status == "up:standby-replay" || status.Fsmap.ByRank[mdsRank].Status == "up:standby" {
logger.Infof("mds %s is %s", mdsName, status.Fsmap.ByRank[mdsRank].Status)
return nil
}

return fmt.Errorf("mds %s is %s, bad state", mdsName, status.Fsmap.ByRank[mdsRank].Status)
}
14 changes: 14 additions & 0 deletions pkg/daemon/ceph/client/status_test.go

Large diffs are not rendered by default.

242 changes: 236 additions & 6 deletions pkg/daemon/ceph/client/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,35 @@ package client

import (
"encoding/json"
"errors"
"fmt"
"strings"
"time"

"github.com/rook/rook/pkg/clusterd"
cephver "github.com/rook/rook/pkg/operator/ceph/version"
"github.com/rook/rook/pkg/util"
)

// CephDaemonsVersions is a structure that can be used to parsed the output of the 'ceph versions' command
type CephDaemonsVersions struct {
Mon map[string]int `json:"mon,omitempty"`
Osd map[string]int `json:"osd,omitempty"`
Mgr map[string]int `json:"mgr,omitempty"`
Osd map[string]int `json:"osd,omitempty"`
Mds map[string]int `json:"mds,omitempty"`
Overall map[string]int `json:"overall,omitempty"`
}

var (
// we don't perform any checks on these daemons
// they don't have any "ok-to-stop" command implemented
daemonNoCheck = []string{"mgr", "rgw", "rbdmirror"}
)

func getCephMonVersionString(context *clusterd.Context) (string, error) {
output, err := context.Executor.ExecuteCommandWithOutput(false, "", "ceph", "version")
if err != nil {
return "", fmt.Errorf("failed to run ceph version: %+v", err)
return "", fmt.Errorf("failed to run ceph version. %+v", err)
}
logger.Debug(output)

Expand All @@ -46,7 +56,24 @@ func getCephMonVersionString(context *clusterd.Context) (string, error) {
func getCephVersionsString(context *clusterd.Context) (string, error) {
output, err := context.Executor.ExecuteCommandWithOutput(false, "", "ceph", "versions")
if err != nil {
return "", fmt.Errorf("failed to run ceph versions: %+v", err)
return "", fmt.Errorf("failed to run ceph versions. %+v", err)
}
logger.Debug(output)

return output, nil
}

func getCephDaemonVersionString(context *clusterd.Context, deployment string) (string, error) {
daemonName, err := findDaemonName(deployment)
if err != nil {
return "", fmt.Errorf("%+v", err)
}
daemonID := findDaemonID(deployment)
daemon := daemonName + "." + daemonID

output, err := context.Executor.ExecuteCommandWithOutput(false, "", "ceph", "tell", daemon, "version")
if err != nil {
return "", fmt.Errorf("failed to run ceph tell. %+v", err)
}
logger.Debug(output)

Expand All @@ -57,7 +84,23 @@ func getCephVersionsString(context *clusterd.Context) (string, error) {
func GetCephMonVersion(context *clusterd.Context) (*cephver.CephVersion, error) {
output, err := getCephMonVersionString(context)
if err != nil {
return nil, fmt.Errorf("failed to run ceph version: %+v", err)
return nil, fmt.Errorf("failed to run ceph version. %+v", err)
}
logger.Debug(output)

v, err := cephver.ExtractCephVersion(output)
if err != nil {
return nil, fmt.Errorf("failed to extract ceph version. %+v", err)
}

return v, nil
}

// GetCephDaemonVersion reports the Ceph version of a particular daemon
func GetCephDaemonVersion(context *clusterd.Context, deployment string) (*cephver.CephVersion, error) {
output, err := getCephDaemonVersionString(context, deployment)
if err != nil {
return nil, fmt.Errorf("failed to run ceph tell. %+v", err)
}
logger.Debug(output)

Expand All @@ -73,7 +116,7 @@ func GetCephMonVersion(context *clusterd.Context) (*cephver.CephVersion, error)
func GetCephVersions(context *clusterd.Context) (*CephDaemonsVersions, error) {
output, err := getCephVersionsString(context)
if err != nil {
return nil, fmt.Errorf("failed to run ceph versions: %+v", err)
return nil, fmt.Errorf("failed to run ceph versions. %+v", err)
}
logger.Debug(output)

Expand All @@ -90,7 +133,7 @@ func GetCephVersions(context *clusterd.Context) (*CephDaemonsVersions, error) {
func EnableMessenger2(context *clusterd.Context) error {
_, err := context.Executor.ExecuteCommandWithOutput(false, "", "ceph", "mon", "enable-msgr2")
if err != nil {
return fmt.Errorf("failed to enable msgr2 protocol: %+v", err)
return fmt.Errorf("failed to enable msgr2 protocol. %+v", err)
}
logger.Infof("successfully enabled msgr2 protocol")

Expand All @@ -104,6 +147,193 @@ func EnableNautilusOSD(context *clusterd.Context) error {
return fmt.Errorf("failed to disallow pre-nautilus osds and enable all new nautilus-only functionality: %+v", err)
}
logger.Infof("successfully disallowed pre-nautilus osds and enabled all new nautilus-only functionality")
return nil
}

// OkToStop determines if it's ok to stop an upgrade
func OkToStop(context *clusterd.Context, namespace, deployment string, cephVersion cephver.CephVersion) error {
daemonName, err := findDaemonName(deployment)
if err != nil {
logger.Warningf("%+v", err)
return nil
}

// The ok-to-stop command for mon and mds landed on 14.2.1
// so we return nil if that Ceph version is not satisfied
if !cephVersion.IsAtLeast(cephver.CephVersion{Major: 14, Minor: 2, Extra: 1}) {
if daemonName != "osd" {
return nil
}
}

versions, err := GetCephVersions(context)
if err != nil {
return fmt.Errorf("failed to get ceph daemons versions. %+v", err)
}

switch daemonName {
// Trying to handle the case where a **single** mon is deployed and an upgrade is called
case "mon":
// if len(versions.Mon) > 1, we have different Ceph versions for some monitor(s)
// this is fine running the upgrade we can run the upgrade checks
if len(versions.Mon) == 1 {
// now trying to parse and find how many mons are presents
// if we have less than 3 mons we skip the check and do best-effort
// we do less than 3 because during the initial bootstrap the mon sequence is updated too
// so running running the check on 2/3 mon fails
// versions.Mon looks like this map[ceph version 15.0.0-12-g6c8fb92 (6c8fb920cb1d862f36ee852ed849a15f9a50bd68) octopus (dev):1]
// now looping over a single element since we can't address the key directly (we don't know its name)
for _, monCount := range versions.Mon {
if monCount < 3 {
logger.Infof("the cluster has less than 3 monitors, not performing upgrade check, running in best-effort")
return nil
}
}
}
// Trying to handle the case where a **single** osd is deployed and an upgrade is called
case "osd":
// if len(versions.Osd) > 1, we have different Ceph versions for some osd(s)
// this is fine running the upgrade we can run the upgrade checks
if len(versions.Osd) == 1 {
// now trying to parse and find how many osds are presents
// if we have less than 3 osds we skip the check and do best-effort
for _, osdCount := range versions.Osd {
if osdCount < 3 {
logger.Infof("the cluster has less than 3 OSDs, not performing upgrade check, running in best-effort")
return nil
}
}
}
}
// we don't implement any checks for mon, rgw and rbdmirror since:
// - mon: the is done in the monitor code since it ensures all the mons are always in quorum before continuing
// - rgw: the pod spec has a liveness probe so if the pod successfully start
// - rbdmirror: you can chain as many as you want like mdss but there is no ok-to-stop logic yet
err = okToStopDaemon(context, deployment)
if err != nil {
return fmt.Errorf("failed to check if %s was ok to stop. %+v", deployment, err)
}

return nil
}

// OkToContinue determines if it's ok to continue an upgrade
func OkToContinue(context *clusterd.Context, namespace, deployment string, cephVersion cephver.CephVersion) error {
daemonName, err := findDaemonName(deployment)
if err != nil {
logger.Warningf("%+v", err)
return nil
}

// the mon case is handled directly in the deployment where the mon checks for quorum
switch daemonName {
case "osd":
err := okToContinueOSDDaemon(context, namespace)
if err != nil {
return fmt.Errorf("failed to check if %s was ok to continue. %+v", deployment, err)
}
case "mds":
err := okToContinueMDSDaemon(context, namespace, deployment)
if err != nil {
return fmt.Errorf("failed to check if %s was ok to continue. %+v", deployment, err)
}
}

return nil
}

func okToStopDaemon(context *clusterd.Context, deployment string) error {
daemonID := findDaemonID(deployment)
daemonName, err := findDaemonName(deployment)
if err != nil {
logger.Warningf("%+v", err)
return nil
}

if !stringInSlice(daemonName, daemonNoCheck) {
output, err := context.Executor.ExecuteCommandWithOutput(false, "", "ceph", daemonName, "ok-to-stop", daemonID)
if err != nil {
return fmt.Errorf("deployment %s cannot be stopped. %+v", deployment, err)
}
logger.Infof("deployment %s is ok to be updated. %s", deployment, output)
}

logger.Infof("deployment %s is ok to be updated.", deployment)

return nil
}

// okToContinueOSDDaemon determines whether it's fine to go to the next osd during an upgrade
// This basically makes sure all the PGs have settled
func okToContinueOSDDaemon(context *clusterd.Context, namespace string) error {
err := util.Retry(3000, 15*time.Second, func() error {
return IsClusterClean(context, namespace)
})
if err != nil {
return err
}

return nil
}

// okToContinueMDSDaemon determines whether it's fine to go to the next mds during an upgrade
// mostly a placeholder function for the future but since we have standby mds this shouldn't be needed
func okToContinueMDSDaemon(context *clusterd.Context, namespace, deployment string) error {
daemonName, err := findDaemonName(deployment)
if err != nil {
logger.Warningf("%+v", err)
return nil
}

// wait for the MDS to be active again or in standby-replay
err = util.Retry(10, 15*time.Second, func() error {
return MdsActiveOrStandbyReplay(context, namespace, daemonName)
})
if err != nil {
return err
}

return nil
}

func findDaemonID(deployment string) string {
daemonTrimPrefixSplit := strings.Split(deployment, "-")
return daemonTrimPrefixSplit[len(daemonTrimPrefixSplit)-1]
}

func findDaemonName(deployment string) (string, error) {
err := errors.New("could not find daemon name, is this a new daemon?")

if strings.Contains(deployment, "mds") {
return "mds", nil
}
if strings.Contains(deployment, "rgw") {
return "rgw", nil
}
if strings.Contains(deployment, "mon") {
return "mon", nil
}
if strings.Contains(deployment, "osd") {
return "osd", nil
}
if strings.Contains(deployment, "mgr") {
return "mgr", nil
}
if strings.Contains(deployment, "nfs") {
return "nfs", nil
}
if strings.Contains(deployment, "rbdmirror") {
return "rbdmirror", nil
}

return "", fmt.Errorf("%+v from deployment %s", err, deployment)
}

func stringInSlice(a string, list []string) bool {
for _, b := range list {
if b == a {
return true
}
}
return false
}
Loading

0 comments on commit aa2d6fe

Please sign in to comment.