Skip to content

Commit

Permalink
ceph: improve upgrade procedure
Browse files Browse the repository at this point in the history
When a cluster is updated with a different image version, this triggers
a serialized restart of all the pods. Prior to this commit, no safety
check were performed and rook was hoping for the best outcome.

Now before doing restarting a daemon we check it can be restarted. Once
it's restarted we also check we can pursue with the rest of the
platform. For instance, with monitors we check that they are in quorum,
for OSD we check that PGs are clean and for MDS we make sure they are
  active.

Fixes: rook#2889
Signed-off-by: Sébastien Han <seb@redhat.com>
  • Loading branch information
leseb committed May 19, 2019
1 parent 097f5f1 commit ec02f74
Show file tree
Hide file tree
Showing 11 changed files with 405 additions and 16 deletions.
6 changes: 3 additions & 3 deletions pkg/daemon/ceph/client/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestFinalizeCephCommandArgs(t *testing.T) {
args := []string{"mon_status"}
expectedArgs := []string{
"mon_status",
"--connect-timeout=15",
"--connect-timeout=60",
"--cluster=rook",
"--conf=/var/lib/rook/rook-ceph/rook/rook.config",
"--keyring=/var/lib/rook/rook-ceph/rook/client.admin.keyring",
Expand Down Expand Up @@ -85,7 +85,7 @@ func TestFinalizeCephCommandArgsToolBox(t *testing.T) {
"--",
"ceph",
"health",
"--connect-timeout=15",
"--connect-timeout=60",
}

cmd, args := FinalizeCephCommandArgs(expectedCommand, args, configDir, clusterName)
Expand All @@ -101,7 +101,7 @@ func TestFinalizeCephCommandArgsClusterDefaultName(t *testing.T) {
args := []string{"mon_status"}
expectedArgs := []string{
"mon_status",
"--connect-timeout=15",
"--connect-timeout=60",
}

cmd, args := FinalizeCephCommandArgs(expectedCommand, args, configDir, clusterName)
Expand Down
4 changes: 2 additions & 2 deletions pkg/daemon/ceph/client/mon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestCephArgs(t *testing.T) {
assert.Equal(t, CephTool, command)
assert.Equal(t, 4, len(args))
assert.Equal(t, 4, len(args))
assert.Equal(t, "--connect-timeout=15", args[0])
assert.Equal(t, "--connect-timeout=60", args[0])
assert.Equal(t, "--cluster=a", args[1])
assert.Equal(t, "--conf=/etc/a/a.config", args[2])
assert.Equal(t, "--keyring=/etc/a/client.admin.keyring", args[3])
Expand All @@ -46,7 +46,7 @@ func TestCephArgs(t *testing.T) {
assert.Equal(t, "a", args[4])
assert.Equal(t, "--", args[5])
assert.Equal(t, CephTool, args[6])
assert.Equal(t, "--connect-timeout=15", args[7])
assert.Equal(t, "--connect-timeout=60", args[7])
RunAllCephCommandsInToolbox = false

// cluster under /var/lib/rook
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/mgr/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (c *Cluster) Start() error {
return fmt.Errorf("failed to create mgr deployment %s. %+v", resourceName, err)
}
logger.Infof("deployment for mgr %s already exists. updating if needed", resourceName)
if _, err := updateDeploymentAndWait(c.context, d, c.Namespace); err != nil {
if _, err := updateDeploymentAndWait(c.context, d, c.Namespace, c.clusterInfo); err != nil {
return fmt.Errorf("failed to update mgr deployment %s. %+v", resourceName, err)
}
}
Expand Down
15 changes: 13 additions & 2 deletions pkg/operator/ceph/cluster/mon/mon.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,9 +423,20 @@ func (c *Cluster) startDeployments(mons []*monConfig, requireAllInQuorum bool) e
if err != nil {
return fmt.Errorf("failed to create mon %s. %+v", mons[i].DaemonName, err)
}
// For the initial deployment (first creation) it's expected to not have all the monitors in quorum
// However, in an event of an update, it's crucial to proceed monitors by monitors
// At the end of the method we perform one last check where all the monitors must be in quorum
requireAllInQuorum := false
err = c.waitForMonsToJoin(mons, requireAllInQuorum)
if err != nil {
return fmt.Errorf("failed to check mon quorum %s. %+v", mons[i].DaemonName, err)
}
}

logger.Infof("mons created: %d", len(mons))
// Final verification that **all** mons are in quorum
// Do not proceed if one monitor is still syncing
requireAllInQuorum = true
return c.waitForMonsToJoin(mons, requireAllInQuorum)
}

Expand Down Expand Up @@ -505,7 +516,7 @@ func (c *Cluster) startMon(m *monConfig, hostname string) error {
return fmt.Errorf("failed to create mon deployment %s. %+v", m.ResourceName, err)
}
logger.Infof("deployment for mon %s already exists. updating if needed", m.ResourceName)
if _, err := updateDeploymentAndWait(c.context, d, c.Namespace); err != nil {
if _, err := updateDeploymentAndWait(c.context, d, c.Namespace, c.clusterInfo); err != nil {
return fmt.Errorf("failed to update mon deployment %s. %+v", m.ResourceName, err)
}
}
Expand All @@ -518,7 +529,7 @@ func waitForQuorumWithMons(context *clusterd.Context, clusterName string, mons [

// wait for monitors to establish quorum
retryCount := 0
retryMax := 20
retryMax := 30
for {
retryCount++
if retryCount > retryMax {
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/osd/osd.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (c *Cluster) startOSDDaemonsOnNode(nodeName string, config *provisionConfig
continue
}
logger.Infof("deployment for osd %d already exists. updating if needed", osd.ID)
if _, err = k8sutil.UpdateDeploymentAndWait(c.context, dp, c.Namespace); err != nil {
if _, err = k8sutil.UpdateDeploymentAndWait(c.context, dp, c.Namespace, c.clusterInfo); err != nil {
config.addError(fmt.Sprintf("failed to update osd deployment %d. %+v", osd.ID, err))
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/rbd/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (m *Mirroring) Start() error {
return fmt.Errorf("failed to create %s deployment. %+v", resourceName, err)
}
logger.Infof("deployment for rbd-mirror %s already exists. updating if needed", resourceName)
if _, err := updateDeploymentAndWait(m.context, d, m.Namespace); err != nil {
if _, err := updateDeploymentAndWait(m.context, d, m.Namespace, m.ClusterInfo); err != nil {
// fail could be an issue updating label selector (immutable), so try del and recreate
logger.Debugf("updateDeploymentAndWait failed for rbd-mirror %s. Attempting del-and-recreate. %+v", resourceName, err)
err = m.context.Clientset.AppsV1().Deployments(m.Namespace).Delete(d.Name, &metav1.DeleteOptions{})
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/file/mds/mds.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (c *Cluster) Start() error {
// keyring must be generated before update-and-wait since no keyring will prevent the
// deployment from reaching ready state
if createErr != nil && errors.IsAlreadyExists(createErr) {
if _, err = UpdateDeploymentAndWait(c.context, d, c.fs.Namespace); err != nil {
if _, err = UpdateDeploymentAndWait(c.context, d, c.fs.Namespace, c.clusterInfo); err != nil {
return fmt.Errorf("failed to update mds deployment %s. %+v", d.Name, err)
}
}
Expand Down
204 changes: 204 additions & 0 deletions pkg/operator/ceph/upgrade/upgrade.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
Copyright 2019 The Rook Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package upgrade

import (
"encoding/json"
"fmt"
"strings"

"github.com/coreos/pkg/capnslog"
"github.com/rook/rook/pkg/clusterd"
"github.com/rook/rook/pkg/daemon/ceph/client"
cephver "github.com/rook/rook/pkg/operator/ceph/version"
)

// CephDaemonsVersions is a structure that can be used to parsed the output of the 'ceph versions' command
type CephDaemonsVersions struct {
Mon map[string]int `json:"mon,omitempty"`
Mgr map[string]int `json:"mgr,omitempty"`
Mds map[string]int `json:"mds,omitempty"`
Overall map[string]int `json:"overall,omitempty"`
}

var (
logger = capnslog.NewPackageLogger("github.com/rook/rook", "upgrade")
)

func getCephMonVersionString(context *clusterd.Context) (string, error) {
output, err := context.Executor.ExecuteCommandWithOutput(false, "", "ceph", "version")
if err != nil {
return "", fmt.Errorf("failed to run ceph version: %+v", err)
}
logger.Debug(output)

return output, nil
}

func getCephVersionsString(context *clusterd.Context) (string, error) {
output, err := context.Executor.ExecuteCommandWithOutput(false, "", "ceph", "versions")
if err != nil {
return "", fmt.Errorf("failed to run ceph versions: %+v", err)
}
logger.Debug(output)

return output, nil
}

func getCephDaemonVersionString(context *clusterd.Context, deployment string) (string, error) {
deploymentSplit := strings.Split(deployment, "-")
daemonID := deploymentSplit[len(deploymentSplit)-1]
daemonName := deploymentSplit[len(deploymentSplit)-2]
daemon := daemonName + "." + daemonID

output, err := context.Executor.ExecuteCommandWithOutput(false, "", "ceph", "tell", daemon, "version")
if err != nil {
return "", fmt.Errorf("failed to run ceph tell: %+v", err)
}
logger.Debug(output)

return output, nil
}

// GetCephMonVersion reports the Ceph version of all the monitors, or at least a majority with quorum
func GetCephMonVersion(context *clusterd.Context) (*cephver.CephVersion, error) {
output, err := getCephMonVersionString(context)
if err != nil {
return nil, fmt.Errorf("failed to run ceph version: %+v", err)
}
logger.Debug(output)

v, err := cephver.ExtractCephVersion(output)
if err != nil {
return nil, fmt.Errorf("failed to extract ceph version. %+v", err)
}

return v, nil
}

// GetCephDaemonVersion reports the Ceph version of a particular daemon
func GetCephDaemonVersion(context *clusterd.Context, deployment string) (*cephver.CephVersion, error) {
output, err := getCephDaemonVersionString(context, deployment)
if err != nil {
return nil, fmt.Errorf("failed to run ceph tell: %+v", err)
}
logger.Debug(output)

v, err := cephver.ExtractCephVersion(output)
if err != nil {
return nil, fmt.Errorf("failed to extract ceph version. %+v", err)
}

return v, nil
}

// GetCephVersions reports the Ceph version of each daemon in the cluster
func GetCephVersions(context *clusterd.Context) (*CephDaemonsVersions, error) {
output, err := getCephVersionsString(context)
if err != nil {
return nil, fmt.Errorf("failed to run ceph versions: %+v", err)
}
logger.Debug(output)

var cephVersionsResult CephDaemonsVersions
err = json.Unmarshal([]byte(output), &cephVersionsResult)
if err != nil {
return nil, fmt.Errorf("failed to retrieve ceph versions results. %+v", err)
}

return &cephVersionsResult, nil
}

// EnableMessenger2 enable the messenger 2 protocol on Nautilus clusters
func EnableMessenger2(context *clusterd.Context, namespace string) error {
args := []string{"mon", "enable-msgr2"}
_, err := client.ExecuteCephCommand(context, namespace, args)
if err != nil {
return fmt.Errorf("failed to enable msgr2 protocol: %+v", err)
}
logger.Infof("successfully enabled msgr2 protocol")

return nil
}

// OkToStopOrContinue determines wether it's ok to stop or continue an upgrade
func OkToStopOrContinue(context *clusterd.Context, namespace, deployment, action string, cephVersion cephver.CephVersion) error {
deploymentSplit := strings.Split(deployment, "-")
daemonName := deploymentSplit[len(deploymentSplit)-2]

// The ok-to-stop command for mon and mds landed on 14.2.1
// so we return nil if that Ceph version is not satisfied
if !cephVersion.IsAtLeast(cephver.CephVersion{Major: 14, Minor: 2, Extra: 1}) {
if action == "stop" && daemonName != "osd" {
return nil
}
}

if action == "stop" {
err := OkToStopDaemon(context, deployment, daemonName)
if err != nil {
return fmt.Errorf("failed to check if %s was ok to %s", deployment, action)
}
}

if action == "continue" {
// the mon case is handled directly in the deployment where the mon checks for quorum
switch daemonName {
case "osd":
err := OkToContinueOSDDaemon(context, namespace)
if err != nil {
return fmt.Errorf("failed to check if %s was ok to %s", deployment, action)
}
case "mds":
err := OkToContinueMDSDaemon(context, namespace, deployment, daemonName)
if err != nil {
return fmt.Errorf("failed to check if %s was ok to %s", deployment, action)
}
}
}

return nil
}

// OkToStopDaemon determines whether it's fine to stop a Ceph daemon
func OkToStopDaemon(context *clusterd.Context, deployment, daemon string) error {
deploymentSplit := strings.Split(deployment, "-")
daemonID := deploymentSplit[len(deploymentSplit)-1]

output, err := context.Executor.ExecuteCommandWithOutput(false, "", "ceph", daemon, "ok-to-stop", daemonID)
if err != nil {
return fmt.Errorf("deployment %s cannot be stopped: %+v", deployment, err)
}
logger.Infof("deployment %s is ok to be updated. %s", deployment, output)

return nil
}

// OkToContinueOSDDaemon determines whether it's fine to go to the next osd during an upgrade
// This basically makes sure all the PGs have settled
func OkToContinueOSDDaemon(context *clusterd.Context, namespace string) error {
if err := client.IsClusterClean(context, namespace); err != nil {
return err
}

return nil
}

// OkToContinueMDSDaemon determines whether it's fine to go to the next mds during an upgrade
func OkToContinueMDSDaemon(context *clusterd.Context, namespace, deployment, daemon string) error {
return nil
}
Loading

0 comments on commit ec02f74

Please sign in to comment.