Skip to content
This repository was archived by the owner on Jun 11, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions operators/clusters/controller/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
func RegisterInto(mgr operator.Operator) {
ev := env.GetEnvOrDie()
mgr.AddToSchemes(clustersv1.AddToScheme)
// mgr.RegisterWebhooks(&clustersv1.Cluster{})

logger := mgr.Operator().Logger

Expand Down Expand Up @@ -75,7 +74,24 @@ func RegisterInto(mgr operator.Operator) {
accountName := obj.Spec.AccountName
clusterName := obj.Name

msg, err := json.Marshal(types.ResourceUpdate{AccountName: accountName, ClusterName: clusterName, Object: m})
if obj.GetDeletionTimestamp() == nil {
m[types.ResourceStatusKey] = types.ResourceStatusUpdated
}

if obj.GetDeletionTimestamp() != nil {
m[types.ResourceStatusKey] = func() types.ResourceStatus {
if types.HasOtherKloudliteFinalizers(obj) {
return types.ResourceStatusDeleting
}
return types.ResourceStatusDeleted
}()
}

msg, err := json.Marshal(types.ResourceUpdate{
AccountName: accountName,
ClusterName: clusterName,
Object: m,
})
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions operators/resource-watcher/controller/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ func RegisterInto(mgr operator.Operator) {
panic(err)
}

ev.IsDev = mgr.Operator().IsDev

mgr.AddToSchemes(
crdsv1.AddToScheme,
mongodbMsvcv1.AddToScheme, mysqlMsvcv1.AddToScheme, redisMsvcv1.AddToScheme,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/json"
"errors"
"fmt"
networkingv1 "k8s.io/api/networking/v1"
"strings"
"time"

Expand All @@ -20,10 +19,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

clustersv1 "github.com/kloudlite/operator/apis/clusters/v1"
crdsv1 "github.com/kloudlite/operator/apis/crds/v1"
serverlessv1 "github.com/kloudlite/operator/apis/serverless/v1"
wireguardv1 "github.com/kloudlite/operator/apis/wireguard/v1"
"github.com/kloudlite/operator/operators/resource-watcher/internal/env"
"github.com/kloudlite/operator/operators/resource-watcher/internal/types"
t "github.com/kloudlite/operator/operators/resource-watcher/types"
Expand All @@ -37,63 +32,53 @@ import (
// Reconciler reconciles a StatusWatcher object
type Reconciler struct {
client.Client
Scheme *runtime.Scheme
logger logging.Logger
Name string
Env *env.Env
accessToken string
MsgSender MessageSender
Scheme *runtime.Scheme
logger logging.Logger
Name string
Env *env.Env
accessToken string
MsgSender MessageSender
}

func (r *Reconciler) GetName() string {
return r.Name
}

func (r *Reconciler) SendResourceEvents(ctx context.Context, obj *unstructured.Unstructured, logger logging.Logger) (ctrl.Result, error) {
obj.SetManagedFields(nil)

// mctx, cf := context.WithTimeout(ctx, 100*time.Second)
_ = time.Second
mctx, cf := context.WithCancel(ctx)
func (r *Reconciler) dispatchEvent(ctx context.Context, obj *unstructured.Unstructured) (ctrl.Result, error) {
mctx, cf := func() (context.Context, context.CancelFunc) {
if r.Env.IsDev {
return context.WithCancel(context.TODO())
}
return context.WithTimeout(ctx, 2*time.Second)
}()
defer cf()

belongsTo := func(group string) bool {
return strings.HasSuffix(obj.GetObjectKind().GroupVersionKind().Group, group)
}

switch {
case strings.HasSuffix(obj.GetObjectKind().GroupVersionKind().Group, "infra.kloudlite.io"):
case belongsTo("infra.kloudlite.io"):
{
if err := r.MsgSender.DispatchInfraUpdates(mctx, t.ResourceUpdate{
if err := r.MsgSender.DispatchInfraResourceUpdates(mctx, t.ResourceUpdate{
ClusterName: r.Env.ClusterName,
AccountName: r.Env.AccountName,
Object: obj.Object,
}); err != nil {
return ctrl.Result{}, err
}
}
case strings.HasSuffix(obj.GetObjectKind().GroupVersionKind().Group, "clusters.kloudlite.io"):
case belongsTo("clusters.kloudlite.io"):
{
switch obj.GetObjectKind().GroupVersionKind().Kind {
case "Cluster":
{
if err := r.MsgSender.DispatchInfraUpdates(mctx, t.ResourceUpdate{
ClusterName: obj.GetName(),
AccountName: obj.Object["spec"].(map[string]any)["accountName"].(string),
Object: obj.Object,
}); err != nil {
return ctrl.Result{}, err
}
}
default:
{
if err := r.MsgSender.DispatchInfraUpdates(mctx, t.ResourceUpdate{
ClusterName: r.Env.ClusterName,
AccountName: r.Env.AccountName,
Object: obj.Object,
}); err != nil {
return ctrl.Result{}, err
}
}
if err := r.MsgSender.DispatchInfraResourceUpdates(mctx, t.ResourceUpdate{
ClusterName: r.Env.ClusterName,
AccountName: r.Env.AccountName,
Object: obj.Object,
}); err != nil {
return ctrl.Result{}, err
}
}
case strings.HasSuffix(obj.GetObjectKind().GroupVersionKind().Group, "wireguard.kloudlite.io"):
case belongsTo("wireguard.kloudlite.io"):
{
switch obj.GetObjectKind().GroupVersionKind().Kind {
case "Device":
Expand All @@ -111,7 +96,7 @@ func (r *Reconciler) SendResourceEvents(ctx context.Context, obj *unstructured.U
}
}

if err := r.MsgSender.DispatchInfraUpdates(mctx, t.ResourceUpdate{
if err := r.MsgSender.DispatchInfraResourceUpdates(mctx, t.ResourceUpdate{
ClusterName: r.Env.ClusterName,
AccountName: r.Env.AccountName,
Object: obj.Object,
Expand All @@ -125,40 +110,58 @@ func (r *Reconciler) SendResourceEvents(ctx context.Context, obj *unstructured.U
}
}
}
case strings.HasSuffix(obj.GetObjectKind().GroupVersionKind().Group, "kloudlite.io"):
case belongsTo("kloudlite.io"):
{
if err := r.MsgSender.DispatchResourceUpdates(mctx, t.ResourceUpdate{
if err := r.MsgSender.DispatchConsoleResourceUpdates(mctx, t.ResourceUpdate{
ClusterName: r.Env.ClusterName,
AccountName: r.Env.AccountName,
Object: obj.Object,
}); err != nil {
return ctrl.Result{}, err
}
}
default:
{
if err := r.MsgSender.DispatchResourceUpdates(mctx, t.ResourceUpdate{
ClusterName: r.Env.ClusterName,
AccountName: r.Env.AccountName,
Object: obj.Object,
}); err != nil {
return ctrl.Result{}, err
}
case belongsTo(""):
if err := r.MsgSender.DispatchConsoleResourceUpdates(mctx, t.ResourceUpdate{
ClusterName: r.Env.ClusterName,
AccountName: r.Env.AccountName,
Object: obj.Object,
}); err != nil {
return ctrl.Result{}, err
}
}

return ctrl.Result{}, nil
}

func (r *Reconciler) SendResourceEvents(ctx context.Context, obj *unstructured.Unstructured, logger logging.Logger) (ctrl.Result, error) {
obj.SetManagedFields(nil)

if obj.GetDeletionTimestamp() != nil {
// resource is about to be deleted
if t.HasOtherKloudliteFinalizers(obj) {
// 1. send deleting event
obj.Object[t.ResourceStatusKey] = t.ResourceStatusDeleting
return r.dispatchEvent(ctx, obj)
}

// 2. send deleted event
obj.Object[t.ResourceStatusKey] = t.ResourceStatusDeleted
if controllerutil.ContainsFinalizer(obj, constants.StatusWatcherFinalizer) {
return r.RemoveWatcherFinalizer(mctx, obj)
if rr, err := r.RemoveWatcherFinalizer(ctx, obj); err != nil {
return rr, err
}
}
return ctrl.Result{}, nil
return r.dispatchEvent(ctx, obj)
}

if !controllerutil.ContainsFinalizer(obj, constants.StatusWatcherFinalizer) {
return r.AddWatcherFinalizer(mctx, obj)
if rr, err := r.AddWatcherFinalizer(ctx, obj); err != nil {
return rr, err
}
}

return ctrl.Result{}, nil
obj.Object[t.ResourceStatusKey] = t.ResourceStatusUpdated
return r.dispatchEvent(ctx, obj)
}

// +kubebuilder:rbac:groups=watcher.kloudlite.io,resources=statuswatchers,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -213,62 +216,56 @@ func (r *Reconciler) RemoveWatcherFinalizer(ctx context.Context, obj client.Obje
return ctrl.Result{}, r.Update(ctx, obj)
}

type WatchResource struct {
metav1.TypeMeta `json:",inline"`
}

func NewWatchResource(apiVersion string, kind string) WatchResource {
return WatchResource{
TypeMeta: metav1.TypeMeta{
Kind: kind,
APIVersion: apiVersion,
},
}
}

// SetupWithManager sets up the controllers with the Manager.
func (r *Reconciler) SetupWithManager(mgr ctrl.Manager, logger logging.Logger) error {
r.Client = mgr.GetClient()
r.Scheme = mgr.GetScheme()
r.logger = logger.WithName(r.Name).WithKV("accountName", r.Env.AccountName).WithKV("clusterName", r.Env.ClusterName)

builder := ctrl.NewControllerManagedBy(mgr)
builder.For(&crdsv1.Project{})
builder.For(&corev1.Node{})

nativeWatchList := []client.Object{
&corev1.PersistentVolumeClaim{},
&corev1.PersistentVolume{},
&networkingv1.Ingress{},
}
watchList := []WatchResource{
NewWatchResource("crds.kloudlite.io/v1", "Project"),
NewWatchResource("crds.kloudlite.io/v1", "App"),
NewWatchResource("crds.kloudlite.io/v1", "ManagedService"),
NewWatchResource("crds.kloudlite.io/v1", "ManagedResource"),
NewWatchResource("crds.kloudlite.io/v1", "Workspace"),
NewWatchResource("crds.kloudlite.io/v1", "Router"),
NewWatchResource("clusters.kloudlite.io/v1", "NodePool"),
NewWatchResource("wireguard.kloudlite.io/v1", "Device"),

watchList := []client.Object{
&crdsv1.Project{},
&crdsv1.App{},
&serverlessv1.Lambda{},
&crdsv1.ManagedService{},
&crdsv1.ManagedResource{},
&crdsv1.Router{},
&crdsv1.Workspace{},
&crdsv1.Config{},
&crdsv1.Secret{},

&clustersv1.Cluster{},
&clustersv1.NodePool{},

&wireguardv1.Device{},
}
for _, object := range nativeWatchList {
builder.Watches(
object,
handler.EnqueueRequestsFromMapFunc(
func(ctx context.Context, obj client.Object) []reconcile.Request {
return []reconcile.Request{
{NamespacedName: fn.NN(obj.GetNamespace(), obj.GetName())},
}
},
),
)
// native resources
NewWatchResource("v1", "PersistentVolumeClaim"),
NewWatchResource("v1", "PersistentVolume"),
NewWatchResource("networking.k8s.io/v1", "Ingress"),
}
for _, object := range watchList {

for i := range watchList {
builder.Watches(
object,
fn.NewUnstructured(watchList[i].TypeMeta),
handler.EnqueueRequestsFromMapFunc(
func(ctx context.Context, obj client.Object) []reconcile.Request {
v, ok := obj.GetAnnotations()[constants.GVKKey]
if !ok {
return nil
}
b64Group := base64.StdEncoding.EncodeToString([]byte(v))
func(_ context.Context, obj client.Object) []reconcile.Request {
gvk := watchList[i].GetObjectKind().GroupVersionKind().String()

b64Group := base64.StdEncoding.EncodeToString([]byte(gvk))
if len(b64Group) == 0 {
return nil
}

wName, err := types.WrappedName{Name: obj.GetName(), Group: b64Group}.String()
if err != nil {
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/kloudlite/operator/grpc-interfaces/grpc/messages"
"github.com/kloudlite/operator/operators/resource-watcher/internal/env"
t "github.com/kloudlite/operator/operators/resource-watcher/types"
"github.com/kloudlite/operator/pkg/errors"
"github.com/kloudlite/operator/pkg/logging"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
Expand All @@ -22,8 +23,8 @@ type grpcMsgSender struct {
logger logging.Logger
}

// DispatchInfraUpdates implements MessageSender.
func (g *grpcMsgSender) DispatchInfraUpdates(ctx context.Context, ru t.ResourceUpdate) error {
// DispatchInfraResourceUpdates implements MessageSender.
func (g *grpcMsgSender) DispatchInfraResourceUpdates(ctx context.Context, ru t.ResourceUpdate) error {
b, err := json.Marshal(ru)
if err != nil {
return err
Expand Down Expand Up @@ -55,8 +56,8 @@ func (g *grpcMsgSender) DispatchInfraUpdates(ctx context.Context, ru t.ResourceU
}
}

// DispatchResourceUpdates implements MessageSender.
func (g *grpcMsgSender) DispatchResourceUpdates(ctx context.Context, ru t.ResourceUpdate) error {
// DispatchConsoleResourceUpdates implements MessageSender.
func (g *grpcMsgSender) DispatchConsoleResourceUpdates(ctx context.Context, ru t.ResourceUpdate) error {
b, err := json.Marshal(ru)
if err != nil {
return err
Expand Down Expand Up @@ -102,7 +103,8 @@ func NewGRPCMessageSender(ctx context.Context, cc *grpc.ClientConn, ev *env.Env,
}

if validationOut == nil || !validationOut.Valid {
logger.Infof("accessToken is invalid, aborting")
err := errors.Newf("accessToken is invalid, aborting")
logger.Error(err)
return nil, err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
)

type MessageSender interface {
DispatchResourceUpdates(ctx context.Context, stu t.ResourceUpdate) error
DispatchInfraUpdates(ctx context.Context, stu t.ResourceUpdate) error
DispatchConsoleResourceUpdates(ctx context.Context, stu t.ResourceUpdate) error
DispatchInfraResourceUpdates(ctx context.Context, stu t.ResourceUpdate) error
}

1 change: 1 addition & 0 deletions operators/resource-watcher/internal/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

type Env struct {
ReconcilePeriod time.Duration `env:"RECONCILE_PERIOD" required:"true"`
IsDev bool `env:"IS_DEV"`
MaxConcurrentReconciles int `env:"MAX_CONCURRENT_RECONCILES" required:"true"`
AccountName string `env:"ACCOUNT_NAME" required:"true"`
ClusterName string `env:"CLUSTER_NAME" required:"true"`
Expand Down
Loading