Skip to content

Commit

Permalink
ceph: improve upgrade procedure
Browse files Browse the repository at this point in the history
When a cluster is updated with a different image version, this triggers
a serialized restart of all the pods. Prior to this commit, no safety
check were performed and rook was hoping for the best outcome.

Now before doing restarting a daemon we check it can be restarted. Once
it's restarted we also check we can pursue with the rest of the
platform. For instance, with monitors we check that they are in quorum,
for OSD we check that PGs are clean and for MDS we make sure they are
 active.

Fixes: rook#2889
Signed-off-by: Sébastien Han <seb@redhat.com>
  • Loading branch information
leseb committed Jun 5, 2019
1 parent 298dc10 commit 8b96320
Show file tree
Hide file tree
Showing 10 changed files with 217 additions and 17 deletions.
147 changes: 142 additions & 5 deletions pkg/daemon/ceph/client/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package client
import (
"encoding/json"
"fmt"
"strings"

"github.com/rook/rook/pkg/clusterd"
cephver "github.com/rook/rook/pkg/operator/ceph/version"
Expand All @@ -35,7 +36,7 @@ type CephDaemonsVersions struct {
func getCephMonVersionString(context *clusterd.Context) (string, error) {
output, err := context.Executor.ExecuteCommandWithOutput(false, "", "ceph", "version")
if err != nil {
return "", fmt.Errorf("failed to run ceph version: %+v", err)
return "", fmt.Errorf("failed to run ceph version. %+v", err)
}
logger.Debug(output)

Expand All @@ -45,7 +46,22 @@ func getCephMonVersionString(context *clusterd.Context) (string, error) {
func getCephVersionsString(context *clusterd.Context) (string, error) {
output, err := context.Executor.ExecuteCommandWithOutput(false, "", "ceph", "versions")
if err != nil {
return "", fmt.Errorf("failed to run ceph versions: %+v", err)
return "", fmt.Errorf("failed to run ceph versions. %+v", err)
}
logger.Debug(output)

return output, nil
}

func getCephDaemonVersionString(context *clusterd.Context, deployment string) (string, error) {
deploymentSplit := strings.Split(deployment, "-")
daemonID := deploymentSplit[len(deploymentSplit)-1]
daemonName := deploymentSplit[len(deploymentSplit)-2]
daemon := daemonName + "." + daemonID

output, err := context.Executor.ExecuteCommandWithOutput(false, "", "ceph", "tell", daemon, "version")
if err != nil {
return "", fmt.Errorf("failed to run ceph tell. %+v", err)
}
logger.Debug(output)

Expand All @@ -56,7 +72,23 @@ func getCephVersionsString(context *clusterd.Context) (string, error) {
func GetCephMonVersion(context *clusterd.Context) (*cephver.CephVersion, error) {
output, err := getCephMonVersionString(context)
if err != nil {
return nil, fmt.Errorf("failed to run ceph version: %+v", err)
return nil, fmt.Errorf("failed to run ceph version. %+v", err)
}
logger.Debug(output)

v, err := cephver.ExtractCephVersion(output)
if err != nil {
return nil, fmt.Errorf("failed to extract ceph version. %+v", err)
}

return v, nil
}

// GetCephDaemonVersion reports the Ceph version of a particular daemon
func GetCephDaemonVersion(context *clusterd.Context, deployment string) (*cephver.CephVersion, error) {
output, err := getCephDaemonVersionString(context, deployment)
if err != nil {
return nil, fmt.Errorf("failed to run ceph tell. %+v", err)
}
logger.Debug(output)

Expand All @@ -72,7 +104,7 @@ func GetCephMonVersion(context *clusterd.Context) (*cephver.CephVersion, error)
func GetCephVersions(context *clusterd.Context) (*CephDaemonsVersions, error) {
output, err := getCephVersionsString(context)
if err != nil {
return nil, fmt.Errorf("failed to run ceph versions: %+v", err)
return nil, fmt.Errorf("failed to run ceph versions. %+v", err)
}
logger.Debug(output)

Expand All @@ -89,9 +121,114 @@ func GetCephVersions(context *clusterd.Context) (*CephDaemonsVersions, error) {
func EnableMessenger2(context *clusterd.Context) error {
_, err := context.Executor.ExecuteCommandWithOutput(false, "", "ceph", "mon", "enable-msgr2")
if err != nil {
return fmt.Errorf("failed to enable msgr2 protocol: %+v", err)
return fmt.Errorf("failed to enable msgr2 protocol. %+v", err)
}
logger.Infof("successfully enabled msgr2 protocol")

return nil
}

// OkToStopOrContinue determines wether it's ok to stop or continue an upgrade
func OkToStopOrContinue(context *clusterd.Context, namespace, deployment, action string, cephVersion cephver.CephVersion) error {
deploymentSplit := strings.Split(deployment, "-")
daemonName := deploymentSplit[len(deploymentSplit)-2]

// The ok-to-stop command for mon and mds landed on 14.2.1
// so we return nil if that Ceph version is not satisfied
if !cephVersion.IsAtLeast(cephver.CephVersion{Major: 14, Minor: 2, Extra: 1}) {
if action == "stop" && daemonName != "osd" {
return nil
}
}

// Trying to handle the case where a **single** mon is deployed and an upgrade is called
if daemonName == "mon" {
versions, err := GetCephVersions(context)
if err != nil {
return fmt.Errorf("failed to get ceph daemons versions. %+v", err)
}
// if len(versions.Mon) > 1, we have different Ceph versions for some monitor
// this is fine running the upgrade we can run the upgrade checks
if len(versions.Mon) == 1 {
// now trying to parse and find how many mons are presents
// if we have less than 3 mons we skip the check and do best-effort
// we do less than 3 because during the initial bootstrap the mon sequence is updated too
// so running running the check on 2/3 mon fails
// versions.Mon looks like this map[ceph version 15.0.0-12-g6c8fb92 (6c8fb920cb1d862f36ee852ed849a15f9a50bd68) octopus (dev):1]
// now looping over a single element since we can't address the key directly (we don't know its name)
for _, monCount := range versions.Mon {
if monCount < 3 {
logger.Infof("the cluster has less than 3 monitors, not performing upgrade check, running in best-effort")
return nil
}
}
}
}

if action == "stop" {
err := okToStopDaemon(context, deployment, daemonName)
if err != nil {
return fmt.Errorf("failed to check if %s was ok to %s. %+v", deployment, action, err)
}
}

if action == "continue" {
// the mon case is handled directly in the deployment where the mon checks for quorum
switch daemonName {
case "osd":
err := okToContinueOSDDaemon(context, namespace)
if err != nil {
return fmt.Errorf("failed to check if %s was ok to %s. %+v", deployment, action, err)
}
case "mds":
err := okToContinueMDSDaemon(context, namespace, deployment, daemonName)
if err != nil {
return fmt.Errorf("failed to check if %s was ok to %s. %+v", deployment, action, err)
}
case "rgw":
err := okToContinueRGWDaemon(context, namespace, deployment, daemonName)
if err != nil {
return fmt.Errorf("failed to check if %s was ok to %s. %+v", deployment, action, err)
}
}
}

return nil
}

func okToStopDaemon(context *clusterd.Context, deployment, daemon string) error {
deploymentSplit := strings.Split(deployment, "-")
daemonID := deploymentSplit[len(deploymentSplit)-1]

// mgr and rbdmirror do not implement that check
if daemon != "mgr" && daemon != "rbdmirror" {
output, err := context.Executor.ExecuteCommandWithOutput(false, "", "ceph", daemon, "ok-to-stop", daemonID)
if err != nil {
return fmt.Errorf("deployment %s cannot be stopped. %+v", deployment, err)
}
logger.Infof("deployment %s is ok to be updated. %s", deployment, output)
}
logger.Infof("deployment %s is ok to be updated.", deployment)

return nil
}

// okToContinueOSDDaemon determines whether it's fine to go to the next osd during an upgrade
// This basically makes sure all the PGs have settled
func okToContinueOSDDaemon(context *clusterd.Context, namespace string) error {
if err := IsClusterClean(context, namespace); err != nil {
return err
}

return nil
}

// okToContinueMDSDaemon determines whether it's fine to go to the next mds during an upgrade
func okToContinueMDSDaemon(context *clusterd.Context, namespace, deployment, daemon string) error {
return nil
}

// okToContinueRGWDaemon determines whether it's fine to go to the next rgw during an upgrade
func okToContinueRGWDaemon(context *clusterd.Context, namespace, deployment, daemon string) error {
return nil
}
33 changes: 33 additions & 0 deletions pkg/daemon/ceph/client/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,22 @@ func TestGetCephMonVersionsString(t *testing.T) {
assert.Nil(t, err)
}

func TestGetCephDaemonVersionString(t *testing.T) {
executor := &exectest.MockExecutor{}
deployment := "rook-ceph-mds-a"
executor.MockExecuteCommandWithOutput = func(debug bool, name string, command string, args ...string) (string, error) {
assert.Equal(t, "tell", args[0])
assert.Equal(t, "mds.a", args[1])
assert.Equal(t, "version", args[2])
assert.Equal(t, 3, len(args))
return "", nil
}
context := &clusterd.Context{Executor: executor}

_, err := getCephDaemonVersionString(context, deployment)
assert.Nil(t, err)
}

func TestEnableMessenger2(t *testing.T) {
executor := &exectest.MockExecutor{}
executor.MockExecuteCommandWithOutput = func(debug bool, name string, command string, args ...string) (string, error) {
Expand All @@ -63,3 +79,20 @@ func TestEnableMessenger2(t *testing.T) {
err := EnableMessenger2(context)
assert.Nil(t, err)
}

func TestOkToStopDaemon(t *testing.T) {
executor := &exectest.MockExecutor{}
executor.MockExecuteCommandWithOutput = func(debug bool, name string, command string, args ...string) (string, error) {
assert.Equal(t, "mon", args[0])
assert.Equal(t, "ok-to-stop", args[1])
assert.Equal(t, "a", args[2])
assert.Equal(t, 3, len(args))
return "", nil
}
context := &clusterd.Context{Executor: executor}

deployment := "rook-ceph-mon-a"
daemon := "mon"
err := okToStopDaemon(context, deployment, daemon)
assert.Nil(t, err)
}
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/mgr/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (c *Cluster) Start() error {
return fmt.Errorf("failed to create mgr deployment %s. %+v", resourceName, err)
}
logger.Infof("deployment for mgr %s already exists. updating if needed", resourceName)
if _, err := updateDeploymentAndWait(c.context, d, c.Namespace); err != nil {
if _, err := updateDeploymentAndWait(c.context, d, c.Namespace, c.clusterInfo); err != nil {
return fmt.Errorf("failed to update mgr deployment %s. %+v", resourceName, err)
}
}
Expand Down
15 changes: 13 additions & 2 deletions pkg/operator/ceph/cluster/mon/mon.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,9 +423,20 @@ func (c *Cluster) startDeployments(mons []*monConfig, requireAllInQuorum bool) e
if err != nil {
return fmt.Errorf("failed to create mon %s. %+v", mons[i].DaemonName, err)
}
// For the initial deployment (first creation) it's expected to not have all the monitors in quorum
// However, in an event of an update, it's crucial to proceed monitors by monitors
// At the end of the method we perform one last check where all the monitors must be in quorum
requireAllInQuorum := false
err = c.waitForMonsToJoin(mons, requireAllInQuorum)
if err != nil {
return fmt.Errorf("failed to check mon quorum %s. %+v", mons[i].DaemonName, err)
}
}

logger.Infof("mons created: %d", len(mons))
// Final verification that **all** mons are in quorum
// Do not proceed if one monitor is still syncing
requireAllInQuorum = true
return c.waitForMonsToJoin(mons, requireAllInQuorum)
}

Expand Down Expand Up @@ -505,7 +516,7 @@ func (c *Cluster) startMon(m *monConfig, hostname string) error {
return fmt.Errorf("failed to create mon deployment %s. %+v", m.ResourceName, err)
}
logger.Infof("deployment for mon %s already exists. updating if needed", m.ResourceName)
if _, err := updateDeploymentAndWait(c.context, d, c.Namespace); err != nil {
if _, err := updateDeploymentAndWait(c.context, d, c.Namespace, c.clusterInfo); err != nil {
return fmt.Errorf("failed to update mon deployment %s. %+v", m.ResourceName, err)
}
}
Expand All @@ -518,7 +529,7 @@ func waitForQuorumWithMons(context *clusterd.Context, clusterName string, mons [

// wait for monitors to establish quorum
retryCount := 0
retryMax := 20
retryMax := 30
for {
retryCount++
if retryCount > retryMax {
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/osd/osd.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (c *Cluster) startOSDDaemonsOnNode(nodeName string, config *provisionConfig
continue
}
logger.Infof("deployment for osd %d already exists. updating if needed", osd.ID)
if _, err = k8sutil.UpdateDeploymentAndWait(c.context, dp, c.Namespace); err != nil {
if _, err = k8sutil.UpdateDeploymentAndWait(c.context, dp, c.Namespace, c.clusterInfo); err != nil {
config.addError(fmt.Sprintf("failed to update osd deployment %d. %+v", osd.ID, err))
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/rbd/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (m *Mirroring) Start() error {
return fmt.Errorf("failed to create %s deployment. %+v", resourceName, err)
}
logger.Infof("deployment for rbd-mirror %s already exists. updating if needed", resourceName)
if _, err := updateDeploymentAndWait(m.context, d, m.Namespace); err != nil {
if _, err := updateDeploymentAndWait(m.context, d, m.Namespace, m.ClusterInfo); err != nil {
// fail could be an issue updating label selector (immutable), so try del and recreate
logger.Debugf("updateDeploymentAndWait failed for rbd-mirror %s. Attempting del-and-recreate. %+v", resourceName, err)
err = m.context.Clientset.AppsV1().Deployments(m.Namespace).Delete(d.Name, &metav1.DeleteOptions{})
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/file/mds/mds.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (c *Cluster) Start() error {
// keyring must be generated before update-and-wait since no keyring will prevent the
// deployment from reaching ready state
if createErr != nil && errors.IsAlreadyExists(createErr) {
if _, err = UpdateDeploymentAndWait(c.context, d, c.fs.Namespace); err != nil {
if _, err = UpdateDeploymentAndWait(c.context, d, c.fs.Namespace, c.clusterInfo); err != nil {
return fmt.Errorf("failed to update mds deployment %s. %+v", d.Name, err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/nfs/nfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (c *CephNFSController) upCephNFS(n cephv1.CephNFS, oldActive int) error {
return fmt.Errorf("failed to create ganesha deployment. %+v", err)
}
logger.Infof("ganesha deployment %s already exists. updating if needed", deployment.Name)
if _, err := updateDeploymentAndWait(c.context, deployment, n.Namespace); err != nil {
if _, err := updateDeploymentAndWait(c.context, deployment, n.Namespace, c.clusterInfo); err != nil {
return fmt.Errorf("failed to update ganesha deployment %s. %+v", deployment.Name, err)
}
} else {
Expand Down
24 changes: 21 additions & 3 deletions pkg/operator/k8sutil/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import (
"time"

"github.com/rook/rook/pkg/clusterd"
"k8s.io/api/apps/v1"
"github.com/rook/rook/pkg/daemon/ceph/client"
cephconfig "github.com/rook/rook/pkg/daemon/ceph/config"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
Expand All @@ -36,6 +38,7 @@ func GetDeploymentImage(clientset kubernetes.Interface, namespace, name, contain
return GetDeploymentSpecImage(clientset, *d, container, false)
}

// GetDeploymentSpecImage returns the image name from the spec
func GetDeploymentSpecImage(clientset kubernetes.Interface, d apps.Deployment, container string, initContainer bool) (string, error) {
image, err := GetSpecContainerImage(d.Spec.Template.Spec, container, initContainer)
if err != nil {
Expand All @@ -47,12 +50,19 @@ func GetDeploymentSpecImage(clientset kubernetes.Interface, d apps.Deployment, c

// UpdateDeploymentAndWait updates a deployment and waits until it is running to return. It will
// error if the deployment does not exist to be updated or if it takes too long.
func UpdateDeploymentAndWait(context *clusterd.Context, deployment *apps.Deployment, namespace string) (*v1.Deployment, error) {
func UpdateDeploymentAndWait(context *clusterd.Context, deployment *apps.Deployment, namespace string, clusterInfo *cephconfig.ClusterInfo) (*v1.Deployment, error) {
original, err := context.Clientset.AppsV1().Deployments(namespace).Get(deployment.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get deployment %s. %+v", deployment.Name, err)
}

actionToCheck := "stop"
logger.Infof("checking if deployment %s can be updated", deployment.Name)
err = client.OkToStopOrContinue(context, namespace, deployment.Name, actionToCheck, clusterInfo.CephVersion)
if err != nil {
return nil, fmt.Errorf("failed to check if deployment %s can be updated: %+v", deployment.Name, err)
}

logger.Infof("updating deployment %s", deployment.Name)
if _, err := context.Clientset.AppsV1().Deployments(namespace).Update(deployment); err != nil {
return nil, fmt.Errorf("failed to update deployment %s. %+v", deployment.Name, err)
Expand All @@ -73,9 +83,17 @@ func UpdateDeploymentAndWait(context *clusterd.Context, deployment *apps.Deploym
}
if d.Status.ObservedGeneration != original.Status.ObservedGeneration && d.Status.UpdatedReplicas > 0 && d.Status.ReadyReplicas > 0 {
logger.Infof("finished waiting for updated deployment %s", d.Name)

// Now we check if we can go to the next daemon
actionToCheck := "continue"
logger.Infof("checking if deployment %s update can continue", deployment.Name)
err := client.OkToStopOrContinue(context, namespace, deployment.Name, actionToCheck, clusterInfo.CephVersion)
if err != nil {
return nil, fmt.Errorf("failed to check if deployment %s can be updated. %+v", deployment.Name, err)
}

return d, nil
}

logger.Debugf("deployment %s status=%+v", d.Name, d.Status)
time.Sleep(time.Duration(sleepTime) * time.Second)
}
Expand Down
Loading

0 comments on commit 8b96320

Please sign in to comment.