From 8c3c9bc0810d33d6f494b217632951439bb02654 Mon Sep 17 00:00:00 2001 From: nxtcoder17 Date: Tue, 26 Dec 2023 16:38:19 +0530 Subject: [PATCH 1/2] fix(resource-watcher): fixes message dispatching - adds resource status to every message dispatched through resource watcher --- operators/clusters/controller/register.go | 20 +- .../resource-watcher/controller/register.go | 2 + .../watch-and-update/controller.go | 195 +++++++++--------- .../watch-and-update/grpc-message-sender.go | 12 +- .../watch-and-update/message-sender.go | 5 +- .../resource-watcher/internal/env/env.go | 1 + operators/resource-watcher/main.go | 3 - operators/resource-watcher/types/types.go | 34 +++ 8 files changed, 160 insertions(+), 112 deletions(-) diff --git a/operators/clusters/controller/register.go b/operators/clusters/controller/register.go index 79c43e55..689fa7e6 100644 --- a/operators/clusters/controller/register.go +++ b/operators/clusters/controller/register.go @@ -19,7 +19,6 @@ import ( func RegisterInto(mgr operator.Operator) { ev := env.GetEnvOrDie() mgr.AddToSchemes(clustersv1.AddToScheme) - // mgr.RegisterWebhooks(&clustersv1.Cluster{}) logger := mgr.Operator().Logger @@ -75,7 +74,24 @@ func RegisterInto(mgr operator.Operator) { accountName := obj.Spec.AccountName clusterName := obj.Name - msg, err := json.Marshal(types.ResourceUpdate{AccountName: accountName, ClusterName: clusterName, Object: m}) + if obj.GetDeletionTimestamp() == nil { + m[types.ResourceStatusKey] = types.ResourceStatusUpdated + } + + if obj.GetDeletionTimestamp() != nil { + m[types.ResourceStatusKey] = func() types.ResourceStatus { + if types.HasOtherKloudliteFinalizers(obj) { + return types.ResourceStatusDeleting + } + return types.ResourceStatusDeleted + }() + } + + msg, err := json.Marshal(types.ResourceUpdate{ + AccountName: accountName, + ClusterName: clusterName, + Object: m, + }) if err != nil { return err } diff --git a/operators/resource-watcher/controller/register.go b/operators/resource-watcher/controller/register.go index a727668d..d98ad67c 100644 --- a/operators/resource-watcher/controller/register.go +++ b/operators/resource-watcher/controller/register.go @@ -28,6 +28,8 @@ func RegisterInto(mgr operator.Operator) { panic(err) } + ev.IsDev = mgr.Operator().IsDev + mgr.AddToSchemes( crdsv1.AddToScheme, mongodbMsvcv1.AddToScheme, mysqlMsvcv1.AddToScheme, redisMsvcv1.AddToScheme, diff --git a/operators/resource-watcher/internal/controllers/watch-and-update/controller.go b/operators/resource-watcher/internal/controllers/watch-and-update/controller.go index a9a8b93f..c8d8169d 100644 --- a/operators/resource-watcher/internal/controllers/watch-and-update/controller.go +++ b/operators/resource-watcher/internal/controllers/watch-and-update/controller.go @@ -6,7 +6,6 @@ import ( "encoding/json" "errors" "fmt" - networkingv1 "k8s.io/api/networking/v1" "strings" "time" @@ -20,10 +19,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" - clustersv1 "github.com/kloudlite/operator/apis/clusters/v1" - crdsv1 "github.com/kloudlite/operator/apis/crds/v1" - serverlessv1 "github.com/kloudlite/operator/apis/serverless/v1" - wireguardv1 "github.com/kloudlite/operator/apis/wireguard/v1" "github.com/kloudlite/operator/operators/resource-watcher/internal/env" "github.com/kloudlite/operator/operators/resource-watcher/internal/types" t "github.com/kloudlite/operator/operators/resource-watcher/types" @@ -37,30 +32,35 @@ import ( // Reconciler reconciles a StatusWatcher object type Reconciler struct { client.Client - Scheme *runtime.Scheme - logger logging.Logger - Name string - Env *env.Env - accessToken string - MsgSender MessageSender + Scheme *runtime.Scheme + logger logging.Logger + Name string + Env *env.Env + accessToken string + MsgSender MessageSender } func (r *Reconciler) GetName() string { return r.Name } -func (r *Reconciler) SendResourceEvents(ctx context.Context, obj *unstructured.Unstructured, logger logging.Logger) (ctrl.Result, error) { - obj.SetManagedFields(nil) - - // mctx, cf := context.WithTimeout(ctx, 100*time.Second) - _ = time.Second - mctx, cf := context.WithCancel(ctx) +func (r *Reconciler) dispatchEvent(ctx context.Context, obj *unstructured.Unstructured) (ctrl.Result, error) { + mctx, cf := func() (context.Context, context.CancelFunc) { + if r.Env.IsDev { + return context.WithCancel(context.TODO()) + } + return context.WithTimeout(ctx, 2*time.Second) + }() defer cf() + belongsTo := func(group string) bool { + return strings.HasSuffix(obj.GetObjectKind().GroupVersionKind().Group, group) + } + switch { - case strings.HasSuffix(obj.GetObjectKind().GroupVersionKind().Group, "infra.kloudlite.io"): + case belongsTo("infra.kloudlite.io"): { - if err := r.MsgSender.DispatchInfraUpdates(mctx, t.ResourceUpdate{ + if err := r.MsgSender.DispatchInfraResourceUpdates(mctx, t.ResourceUpdate{ ClusterName: r.Env.ClusterName, AccountName: r.Env.AccountName, Object: obj.Object, @@ -68,32 +68,17 @@ func (r *Reconciler) SendResourceEvents(ctx context.Context, obj *unstructured.U return ctrl.Result{}, err } } - case strings.HasSuffix(obj.GetObjectKind().GroupVersionKind().Group, "clusters.kloudlite.io"): + case belongsTo("clusters.kloudlite.io"): { - switch obj.GetObjectKind().GroupVersionKind().Kind { - case "Cluster": - { - if err := r.MsgSender.DispatchInfraUpdates(mctx, t.ResourceUpdate{ - ClusterName: obj.GetName(), - AccountName: obj.Object["spec"].(map[string]any)["accountName"].(string), - Object: obj.Object, - }); err != nil { - return ctrl.Result{}, err - } - } - default: - { - if err := r.MsgSender.DispatchInfraUpdates(mctx, t.ResourceUpdate{ - ClusterName: r.Env.ClusterName, - AccountName: r.Env.AccountName, - Object: obj.Object, - }); err != nil { - return ctrl.Result{}, err - } - } + if err := r.MsgSender.DispatchInfraResourceUpdates(mctx, t.ResourceUpdate{ + ClusterName: r.Env.ClusterName, + AccountName: r.Env.AccountName, + Object: obj.Object, + }); err != nil { + return ctrl.Result{}, err } } - case strings.HasSuffix(obj.GetObjectKind().GroupVersionKind().Group, "wireguard.kloudlite.io"): + case belongsTo("wireguard.kloudlite.io"): { switch obj.GetObjectKind().GroupVersionKind().Kind { case "Device": @@ -111,7 +96,7 @@ func (r *Reconciler) SendResourceEvents(ctx context.Context, obj *unstructured.U } } - if err := r.MsgSender.DispatchInfraUpdates(mctx, t.ResourceUpdate{ + if err := r.MsgSender.DispatchInfraResourceUpdates(mctx, t.ResourceUpdate{ ClusterName: r.Env.ClusterName, AccountName: r.Env.AccountName, Object: obj.Object, @@ -125,9 +110,9 @@ func (r *Reconciler) SendResourceEvents(ctx context.Context, obj *unstructured.U } } } - case strings.HasSuffix(obj.GetObjectKind().GroupVersionKind().Group, "kloudlite.io"): + case belongsTo("kloudlite.io"): { - if err := r.MsgSender.DispatchResourceUpdates(mctx, t.ResourceUpdate{ + if err := r.MsgSender.DispatchConsoleResourceUpdates(mctx, t.ResourceUpdate{ ClusterName: r.Env.ClusterName, AccountName: r.Env.AccountName, Object: obj.Object, @@ -135,30 +120,48 @@ func (r *Reconciler) SendResourceEvents(ctx context.Context, obj *unstructured.U return ctrl.Result{}, err } } - default: - { - if err := r.MsgSender.DispatchResourceUpdates(mctx, t.ResourceUpdate{ - ClusterName: r.Env.ClusterName, - AccountName: r.Env.AccountName, - Object: obj.Object, - }); err != nil { - return ctrl.Result{}, err - } + case belongsTo(""): + if err := r.MsgSender.DispatchConsoleResourceUpdates(mctx, t.ResourceUpdate{ + ClusterName: r.Env.ClusterName, + AccountName: r.Env.AccountName, + Object: obj.Object, + }); err != nil { + return ctrl.Result{}, err } } + return ctrl.Result{}, nil +} + +func (r *Reconciler) SendResourceEvents(ctx context.Context, obj *unstructured.Unstructured, logger logging.Logger) (ctrl.Result, error) { + obj.SetManagedFields(nil) + if obj.GetDeletionTimestamp() != nil { + // resource is about to be deleted + if t.HasOtherKloudliteFinalizers(obj) { + // 1. send deleting event + obj.Object[t.ResourceStatusKey] = t.ResourceStatusDeleting + return r.dispatchEvent(ctx, obj) + } + + // 2. send deleted event + obj.Object[t.ResourceStatusKey] = t.ResourceStatusDeleted if controllerutil.ContainsFinalizer(obj, constants.StatusWatcherFinalizer) { - return r.RemoveWatcherFinalizer(mctx, obj) + if rr, err := r.RemoveWatcherFinalizer(ctx, obj); err != nil { + return rr, err + } } - return ctrl.Result{}, nil + return r.dispatchEvent(ctx, obj) } if !controllerutil.ContainsFinalizer(obj, constants.StatusWatcherFinalizer) { - return r.AddWatcherFinalizer(mctx, obj) + if rr, err := r.AddWatcherFinalizer(ctx, obj); err != nil { + return rr, err + } } - return ctrl.Result{}, nil + obj.Object[t.ResourceStatusKey] = t.ResourceStatusUpdated + return r.dispatchEvent(ctx, obj) } // +kubebuilder:rbac:groups=watcher.kloudlite.io,resources=statuswatchers,verbs=get;list;watch;create;update;patch;delete @@ -213,6 +216,19 @@ func (r *Reconciler) RemoveWatcherFinalizer(ctx context.Context, obj client.Obje return ctrl.Result{}, r.Update(ctx, obj) } +type WatchResource struct { + metav1.TypeMeta `json:",inline"` +} + +func NewWatchResource(apiVersion string, kind string) WatchResource { + return WatchResource{ + TypeMeta: metav1.TypeMeta{ + Kind: kind, + APIVersion: apiVersion, + }, + } +} + // SetupWithManager sets up the controllers with the Manager. func (r *Reconciler) SetupWithManager(mgr ctrl.Manager, logger logging.Logger) error { r.Client = mgr.GetClient() @@ -220,55 +236,36 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager, logger logging.Logger) e r.logger = logger.WithName(r.Name).WithKV("accountName", r.Env.AccountName).WithKV("clusterName", r.Env.ClusterName) builder := ctrl.NewControllerManagedBy(mgr) - builder.For(&crdsv1.Project{}) + builder.For(&corev1.Node{}) - nativeWatchList := []client.Object{ - &corev1.PersistentVolumeClaim{}, - &corev1.PersistentVolume{}, - &networkingv1.Ingress{}, - } + watchList := []WatchResource{ + NewWatchResource("crds.kloudlite.io/v1", "Project"), + NewWatchResource("crds.kloudlite.io/v1", "App"), + NewWatchResource("crds.kloudlite.io/v1", "ManagedService"), + NewWatchResource("crds.kloudlite.io/v1", "ManagedResource"), + NewWatchResource("crds.kloudlite.io/v1", "Workspace"), + NewWatchResource("crds.kloudlite.io/v1", "Router"), + NewWatchResource("clusters.kloudlite.io/v1", "NodePool"), + NewWatchResource("wireguard.kloudlite.io/v1", "Device"), - watchList := []client.Object{ - &crdsv1.Project{}, - &crdsv1.App{}, - &serverlessv1.Lambda{}, - &crdsv1.ManagedService{}, - &crdsv1.ManagedResource{}, - &crdsv1.Router{}, - &crdsv1.Workspace{}, - &crdsv1.Config{}, - &crdsv1.Secret{}, - - &clustersv1.Cluster{}, - &clustersv1.NodePool{}, - - &wireguardv1.Device{}, - } - for _, object := range nativeWatchList { - builder.Watches( - object, - handler.EnqueueRequestsFromMapFunc( - func(ctx context.Context, obj client.Object) []reconcile.Request { - return []reconcile.Request{ - {NamespacedName: fn.NN(obj.GetNamespace(), obj.GetName())}, - } - }, - ), - ) + // native resources + NewWatchResource("v1", "PersistentVolumeClaim"), + NewWatchResource("v1", "PersistentVolume"), + NewWatchResource("networking.k8s.io/v1", "Ingress"), } - for _, object := range watchList { + + for i := range watchList { builder.Watches( - object, + fn.NewUnstructured(watchList[i].TypeMeta), handler.EnqueueRequestsFromMapFunc( - func(ctx context.Context, obj client.Object) []reconcile.Request { - v, ok := obj.GetAnnotations()[constants.GVKKey] - if !ok { - return nil - } - b64Group := base64.StdEncoding.EncodeToString([]byte(v)) + func(_ context.Context, obj client.Object) []reconcile.Request { + gvk := watchList[i].GetObjectKind().GroupVersionKind().String() + + b64Group := base64.StdEncoding.EncodeToString([]byte(gvk)) if len(b64Group) == 0 { return nil } + wName, err := types.WrappedName{Name: obj.GetName(), Group: b64Group}.String() if err != nil { return nil diff --git a/operators/resource-watcher/internal/controllers/watch-and-update/grpc-message-sender.go b/operators/resource-watcher/internal/controllers/watch-and-update/grpc-message-sender.go index 5b214c72..6ed369f6 100644 --- a/operators/resource-watcher/internal/controllers/watch-and-update/grpc-message-sender.go +++ b/operators/resource-watcher/internal/controllers/watch-and-update/grpc-message-sender.go @@ -8,6 +8,7 @@ import ( "github.com/kloudlite/operator/grpc-interfaces/grpc/messages" "github.com/kloudlite/operator/operators/resource-watcher/internal/env" t "github.com/kloudlite/operator/operators/resource-watcher/types" + "github.com/kloudlite/operator/pkg/errors" "github.com/kloudlite/operator/pkg/logging" "google.golang.org/grpc" "google.golang.org/grpc/metadata" @@ -22,8 +23,8 @@ type grpcMsgSender struct { logger logging.Logger } -// DispatchInfraUpdates implements MessageSender. -func (g *grpcMsgSender) DispatchInfraUpdates(ctx context.Context, ru t.ResourceUpdate) error { +// DispatchInfraResourceUpdates implements MessageSender. +func (g *grpcMsgSender) DispatchInfraResourceUpdates(ctx context.Context, ru t.ResourceUpdate) error { b, err := json.Marshal(ru) if err != nil { return err @@ -55,8 +56,8 @@ func (g *grpcMsgSender) DispatchInfraUpdates(ctx context.Context, ru t.ResourceU } } -// DispatchResourceUpdates implements MessageSender. -func (g *grpcMsgSender) DispatchResourceUpdates(ctx context.Context, ru t.ResourceUpdate) error { +// DispatchConsoleResourceUpdates implements MessageSender. +func (g *grpcMsgSender) DispatchConsoleResourceUpdates(ctx context.Context, ru t.ResourceUpdate) error { b, err := json.Marshal(ru) if err != nil { return err @@ -102,7 +103,8 @@ func NewGRPCMessageSender(ctx context.Context, cc *grpc.ClientConn, ev *env.Env, } if validationOut == nil || !validationOut.Valid { - logger.Infof("accessToken is invalid, aborting") + err := errors.Newf("accessToken is invalid, aborting") + logger.Error(err) return nil, err } diff --git a/operators/resource-watcher/internal/controllers/watch-and-update/message-sender.go b/operators/resource-watcher/internal/controllers/watch-and-update/message-sender.go index cb673cf3..e747f5f9 100644 --- a/operators/resource-watcher/internal/controllers/watch-and-update/message-sender.go +++ b/operators/resource-watcher/internal/controllers/watch-and-update/message-sender.go @@ -7,7 +7,6 @@ import ( ) type MessageSender interface { - DispatchResourceUpdates(ctx context.Context, stu t.ResourceUpdate) error - DispatchInfraUpdates(ctx context.Context, stu t.ResourceUpdate) error + DispatchConsoleResourceUpdates(ctx context.Context, stu t.ResourceUpdate) error + DispatchInfraResourceUpdates(ctx context.Context, stu t.ResourceUpdate) error } - diff --git a/operators/resource-watcher/internal/env/env.go b/operators/resource-watcher/internal/env/env.go index 021cac01..2e9afc93 100644 --- a/operators/resource-watcher/internal/env/env.go +++ b/operators/resource-watcher/internal/env/env.go @@ -8,6 +8,7 @@ import ( type Env struct { ReconcilePeriod time.Duration `env:"RECONCILE_PERIOD" required:"true"` + IsDev bool `env:"IS_DEV"` MaxConcurrentReconciles int `env:"MAX_CONCURRENT_RECONCILES" required:"true"` AccountName string `env:"ACCOUNT_NAME" required:"true"` ClusterName string `env:"CLUSTER_NAME" required:"true"` diff --git a/operators/resource-watcher/main.go b/operators/resource-watcher/main.go index 6e9c7055..d44b55d6 100644 --- a/operators/resource-watcher/main.go +++ b/operators/resource-watcher/main.go @@ -8,9 +8,6 @@ import ( ) func main() { - var runningOnTenant bool - flag.BoolVar(&runningOnTenant, "running-on-tenant", false, "--running-on-tenant") - mgr := operator.New("resource-watcher") controller.RegisterInto(mgr) mgr.Start() diff --git a/operators/resource-watcher/types/types.go b/operators/resource-watcher/types/types.go index 083ff5c8..89a010ef 100644 --- a/operators/resource-watcher/types/types.go +++ b/operators/resource-watcher/types/types.go @@ -1,8 +1,42 @@ package types +import ( + "strings" + + "github.com/kloudlite/operator/pkg/constants" + "sigs.k8s.io/controller-runtime/pkg/client" +) + type ResourceUpdate struct { AuthToken string `json:"authToken"` AccountName string `json:"accountName"` ClusterName string `json:"clusterName"` Object map[string]any `json:"object"` } + +type ResourceStatus string + +const ( + ResourceStatusUpdated ResourceStatus = "updated" + ResourceStatusDeleting ResourceStatus = "deleting" + ResourceStatusDeleted ResourceStatus = "deleted" +) + +const ( + ResourceStatusKey string = "resource-watcher-resource-status" +) + +func HasOtherKloudliteFinalizers(obj client.Object) bool { + hasOtherKloudliteFinalizers := false + + for _, f := range obj.GetFinalizers() { + if strings.Contains(f, ".kloudlite.io") { + if f == constants.StatusWatcherFinalizer { + continue + } + hasOtherKloudliteFinalizers = true + } + } + + return hasOtherKloudliteFinalizers +} From 2158ef2174674ed57f1c81637c6e329011f257ad Mon Sep 17 00:00:00 2001 From: nxtcoder17 Date: Tue, 26 Dec 2023 16:41:34 +0530 Subject: [PATCH 2/2] fix(resource-watcher): cleanup imports --- operators/resource-watcher/main.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/operators/resource-watcher/main.go b/operators/resource-watcher/main.go index d44b55d6..7b36ff5e 100644 --- a/operators/resource-watcher/main.go +++ b/operators/resource-watcher/main.go @@ -1,8 +1,6 @@ package main import ( - "flag" - "github.com/kloudlite/operator/operator" "github.com/kloudlite/operator/operators/resource-watcher/controller" )