Skip to content

Commit

Permalink
cluster: major upgrade procedure (#51)
Browse files Browse the repository at this point in the history
Current patch version upgrade procedure is ran when cluster is being
upgraded to next patch version.
New major upgrade procedure handles all others upgrades.

Procedure:
* Check if the cluster has schema agreement (using API call)
* Take `system` and `system_schema` tables snapshot on all nodes in
parallel.

For each node:

* Drain node
* Backup the data - snapshot of all data keyspaces
* Update Scylla image by restarting Pod
* Validate if node is up and version is updated via API call
* Clear data snapshot

After last node:

* Delete `system` and `system_schema` table snapshots on all nodes in
parallel

Fixes #51
  • Loading branch information
zimnx committed Dec 18, 2020
1 parent 941a02c commit d3d1707
Show file tree
Hide file tree
Showing 17 changed files with 1,714 additions and 151 deletions.
56 changes: 52 additions & 4 deletions pkg/api/v1alpha1/cluster_types.go
Expand Up @@ -47,6 +47,8 @@ type ClusterSpec struct {
CpuSet bool `json:"cpuset,omitempty"`
// AutomaticOrphanedNodeCleanup controls if automatic orphan node cleanup should be performed.
AutomaticOrphanedNodeCleanup bool `json:"automaticOrphanedNodeCleanup,omitempty"`
// GenericUpgrade allows to configure behavior of generic upgrade logic.
GenericUpgrade *GenericUpgradeSpec `json:"genericUpgrade,omitempty"`
// Datacenter that will make up this cluster.
Datacenter DatacenterSpec `json:"datacenter"`
// Sysctl properties to be applied during initialization
Expand All @@ -64,6 +66,28 @@ type ClusterSpec struct {
Backups []BackupTaskSpec `json:"backups,omitempty"`
}

// GenericUpgradeFailureStrategy allows to specify how upgrade logic should handle failures.
type GenericUpgradeFailureStrategy string

const (
// GenericUpgradeFailureStrategyRetry infinitely retry until node becomes ready.
GenericUpgradeFailureStrategyRetry GenericUpgradeFailureStrategy = "Retry"
)

// GenericUpgradeSpec hold generic upgrade procedure parameters.
type GenericUpgradeSpec struct {
// ValidationTimeout specifies how long it can take for Scylla to boot and enter ready state
// after image upgrade until FailureStrategy is executed.
ValidationTimeout *metav1.Duration `json:"validationTimeout,omitempty"`
// FailureStrategy specifies which logic is executed when upgrade failure happens.
// Currently only Retry is supported.
FailureStrategy GenericUpgradeFailureStrategy `json:"failureStrategy,omitempty"`
// PollInterval specifies how often upgrade logic polls on state updates.
// Increasing this value should lower number of requests sent to apiserver, but it may affect
// overall time spent during upgrade.
PollInterval *metav1.Duration `json:"pollInterval,omitempty"`
}

type SchedulerTaskSpec struct {
// Name of a task, it must be unique across all tasks.
Name string `json:"name"`
Expand Down Expand Up @@ -231,10 +255,34 @@ type BackupTaskStatus struct {

// ClusterStatus defines the observed state of ScyllaCluster
type ClusterStatus struct {
Racks map[string]RackStatus `json:"racks,omitempty"`
ManagerID *string `json:"managerId,omitempty"`
Repairs []RepairTaskStatus `json:"repairs,omitempty"`
Backups []BackupTaskStatus `json:"backups,omitempty"`
// Racks reflect status of cluster racks.
Racks map[string]RackStatus `json:"racks,omitempty"`
// ManagerID contains ID under which cluster was registered in Scylla Manager.
ManagerID *string `json:"managerId,omitempty"`
// Repairs reflects status of repair tasks.
Repairs []RepairTaskStatus `json:"repairs,omitempty"`
// Backups reflects status of backup tasks.
Backups []BackupTaskStatus `json:"backups,omitempty"`
// Upgrade reflects state of ongoing upgrade procedure.
Upgrade *UpgradeStatus `json:"upgrade,omitempty"`
}

// UpgradeStatus contains state of ongoing upgrade procedure.
type UpgradeStatus struct {
// State reflects current upgrade state.
State string `json:"state"`
// CurrentNode node under upgrade.
CurrentNode string `json:"currentNode,omitempty"`
// CurrentRack rack under upgrade.
CurrentRack string `json:"currentRack,omitempty"`
// FromVersion reflects from which version ScyllaCluster is being upgraded.
FromVersion string `json:"fromVersion"`
// ToVersion reflects to which version ScyllaCluster is being upgraded.
ToVersion string `json:"toVersion"`
// SystemSnapshotTag snapshot tag of system keyspaces.
SystemSnapshotTag string `json:"systemSnapshotTag,omitempty"`
// DataSnapshotTag snapshot tag of data keyspaces.
DataSnapshotTag string `json:"dataSnapshotTag,omitempty"`
}

// RackStatus is the status of a Scylla Rack
Expand Down
19 changes: 6 additions & 13 deletions pkg/api/v1alpha1/cluster_validation.go
Expand Up @@ -108,23 +108,16 @@ func checkValues(c *ScyllaCluster) error {
managerTaskNames.Add(b.Name)
}

if c.Spec.GenericUpgrade != nil {
if c.Spec.GenericUpgrade.FailureStrategy != GenericUpgradeFailureStrategyRetry {
return errors.Errorf("unsupported generic upgrade failure strategy %q", GenericUpgradeFailureStrategyRetry)
}
}

return nil
}

func checkTransitions(old, new *ScyllaCluster) error {
oldVersion, err := semver.Parse(old.Spec.Version)
if err != nil {
return errors.Errorf("invalid old semantic version, err=%s", err)
}
newVersion, err := semver.Parse(new.Spec.Version)
if err != nil {
return errors.Errorf("invalid new semantic version, err=%s", err)
}
// Check that version remained the same
if newVersion.Major != oldVersion.Major || newVersion.Minor != oldVersion.Minor {
return errors.Errorf("only upgrading of patch versions are supported")
}

// Check that repository remained the same
if !reflect.DeepEqual(old.Spec.Repository, new.Spec.Repository) {
return errors.Errorf("repository change is currently not supported, old=%v, new=%v", *old.Spec.Repository, *new.Spec.Repository)
Expand Down
4 changes: 2 additions & 2 deletions pkg/api/v1alpha1/cluster_validation_test.go
Expand Up @@ -97,13 +97,13 @@ func TestCheckTransitions(t *testing.T) {
name: "major version changed",
old: unit.NewSingleRackCluster(3),
new: unit.NewDetailedSingleRackCluster("test-cluster", "test-ns", "repo", "3.3.1", "test-dc", "test-rack", 3),
allowed: false,
allowed: true,
},
{
name: "minor version changed",
old: unit.NewSingleRackCluster(3),
new: unit.NewDetailedSingleRackCluster("test-cluster", "test-ns", "repo", "2.4.2", "test-dc", "test-rack", 3),
allowed: false,
allowed: true,
},
{
name: "patch version changed",
Expand Down
18 changes: 18 additions & 0 deletions pkg/api/v1alpha1/cluster_webhook.go
Expand Up @@ -18,9 +18,11 @@ package v1alpha1

import (
"reflect"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -95,6 +97,22 @@ func (c *ScyllaCluster) Default() {
c.Spec.Backups[i].Retention = pointer.Int64Ptr(3)
}
}

if c.Spec.GenericUpgrade == nil {
c.Spec.GenericUpgrade = &GenericUpgradeSpec{}
}

if c.Spec.GenericUpgrade.FailureStrategy == "" {
c.Spec.GenericUpgrade.FailureStrategy = GenericUpgradeFailureStrategyRetry
}

if c.Spec.GenericUpgrade.ValidationTimeout == nil {
c.Spec.GenericUpgrade.ValidationTimeout = &metav1.Duration{Duration: 30 * time.Minute}
}

if c.Spec.GenericUpgrade.PollInterval == nil {
c.Spec.GenericUpgrade.PollInterval = &metav1.Duration{Duration: time.Second}
}
}

// ValidateCreate implements webhook.Validator so a webhook will be registered for the type
Expand Down
5 changes: 5 additions & 0 deletions pkg/controllers/cluster/actions/export_test.go
@@ -0,0 +1,5 @@
// Copyright (C) 2017 ScyllaDB

package actions

type CQLSession = cqlSession
99 changes: 99 additions & 0 deletions pkg/controllers/cluster/actions/main_integration_test.go
@@ -0,0 +1,99 @@
// +build integration

// Copyright (C) 2017 ScyllaDB

package actions_test

import (
"context"
"fmt"
"os"
"testing"
"time"

"github.com/go-logr/zapr"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/pkg/errors"
"github.com/scylladb/go-log"
"github.com/scylladb/scylla-operator/pkg/cmd/options"
"github.com/scylladb/scylla-operator/pkg/controllers/cluster"
"github.com/scylladb/scylla-operator/pkg/test/integration"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"k8s.io/klog"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/envtest/printer"
)

var (
testEnv *integration.TestEnvironment
ctx = context.Background()
)

const (
retryInterval = 200 * time.Millisecond
timeout = 30 * time.Second
)

func TestMain(m *testing.M) {
ctx := log.WithNewTraceID(context.Background())
atom := zap.NewAtomicLevelAt(zapcore.DebugLevel)
logger, _ := log.NewProduction(log.Config{
Level: atom,
})
zlogger, _ := zap.NewDevelopment()
ctrl.SetLogger(zapr.NewLogger(zlogger))
klog.InitFlags(nil)
klog.SetOutput(os.Stdout)

logger.Info(ctx, "Creating test environment")
var err error
testEnv, err = integration.NewTestEnvironment(logger.Named("env"),
integration.WithPollRetryInterval(retryInterval),
integration.WithPollTimeout(timeout),
)
if err != nil {
panic(err)
}

logger.Info(ctx, "Starting test manager")
go func() {
if err := testEnv.StartManager(ctx); err != nil {
panic(fmt.Sprintf("Failed to start the envtest manager: %v", err))
}
}()

options.GetOperatorOptions().Image = "scylladb/scylla-operator"
defer func() {
options.GetOperatorOptions().Image = ""
}()

reconciler, err := cluster.New(ctx, testEnv.Manager, logger)
if err != nil {
panic(errors.Wrap(err, "create cluster reconciler"))
}
logger.Info(ctx, "Reconciler setup")
if err := reconciler.SetupWithManager(testEnv.Manager); err != nil {
panic(errors.Wrap(err, "setup cluster reconciler"))
}

logger.Info(ctx, "Starting tests")
// Run tests
code := m.Run()
logger.Info(ctx, "Tests done")
// Tearing down the test environment
if err := testEnv.Stop(); err != nil {
panic(fmt.Sprintf("Failed to stop the envtest: %v", err))
}

os.Exit(code)
}

func TestAPIs(t *testing.T) {
RegisterFailHandler(Fail)

RunSpecsWithDefaultAndCustomReporters(t,
"Controller Suite",
[]Reporter{printer.NewlineReporter{}})
}

0 comments on commit d3d1707

Please sign in to comment.