Skip to content

Commit

Permalink
ceph: improve upgrade procedure
Browse files Browse the repository at this point in the history
When a cluster is updated with a different image version, this triggers
a serialized restart of all the pods. Prior to this commit, no safety
check were performed and rook was hoping for the best outcome.

Now before doing restarting a daemon we check it can be restarted. Once
it's restarted we also check we can pursue with the rest of the
platform. For instance, with monitors we check that they are in quorum,
for OSD we check that PGs are clean and for MDS we make sure they are
  active.

Fixes: rook#2889
Signed-off-by: Sébastien Han <seb@redhat.com>
  • Loading branch information
leseb committed Jun 4, 2019
1 parent cea5fd8 commit 896beef
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 11 deletions.
121 changes: 121 additions & 0 deletions pkg/daemon/ceph/client/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package client
import (
"encoding/json"
"fmt"
"strings"

"github.com/rook/rook/pkg/clusterd"
cephver "github.com/rook/rook/pkg/operator/ceph/version"
Expand Down Expand Up @@ -52,6 +53,21 @@ func getCephVersionsString(context *clusterd.Context) (string, error) {
return output, nil
}

func getCephDaemonVersionString(context *clusterd.Context, deployment string) (string, error) {
deploymentSplit := strings.Split(deployment, "-")
daemonID := deploymentSplit[len(deploymentSplit)-1]
daemonName := deploymentSplit[len(deploymentSplit)-2]
daemon := daemonName + "." + daemonID

output, err := context.Executor.ExecuteCommandWithOutput(false, "", "ceph", "tell", daemon, "version")
if err != nil {
return "", fmt.Errorf("failed to run ceph tell: %+v", err)
}
logger.Debug(output)

return output, nil
}

// GetCephMonVersion reports the Ceph version of all the monitors, or at least a majority with quorum
func GetCephMonVersion(context *clusterd.Context) (*cephver.CephVersion, error) {
output, err := getCephMonVersionString(context)
Expand All @@ -68,6 +84,22 @@ func GetCephMonVersion(context *clusterd.Context) (*cephver.CephVersion, error)
return v, nil
}

// GetCephDaemonVersion reports the Ceph version of a particular daemon
func GetCephDaemonVersion(context *clusterd.Context, deployment string) (*cephver.CephVersion, error) {
output, err := getCephDaemonVersionString(context, deployment)
if err != nil {
return nil, fmt.Errorf("failed to run ceph tell: %+v", err)
}
logger.Debug(output)

v, err := cephver.ExtractCephVersion(output)
if err != nil {
return nil, fmt.Errorf("failed to extract ceph version. %+v", err)
}

return v, nil
}

// GetCephVersions reports the Ceph version of each daemon in the cluster
func GetCephVersions(context *clusterd.Context) (*CephDaemonsVersions, error) {
output, err := getCephVersionsString(context)
Expand Down Expand Up @@ -95,3 +127,92 @@ func EnableMessenger2(context *clusterd.Context) error {

return nil
}

// OkToStopOrContinue determines wether it's ok to stop or continue an upgrade
func OkToStopOrContinue(context *clusterd.Context, namespace, deployment, action string, cephVersion cephver.CephVersion) error {
deploymentSplit := strings.Split(deployment, "-")
daemonName := deploymentSplit[len(deploymentSplit)-2]

// The ok-to-stop command for mon and mds landed on 14.2.1
// so we return nil if that Ceph version is not satisfied
if !cephVersion.IsAtLeast(cephver.CephVersion{Major: 14, Minor: 2, Extra: 1}) {
if action == "stop" && daemonName != "osd" {
return nil
}
}

// Trying to handle the case where a **single** mon is deployed and and upgrade is called
if daemonName == "mon" {
versions, err := GetCephVersions(context)
if err != nil {
return fmt.Errorf("failed to get ceph daemons versions. %+v", err)
}
// if len(versions.Mon) > 1, we have different Ceph versions for some monitor
// this is fine running the upgrade we can run the upgrade checks
if len(versions.Mon) == 1 {
// now trying to parse and find how many mons are presents
// if < 3 then we skip the check and do best-effort
// versions.Mon looks like this map[ceph version 15.0.0-12-g6c8fb92 (6c8fb920cb1d862f36ee852ed849a15f9a50bd68) octopus (dev):1]
// now looping over a single element since we can't address the key directly, we don't know its name
for _, monCount := range versions.Mon {
if monCount == 1 {
logger.Infof("the cluster has a single monitor only, not performing upgrade check, running in best-effort")
return nil
}
}
}
}

if action == "stop" {
err := okToStopDaemon(context, deployment, daemonName)
if err != nil {
return fmt.Errorf("failed to check if %s was ok to %s", deployment, action)
}
}

if action == "continue" {
// the mon case is handled directly in the deployment where the mon checks for quorum
switch daemonName {
case "osd":
err := OkToContinueOSDDaemon(context, namespace)
if err != nil {
return fmt.Errorf("failed to check if %s was ok to %s", deployment, action)
}
case "mds":
err := OkToContinueMDSDaemon(context, namespace, deployment, daemonName)
if err != nil {
return fmt.Errorf("failed to check if %s was ok to %s", deployment, action)
}
}
}

return nil
}

func okToStopDaemon(context *clusterd.Context, deployment, daemon string) error {
deploymentSplit := strings.Split(deployment, "-")
daemonID := deploymentSplit[len(deploymentSplit)-1]

output, err := context.Executor.ExecuteCommandWithOutput(false, "", "ceph", daemon, "ok-to-stop", daemonID)
if err != nil {
return fmt.Errorf("deployment %s cannot be stopped: %+v", deployment, err)
}
logger.Infof("deployment %s is ok to be updated. %s", deployment, output)

return nil
}

// OkToContinueOSDDaemon determines whether it's fine to go to the next osd during an upgrade
// This basically makes sure all the PGs have settled
func OkToContinueOSDDaemon(context *clusterd.Context, namespace string) error {
if err := IsClusterClean(context, namespace); err != nil {
return err
}

return nil
}

// OkToContinueMDSDaemon determines whether it's fine to go to the next mds during an upgrade
func OkToContinueMDSDaemon(context *clusterd.Context, namespace, deployment, daemon string) error {
return nil
}
33 changes: 33 additions & 0 deletions pkg/daemon/ceph/client/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,22 @@ func TestGetCephMonVersionsString(t *testing.T) {
assert.Nil(t, err)
}

func TestGetCephDaemonVersionString(t *testing.T) {
executor := &exectest.MockExecutor{}
deployment := "rook-ceph-mds-a"
executor.MockExecuteCommandWithOutput = func(debug bool, name string, command string, args ...string) (string, error) {
assert.Equal(t, "tell", args[0])
assert.Equal(t, "mds.a", args[1])
assert.Equal(t, "version", args[2])
assert.Equal(t, 3, len(args))
return "", nil
}
context := &clusterd.Context{Executor: executor}

_, err := getCephDaemonVersionString(context, deployment)
assert.Nil(t, err)
}

func TestEnableMessenger2(t *testing.T) {
executor := &exectest.MockExecutor{}
executor.MockExecuteCommandWithOutput = func(debug bool, name string, command string, args ...string) (string, error) {
Expand All @@ -63,3 +79,20 @@ func TestEnableMessenger2(t *testing.T) {
err := EnableMessenger2(context)
assert.Nil(t, err)
}

func TestOkToStopDaemon(t *testing.T) {
executor := &exectest.MockExecutor{}
executor.MockExecuteCommandWithOutput = func(debug bool, name string, command string, args ...string) (string, error) {
assert.Equal(t, "mon", args[0])
assert.Equal(t, "ok-to-stop", args[1])
assert.Equal(t, "a", args[2])
assert.Equal(t, 3, len(args))
return "", nil
}
context := &clusterd.Context{Executor: executor}

deployment := "rook-ceph-mon-a"
daemon := "mon"
err := okToStopDaemon(context, deployment, daemon)
assert.Nil(t, err)
}
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/mgr/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (c *Cluster) Start() error {
return fmt.Errorf("failed to create mgr deployment %s. %+v", resourceName, err)
}
logger.Infof("deployment for mgr %s already exists. updating if needed", resourceName)
if _, err := updateDeploymentAndWait(c.context, d, c.Namespace); err != nil {
if _, err := updateDeploymentAndWait(c.context, d, c.Namespace, c.clusterInfo); err != nil {
return fmt.Errorf("failed to update mgr deployment %s. %+v", resourceName, err)
}
}
Expand Down
15 changes: 13 additions & 2 deletions pkg/operator/ceph/cluster/mon/mon.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,9 +423,20 @@ func (c *Cluster) startDeployments(mons []*monConfig, requireAllInQuorum bool) e
if err != nil {
return fmt.Errorf("failed to create mon %s. %+v", mons[i].DaemonName, err)
}
// For the initial deployment (first creation) it's expected to not have all the monitors in quorum
// However, in an event of an update, it's crucial to proceed monitors by monitors
// At the end of the method we perform one last check where all the monitors must be in quorum
requireAllInQuorum := false
err = c.waitForMonsToJoin(mons, requireAllInQuorum)
if err != nil {
return fmt.Errorf("failed to check mon quorum %s. %+v", mons[i].DaemonName, err)
}
}

logger.Infof("mons created: %d", len(mons))
// Final verification that **all** mons are in quorum
// Do not proceed if one monitor is still syncing
requireAllInQuorum = true
return c.waitForMonsToJoin(mons, requireAllInQuorum)
}

Expand Down Expand Up @@ -505,7 +516,7 @@ func (c *Cluster) startMon(m *monConfig, hostname string) error {
return fmt.Errorf("failed to create mon deployment %s. %+v", m.ResourceName, err)
}
logger.Infof("deployment for mon %s already exists. updating if needed", m.ResourceName)
if _, err := updateDeploymentAndWait(c.context, d, c.Namespace); err != nil {
if _, err := updateDeploymentAndWait(c.context, d, c.Namespace, c.clusterInfo); err != nil {
return fmt.Errorf("failed to update mon deployment %s. %+v", m.ResourceName, err)
}
}
Expand All @@ -518,7 +529,7 @@ func waitForQuorumWithMons(context *clusterd.Context, clusterName string, mons [

// wait for monitors to establish quorum
retryCount := 0
retryMax := 20
retryMax := 30
for {
retryCount++
if retryCount > retryMax {
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/osd/osd.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (c *Cluster) startOSDDaemonsOnNode(nodeName string, config *provisionConfig
continue
}
logger.Infof("deployment for osd %d already exists. updating if needed", osd.ID)
if _, err = k8sutil.UpdateDeploymentAndWait(c.context, dp, c.Namespace); err != nil {
if _, err = k8sutil.UpdateDeploymentAndWait(c.context, dp, c.Namespace, c.clusterInfo); err != nil {
config.addError(fmt.Sprintf("failed to update osd deployment %d. %+v", osd.ID, err))
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/rbd/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (m *Mirroring) Start() error {
return fmt.Errorf("failed to create %s deployment. %+v", resourceName, err)
}
logger.Infof("deployment for rbd-mirror %s already exists. updating if needed", resourceName)
if _, err := updateDeploymentAndWait(m.context, d, m.Namespace); err != nil {
if _, err := updateDeploymentAndWait(m.context, d, m.Namespace, m.ClusterInfo); err != nil {
// fail could be an issue updating label selector (immutable), so try del and recreate
logger.Debugf("updateDeploymentAndWait failed for rbd-mirror %s. Attempting del-and-recreate. %+v", resourceName, err)
err = m.context.Clientset.AppsV1().Deployments(m.Namespace).Delete(d.Name, &metav1.DeleteOptions{})
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/file/mds/mds.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (c *Cluster) Start() error {
// keyring must be generated before update-and-wait since no keyring will prevent the
// deployment from reaching ready state
if createErr != nil && errors.IsAlreadyExists(createErr) {
if _, err = UpdateDeploymentAndWait(c.context, d, c.fs.Namespace); err != nil {
if _, err = UpdateDeploymentAndWait(c.context, d, c.fs.Namespace, c.clusterInfo); err != nil {
return fmt.Errorf("failed to update mds deployment %s. %+v", d.Name, err)
}
}
Expand Down
24 changes: 21 additions & 3 deletions pkg/operator/k8sutil/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import (
"time"

"github.com/rook/rook/pkg/clusterd"
"k8s.io/api/apps/v1"
"github.com/rook/rook/pkg/daemon/ceph/client"
cephconfig "github.com/rook/rook/pkg/daemon/ceph/config"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
Expand All @@ -36,6 +38,7 @@ func GetDeploymentImage(clientset kubernetes.Interface, namespace, name, contain
return GetDeploymentSpecImage(clientset, *d, container, false)
}

// GetDeploymentSpecImage returns the image name from the spec
func GetDeploymentSpecImage(clientset kubernetes.Interface, d apps.Deployment, container string, initContainer bool) (string, error) {
image, err := GetSpecContainerImage(d.Spec.Template.Spec, container, initContainer)
if err != nil {
Expand All @@ -47,12 +50,19 @@ func GetDeploymentSpecImage(clientset kubernetes.Interface, d apps.Deployment, c

// UpdateDeploymentAndWait updates a deployment and waits until it is running to return. It will
// error if the deployment does not exist to be updated or if it takes too long.
func UpdateDeploymentAndWait(context *clusterd.Context, deployment *apps.Deployment, namespace string) (*v1.Deployment, error) {
func UpdateDeploymentAndWait(context *clusterd.Context, deployment *apps.Deployment, namespace string, clusterInfo *cephconfig.ClusterInfo) (*v1.Deployment, error) {
original, err := context.Clientset.AppsV1().Deployments(namespace).Get(deployment.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get deployment %s. %+v", deployment.Name, err)
}

actionToCheck := "stop"
logger.Infof("checking if deployment %s can be updated", deployment.Name)
err = client.OkToStopOrContinue(context, namespace, deployment.Name, actionToCheck, clusterInfo.CephVersion)
if err != nil {
return nil, fmt.Errorf("failed to check if deployment %s can be updated: %+v", deployment.Name, err)
}

logger.Infof("updating deployment %s", deployment.Name)
if _, err := context.Clientset.AppsV1().Deployments(namespace).Update(deployment); err != nil {
return nil, fmt.Errorf("failed to update deployment %s. %+v", deployment.Name, err)
Expand All @@ -73,9 +83,17 @@ func UpdateDeploymentAndWait(context *clusterd.Context, deployment *apps.Deploym
}
if d.Status.ObservedGeneration != original.Status.ObservedGeneration && d.Status.UpdatedReplicas > 0 && d.Status.ReadyReplicas > 0 {
logger.Infof("finished waiting for updated deployment %s", d.Name)

// Now we check if we can go to the next daemon
actionToCheck := "continue"
logger.Infof("checking if deployment %s update can continue", deployment.Name)
err := client.OkToStopOrContinue(context, namespace, deployment.Name, actionToCheck, clusterInfo.CephVersion)
if err != nil {
return nil, fmt.Errorf("failed to check if deployment %s can be updated. %+v", deployment.Name, err)
}

return d, nil
}

logger.Debugf("deployment %s status=%+v", d.Name, d.Status)
time.Sleep(time.Duration(sleepTime) * time.Second)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/operator/k8sutil/test/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package test

import (
"github.com/rook/rook/pkg/clusterd"
cephconfig "github.com/rook/rook/pkg/daemon/ceph/config"
apps "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand All @@ -15,11 +16,11 @@ import (
// returns a pointer to this slice which the calling func may use to verify the expected contents of
// deploymentsUpdated based on expected behavior.
func UpdateDeploymentAndWaitStub() (
stubFunc func(context *clusterd.Context, deployment *apps.Deployment, namespace string) (*apps.Deployment, error),
stubFunc func(context *clusterd.Context, deployment *apps.Deployment, namespace string, clusterInfo *cephconfig.ClusterInfo) (*apps.Deployment, error),
deploymentsUpdated *[]*apps.Deployment,
) {
deploymentsUpdated = &[]*apps.Deployment{}
stubFunc = func(context *clusterd.Context, deployment *apps.Deployment, namespace string) (*apps.Deployment, error) {
stubFunc = func(context *clusterd.Context, deployment *apps.Deployment, namespace string, clusterInfo *cephconfig.ClusterInfo) (*apps.Deployment, error) {
*deploymentsUpdated = append(*deploymentsUpdated, deployment)
return &apps.Deployment{ObjectMeta: metav1.ObjectMeta{UID: "stub-deployment-uid"}}, nil
}
Expand Down

0 comments on commit 896beef

Please sign in to comment.