Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update karpenter manifest management #744

Merged
merged 4 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions pkg/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,18 @@ func (c *ClientsCollection) Get(ctx context.Context, kind, namespace, name strin
return client.Get(ctx, name, options, subresources...)
}

func (c *ClientsCollection) Exists(ctx context.Context, kind, namespace, name string, options metav1.GetOptions, subresources ...string) (bool, error) {
_, err := c.Get(ctx, kind, namespace, name, options, subresources...)

if apierrors.IsNotFound(err) {
return false, nil
}
if err != nil {
return false, err
}
return true, nil
}

func (c *ClientsCollection) List(ctx context.Context, kind, namespace string, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) {
client, err := c.getResourceClient(kind, namespace)
if err != nil {
Expand Down
144 changes: 115 additions & 29 deletions pkg/updatestrategy/aws_ec2.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,33 @@ package updatestrategy

import (
"context"
"encoding/base64"
"fmt"
"strings"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/pkg/errors"
"github.com/zalando-incubator/cluster-lifecycle-manager/api"
"github.com/zalando-incubator/cluster-lifecycle-manager/pkg/kubernetes"
"github.com/zalando-incubator/cluster-lifecycle-manager/pkg/util"
)

const karpenterProvisionerTag = "karpenter.sh/provisioner-name"
const (
karpenterProvisionerTag = "karpenter.sh/provisioner-name"
karpenterNodePoolTag = "karpenter.sh/nodepool"
karpenterProvisionerResource = "provisioners.karpenter.sh"
karpenterAWSNodeTemplateResource = "awsnodetemplates.karpenter.k8s.aws"
karpenterNodePoolResource = "nodepools.karpenter.sh"
KarpenterEC2NodeClassResource = "ec2nodeclasses.karpenter.k8s.aws"
crd = "CustomResourceDefinition"
)

type InstanceConfig struct {
UserData string
Expand Down Expand Up @@ -45,40 +59,22 @@ func InstanceConfigUpToDate(instanceConfig, poolConfig *InstanceConfig) bool {
return true
}

type Option func(b *EC2NodePoolBackend)

type NodePoolConfigGetter func(ctx context.Context, nodePool *api.NodePool) (*InstanceConfig, error)

func NoopNodePoolConfigGetter(context.Context, *api.NodePool) (*InstanceConfig, error) {
return nil, nil
}

func WithConfigGetter(g NodePoolConfigGetter) Option {
return func(b *EC2NodePoolBackend) {
b.configGetter = g
}
}

// EC2NodePoolBackend defines a node pool consisting of EC2 instances
// managed externally by some component e.g. Karpenter.
type EC2NodePoolBackend struct {
ec2Client ec2iface.EC2API
clusterID string
configGetter NodePoolConfigGetter
crdResolver *util.LazyOf[*KarpenterCRDNameResolver]
ec2Client ec2iface.EC2API
clusterID string
}

// NewEC2NodePoolBackend initializes a new EC2NodePoolBackend for
// the given clusterID and AWS session and.
func NewEC2NodePoolBackend(clusterID string, sess *session.Session, opts ...Option) *EC2NodePoolBackend {
b := &EC2NodePoolBackend{
ec2Client: ec2.New(sess),
clusterID: clusterID,
configGetter: NoopNodePoolConfigGetter,
func NewEC2NodePoolBackend(clusterID string, sess *session.Session, crdResolverInitializer func() (*KarpenterCRDNameResolver, error)) *EC2NodePoolBackend {
return &EC2NodePoolBackend{
ec2Client: ec2.New(sess),
clusterID: clusterID,
crdResolver: util.NewLazyOf[*KarpenterCRDNameResolver](crdResolverInitializer),
}
for _, opt := range opts {
opt(b)
}
return b
}

// Get gets the EC2 instances matching to the node pool by looking at node pool
Expand All @@ -92,8 +88,13 @@ func (n *EC2NodePoolBackend) Get(ctx context.Context, nodePool *api.NodePool) (*
return nil, fmt.Errorf("failed to list EC2 instances of the node pool: %w", err)
}

crdResolver, err := n.crdResolver.Value()
if err != nil {
return nil, err
}

nodes := make([]*Node, 0)
nodePoolConfig, err := n.configGetter(ctx, nodePool)
nodePoolConfig, err := crdResolver.NodePoolConfigGetter(ctx, nodePool) // in case of decommission nodePoolConfig is nil, and all nodes are deleted anyway
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -212,6 +213,10 @@ func (n *EC2NodePoolBackend) DecommissionNodePool(ctx context.Context, nodePool
}

func (n *EC2NodePoolBackend) DecommissionKarpenterNodes(ctx context.Context) error {
crdResolver, err := n.crdResolver.Value()
if err != nil {
return err
}
return n.decommission(ctx, []*ec2.Filter{
{
Name: aws.String("tag:" + clusterIDTagPrefix + n.clusterID),
Expand All @@ -222,7 +227,7 @@ func (n *EC2NodePoolBackend) DecommissionKarpenterNodes(ctx context.Context) err
{
Name: aws.String("tag-key"),
Values: []*string{
aws.String(karpenterProvisionerTag),
aws.String(crdResolver.getInstanceTag()),
},
},
})
Expand Down Expand Up @@ -269,3 +274,84 @@ func (n *EC2NodePoolBackend) decommission(ctx context.Context, filters []*ec2.Fi
}
}
}

type KarpenterCRDNameResolver struct {
NodePoolCRDName string
k8sClients *kubernetes.ClientsCollection
}

func NewKarpenterCRDResolver(ctx context.Context, k8sClients *kubernetes.ClientsCollection) (*KarpenterCRDNameResolver, error) {
exists, err := k8sClients.Exists(ctx, crd, "", karpenterNodePoolResource, v1.GetOptions{})
if err != nil {
return nil, err
}
if exists {
return &KarpenterCRDNameResolver{
NodePoolCRDName: karpenterNodePoolResource,
k8sClients: k8sClients,
}, nil
}
return &KarpenterCRDNameResolver{
NodePoolCRDName: karpenterProvisionerResource,
k8sClients: k8sClients,
}, nil
}

func (r *KarpenterCRDNameResolver) NodeTemplateCRDName() string {
switch r.NodePoolCRDName {
case karpenterNodePoolResource:
return KarpenterEC2NodeClassResource
default:
return karpenterAWSNodeTemplateResource
}
}

func (r *KarpenterCRDNameResolver) getAMIsFromSpec(spec interface{}) string {
switch r.NodePoolCRDName {
case karpenterNodePoolResource:
amiSelectorTerms := spec.(map[string]interface{})["amiSelectorTerms"].([]interface{})
var amis []string
for _, amiSelectorTerm := range amiSelectorTerms {
if amiSelectorTerm.(map[string]interface{})["id"] != nil {
amis = append(amis, amiSelectorTerm.(map[string]interface{})["id"].(string))
}
}
return strings.Join(amis, ",")
default:
return spec.(map[string]interface{})["amiSelector"].(map[string]interface{})["aws-ids"].(string)
}
}

func (r *KarpenterCRDNameResolver) getInstanceTag() string {
switch r.NodePoolCRDName {
case karpenterNodePoolResource:
return karpenterNodePoolTag
default:
return karpenterProvisionerTag
}
}

func (r *KarpenterCRDNameResolver) NodePoolConfigGetter(ctx context.Context, nodePool *api.NodePool) (*InstanceConfig, error) {
// CLM assumes that the node pool name is used for both the node-pool and the node-template that it references
NodeTemplate, err := r.k8sClients.Get(ctx, r.NodeTemplateCRDName(), "", nodePool.Name, v1.GetOptions{})
if apierrors.IsNotFound(err) {
// the node pool have been deleted. thus returning nil nodePoolConfig will result in labeling all nodes for decommission
return nil, nil
}
if err != nil {
return nil, err
}
spec, ok := NodeTemplate.Object["spec"]
if !ok {
return nil, errors.New("could not find spec in the %s object" + r.NodeTemplateCRDName())
}
tags := make(map[string]string)
for k, v := range spec.(map[string]interface{})["tags"].(map[string]interface{}) {
tags[k] = v.(string)
}
return &InstanceConfig{
UserData: base64.StdEncoding.EncodeToString([]byte(spec.(map[string]interface{})["userData"].(string))),
ImageID: r.getAMIsFromSpec(spec),
Tags: tags,
}, nil
}
23 changes: 23 additions & 0 deletions pkg/util/generics.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package util

import "sync"

func Contains[T comparable](s []T, e T) bool {
for _, v := range s {
if v == e {
Expand All @@ -8,3 +10,24 @@ func Contains[T comparable](s []T, e T) bool {
}
return false
}

type LazyOf[T any] struct {
New func() (T, error)
once sync.Once
value T
}

func (l *LazyOf[T]) Value() (T, error) {
var err error
if l.New != nil {
l.once.Do(func() {
l.value, err = l.New()
l.New = nil
})
}
return l.value, err
}

func NewLazyOf[T any](newfunc func() (T, error)) *LazyOf[T] {
return &LazyOf[T]{New: newfunc}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not completely clear to me why this lazy initialization is needed. Can't we just initialize when initializing the NodePool backend, or is it because the resources are not applied at that time?

The way to solve that would be to do the resolution whenever there is a need to lookup the CRD resources and not during the setup of the client. Since this is gonna be short lived I think we can keep it like this for now. But I would suggest to remove it long term to simplify the code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it is because the CRDs are not there during the initialization of the backend.

I tried to delay the initialization of the resolver but it was a bit difficult to pass all the parameters across the functions all the way to where it is needed. I thought this was "simpler" 😄

But I would suggest to remove it long term to simplify the code.

yea, sure, we can do this

17 changes: 13 additions & 4 deletions provisioner/clusterpy.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/zalando-incubator/cluster-lifecycle-manager/pkg/util/command"
"github.com/zalando-incubator/kube-ingress-aws-controller/certs"
"golang.org/x/oauth2"
yaml "gopkg.in/yaml.v2"
"gopkg.in/yaml.v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -628,7 +628,13 @@ func (p *clusterpyProvisioner) Decommission(ctx context.Context, logger *log.Ent
}

// decommission karpenter node-pools, since karpenter controller is decommissioned. we need to clean up ec2 resources
ec2Backend := updatestrategy.NewEC2NodePoolBackend(cluster.ID, awsAdapter.session)
ec2Backend := updatestrategy.NewEC2NodePoolBackend(cluster.ID, awsAdapter.session, func() (*updatestrategy.KarpenterCRDNameResolver, error) {
k8sClients, err := kubernetes.NewClientsCollection(cluster.APIServerURL, p.tokenSource)
if err != nil {
return nil, err
}
return updatestrategy.NewKarpenterCRDResolver(ctx, k8sClients)
})
err = ec2Backend.DecommissionKarpenterNodes(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -849,8 +855,11 @@ func (p *clusterpyProvisioner) prepareProvision(logger *log.Entry, cluster *api.
if err != nil {
return nil, nil, err
}

additionalBackends := map[string]updatestrategy.ProviderNodePoolsBackend{
karpenterNodePoolProfile: updatestrategy.NewEC2NodePoolBackend(cluster.ID, adapter.session, updatestrategy.WithConfigGetter(KarpenterNodePoolConfigGetter(k8sClients))),
karpenterNodePoolProfile: updatestrategy.NewEC2NodePoolBackend(cluster.ID, adapter.session, func() (*updatestrategy.KarpenterCRDNameResolver, error) {
return updatestrategy.NewKarpenterCRDResolver(context.Background(), k8sClients)
}),
}

asgBackend := updatestrategy.NewASGNodePoolsBackend(cluster.ID, adapter.session)
Expand Down Expand Up @@ -1077,7 +1086,7 @@ func renderManifests(config channel.Config, cluster *api.Cluster, values map[str

// If there's no content we skip the manifest
if remarshaled == "" {
log.Debugf("Skipping empty manifest: %s\n%s\n", manifest.Path, rendered)
log.Debugf("Skipping empty file: %s", manifest.Path)
} else {
renderedManifests = append(renderedManifests, remarshaled)
}
Expand Down
56 changes: 8 additions & 48 deletions provisioner/node_pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ const (
userDataValuesKey = "UserData"
s3GeneratedFilesPathValuesKey = "S3GeneratedFilesPath"
instanceInfoKey = "InstanceInfo"

karpenterProvisionerResource = "provisioners.karpenter.sh"
karpenterAWSNodeTemplateResource = "awsnodetemplates.karpenter.k8s.aws"
crd = "CustomResourceDefinition"
)

// NodePoolProvisioner is able to provision node pools for a cluster.
Expand Down Expand Up @@ -230,27 +226,31 @@ func (p *KarpenterNodePoolProvisioner) isKarpenterEnabled() bool {
func (p *KarpenterNodePoolProvisioner) Reconcile(ctx context.Context, _ updatestrategy.UpdateStrategy) error {
karpenterPools := p.cluster.KarpenterPools()

existingProvisioners, err := p.k8sClients.List(ctx, karpenterProvisionerResource, "", metav1.ListOptions{})
crdResolver, err := updatestrategy.NewKarpenterCRDResolver(ctx, p.k8sClients)
if err != nil {
return err
}
existingProvisioners, err := p.k8sClients.List(ctx, crdResolver.NodePoolCRDName, "", metav1.ListOptions{})
if err != nil {
return err
}
for _, pr := range existingProvisioners.Items {
if !inNodePoolList(&api.NodePool{Name: pr.GetName()}, karpenterPools) {
err := p.k8sClients.Delete(ctx, karpenterProvisionerResource, "", pr.GetName(), metav1.DeleteOptions{})
err := p.k8sClients.Delete(ctx, crdResolver.NodePoolCRDName, "", pr.GetName(), metav1.DeleteOptions{})
if err != nil {
return err
}
}
}

existingNodeTemplates, err := p.k8sClients.List(ctx, karpenterAWSNodeTemplateResource, "", metav1.ListOptions{})
existingNodeTemplates, err := p.k8sClients.List(ctx, crdResolver.NodeTemplateCRDName(), "", metav1.ListOptions{})
if err != nil {
return err
}

for _, pr := range existingNodeTemplates.Items {
if !inNodePoolList(&api.NodePool{Name: pr.GetName()}, karpenterPools) {
err = p.k8sClients.Delete(ctx, karpenterAWSNodeTemplateResource, "", pr.GetName(), metav1.DeleteOptions{})
err = p.k8sClients.Delete(ctx, crdResolver.NodeTemplateCRDName(), "", pr.GetName(), metav1.DeleteOptions{})
if err != nil {
return err
}
Expand Down Expand Up @@ -456,43 +456,3 @@ func nodePoolStackToNodePool(stack *cloudformation.Stack) *api.NodePool {
}
return nodePool
}

func KarpenterNodePoolConfigGetter(kubeClient *kubernetes.ClientsCollection) updatestrategy.NodePoolConfigGetter {
return func(ctx context.Context, nodePool *api.NodePool) (*updatestrategy.InstanceConfig, error) {
provisionerResource, err := kubeClient.Get(ctx, karpenterProvisionerResource, "", nodePool.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}

spec, ok := provisionerResource.Object["spec"]
if !ok {
return nil, errors.New("")
}
providerRefSpec := spec.(map[string]interface{})["providerRef"]
if providerRefSpec == nil {
return nil, nil
}
providerRef := providerRefSpec.(map[string]interface{})["name"]
if providerRefSpec == nil {
return nil, nil
}

nodeTemplateResource, err := kubeClient.Get(ctx, karpenterAWSNodeTemplateResource, "", providerRef.(string), metav1.GetOptions{})
if err != nil {
return nil, err
}
spec, ok = nodeTemplateResource.Object["spec"]
if !ok {
return nil, errors.New("")
}
tags := make(map[string]string)
for k, v := range spec.(map[string]interface{})["tags"].(map[string]interface{}) {
tags[k] = v.(string)
}
return &updatestrategy.InstanceConfig{
UserData: base64.StdEncoding.EncodeToString([]byte(spec.(map[string]interface{})["userData"].(string))),
ImageID: spec.(map[string]interface{})["amiSelector"].(map[string]interface{})["aws-ids"].(string),
Tags: tags,
}, nil
}
}