Skip to content

Commit

Permalink
fix(platform): enhance cluster controller (#937)
Browse files Browse the repository at this point in the history
1. decouple cluster update to housekeeping, upgrade, master scale up/down
2. implement master scale down: remove etcd mamber and k8s node
3. add validation: only ha mode support master scale
  • Loading branch information
huxiaoliang committed Nov 25, 2020
1 parent bc469d8 commit 8a393c8
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 48 deletions.
2 changes: 1 addition & 1 deletion api/platform/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ type ClusterSpec struct {
// +optional
NetworkArgs map[string]string `json:"networkArgs,omitempty" protobuf:"bytes,24,name=networkArgs"`
// +optional
ScalingMachines []ClusterMachine `json:"scalingMachines,omitempty" protobuf:"bytes,25,rep,name=scalingMachines"`
ScalingMachines []ClusterMachine `json:"scalingMachines,omitempty" protobuf:"bytes,25,opt,name=scalingMachines"`
}

// ClusterStatus represents information about the status of a cluster.
Expand Down
11 changes: 10 additions & 1 deletion api/platform/validation/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,24 @@ func ValidateClusterUpdate(cluster *types.Cluster, oldCluster *types.Cluster) fi
allErrs = append(allErrs, apimachineryvalidation.ValidateImmutableField(cluster.Spec.ControllerManagerExtraArgs, oldCluster.Spec.ControllerManagerExtraArgs, fldPath.Child("controllerManagerExtraArgs"))...)
allErrs = append(allErrs, apimachineryvalidation.ValidateImmutableField(cluster.Spec.SchedulerExtraArgs, oldCluster.Spec.SchedulerExtraArgs, fldPath.Child("schedulerExtraArgs"))...)

allErrs = append(allErrs, ValidateClusterScale(cluster.Cluster, oldCluster.Cluster, fldPath.Child("machines"))...)
allErrs = append(allErrs, ValidateCluster(cluster)...)
allErrs = append(allErrs, ValidateClusterScale(cluster.Cluster, oldCluster.Cluster, fldPath.Child("machines"))...)

return allErrs
}

