From ec02f74b523cb5dd1fb17a2d95bcb3ee136a2538 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Han?= Date: Wed, 27 Mar 2019 12:34:57 +0100 Subject: [PATCH] ceph: improve upgrade procedure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a cluster is updated with a different image version, this triggers a serialized restart of all the pods. Prior to this commit, no safety check were performed and rook was hoping for the best outcome. Now before doing restarting a daemon we check it can be restarted. Once it's restarted we also check we can pursue with the rest of the platform. For instance, with monitors we check that they are in quorum, for OSD we check that PGs are clean and for MDS we make sure they are active. Fixes: https://github.com/rook/rook/issues/2889 Signed-off-by: Sébastien Han --- pkg/daemon/ceph/client/command_test.go | 6 +- pkg/daemon/ceph/client/mon_test.go | 4 +- pkg/operator/ceph/cluster/mgr/mgr.go | 2 +- pkg/operator/ceph/cluster/mon/mon.go | 15 +- pkg/operator/ceph/cluster/osd/osd.go | 2 +- pkg/operator/ceph/cluster/rbd/mirror.go | 2 +- pkg/operator/ceph/file/mds/mds.go | 2 +- pkg/operator/ceph/upgrade/upgrade.go | 204 ++++++++++++++++++++++ pkg/operator/ceph/upgrade/upgrade_test.go | 99 +++++++++++ pkg/operator/k8sutil/deployment.go | 80 ++++++++- pkg/operator/k8sutil/test/deployment.go | 5 +- 11 files changed, 405 insertions(+), 16 deletions(-) create mode 100644 pkg/operator/ceph/upgrade/upgrade.go create mode 100644 pkg/operator/ceph/upgrade/upgrade_test.go diff --git a/pkg/daemon/ceph/client/command_test.go b/pkg/daemon/ceph/client/command_test.go index 122bf0c98a00..9a42a9deab4a 100644 --- a/pkg/daemon/ceph/client/command_test.go +++ b/pkg/daemon/ceph/client/command_test.go @@ -30,7 +30,7 @@ func TestFinalizeCephCommandArgs(t *testing.T) { args := []string{"mon_status"} expectedArgs := []string{ "mon_status", - "--connect-timeout=15", + "--connect-timeout=60", "--cluster=rook", "--conf=/var/lib/rook/rook-ceph/rook/rook.config", "--keyring=/var/lib/rook/rook-ceph/rook/client.admin.keyring", @@ -85,7 +85,7 @@ func TestFinalizeCephCommandArgsToolBox(t *testing.T) { "--", "ceph", "health", - "--connect-timeout=15", + "--connect-timeout=60", } cmd, args := FinalizeCephCommandArgs(expectedCommand, args, configDir, clusterName) @@ -101,7 +101,7 @@ func TestFinalizeCephCommandArgsClusterDefaultName(t *testing.T) { args := []string{"mon_status"} expectedArgs := []string{ "mon_status", - "--connect-timeout=15", + "--connect-timeout=60", } cmd, args := FinalizeCephCommandArgs(expectedCommand, args, configDir, clusterName) diff --git a/pkg/daemon/ceph/client/mon_test.go b/pkg/daemon/ceph/client/mon_test.go index 3a5b7a48736b..369de2385cc2 100644 --- a/pkg/daemon/ceph/client/mon_test.go +++ b/pkg/daemon/ceph/client/mon_test.go @@ -29,7 +29,7 @@ func TestCephArgs(t *testing.T) { assert.Equal(t, CephTool, command) assert.Equal(t, 4, len(args)) assert.Equal(t, 4, len(args)) - assert.Equal(t, "--connect-timeout=15", args[0]) + assert.Equal(t, "--connect-timeout=60", args[0]) assert.Equal(t, "--cluster=a", args[1]) assert.Equal(t, "--conf=/etc/a/a.config", args[2]) assert.Equal(t, "--keyring=/etc/a/client.admin.keyring", args[3]) @@ -46,7 +46,7 @@ func TestCephArgs(t *testing.T) { assert.Equal(t, "a", args[4]) assert.Equal(t, "--", args[5]) assert.Equal(t, CephTool, args[6]) - assert.Equal(t, "--connect-timeout=15", args[7]) + assert.Equal(t, "--connect-timeout=60", args[7]) RunAllCephCommandsInToolbox = false // cluster under /var/lib/rook diff --git a/pkg/operator/ceph/cluster/mgr/mgr.go b/pkg/operator/ceph/cluster/mgr/mgr.go index 720dafa59765..3907c9b17eb8 100644 --- a/pkg/operator/ceph/cluster/mgr/mgr.go +++ b/pkg/operator/ceph/cluster/mgr/mgr.go @@ -141,7 +141,7 @@ func (c *Cluster) Start() error { return fmt.Errorf("failed to create mgr deployment %s. %+v", resourceName, err) } logger.Infof("deployment for mgr %s already exists. updating if needed", resourceName) - if _, err := updateDeploymentAndWait(c.context, d, c.Namespace); err != nil { + if _, err := updateDeploymentAndWait(c.context, d, c.Namespace, c.clusterInfo); err != nil { return fmt.Errorf("failed to update mgr deployment %s. %+v", resourceName, err) } } diff --git a/pkg/operator/ceph/cluster/mon/mon.go b/pkg/operator/ceph/cluster/mon/mon.go index 47c8b8110676..c96f1b403d97 100644 --- a/pkg/operator/ceph/cluster/mon/mon.go +++ b/pkg/operator/ceph/cluster/mon/mon.go @@ -423,9 +423,20 @@ func (c *Cluster) startDeployments(mons []*monConfig, requireAllInQuorum bool) e if err != nil { return fmt.Errorf("failed to create mon %s. %+v", mons[i].DaemonName, err) } + // For the initial deployment (first creation) it's expected to not have all the monitors in quorum + // However, in an event of an update, it's crucial to proceed monitors by monitors + // At the end of the method we perform one last check where all the monitors must be in quorum + requireAllInQuorum := false + err = c.waitForMonsToJoin(mons, requireAllInQuorum) + if err != nil { + return fmt.Errorf("failed to check mon quorum %s. %+v", mons[i].DaemonName, err) + } } logger.Infof("mons created: %d", len(mons)) + // Final verification that **all** mons are in quorum + // Do not proceed if one monitor is still syncing + requireAllInQuorum = true return c.waitForMonsToJoin(mons, requireAllInQuorum) } @@ -505,7 +516,7 @@ func (c *Cluster) startMon(m *monConfig, hostname string) error { return fmt.Errorf("failed to create mon deployment %s. %+v", m.ResourceName, err) } logger.Infof("deployment for mon %s already exists. updating if needed", m.ResourceName) - if _, err := updateDeploymentAndWait(c.context, d, c.Namespace); err != nil { + if _, err := updateDeploymentAndWait(c.context, d, c.Namespace, c.clusterInfo); err != nil { return fmt.Errorf("failed to update mon deployment %s. %+v", m.ResourceName, err) } } @@ -518,7 +529,7 @@ func waitForQuorumWithMons(context *clusterd.Context, clusterName string, mons [ // wait for monitors to establish quorum retryCount := 0 - retryMax := 20 + retryMax := 30 for { retryCount++ if retryCount > retryMax { diff --git a/pkg/operator/ceph/cluster/osd/osd.go b/pkg/operator/ceph/cluster/osd/osd.go index 403c2f9b8f9a..9ed337db1389 100644 --- a/pkg/operator/ceph/cluster/osd/osd.go +++ b/pkg/operator/ceph/cluster/osd/osd.go @@ -290,7 +290,7 @@ func (c *Cluster) startOSDDaemonsOnNode(nodeName string, config *provisionConfig continue } logger.Infof("deployment for osd %d already exists. updating if needed", osd.ID) - if _, err = k8sutil.UpdateDeploymentAndWait(c.context, dp, c.Namespace); err != nil { + if _, err = k8sutil.UpdateDeploymentAndWait(c.context, dp, c.Namespace, c.clusterInfo); err != nil { config.addError(fmt.Sprintf("failed to update osd deployment %d. %+v", osd.ID, err)) } } diff --git a/pkg/operator/ceph/cluster/rbd/mirror.go b/pkg/operator/ceph/cluster/rbd/mirror.go index 88948ad4810b..cd34ef63da1f 100644 --- a/pkg/operator/ceph/cluster/rbd/mirror.go +++ b/pkg/operator/ceph/cluster/rbd/mirror.go @@ -110,7 +110,7 @@ func (m *Mirroring) Start() error { return fmt.Errorf("failed to create %s deployment. %+v", resourceName, err) } logger.Infof("deployment for rbd-mirror %s already exists. updating if needed", resourceName) - if _, err := updateDeploymentAndWait(m.context, d, m.Namespace); err != nil { + if _, err := updateDeploymentAndWait(m.context, d, m.Namespace, m.ClusterInfo); err != nil { // fail could be an issue updating label selector (immutable), so try del and recreate logger.Debugf("updateDeploymentAndWait failed for rbd-mirror %s. Attempting del-and-recreate. %+v", resourceName, err) err = m.context.Clientset.AppsV1().Deployments(m.Namespace).Delete(d.Name, &metav1.DeleteOptions{}) diff --git a/pkg/operator/ceph/file/mds/mds.go b/pkg/operator/ceph/file/mds/mds.go index 35e2ffe02994..077998b86ab7 100644 --- a/pkg/operator/ceph/file/mds/mds.go +++ b/pkg/operator/ceph/file/mds/mds.go @@ -155,7 +155,7 @@ func (c *Cluster) Start() error { // keyring must be generated before update-and-wait since no keyring will prevent the // deployment from reaching ready state if createErr != nil && errors.IsAlreadyExists(createErr) { - if _, err = UpdateDeploymentAndWait(c.context, d, c.fs.Namespace); err != nil { + if _, err = UpdateDeploymentAndWait(c.context, d, c.fs.Namespace, c.clusterInfo); err != nil { return fmt.Errorf("failed to update mds deployment %s. %+v", d.Name, err) } } diff --git a/pkg/operator/ceph/upgrade/upgrade.go b/pkg/operator/ceph/upgrade/upgrade.go new file mode 100644 index 000000000000..c334986ff6bc --- /dev/null +++ b/pkg/operator/ceph/upgrade/upgrade.go @@ -0,0 +1,204 @@ +/* +Copyright 2019 The Rook Authors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package upgrade + +import ( + "encoding/json" + "fmt" + "strings" + + "github.com/coreos/pkg/capnslog" + "github.com/rook/rook/pkg/clusterd" + "github.com/rook/rook/pkg/daemon/ceph/client" + cephver "github.com/rook/rook/pkg/operator/ceph/version" +) + +// CephDaemonsVersions is a structure that can be used to parsed the output of the 'ceph versions' command +type CephDaemonsVersions struct { + Mon map[string]int `json:"mon,omitempty"` + Mgr map[string]int `json:"mgr,omitempty"` + Mds map[string]int `json:"mds,omitempty"` + Overall map[string]int `json:"overall,omitempty"` +} + +var ( + logger = capnslog.NewPackageLogger("github.com/rook/rook", "upgrade") +) + +func getCephMonVersionString(context *clusterd.Context) (string, error) { + output, err := context.Executor.ExecuteCommandWithOutput(false, "", "ceph", "version") + if err != nil { + return "", fmt.Errorf("failed to run ceph version: %+v", err) + } + logger.Debug(output) + + return output, nil +} + +func getCephVersionsString(context *clusterd.Context) (string, error) { + output, err := context.Executor.ExecuteCommandWithOutput(false, "", "ceph", "versions") + if err != nil { + return "", fmt.Errorf("failed to run ceph versions: %+v", err) + } + logger.Debug(output) + + return output, nil +} + +func getCephDaemonVersionString(context *clusterd.Context, deployment string) (string, error) { + deploymentSplit := strings.Split(deployment, "-") + daemonID := deploymentSplit[len(deploymentSplit)-1] + daemonName := deploymentSplit[len(deploymentSplit)-2] + daemon := daemonName + "." + daemonID + + output, err := context.Executor.ExecuteCommandWithOutput(false, "", "ceph", "tell", daemon, "version") + if err != nil { + return "", fmt.Errorf("failed to run ceph tell: %+v", err) + } + logger.Debug(output) + + return output, nil +} + +// GetCephMonVersion reports the Ceph version of all the monitors, or at least a majority with quorum +func GetCephMonVersion(context *clusterd.Context) (*cephver.CephVersion, error) { + output, err := getCephMonVersionString(context) + if err != nil { + return nil, fmt.Errorf("failed to run ceph version: %+v", err) + } + logger.Debug(output) + + v, err := cephver.ExtractCephVersion(output) + if err != nil { + return nil, fmt.Errorf("failed to extract ceph version. %+v", err) + } + + return v, nil +} + +// GetCephDaemonVersion reports the Ceph version of a particular daemon +func GetCephDaemonVersion(context *clusterd.Context, deployment string) (*cephver.CephVersion, error) { + output, err := getCephDaemonVersionString(context, deployment) + if err != nil { + return nil, fmt.Errorf("failed to run ceph tell: %+v", err) + } + logger.Debug(output) + + v, err := cephver.ExtractCephVersion(output) + if err != nil { + return nil, fmt.Errorf("failed to extract ceph version. %+v", err) + } + + return v, nil +} + +// GetCephVersions reports the Ceph version of each daemon in the cluster +func GetCephVersions(context *clusterd.Context) (*CephDaemonsVersions, error) { + output, err := getCephVersionsString(context) + if err != nil { + return nil, fmt.Errorf("failed to run ceph versions: %+v", err) + } + logger.Debug(output) + + var cephVersionsResult CephDaemonsVersions + err = json.Unmarshal([]byte(output), &cephVersionsResult) + if err != nil { + return nil, fmt.Errorf("failed to retrieve ceph versions results. %+v", err) + } + + return &cephVersionsResult, nil +} + +// EnableMessenger2 enable the messenger 2 protocol on Nautilus clusters +func EnableMessenger2(context *clusterd.Context, namespace string) error { + args := []string{"mon", "enable-msgr2"} + _, err := client.ExecuteCephCommand(context, namespace, args) + if err != nil { + return fmt.Errorf("failed to enable msgr2 protocol: %+v", err) + } + logger.Infof("successfully enabled msgr2 protocol") + + return nil +} + +// OkToStopOrContinue determines wether it's ok to stop or continue an upgrade +func OkToStopOrContinue(context *clusterd.Context, namespace, deployment, action string, cephVersion cephver.CephVersion) error { + deploymentSplit := strings.Split(deployment, "-") + daemonName := deploymentSplit[len(deploymentSplit)-2] + + // The ok-to-stop command for mon and mds landed on 14.2.1 + // so we return nil if that Ceph version is not satisfied + if !cephVersion.IsAtLeast(cephver.CephVersion{Major: 14, Minor: 2, Extra: 1}) { + if action == "stop" && daemonName != "osd" { + return nil + } + } + + if action == "stop" { + err := OkToStopDaemon(context, deployment, daemonName) + if err != nil { + return fmt.Errorf("failed to check if %s was ok to %s", deployment, action) + } + } + + if action == "continue" { + // the mon case is handled directly in the deployment where the mon checks for quorum + switch daemonName { + case "osd": + err := OkToContinueOSDDaemon(context, namespace) + if err != nil { + return fmt.Errorf("failed to check if %s was ok to %s", deployment, action) + } + case "mds": + err := OkToContinueMDSDaemon(context, namespace, deployment, daemonName) + if err != nil { + return fmt.Errorf("failed to check if %s was ok to %s", deployment, action) + } + } + } + + return nil +} + +// OkToStopDaemon determines whether it's fine to stop a Ceph daemon +func OkToStopDaemon(context *clusterd.Context, deployment, daemon string) error { + deploymentSplit := strings.Split(deployment, "-") + daemonID := deploymentSplit[len(deploymentSplit)-1] + + output, err := context.Executor.ExecuteCommandWithOutput(false, "", "ceph", daemon, "ok-to-stop", daemonID) + if err != nil { + return fmt.Errorf("deployment %s cannot be stopped: %+v", deployment, err) + } + logger.Infof("deployment %s is ok to be updated. %s", deployment, output) + + return nil +} + +// OkToContinueOSDDaemon determines whether it's fine to go to the next osd during an upgrade +// This basically makes sure all the PGs have settled +func OkToContinueOSDDaemon(context *clusterd.Context, namespace string) error { + if err := client.IsClusterClean(context, namespace); err != nil { + return err + } + + return nil +} + +// OkToContinueMDSDaemon determines whether it's fine to go to the next mds during an upgrade +func OkToContinueMDSDaemon(context *clusterd.Context, namespace, deployment, daemon string) error { + return nil +} diff --git a/pkg/operator/ceph/upgrade/upgrade_test.go b/pkg/operator/ceph/upgrade/upgrade_test.go new file mode 100644 index 000000000000..92e670efc84a --- /dev/null +++ b/pkg/operator/ceph/upgrade/upgrade_test.go @@ -0,0 +1,99 @@ +/* +Copyright 2019 The Rook Authors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package upgrade + +import ( + "testing" + + "github.com/rook/rook/pkg/clusterd" + exectest "github.com/rook/rook/pkg/util/exec/test" + "github.com/stretchr/testify/assert" +) + +func TestGetCephMonVersionString(t *testing.T) { + executor := &exectest.MockExecutor{} + executor.MockExecuteCommandWithOutput = func(debug bool, name string, command string, args ...string) (string, error) { + assert.Equal(t, "version", args[0]) + assert.Equal(t, 1, len(args)) + return "", nil + } + context := &clusterd.Context{Executor: executor} + + _, err := getCephMonVersionString(context) + assert.Nil(t, err) +} + +func TestGetCephMonVersionsString(t *testing.T) { + executor := &exectest.MockExecutor{} + executor.MockExecuteCommandWithOutput = func(debug bool, name string, command string, args ...string) (string, error) { + assert.Equal(t, "versions", args[0]) + assert.Equal(t, 1, len(args)) + return "", nil + } + context := &clusterd.Context{Executor: executor} + + _, err := getCephVersionsString(context) + assert.Nil(t, err) +} + +func TestGetCephDaemonVersionString(t *testing.T) { + executor := &exectest.MockExecutor{} + deployment := "rook-ceph-mds-a" + executor.MockExecuteCommandWithOutput = func(debug bool, name string, command string, args ...string) (string, error) { + assert.Equal(t, "tell", args[0]) + assert.Equal(t, "mds.a", args[1]) + assert.Equal(t, "version", args[2]) + assert.Equal(t, 3, len(args)) + return "", nil + } + context := &clusterd.Context{Executor: executor} + + _, err := getCephDaemonVersionString(context, deployment) + assert.Nil(t, err) +} + +func TestEnableMessenger2(t *testing.T) { + executor := &exectest.MockExecutor{} + executor.MockExecuteCommandWithOutput = func(debug bool, name string, command string, args ...string) (string, error) { + assert.Equal(t, "mon", args[0]) + assert.Equal(t, "enable-msgr2", args[1]) + assert.Equal(t, 2, len(args)) + return "", nil + } + context := &clusterd.Context{Executor: executor} + + ns := "rook-ceph" + err := EnableMessenger2(context, ns) + assert.Nil(t, err) +} + +func TestOkToStopDaemon(t *testing.T) { + executor := &exectest.MockExecutor{} + executor.MockExecuteCommandWithOutput = func(debug bool, name string, command string, args ...string) (string, error) { + assert.Equal(t, "mon", args[0]) + assert.Equal(t, "ok-to-stop", args[1]) + assert.Equal(t, "a", args[2]) + assert.Equal(t, 3, len(args)) + return "", nil + } + context := &clusterd.Context{Executor: executor} + + deployment := "rook-ceph-mon-a" + daemon := "mon" + err := OkToStopDaemon(context, deployment, daemon) + assert.Nil(t, err) +} diff --git a/pkg/operator/k8sutil/deployment.go b/pkg/operator/k8sutil/deployment.go index c5244f52ccc1..6f94ebf85bd0 100644 --- a/pkg/operator/k8sutil/deployment.go +++ b/pkg/operator/k8sutil/deployment.go @@ -18,11 +18,14 @@ package k8sutil import ( "fmt" + "strings" "time" "github.com/rook/rook/pkg/clusterd" - "k8s.io/api/apps/v1" + cephconfig "github.com/rook/rook/pkg/daemon/ceph/config" + "github.com/rook/rook/pkg/operator/ceph/upgrade" apps "k8s.io/api/apps/v1" + v1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" ) @@ -36,6 +39,7 @@ func GetDeploymentImage(clientset kubernetes.Interface, namespace, name, contain return GetDeploymentSpecImage(clientset, *d, container, false) } +// GetDeploymentSpecImage returns the image name from the spec func GetDeploymentSpecImage(clientset kubernetes.Interface, d apps.Deployment, container string, initContainer bool) (string, error) { image, err := GetSpecContainerImage(d.Spec.Template.Spec, container, initContainer) if err != nil { @@ -47,12 +51,54 @@ func GetDeploymentSpecImage(clientset kubernetes.Interface, d apps.Deployment, c // UpdateDeploymentAndWait updates a deployment and waits until it is running to return. It will // error if the deployment does not exist to be updated or if it takes too long. -func UpdateDeploymentAndWait(context *clusterd.Context, deployment *apps.Deployment, namespace string) (*v1.Deployment, error) { +func UpdateDeploymentAndWait(context *clusterd.Context, deployment *apps.Deployment, namespace string, clusterInfo *cephconfig.ClusterInfo) (*v1.Deployment, error) { original, err := context.Clientset.AppsV1().Deployments(namespace).Get(deployment.Name, metav1.GetOptions{}) if err != nil { return nil, fmt.Errorf("failed to get deployment %s. %+v", deployment.Name, err) } + actionToCheck := "stop" + if strings.Contains(deployment.Name, "mon") { + if clusterInfo.CephVersion.IsAtLeastNautilus() { + v, err := upgrade.GetCephMonVersion(context) + if err != nil { + return nil, fmt.Errorf("failed to get ceph mon version. %+v", err) + } + if v.IsAtLeastNautilus() { + versions, err := upgrade.GetCephVersions(context) + if err != nil { + return nil, fmt.Errorf("failed to get ceph daemons versions. %+v", err) + } + if len(versions.Mon) == 1 { + // If len is one, this clearly indicates that all the mons are running the same version + // We are doing this because 'ceph version' might return the Ceph version that a majority of mons have but not all of them + // so instead of trying to active msgr2 when mons are not ready, we activate it when we believe that's the right time + + // run ok-to-stop + + } + } + } + } else if strings.Contains(deployment.Name, "mds") { + if clusterInfo.CephVersion.IsAtLeastNautilus() { + MDSversion, err := upgrade.GetCephDaemonVersion(context, deployment.Name) + if err != nil { + return nil, fmt.Errorf("failed to get ceph daemons versions. %+v", err) + } + if MDSversion.IsAtLeastNautilus() { + // run ok-to-stop + } + } + } else if strings.Contains(deployment.Name, "osd") { + // run ok-to-stop + } + + logger.Infof("checking if deployment %s can be updated", deployment.Name) + err = upgrade.OkToStopOrContinue(context, namespace, deployment.Name, actionToCheck, clusterInfo.CephVersion) + if err != nil { + return nil, fmt.Errorf("failed to check if deployment %s can be updated: %+v", deployment.Name, err) + } + logger.Infof("updating deployment %s", deployment.Name) if _, err := context.Clientset.AppsV1().Deployments(namespace).Update(deployment); err != nil { return nil, fmt.Errorf("failed to update deployment %s. %+v", deployment.Name, err) @@ -73,9 +119,37 @@ func UpdateDeploymentAndWait(context *clusterd.Context, deployment *apps.Deploym } if d.Status.ObservedGeneration != original.Status.ObservedGeneration && d.Status.UpdatedReplicas > 0 && d.Status.ReadyReplicas > 0 { logger.Infof("finished waiting for updated deployment %s", d.Name) + + // Now we check if we can go to the next daemon + actionToCheck := "continue" + if strings.Contains(deployment.Name, "mon") || strings.Contains(deployment.Name, "mds") { + if clusterInfo.CephVersion.IsAtLeastNautilus() { + v, err := upgrade.GetCephMonVersion(context) + if err != nil { + return nil, fmt.Errorf("failed to get ceph mon version. %+v", err) + } + if v.IsAtLeastNautilus() { + versions, err := upgrade.GetCephVersions(context) + if err != nil { + return nil, fmt.Errorf("failed to get ceph daemons versions. %+v", err) + } + if len(versions.Mon) == 1 { + // If len is one, this clearly indicates that all the mons are running the same version + // We are doing this because 'ceph version' might return the Ceph version that a majority of mons have but not all of them + // so instead of trying to active msgr2 when mons are not ready, we activate it when we believe that's the right time + upgrade.EnableMessenger2(context, namespace) + } + } + } + } else if strings.Contains(deployment.Name, "osd") { + logger.Infof("checking if deployment %s update can continue", deployment.Name) + err := upgrade.OkToStopOrContinue(context, namespace, deployment.Name, actionToCheck, clusterInfo.CephVersion) + if err != nil { + return nil, fmt.Errorf("failed to check if deployment %s can be updated. %+v", deployment.Name, err) + } + } return d, nil } - logger.Debugf("deployment %s status=%+v", d.Name, d.Status) time.Sleep(time.Duration(sleepTime) * time.Second) } diff --git a/pkg/operator/k8sutil/test/deployment.go b/pkg/operator/k8sutil/test/deployment.go index e4d5c5b8ef7d..500795749c88 100644 --- a/pkg/operator/k8sutil/test/deployment.go +++ b/pkg/operator/k8sutil/test/deployment.go @@ -2,6 +2,7 @@ package test import ( "github.com/rook/rook/pkg/clusterd" + cephconfig "github.com/rook/rook/pkg/daemon/ceph/config" apps "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -15,11 +16,11 @@ import ( // returns a pointer to this slice which the calling func may use to verify the expected contents of // deploymentsUpdated based on expected behavior. func UpdateDeploymentAndWaitStub() ( - stubFunc func(context *clusterd.Context, deployment *apps.Deployment, namespace string) (*apps.Deployment, error), + stubFunc func(context *clusterd.Context, deployment *apps.Deployment, namespace string, clusterInfo *cephconfig.ClusterInfo) (*apps.Deployment, error), deploymentsUpdated *[]*apps.Deployment, ) { deploymentsUpdated = &[]*apps.Deployment{} - stubFunc = func(context *clusterd.Context, deployment *apps.Deployment, namespace string) (*apps.Deployment, error) { + stubFunc = func(context *clusterd.Context, deployment *apps.Deployment, namespace string, clusterInfo *cephconfig.ClusterInfo) (*apps.Deployment, error) { *deploymentsUpdated = append(*deploymentsUpdated, deployment) return &apps.Deployment{ObjectMeta: metav1.ObjectMeta{UID: "stub-deployment-uid"}}, nil }