Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
Signed-off-by: Mahmoud Gaballah <mahmoud.gaballah@zalando.de>
  • Loading branch information
myaser committed Mar 10, 2023
1 parent ebc2a81 commit c3ff582
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 152 deletions.
8 changes: 8 additions & 0 deletions api/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,11 @@ func (cluster *Cluster) ChannelOverrides() (map[string]string, error) {
}
return result, nil
}

func (cluster *Cluster) ASGBackedPools() []*NodePool {
var cp []*NodePool
for _, n := range cluster.NodePools {
cp = append(cp, n)
}
return cp
}
32 changes: 32 additions & 0 deletions api/node_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,35 @@ func (np NodePool) IsSpot() bool {
func (np NodePool) IsMaster() bool {
return strings.Contains(np.Profile, "master")
}

type Taint struct {
Key string
Value string
Effect string
}

func (np NodePool) Taints() []Taint {
conf, exist := np.ConfigItems["taints"]
if !exist {
return nil
}
var taints []Taint
for _, t := range strings.Split(conf, ",") {
taintData := strings.FieldsFunc(t, func(r rune) bool {
return r == '=' || r == ':'
})
if len(taintData) == 3 {
taints = append(taints, Taint{
Key: taintData[0],
Value: taintData[1],
Effect: taintData[2],
})
} else if len(taintData) == 2 {
taints = append(taints, Taint{
Key: taintData[0],
Effect: taintData[1],
})
}
}
return taints
}
17 changes: 17 additions & 0 deletions api/node_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package api

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestTaints(t *testing.T) {
pool := NodePool{ConfigItems: map[string]string{
"taints": "dedicated=test:NoSchedule,example:NoSchedule",
}}
require.Equal(t, []Taint{
{Key: "dedicated", Value: "test", Effect: "NoSchedule"},
{Key: "example", Value: "", Effect: "NoSchedule"},
}, pool.Taints())
}
4 changes: 2 additions & 2 deletions provisioner/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ func (a *awsAdapter) GetSubnets(vpcID string) ([]*ec2.Subnet, error) {
return subnetResp.Subnets, nil
}

// CreateTags adds or updates tags of a resource.
// CreateTags adds or updates tags of a kubernetes.Resource.
func (a *awsAdapter) CreateTags(resource string, tags []*ec2.Tag) error {
params := &ec2.CreateTagsInput{
Resources: []*string{aws.String(resource)},
Expand All @@ -700,7 +700,7 @@ func (a *awsAdapter) CreateTags(resource string, tags []*ec2.Tag) error {
return err
}

// DeleteTags deletes tags from a resource.
// DeleteTags deletes tags from a kubernetes.Resource.
func (a *awsAdapter) DeleteTags(resource string, tags []*ec2.Tag) error {
params := &ec2.DeleteTagsInput{
Resources: []*string{aws.String(resource)},
Expand Down
140 changes: 81 additions & 59 deletions provisioner/clusterpy.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (p *clusterpyProvisioner) propagateConfigItemsToNodePools(cluster *api.Clus
// Provision provisions/updates a cluster on AWS. Provision is an idempotent
// operation for the same input.
func (p *clusterpyProvisioner) Provision(ctx context.Context, logger *log.Entry, cluster *api.Cluster, channelConfig channel.Config) error {
awsAdapter, updater, nodePoolManager, err := p.prepareProvision(logger, cluster, channelConfig)
awsAdapter, updater, err := p.prepareProvision(logger, cluster, channelConfig)
if err != nil {
return err
}
Expand Down Expand Up @@ -321,56 +321,37 @@ func (p *clusterpyProvisioner) Provision(ctx context.Context, logger *log.Entry,

// provision node pools
nodePoolProvisioner := &AWSNodePoolProvisioner{
awsAdapter: awsAdapter,
instanceTypes: instanceTypes,
nodePoolManager: nodePoolManager,
bucketName: bucketName,
config: channelConfig,
cluster: cluster,
azInfo: azInfo,
logger: logger,
NodePoolTemplateRenderer: NodePoolTemplateRenderer{
awsAdapter: awsAdapter,
config: channelConfig,
cluster: cluster,
bucketName: bucketName,
logger: logger,
encodeUserData: true,
},
instanceTypes: instanceTypes,
azInfo: azInfo,
}

// group node pools based on their profile e.g. master
nodePoolGroups := groupNodePools(
logger,
cluster,
nodePoolProvisioner,
)

for _, g := range nodePoolGroups {
err := nodePoolProvisioner.Provision(ctx, g.NodePools, values)
if err != nil {
return err
}

// custom function that checks if the node pools are "ready"
err = g.ReadyFn()
if err != nil {
return err
}

if err = ctx.Err(); err != nil {
return err
}
err = nodePoolGroups["masters"].provisionNodePoolGroup(ctx, values, updater, cluster, p.applyOnly)
if err != nil {
return err
}

if !p.applyOnly {
switch cluster.LifecycleStatus {
case models.ClusterLifecycleStatusRequested, models.ClusterUpdateLifecycleStatusCreating:
log.Warnf("New cluster (%s), skipping node pool update", cluster.LifecycleStatus)
default:
// update nodes
for _, nodePool := range g.NodePools {
err := updater.Update(ctx, nodePool)
if err != nil {
return err
}
if err = ctx.Err(); err != nil {
return err
}

if err = ctx.Err(); err != nil {
return err
}
}
}
}
err = nodePoolGroups["workers"].provisionNodePoolGroup(ctx, values, updater, cluster, p.applyOnly)
if err != nil {
return err
}

// clean up removed node pools
Expand Down Expand Up @@ -746,26 +727,26 @@ func (p *clusterpyProvisioner) setupAWSAdapter(logger *log.Entry, cluster *api.C
// prepares to provision a cluster by initializing the aws adapter.
// TODO: this is doing a lot of things to glue everything together, this should
// be refactored.
func (p *clusterpyProvisioner) prepareProvision(logger *log.Entry, cluster *api.Cluster, channelConfig channel.Config) (*awsAdapter, updatestrategy.UpdateStrategy, updatestrategy.NodePoolManager, error) {
func (p *clusterpyProvisioner) prepareProvision(logger *log.Entry, cluster *api.Cluster, channelConfig channel.Config) (*awsAdapter, updatestrategy.UpdateStrategy, error) {
if cluster.Provider != providerID {
return nil, nil, nil, ErrProviderNotSupported
return nil, nil, ErrProviderNotSupported
}

logger.Infof("clusterpy: Prepare for provisioning cluster %s (%s)..", cluster.ID, cluster.LifecycleStatus)

adapter, err := p.setupAWSAdapter(logger, cluster)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to setup AWS Adapter: %v", err)
return nil, nil, fmt.Errorf("failed to setup AWS Adapter: %v", err)
}

err = p.updateDefaults(cluster, channelConfig, adapter)
if err != nil {
return nil, nil, nil, fmt.Errorf("unable to read configuration defaults: %v", err)
return nil, nil, fmt.Errorf("unable to read configuration defaults: %v", err)
}

err = p.decryptConfigItems(cluster)
if err != nil {
return nil, nil, nil, fmt.Errorf("unable to decrypt config items: %v", err)
return nil, nil, fmt.Errorf("unable to decrypt config items: %v", err)
}

p.propagateConfigItemsToNodePools(cluster)
Expand All @@ -788,7 +769,7 @@ func (p *clusterpyProvisioner) prepareProvision(logger *log.Entry, cluster *api.

client, err := kubernetes.NewClient(cluster.APIServerURL, p.tokenSource)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}

// setup updater
Expand All @@ -808,7 +789,7 @@ func (p *clusterpyProvisioner) prepareProvision(logger *log.Entry, cluster *api.
if value, ok := cluster.ConfigItems[setting.key]; ok {
parsed, err := time.ParseDuration(value)
if err != nil {
return nil, nil, nil, fmt.Errorf("invalid value for %s: %v", setting.key, err)
return nil, nil, fmt.Errorf("invalid value for %s: %v", setting.key, err)
}
setting.fn(parsed)
}
Expand All @@ -832,10 +813,10 @@ func (p *clusterpyProvisioner) prepareProvision(logger *log.Entry, cluster *api.
case updateStrategyCLC:
updater = updatestrategy.NewCLCUpdateStrategy(logger, poolManager, clcPollingInterval)
default:
return nil, nil, nil, fmt.Errorf("unknown update strategy: %s", p.updateStrategy)
return nil, nil, fmt.Errorf("unknown update strategy: %s", p.updateStrategy)
}

return adapter, updater, poolManager, nil
return adapter, updater, nil
}

// downscaleDeployments scales down all deployments of a cluster in the
Expand Down Expand Up @@ -1114,7 +1095,14 @@ func int32Value(v *int32) int32 {
return 0
}

func groupNodePools(logger *log.Entry, cluster *api.Cluster) []nodePoolGroup {
type nodePoolGroup struct {
NodePools []*api.NodePool
Provisioner NodePoolProvisioner
ReadyFn func() error
}

func groupNodePools(logger *log.Entry, cluster *api.Cluster, caProvisioner *AWSNodePoolProvisioner) map[string]*nodePoolGroup {

var masters, workers []*api.NodePool
for _, nodePool := range cluster.NodePools {
if nodePool.IsMaster() {
Expand All @@ -1125,23 +1113,57 @@ func groupNodePools(logger *log.Entry, cluster *api.Cluster) []nodePoolGroup {
workers = append(workers, nodePool)
}

return []nodePoolGroup{
{
NodePools: masters,
return map[string]*nodePoolGroup{
"masters": {
NodePools: masters,
Provisioner: caProvisioner,
ReadyFn: func() error {
return waitForAPIServer(logger, cluster.APIServerURL, 15*time.Minute)
},
},
{
NodePools: workers,
"workers": {
NodePools: workers,
Provisioner: caProvisioner,
ReadyFn: func() error {
return nil
},
},
}
}

type nodePoolGroup struct {
NodePools []*api.NodePool
ReadyFn func() error
func (npg *nodePoolGroup) provisionNodePoolGroup(ctx context.Context, values map[string]interface{}, updater updatestrategy.UpdateStrategy, cluster *api.Cluster, applyOnly bool) error {
err := npg.Provisioner.Provision(ctx, npg.NodePools, values)
if err != nil {
return err
}

// custom function that checks if the node pools are "ready"
err = npg.ReadyFn()
if err != nil {
return err
}

if err = ctx.Err(); err != nil {
return err
}

if !applyOnly {
switch cluster.LifecycleStatus {
case models.ClusterLifecycleStatusRequested, models.ClusterUpdateLifecycleStatusCreating:
log.Warnf("New cluster (%s), skipping node pool update", cluster.LifecycleStatus)
default:
// update nodes
for _, nodePool := range npg.NodePools {
err := updater.Update(ctx, nodePool)
if err != nil {
return err
}

if err = ctx.Err(); err != nil {
return err
}
}
}
}
return nil
}

0 comments on commit c3ff582

Please sign in to comment.