Skip to content
This repository was archived by the owner on Jun 11, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions apis/clusters/v1/nodepool_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
ct "github.com/kloudlite/operator/apis/common-types"
"github.com/kloudlite/operator/pkg/constants"
rApi "github.com/kloudlite/operator/pkg/operator"
corev1 "k8s.io/api/core/v1"
)

// +kubebuilder:validation:Enum=ec2;spot;
Expand Down Expand Up @@ -72,6 +73,9 @@ type NodePoolSpec struct {
// +kubebuilder:validation:Minimum=0
TargetCount int `json:"targetCount"`

NodeLabels map[string]string `json:"nodeLabels,omitempty"`
NodeTaints []corev1.Taint `json:"nodeTaints,omitempty"`

IAC InfrastuctureAsCode `json:"iac" graphql:"noinput"`

CloudProvider ct.CloudProvider `json:"cloudProvider"`
Expand Down
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ require (
github.com/jmoiron/sqlx v1.3.5 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
github.com/lib/pq v1.10.7 // indirect
Expand All @@ -125,6 +125,9 @@ require (
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nats-io/nats.go v1.31.0 // indirect
github.com/nats-io/nkeys v0.4.5 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0-rc2.0.20221005185240-3a7f492d3f1b // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,8 @@ github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdY
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down Expand Up @@ -799,6 +801,12 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E=
github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8=
github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk=
github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/ncw/swift v1.0.47/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM=
github.com/nelsam/hel/v2 v2.3.2/go.mod h1:1ZTGfU2PFTOd5mx22i5O0Lc2GY933lQ2wb/ggy+rL3w=
github.com/nelsam/hel/v2 v2.3.3/go.mod h1:1ZTGfU2PFTOd5mx22i5O0Lc2GY933lQ2wb/ggy+rL3w=
Expand Down
76 changes: 75 additions & 1 deletion operators/clusters/controller/register.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,93 @@
package controller

import (
"context"
"encoding/json"
"fmt"
"time"

clustersv1 "github.com/kloudlite/operator/apis/clusters/v1"
"github.com/kloudlite/operator/operator"
account_s3_bucket "github.com/kloudlite/operator/operators/clusters/internal/controllers/account-s3-bucket"
"github.com/kloudlite/operator/operators/clusters/internal/controllers/target"
"github.com/kloudlite/operator/operators/clusters/internal/env"
"github.com/kloudlite/operator/operators/resource-watcher/types"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)

func RegisterInto(mgr operator.Operator) {
ev := env.GetEnvOrDie()
mgr.AddToSchemes(clustersv1.AddToScheme)
// mgr.RegisterWebhooks(&clustersv1.Cluster{})

logger := mgr.Operator().Logger

nc, err := nats.Connect(ev.NatsURL, func(opts *nats.Options) error {
name := "clusters-operator"
*opts = nats.Options{
Name: name,
Servers: []string{ev.NatsURL},
AllowReconnect: true,
MaxReconnect: -1,
ReconnectWait: 3 * time.Second,
PingInterval: 3 * time.Second,
ClosedCB: func(*nats.Conn) {
logger.Infof("[%s] connection closed with nats server", name)
},
DisconnectedCB: func(*nats.Conn) {
logger.Infof("[%s] disconnected with nats server", opts.Name)
},
ConnectedCB: func(*nats.Conn) {
logger.Infof("[%s] connected to nats server", opts.Name)
},
ReconnectedCB: func(*nats.Conn) {
logger.Infof("[%s] reconnected to nats server", opts.Name)
},
RetryOnFailedConnect: true,
Compression: true,
}
return nil
})
if err != nil {
panic(err)
}

js, err := jetstream.New(nc)
if err != nil {
panic(err)
}

mgr.RegisterControllers(
&target.ClusterReconciler{Name: "clusters:target", Env: ev},
&target.ClusterReconciler{
Env: ev,
Name: "clusters:target",
NotifyOnClusterUpdate: func(ctx context.Context, obj *clustersv1.Cluster) error {
var m map[string]any
b, err := json.Marshal(obj)
if err != nil {
return err
}
if err := json.Unmarshal(b, &m); err != nil {
return err
}

accountName := obj.Spec.AccountName
clusterName := obj.Name

msg, err := json.Marshal(types.ResourceUpdate{AccountName: accountName, ClusterName: clusterName, Object: m})
if err != nil {
return err
}

_, err = js.Publish(ctx, fmt.Sprintf(ev.NatsClusterUpdateSubjectFormat, accountName, clusterName), msg)
if err != nil {
return err
}
logger.Infof("[%s] published cluster update to nats: %s", fmt.Sprintf("%s/%s", obj.Spec.AccountName, obj.Name))
return nil
},
},
&account_s3_bucket.Reconciler{Name: "clusters:account-s3-bucket", Env: ev},
)
}
38 changes: 21 additions & 17 deletions operators/clusters/internal/controllers/target/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type ClusterReconciler struct {

templateClusterJob []byte
templateRBACForClusterJob []byte

NotifyOnClusterUpdate func(ctx context.Context, obj *clustersv1.Cluster) error
}

func (r *ClusterReconciler) GetName() string {
Expand All @@ -51,7 +53,6 @@ func (r *ClusterReconciler) GetName() string {

func getBucketFilePath(baseDir string, accountName string, clusterName string) string {
return filepath.Join(baseDir, "account-"+accountName, "cluster-"+clusterName+".tfstate")
// return fmt.Sprintf(baseDir, "kloudlite/account-%s/cluster-%s.tfstate", accountName, clusterName)
}

const (
Expand Down Expand Up @@ -84,42 +85,48 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, request ctrl.Request)
req.LogPreReconcile()
defer req.LogPostReconcile()

if step := r.patchDefaults(req); !step.ShouldProceed() {
notifyAndExit := func(step stepResult.Result) (ctrl.Result, error) {
if err := r.NotifyOnClusterUpdate(ctx, req.Object); err != nil {
return ctrl.Result{}, err
}
return step.ReconcilerResponse()
}

if step := r.patchDefaults(req); !step.ShouldProceed() {
return notifyAndExit(step)
}

if req.Object.GetDeletionTimestamp() != nil {
if x := r.finalize(req); !x.ShouldProceed() {
return x.ReconcilerResponse()
return notifyAndExit(x)
}
return ctrl.Result{}, nil
}

if step := req.ClearStatusIfAnnotated(); !step.ShouldProceed() {
return step.ReconcilerResponse()
return notifyAndExit(step)
}

if step := req.EnsureLabelsAndAnnotations(); !step.ShouldProceed() {
return step.ReconcilerResponse()
return notifyAndExit(step)
}

if step := req.EnsureFinalizers(constants.CommonFinalizer); !step.ShouldProceed() {
return step.ReconcilerResponse()
return notifyAndExit(step)
}

if step := r.ensureJobRBAC(req); !step.ShouldProceed() {
return step.ReconcilerResponse()
return notifyAndExit(step)
}

// if step := r.ensureMessageQueueTopic(req); !step.ShouldProceed() {
// return step.ReconcilerResponse()
// }

if step := r.startClusterApplyJob(req); !step.ShouldProceed() {
return step.ReconcilerResponse()
return notifyAndExit(step)
}

req.Object.Status.IsReady = true
if err := r.NotifyOnClusterUpdate(ctx, req.Object); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -174,7 +181,7 @@ func (r *ClusterReconciler) patchDefaults(req *rApi.Request[*clustersv1.Cluster]

check.Status = true
if check != obj.Status.Checks[checkName] {
obj.Status.Checks[checkName] = check
fn.MapSet(obj.Status.Checks, checkName, check)
if sr := req.UpdateStatus(); !sr.ShouldProceed() {
return sr
}
Expand All @@ -196,10 +203,7 @@ func (r *ClusterReconciler) finalize(req *rApi.Request[*clustersv1.Cluster]) ste
check.Status = false
check.Message = "cluster job failed"
if check != obj.Status.Checks[checkName] {
if obj.Status.Checks == nil {
obj.Status.Checks = map[string]rApi.Check{}
}
obj.Status.Checks[checkName] = check
fn.MapSet(obj.Status.Checks, checkName, check)
if sr := req.UpdateStatus(); !sr.ShouldProceed() {
return sr
}
Expand Down
5 changes: 5 additions & 0 deletions operators/clusters/internal/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ type Env struct {
MessageOfficeGRPCAddr string `env:"MESSAGE_OFFICE_GRPC_ADDR" required:"true"`

IACJobImage string `env:"IAC_JOB_IMAGE" required:"true"`

NatsURL string `env:"NATS_URL" required:"true"`
NatsStream string `env:"NATS_STREAM" required:"true"`

NatsClusterUpdateSubjectFormat string `env:"NATS_CLUSTER_UPDATE_SUBJECT_FORMAT" required:"true"`
}

func GetEnvOrDie() *Env {
Expand Down