// ValidateClusterScale tests if master scale up/down to a cluster is valid.
func ValidateClusterScale(cluster *platform.Cluster, oldCluster *platform.Cluster, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
ha := cluster.Spec.Features.HA
if ha == nil {
allErrs = append(allErrs, field.Invalid(fldPath, cluster.Spec.Machines, "tkestack HA should enabled for master scale"))
return allErrs
}
if ha.TKEHA == nil && ha.ThirdPartyHA == nil {
allErrs = append(allErrs, field.Invalid(fldPath, cluster.Spec.Machines, "tkestack HA should enabled for master scale"))
return allErrs
}
_, err := clusterutil.PrepareClusterScale(cluster, oldCluster)
if err != nil {
allErrs = append(allErrs, field.Invalid(fldPath, cluster.Spec.Machines, err.Error()))
Expand Down
64 changes: 41 additions & 23 deletions pkg/platform/controller/cluster/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,6 @@ func (c *Controller) needsUpdate(old *platformv1.Cluster, new *platformv1.Cluste
if !reflect.DeepEqual(old.Spec, new.Spec) {
return true
}
if !reflect.DeepEqual(old.ResourceVersion, new.ResourceVersion) {
return true
}

if old.Status.Phase == platformv1.ClusterRunning && new.Status.Phase == platformv1.ClusterTerminating {
return true
Expand Down Expand Up @@ -263,11 +260,11 @@ func (c *Controller) reconcile(ctx context.Context, key string, cluster *platfor
switch cluster.Status.Phase {
case platformv1.ClusterInitializing:
err = c.onCreate(ctx, cluster)
case platformv1.ClusterRunning, platformv1.ClusterFailed, platformv1.ClusterUpgrading:
case platformv1.ClusterRunning, platformv1.ClusterFailed:
err = c.onUpdate(ctx, cluster)
case platformv1.ClusterUpscaling:
case platformv1.ClusterUpgrading:
err = c.onUpdate(ctx, cluster)
case platformv1.ClusterDownscaling:
case platformv1.ClusterUpscaling, platformv1.ClusterDownscaling:
err = c.onUpdate(ctx, cluster)
case platformv1.ClusterTerminating:
log.FromContext(ctx).Info("Cluster has been terminated. Attempting to cleanup resources")
Expand Down Expand Up @@ -329,31 +326,52 @@ func (c *Controller) onUpdate(ctx context.Context, cluster *platformv1.Cluster)
if err != nil {
return err
}
if clusterWrapper.Status.Phase == platformv1.ClusterRunning || clusterWrapper.Status.Phase == platformv1.ClusterFailed {
err = provider.OnUpdate(ctx, clusterWrapper)
clusterWrapper = c.checkHealth(ctx, clusterWrapper)
if err != nil {
// Update status, ignore failure
if clusterWrapper.IsCredentialChanged {
_, _ = c.platformClient.ClusterCredentials().Update(ctx, clusterWrapper.ClusterCredential, metav1.UpdateOptions{})
}

err = provider.OnUpdate(ctx, clusterWrapper)
clusterWrapper = c.checkHealth(ctx, clusterWrapper)
if err != nil {
// Update status, ignore failure
_, _ = c.platformClient.Clusters().UpdateStatus(ctx, clusterWrapper.Cluster, metav1.UpdateOptions{})
return err
}
if clusterWrapper.IsCredentialChanged {
_, _ = c.platformClient.ClusterCredentials().Update(ctx, clusterWrapper.ClusterCredential, metav1.UpdateOptions{})
clusterWrapper.ClusterCredential, err = c.platformClient.ClusterCredentials().Update(ctx, clusterWrapper.ClusterCredential, metav1.UpdateOptions{})
if err != nil {
return err
}
}

_, _ = c.platformClient.Clusters().UpdateStatus(ctx, clusterWrapper.Cluster, metav1.UpdateOptions{})
return err
}

if clusterWrapper.IsCredentialChanged {
clusterWrapper.ClusterCredential, err = c.platformClient.ClusterCredentials().Update(ctx, clusterWrapper.ClusterCredential, metav1.UpdateOptions{})
clusterWrapper.Cluster, err = c.platformClient.Clusters().UpdateStatus(ctx, clusterWrapper.Cluster, metav1.UpdateOptions{})
if err != nil {
return err
}
}
} else {
for clusterWrapper.Status.Phase != platformv1.ClusterRunning {
err = provider.OnUpdate(ctx, clusterWrapper)
if err != nil {
// Update status, ignore failure
if clusterWrapper.IsCredentialChanged {
_, _ = c.platformClient.ClusterCredentials().Update(ctx, clusterWrapper.ClusterCredential, metav1.UpdateOptions{})
}

clusterWrapper.Cluster, err = c.platformClient.Clusters().UpdateStatus(ctx, clusterWrapper.Cluster, metav1.UpdateOptions{})
if err != nil {
return err
_, _ = c.platformClient.Clusters().UpdateStatus(ctx, clusterWrapper.Cluster, metav1.UpdateOptions{})
return err
}
if clusterWrapper.IsCredentialChanged {
clusterWrapper.ClusterCredential, err = c.platformClient.ClusterCredentials().Update(ctx, clusterWrapper.ClusterCredential, metav1.UpdateOptions{})
if err != nil {
return err
}
}
clusterWrapper.Cluster, err = c.platformClient.Clusters().UpdateStatus(ctx, clusterWrapper.Cluster, metav1.UpdateOptions{})
if err != nil {
return err
}
}
}

return nil
}

Expand Down
37 changes: 33 additions & 4 deletions pkg/platform/provider/baremetal/cluster/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ package cluster
import (
"context"

"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"tkestack.io/tke/pkg/platform/provider/baremetal/phases/kubeadm"
"tkestack.io/tke/pkg/platform/provider/util/mark"
typesv1 "tkestack.io/tke/pkg/platform/types/v1"
v1 "tkestack.io/tke/pkg/platform/types/v1"
"tkestack.io/tke/pkg/util/apiclient"
"tkestack.io/tke/pkg/util/log"
)

func (p *Provider) EnsureCleanClusterMark(ctx context.Context, c *typesv1.Cluster) error {
Expand All @@ -34,16 +38,41 @@ func (p *Provider) EnsureCleanClusterMark(ctx context.Context, c *typesv1.Cluste
return nil
}

func (p *Provider) EnsureDownScaling(ctx context.Context, c *v1.Cluster) error {
func (p *Provider) EnsureRemoveETCDMember(ctx context.Context, c *v1.Cluster) error {
for _, machine := range c.Spec.ScalingMachines {
machineSSH, err := machine.SSH()
if err != nil {
return err
}
_, err = machineSSH.CombinedOutput(`kubeadm reset -f`)
err = kubeadm.Reset(machineSSH, "remove-etcd-member")
if err != nil {
return errors.Wrap(err, machine.IP)
return err
}
}
return nil
}

func (p *Provider) EnsureRemoveNode(ctx context.Context, c *v1.Cluster) error {
client, err := c.Clientset()
if err != nil {
return err
}
for _, machine := range c.Spec.ScalingMachines {
node, err := apiclient.GetNodeByMachineIP(ctx, client, machine.IP)
if err != nil {
if !errors.IsNotFound(err) {
return err
}
log.FromContext(ctx).Info("deleteNode done")
return nil
}
err = client.CoreV1().Nodes().Delete(context.Background(), node.Name, metav1.DeleteOptions{})
if err != nil {
if !errors.IsNotFound(err) {
return err
}
}
log.FromContext(ctx).Info("deleteNode done")
}
return nil
}
12 changes: 7 additions & 5 deletions pkg/platform/provider/baremetal/cluster/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,24 +133,26 @@ func NewProvider() (*Provider, error) {
p.EnsurePostClusterInstallHook,
},
UpdateHandlers: []clusterprovider.Handler{
p.EnsurePreClusterUpgradeHook,
p.EnsureUpgradeControlPlaneNode,

p.EnsureAPIServerCert,
p.EnsureRenewCerts,
p.EnsureStoreCredential,
p.EnsureKeepalivedWithLBOption,
p.EnsureThirdPartyHA,
},
UpgradeHandlers: []clusterprovider.Handler{
p.EnsurePreClusterUpgradeHook,
p.EnsureUpgradeControlPlaneNode,
p.EnsurePostClusterUpgradeHook,
},
ScaleUpHandlers: []clusterprovider.Handler{},
ScaleDownHandlers: []clusterprovider.Handler{
p.EnsureDownScaling,
p.EnsureRemoveETCDMember,
p.EnsureRemoveNode,
},
DeleteHandlers: []clusterprovider.Handler{
p.EnsureCleanClusterMark,
},
}
p.ScaleUpHandlers = p.CreateHandlers

cfg, err := config.New(constants.ConfigFile)
if err != nil {
Expand Down
22 changes: 20 additions & 2 deletions pkg/platform/provider/baremetal/phases/kubeadm/kubeadm.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ import (
const (
kubeadmKubeletConf = "/usr/lib/systemd/system/kubelet.service.d/10-kubeadm.conf"

initCmd = `kubeadm init phase {{.Phase}} --config={{.Config}}`
joinCmd = `kubeadm join phase {{.Phase}} --config={{.Config}}`
initCmd = `kubeadm init phase {{.Phase}} --config={{.Config}}`
joinCmd = `kubeadm join phase {{.Phase}} --config={{.Config}}`
resetCmd = `kubeadm reset phase {{.Phase}}`
)

var (
Expand Down Expand Up @@ -148,6 +149,23 @@ func Join(s ssh.Interface, config *kubeadmv1beta2.JoinConfiguration, phase strin
return nil
}

func Reset(s ssh.Interface, phase string) error {

cmd, err := template.ParseString(resetCmd, map[string]interface{}{
"Phase": phase,
})
if err != nil {
return errors.Wrap(err, "parse resetCmd error")
}
out, err := s.CombinedOutput(string(cmd))
if err != nil {
return fmt.Errorf("kubeadm.Reset error: %w", err)
}
log.Debug(string(out))

return nil
}

func RenewCerts(s ssh.Interface) error {
err := fixKubeadmBug1753(s)
if err != nil {
Expand Down
43 changes: 31 additions & 12 deletions pkg/platform/provider/cluster/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ type DelegateProvider struct {
CreateHandlers []Handler
DeleteHandlers []Handler
UpdateHandlers []Handler
UpgradeHandlers []Handler
ScaleUpHandlers []Handler
ScaleDownHandlers []Handler
}
Expand Down Expand Up @@ -221,20 +222,20 @@ func (p *DelegateProvider) OnCreate(ctx context.Context, cluster *v1.Cluster) er
func (p *DelegateProvider) OnUpdate(ctx context.Context, cluster *v1.Cluster) error {
handlers := []Handler{}
phase := cluster.Status.Phase
var condition *platformv1.ClusterCondition
var err error
if phase == platformv1.ClusterUpgrading {
if phase == platformv1.ClusterRunning || phase == platformv1.ClusterFailed {
handlers = p.UpdateHandlers
condition, err = p.getCurrentCondition(cluster, phase, handlers)
return p.houseKeeping(ctx, cluster, handlers)
}
if phase == platformv1.ClusterUpgrading {
handlers = p.UpgradeHandlers
}
if phase == platformv1.ClusterUpscaling {
handlers = p.CreateHandlers
condition, err = p.getCurrentCondition(cluster, phase, handlers)
}
if phase == platformv1.ClusterDownscaling {
handlers = p.ScaleDownHandlers
condition, err = p.getCurrentCondition(cluster, phase, handlers)
}
condition, err := p.getCurrentCondition(cluster, phase, handlers)
if err != nil {
return err
}
Expand Down Expand Up @@ -268,7 +269,6 @@ func (p *DelegateProvider) OnUpdate(ctx context.Context, cluster *v1.Cluster) er
}, true)
return nil
}

cluster.SetCondition(platformv1.ClusterCondition{
Type: condition.Type,
Status: platformv1.ConditionTrue,
Expand All @@ -280,6 +280,8 @@ func (p *DelegateProvider) OnUpdate(ctx context.Context, cluster *v1.Cluster) er
if nextConditionType == ConditionTypeDone {
cluster.Status.Phase = platformv1.ClusterRunning
if err := p.OnRunning(ctx, cluster); err != nil {
cluster.Spec.ScalingMachines = nil
log.FromContext(ctx).Info("zzzzzzzzzzzzzzzzzzzzzzzz")
return fmt.Errorf("%s.OnRunning error: %w", p.Name(), err)
}
} else {
Expand Down Expand Up @@ -314,7 +316,6 @@ func (p *DelegateProvider) OnDelete(ctx context.Context, cluster *v1.Cluster) er
}

func (p *DelegateProvider) OnRunning(ctx context.Context, cluster *v1.Cluster) error {
cluster.Spec.ScalingMachines = nil
return nil
}

Expand Down Expand Up @@ -346,6 +347,24 @@ func (p *DelegateProvider) getHandler(conditionType string, handlers []Handler)
return nil
}

func (p *DelegateProvider) houseKeeping(ctx context.Context, cluster *v1.Cluster, handlers []Handler) error {
for _, handler := range p.UpdateHandlers {
ctx := log.FromContext(ctx).WithName("ClusterProvider.OnUpdate").WithName(handler.Name()).WithContext(ctx)
log.FromContext(ctx).Info("Doing")
startTime := time.Now()
err := handler(ctx, cluster)
log.FromContext(ctx).Info("Done", "error", err, "cost", time.Since(startTime).String())
if err != nil {
cluster.Status.Reason = ReasonFailedUpdate
cluster.Status.Message = fmt.Sprintf("%s error: %v", handler.Name(), err)
return err
}
}
cluster.Status.Reason = ""
cluster.Status.Message = ""
return nil
}

func (p *DelegateProvider) getCurrentCondition(c *v1.Cluster, phase platformv1.ClusterPhase, handlers []Handler) (*platformv1.ClusterCondition, error) {
if c.Status.Phase != phase {
return nil, fmt.Errorf("cluster phase is %s now", phase)
Expand All @@ -362,15 +381,15 @@ func (p *DelegateProvider) getCurrentCondition(c *v1.Cluster, phase platformv1.C
Reason: ReasonWaiting,
}, nil
}

for _, condition := range c.Status.Conditions {
if condition.Status == platformv1.ConditionFalse || condition.Status == platformv1.ConditionUnknown {
return &condition, nil
}
}

if c.Status.Phase == platformv1.ClusterUpgrading || c.Status.Phase == platformv1.ClusterUpscaling ||
c.Status.Phase == platformv1.ClusterDownscaling {
if c.Status.Phase == platformv1.ClusterUpgrading ||
c.Status.Phase == platformv1.ClusterUpscaling ||
c.Status.Phase == platformv1.ClusterDownscaling ||
c.Status.Phase == platformv1.ClusterRunning {
return &platformv1.ClusterCondition{
Type: handlers[0].Name(),
Status: platformv1.ConditionUnknown,
Expand Down

0 comments on commit 8a393c8

Please sign in to comment.