diff --git a/pkg/controller/capsule_controller.go b/pkg/controller/capsule_controller.go index 3e1e4b461..8a50d67e0 100644 --- a/pkg/controller/capsule_controller.go +++ b/pkg/controller/capsule_controller.go @@ -20,6 +20,8 @@ import ( "context" "fmt" "strconv" + "sync" + "time" cmv1 "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1" monitorv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" @@ -29,6 +31,7 @@ import ( "github.com/rigdev/rig/pkg/service/capabilities" "github.com/rigdev/rig/pkg/service/objectstatus" svc_pipeline "github.com/rigdev/rig/pkg/service/pipeline" + "go.uber.org/fx" appsv1 "k8s.io/api/apps/v1" autoscalingv2 "k8s.io/api/autoscaling/v2" batchv1 "k8s.io/api/batch/v1" @@ -55,6 +58,9 @@ type CapsuleReconciler struct { CapabilitiesService capabilities.Service PipelineService svc_pipeline.Service ObjectStatusService objectstatus.Service + Lifecycle fx.Lifecycle + initialize sync.WaitGroup + mgr ctrl.Manager } const ( @@ -70,6 +76,11 @@ const ( func (r *CapsuleReconciler) SetupWithManager(mgr ctrl.Manager) error { ctx := context.TODO() + r.initialize.Add(1) + r.mgr = mgr + + r.Lifecycle.Append(fx.StartHook(r.initializeReconciler)) + if err := mgr.GetFieldIndexer().IndexField( context.Background(), &v1alpha2.Capsule{}, @@ -275,6 +286,28 @@ func findCapsulesForConfig(mgr ctrl.Manager) handler.MapFunc { } } +func (r *CapsuleReconciler) initializeReconciler(ctx context.Context) { + r.mgr.GetCache().WaitForCacheSync(ctx) + + for { + var capsules v1alpha2.CapsuleList + if err := r.mgr.GetCache().List(ctx, &capsules); err != nil { + r.mgr.GetLogger().Error(err, "error getting initial capsule list") + time.Sleep(1 * time.Second) + continue + } + + for _, capsule := range capsules.Items { + r.ObjectStatusService.RegisterCapsule(capsule.GetNamespace(), capsule.GetName()) + } + + break + } + + r.ObjectStatusService.CapsulesInitialized() + r.initialize.Done() +} + //+kubebuilder:rbac:groups=rig.dev,resources=capsules,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=rig.dev,resources=capsules/status,verbs=get;update;patch //+kubebuilder:rbac:groups=rig.dev,resources=capsules/finalizers,verbs=update @@ -289,6 +322,8 @@ func findCapsulesForConfig(mgr ctrl.Manager) handler.MapFunc { // actual cluster state, and then performs operations to make the cluster state // reflect the state specified by the Capsule. func (r *CapsuleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + r.initialize.Wait() + // TODO: use rig logger log := log.FromContext(ctx) log.Info("reconciliation started") diff --git a/pkg/controller/plugin/external_plugin.go b/pkg/controller/plugin/external_plugin.go index 3c354c902..59d270753 100644 --- a/pkg/controller/plugin/external_plugin.go +++ b/pkg/controller/plugin/external_plugin.go @@ -32,6 +32,7 @@ type pluginExecutor struct { binaryPath string args []string tag string + id uuid.UUID } func newPluginExecutor( @@ -50,6 +51,7 @@ func newPluginExecutor( binaryPath: path, args: args, tag: tag, + id: uuid.New(), } return p, p.start(context.Background(), pluginConfig, restConfig) @@ -126,7 +128,7 @@ func (p *pluginExecutor) WatchObjectStatus( capsule string, callback pipeline.ObjectStatusCallback, ) error { - return p.pluginClient.WatchObjectStatus(ctx, namespace, capsule, callback) + return p.pluginClient.WatchObjectStatus(ctx, namespace, capsule, callback, p.id) } type rigOperatorPlugin struct { @@ -214,6 +216,7 @@ func (m *pluginClient) WatchObjectStatus( namespace string, capsule string, callback pipeline.ObjectStatusCallback, + pluginID uuid.UUID, ) error { c, err := m.client.WatchObjectStatus(ctx, &apiplugin.WatchObjectStatusRequest{ Namespace: namespace, @@ -223,15 +226,13 @@ func (m *pluginClient) WatchObjectStatus( return err } - id := uuid.New() - for { res, err := c.Recv() if err != nil { return err } - callback.UpdateStatus(namespace, capsule, id, res.GetChange()) + callback.UpdateStatus(namespace, capsule, pluginID, res.GetChange()) } } diff --git a/pkg/controller/plugin/step.go b/pkg/controller/plugin/step.go index 4a28d6ff4..308f8b013 100644 --- a/pkg/controller/plugin/step.go +++ b/pkg/controller/plugin/step.go @@ -6,9 +6,11 @@ import ( "github.com/go-logr/logr" "github.com/gobwas/glob" + "github.com/rigdev/rig-go-api/operator/api/v1/plugin" "github.com/rigdev/rig/pkg/api/config/v1alpha1" "github.com/rigdev/rig/pkg/errors" "github.com/rigdev/rig/pkg/pipeline" + "github.com/rigdev/rig/pkg/uuid" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" ) @@ -70,15 +72,25 @@ func (s *Step) WatchObjectStatus( // TODO: We need annotations here. if !s.matcher.Match(namespace, capsule, nil) { + for _, p := range s.plugins { + callback.UpdateStatus(namespace, capsule, p.id, &plugin.ObjectStatusChange{ + Change: &plugin.ObjectStatusChange_Checkpoint_{}, + }) + } return nil } for _, p := range s.plugins { wg.Add(1) + go func(p *pluginExecutor) { err := p.WatchObjectStatus(ctx, namespace, capsule, callback) if !errors.IsUnimplemented(err) { s.logger.Error(err, "error getting status") + } else { + callback.UpdateStatus(namespace, capsule, p.id, &plugin.ObjectStatusChange{ + Change: &plugin.ObjectStatusChange_Checkpoint_{}, + }) } wg.Done() }(p) @@ -89,6 +101,14 @@ func (s *Step) WatchObjectStatus( return nil } +func (s *Step) PluginIDs() []uuid.UUID { + var plugins []uuid.UUID + for _, p := range s.plugins { + plugins = append(plugins, p.id) + } + return plugins +} + type Matcher struct { namespaces []glob.Glob capsules []glob.Glob diff --git a/pkg/controller/plugin/watcher.go b/pkg/controller/plugin/watcher.go index bde3e7b33..abc49e7cf 100644 --- a/pkg/controller/plugin/watcher.go +++ b/pkg/controller/plugin/watcher.go @@ -66,6 +66,14 @@ func (w *capsuleWatcher) flush() { }: case <-w.ctx.Done(): } + select { + case w.c <- &apiplugin.ObjectStatusChange{ + Change: &apiplugin.ObjectStatusChange_Checkpoint_{ + Checkpoint: &apiplugin.ObjectStatusChange_Checkpoint{}, + }, + }: + case <-w.ctx.Done(): + } w.initialized = true } @@ -90,6 +98,7 @@ func (w *capsuleWatcher) updated(os *apipipeline.ObjectStatus) { } func (w *capsuleWatcher) deleted(or *apipipeline.ObjectRef) { + // TODO: This is probably not a good idea - delay instead?. w.flush() select { case w.c <- &apiplugin.ObjectStatusChange{ diff --git a/pkg/controller/project_controller.go b/pkg/controller/project_controller.go index 824cc990d..b88d8c67a 100644 --- a/pkg/controller/project_controller.go +++ b/pkg/controller/project_controller.go @@ -8,6 +8,7 @@ import ( configv1alpha1 "github.com/rigdev/rig/pkg/api/config/v1alpha1" "github.com/rigdev/rig/pkg/api/v1alpha2" "github.com/rigdev/rig/pkg/pipeline" + "github.com/rigdev/rig/pkg/uuid" corev1 "k8s.io/api/core/v1" "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" kerrors "k8s.io/apimachinery/pkg/api/errors" @@ -102,3 +103,7 @@ func (s namespaceStep) WatchObjectStatus( ) error { return nil } + +func (s namespaceStep) PluginIDs() []uuid.UUID { + return nil +} diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 40c143747..659b3c351 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -12,6 +12,7 @@ import ( "github.com/rigdev/rig/pkg/service/config" "github.com/rigdev/rig/pkg/service/objectstatus" "github.com/rigdev/rig/pkg/service/pipeline" + "go.uber.org/fx" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/rest" ctrl "sigs.k8s.io/controller-runtime" @@ -36,6 +37,7 @@ func New( objectstatus objectstatus.Service, restConfig *rest.Config, logger logr.Logger, + lc fx.Lifecycle, ) (manager.Manager, error) { cfg := cfgS.Operator() @@ -60,6 +62,7 @@ func New( CapabilitiesService: capabilitiesService, PipelineService: pipeline, ObjectStatusService: objectstatus, + Lifecycle: lc, } if err := cr.SetupWithManager(mgr); err != nil { diff --git a/pkg/pipeline/step.go b/pkg/pipeline/step.go index 982fc9b39..d4d3a6ad3 100644 --- a/pkg/pipeline/step.go +++ b/pkg/pipeline/step.go @@ -14,4 +14,5 @@ type ObjectStatusCallback interface { type Step[T Request] interface { Apply(ctx context.Context, req T) error WatchObjectStatus(ctx context.Context, namespace, capsule string, callback ObjectStatusCallback) error + PluginIDs() []uuid.UUID } diff --git a/pkg/service/objectstatus/service.go b/pkg/service/objectstatus/service.go index 8f560836b..aa3f8c3f7 100644 --- a/pkg/service/objectstatus/service.go +++ b/pkg/service/objectstatus/service.go @@ -2,7 +2,6 @@ package objectstatus import ( "context" - "fmt" "slices" "sync" "time" @@ -24,6 +23,7 @@ type Service interface { // TODO: Adopt iterators. Watch(ctx context.Context, namespace string, c chan<- *apipipeline.ObjectStatusChange) error + CapsulesInitialized() RegisterCapsule(namespace string, capsule string) UnregisterCapsule(namespace string, capsule string) @@ -46,19 +46,23 @@ func NewService( } type service struct { - cfg *v1alpha1.OperatorConfig - logger logr.Logger - pipeline svc_pipeline.Service - lock sync.RWMutex - capsules map[string]map[string]*capsuleCache - watchers []*watcher + cfg *v1alpha1.OperatorConfig + logger logr.Logger + pipeline svc_pipeline.Service + lock sync.RWMutex + capsules map[string]map[string]*capsuleCache + watchers []*watcher + initialized bool } -func (s *service) runForCapsule(ctx context.Context, namespace, capsule string) { +func (s *service) runForCapsule(ctx context.Context, namespace string, c *capsuleCache) { p := s.pipeline.GetDefaultPipeline() for _, step := range p.Steps() { - go s.runStepForCapsule(ctx, namespace, capsule, step) + for _, pluginID := range step.PluginIDs() { + c.plugins[pluginID] = false + } + go s.runStepForCapsule(ctx, namespace, c.capsule, step) } } @@ -95,7 +99,8 @@ func (s *service) Watch(ctx context.Context, namespace string, c chan<- *apipipe // Keep lock (and unlock in go-routine) when all statuses are read. go func() { - s.readStatusForNamespace(w.namespace, w) + s.readStatusForNamespace(namespace, w) + s.sendCheckpoint(namespace, []*watcher{w}) s.lock.Unlock() }() @@ -116,6 +121,16 @@ func (s *service) readStatusForNamespace(namespace string, w *watcher) []*apipip return res } +func (s *service) CapsulesInitialized() { + // Initialized! + s.lock.Lock() + defer s.lock.Unlock() + s.initialized = true + for namespace := range s.capsules { + s.sendCheckpoint(namespace, s.watchers) + } +} + func (s *service) RegisterCapsule(namespace string, capsule string) { if s.cfg.EnableObjectStatusCache == nil || !*s.cfg.EnableObjectStatusCache { return @@ -132,14 +147,15 @@ func (s *service) RegisterCapsule(namespace string, capsule string) { if _, ok := cs[capsule]; !ok { ctx, cancel := context.WithCancel(context.Background()) - cs[capsule] = &capsuleCache{ + c := &capsuleCache{ + plugins: map[uuid.UUID]bool{}, capsule: capsule, cancel: cancel, objects: map[pipeline.ObjectKey]*objectCache{}, } + cs[capsule] = c - fmt.Println("RUN FOR CAPSULE", namespace, capsule) - s.runForCapsule(ctx, namespace, capsule) + s.runForCapsule(ctx, namespace, c) } } @@ -202,9 +218,34 @@ func (s *service) UpdateStatus( } } } + + if change.GetCheckpoint() != nil { + s.sendCheckpoint(namespace, s.watchers) + } + s.lock.RUnlock() } +func (s *service) sendCheckpoint(namespace string, watchers []*watcher) { + if !s.initialized { + return + } + + for _, c := range s.capsules[namespace] { + for _, initialized := range c.plugins { + if !initialized { + return + } + } + } + + for _, w := range watchers { + if w.namespace == namespace { + w.checkpoint() + } + } +} + func (s *service) getCapsule(namespace string, capsule string) *capsuleCache { s.lock.RLock() defer s.lock.RUnlock() @@ -223,8 +264,10 @@ func (s *service) getCapsule(namespace string, capsule string) *capsuleCache { } type capsuleCache struct { - lock sync.RWMutex + // This property is owned by the service. + plugins map[uuid.UUID]bool + lock sync.RWMutex capsule string objects map[pipeline.ObjectKey]*objectCache cancel context.CancelFunc @@ -311,6 +354,9 @@ func (c *capsuleCache) update(pluginID uuid.UUID, change *apiplugin.ObjectStatus delete(c.objects, key) } } + + case *apiplugin.ObjectStatusChange_Checkpoint_: + c.plugins[pluginID] = true } return keys diff --git a/pkg/service/objectstatus/watcher.go b/pkg/service/objectstatus/watcher.go index 3c646907a..3a33e62e9 100644 --- a/pkg/service/objectstatus/watcher.go +++ b/pkg/service/objectstatus/watcher.go @@ -8,10 +8,11 @@ import ( ) type watcher struct { - namespace string - c chan<- *apipipeline.ObjectStatusChange - queue []*apipipeline.ObjectStatusChange - cond *sync.Cond + namespace string + c chan<- *apipipeline.ObjectStatusChange + queue []*apipipeline.ObjectStatusChange + cond *sync.Cond + sentCheckpoint bool } func newWatcher(namespace string, c chan<- *apipipeline.ObjectStatusChange) *watcher { @@ -64,3 +65,18 @@ func (w *watcher) pushChange(change *apipipeline.ObjectStatusChange) { w.cond.Signal() w.cond.L.Unlock() } + +func (w *watcher) checkpoint() { + w.cond.L.Lock() + defer w.cond.L.Unlock() + if w.sentCheckpoint { + return + } + w.sentCheckpoint = true + w.queue = append(w.queue, &apipipeline.ObjectStatusChange{ + Change: &apipipeline.ObjectStatusChange_Checkpoint_{ + Checkpoint: &apipipeline.ObjectStatusChange_Checkpoint{}, + }, + }) + w.cond.Signal() +} diff --git a/proto/rig/operator/api/v1/pipeline/object_status.proto b/proto/rig/operator/api/v1/pipeline/object_status.proto new file mode 100644 index 000000000..7e77a2a0a --- /dev/null +++ b/proto/rig/operator/api/v1/pipeline/object_status.proto @@ -0,0 +1,59 @@ +syntax = "proto3"; + +package api.v1.pipeline; + +import "google/protobuf/timestamp.proto"; + +message ObjectStatusChange { + string capsule = 1; + + message Checkpoint {} + + oneof change { + ObjectStatus updated = 2; + ObjectRef deleted = 3; + // Checkpoint events indicates that all existing object statuses for + // this namespace has been sent in the current "watch" session. + Checkpoint checkpoint = 4; + } +} + +enum ObjectType { + OBJECT_TYPE_UNSPECIFIED = 0; + OBJECT_TYPE_PRIMARY = 1; + OBJECT_TYPE_SECONDARY = 2; + OBJECT_TYPE_POD = 3; +} + +message ObjectStatus { + ObjectRef object_ref = 1; + ObjectType type = 2; + repeated ObjectCondition conditions = 3; + map properties = 4; +} + +enum ObjectState { + OBJECT_STATE_UNSPECIFIED = 0; + OBJECT_STATE_HEALTHY = 1; + OBJECT_STATE_PENDING = 2; + OBJECT_STATE_ERROR = 3; +} + +message ObjectCondition { + string name = 1; + google.protobuf.Timestamp updated_at = 2; + ObjectState state = 3; + string message = 4; +} + +message GVK { + string group = 1; + string version = 2; + string kind = 3; +} + +message ObjectRef { + GVK gvk = 1; + string namespace = 2; + string name = 3; +} diff --git a/proto/rig/operator/api/v1/pipeline/service.proto b/proto/rig/operator/api/v1/pipeline/service.proto index f2997099b..4efddc403 100644 --- a/proto/rig/operator/api/v1/pipeline/service.proto +++ b/proto/rig/operator/api/v1/pipeline/service.proto @@ -2,7 +2,7 @@ syntax = "proto3"; package api.v1.pipeline; -import "google/protobuf/timestamp.proto"; +import "operator/api/v1/pipeline/object_status.proto"; // The service for interacting with the operator pipeline service Service { @@ -20,43 +20,6 @@ message WatchObjectStatusResponse { ObjectStatusChange change = 1; } -message ObjectStatusChange { - string capsule = 1; - - oneof change { - ObjectStatus updated = 2; - ObjectRef deleted = 3; - } -} - -enum ObjectType { - OBJECT_TYPE_UNSPECIFIED = 0; - OBJECT_TYPE_PRIMARY = 1; - OBJECT_TYPE_SECONDARY = 2; - OBJECT_TYPE_POD = 3; -} - -message ObjectStatus { - ObjectRef object_ref = 1; - ObjectType type = 2; - repeated ObjectCondition conditions = 3; - map properties = 4; -} - -enum ObjectState { - OBJECT_STATE_UNSPECIFIED = 0; - OBJECT_STATE_HEALTHY = 1; - OBJECT_STATE_PENDING = 2; - OBJECT_STATE_ERROR = 3; -} - -message ObjectCondition { - string name = 1; - google.protobuf.Timestamp updated_at = 2; - ObjectState state = 3; - string message = 4; -} - message DryRunRequest { string namespace = 1; string capsule = 2; @@ -96,15 +59,3 @@ message Object { string name = 2; string content = 3; } - -message GVK { - string group = 1; - string version = 2; - string kind = 3; -} - -message ObjectRef { - GVK gvk = 1; - string namespace = 2; - string name = 3; -} diff --git a/proto/rig/operator/api/v1/plugin/service.proto b/proto/rig/operator/api/v1/plugin/service.proto index 2ebdf6019..60579cafa 100644 --- a/proto/rig/operator/api/v1/plugin/service.proto +++ b/proto/rig/operator/api/v1/plugin/service.proto @@ -2,7 +2,7 @@ syntax = "proto3"; package api.v1.plugin; -import "operator/api/v1/pipeline/service.proto"; +import "operator/api/v1/pipeline/object_status.proto"; service PluginService { rpc Initialize(InitializeRequest) returns (InitializeResponse) {} @@ -33,10 +33,13 @@ message ObjectStatusChange { repeated api.v1.pipeline.ObjectStatus objects = 1; } + message Checkpoint {} + oneof change { AllObjects all_objects = 1; api.v1.pipeline.ObjectStatus updated = 2; api.v1.pipeline.ObjectRef deleted = 3; + Checkpoint checkpoint = 4; } } diff --git a/test/integration/k8s/plugin_suite_test.go b/test/integration/k8s/plugin_suite_test.go index bc6535fc0..ecf9dfc05 100644 --- a/test/integration/k8s/plugin_suite_test.go +++ b/test/integration/k8s/plugin_suite_test.go @@ -137,7 +137,6 @@ container: require.NoError(t, err) lc := fxtest.NewLifecycle(t) ps := pipeline.NewService(opConfig, cc, cs, ctrl.Log, pmanager, lc) - require.NoError(t, lc.Start(context.Background())) capsuleReconciler := &controller.CapsuleReconciler{ Client: manager.GetClient(), Scheme: scheme, @@ -145,6 +144,7 @@ container: CapabilitiesService: cs, PipelineService: ps, ObjectStatusService: objectstatus.NewService(opConfig, ps, ctrl.Log), + Lifecycle: lc, } require.NoError(t, capsuleReconciler.SetupWithManager(manager)) @@ -154,6 +154,8 @@ container: require.NoError(t, manager.Start(ctx)) }() + require.NoError(t, lc.Start(context.Background())) + s.cancel = cancel setupDone = true } diff --git a/test/integration/k8s/suite_test.go b/test/integration/k8s/suite_test.go index 3c2e35bb0..91300b9b1 100644 --- a/test/integration/k8s/suite_test.go +++ b/test/integration/k8s/suite_test.go @@ -122,7 +122,6 @@ portName: "metricsport"`, lc := fxtest.NewLifecycle(t) ps := pipeline.NewService(opConfig, cc, cs, ctrl.Log, pmanager, lc) - require.NoError(t, lc.Start(context.Background())) capsuleReconciler := &controller.CapsuleReconciler{ Client: manager.GetClient(), @@ -131,6 +130,7 @@ portName: "metricsport"`, CapabilitiesService: cs, PipelineService: ps, ObjectStatusService: objectstatus.NewService(opConfig, ps, ctrl.Log), + Lifecycle: lc, } require.NoError(t, capsuleReconciler.SetupWithManager(manager)) @@ -140,6 +140,8 @@ portName: "metricsport"`, require.NoError(t, manager.Start(ctx)) }() + require.NoError(t, lc.Start(context.Background())) + s.cancel = cancel setupDone = true }