Skip to content
This repository was archived by the owner on Mar 24, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions pkg/controller/capsule_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"fmt"
"strconv"
"sync"
"time"

cmv1 "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1"
monitorv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
Expand All @@ -29,6 +31,7 @@ import (
"github.com/rigdev/rig/pkg/service/capabilities"
"github.com/rigdev/rig/pkg/service/objectstatus"
svc_pipeline "github.com/rigdev/rig/pkg/service/pipeline"
"go.uber.org/fx"
appsv1 "k8s.io/api/apps/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2"
batchv1 "k8s.io/api/batch/v1"
Expand All @@ -55,6 +58,9 @@ type CapsuleReconciler struct {
CapabilitiesService capabilities.Service
PipelineService svc_pipeline.Service
ObjectStatusService objectstatus.Service
Lifecycle fx.Lifecycle
initialize sync.WaitGroup
mgr ctrl.Manager
}

const (
Expand All @@ -70,6 +76,11 @@ const (
func (r *CapsuleReconciler) SetupWithManager(mgr ctrl.Manager) error {
ctx := context.TODO()

r.initialize.Add(1)
r.mgr = mgr

r.Lifecycle.Append(fx.StartHook(r.initializeReconciler))

if err := mgr.GetFieldIndexer().IndexField(
context.Background(),
&v1alpha2.Capsule{},
Expand Down Expand Up @@ -275,6 +286,28 @@ func findCapsulesForConfig(mgr ctrl.Manager) handler.MapFunc {
}
}

func (r *CapsuleReconciler) initializeReconciler(ctx context.Context) {
r.mgr.GetCache().WaitForCacheSync(ctx)

for {
var capsules v1alpha2.CapsuleList
if err := r.mgr.GetCache().List(ctx, &capsules); err != nil {
r.mgr.GetLogger().Error(err, "error getting initial capsule list")
time.Sleep(1 * time.Second)
continue
}

for _, capsule := range capsules.Items {
r.ObjectStatusService.RegisterCapsule(capsule.GetNamespace(), capsule.GetName())
}

break
}

r.ObjectStatusService.CapsulesInitialized()
r.initialize.Done()
}

//+kubebuilder:rbac:groups=rig.dev,resources=capsules,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=rig.dev,resources=capsules/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=rig.dev,resources=capsules/finalizers,verbs=update
Expand All @@ -289,6 +322,8 @@ func findCapsulesForConfig(mgr ctrl.Manager) handler.MapFunc {
// actual cluster state, and then performs operations to make the cluster state
// reflect the state specified by the Capsule.
func (r *CapsuleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
r.initialize.Wait()

// TODO: use rig logger
log := log.FromContext(ctx)
log.Info("reconciliation started")
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/plugin/external_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type pluginExecutor struct {
binaryPath string
args []string
tag string
id uuid.UUID
}

func newPluginExecutor(
Expand All @@ -50,6 +51,7 @@ func newPluginExecutor(
binaryPath: path,
args: args,
tag: tag,
id: uuid.New(),
}

return p, p.start(context.Background(), pluginConfig, restConfig)
Expand Down Expand Up @@ -126,7 +128,7 @@ func (p *pluginExecutor) WatchObjectStatus(
capsule string,
callback pipeline.ObjectStatusCallback,
) error {
return p.pluginClient.WatchObjectStatus(ctx, namespace, capsule, callback)
return p.pluginClient.WatchObjectStatus(ctx, namespace, capsule, callback, p.id)
}

type rigOperatorPlugin struct {
Expand Down Expand Up @@ -214,6 +216,7 @@ func (m *pluginClient) WatchObjectStatus(
namespace string,
capsule string,
callback pipeline.ObjectStatusCallback,
pluginID uuid.UUID,
) error {
c, err := m.client.WatchObjectStatus(ctx, &apiplugin.WatchObjectStatusRequest{
Namespace: namespace,
Expand All @@ -223,15 +226,13 @@ func (m *pluginClient) WatchObjectStatus(
return err
}

id := uuid.New()

for {
res, err := c.Recv()
if err != nil {
return err
}

callback.UpdateStatus(namespace, capsule, id, res.GetChange())
callback.UpdateStatus(namespace, capsule, pluginID, res.GetChange())
}
}

Expand Down
20 changes: 20 additions & 0 deletions pkg/controller/plugin/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (

"github.com/go-logr/logr"
"github.com/gobwas/glob"
"github.com/rigdev/rig-go-api/operator/api/v1/plugin"
"github.com/rigdev/rig/pkg/api/config/v1alpha1"
"github.com/rigdev/rig/pkg/errors"
"github.com/rigdev/rig/pkg/pipeline"
"github.com/rigdev/rig/pkg/uuid"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
)
Expand Down Expand Up @@ -70,15 +72,25 @@ func (s *Step) WatchObjectStatus(

// TODO: We need annotations here.
if !s.matcher.Match(namespace, capsule, nil) {
for _, p := range s.plugins {
callback.UpdateStatus(namespace, capsule, p.id, &plugin.ObjectStatusChange{
Change: &plugin.ObjectStatusChange_Checkpoint_{},
})
}
return nil
}

for _, p := range s.plugins {
wg.Add(1)

go func(p *pluginExecutor) {
err := p.WatchObjectStatus(ctx, namespace, capsule, callback)
if !errors.IsUnimplemented(err) {
s.logger.Error(err, "error getting status")
} else {
callback.UpdateStatus(namespace, capsule, p.id, &plugin.ObjectStatusChange{
Change: &plugin.ObjectStatusChange_Checkpoint_{},
})
}
wg.Done()
}(p)
Expand All @@ -89,6 +101,14 @@ func (s *Step) WatchObjectStatus(
return nil
}

func (s *Step) PluginIDs() []uuid.UUID {
var plugins []uuid.UUID
for _, p := range s.plugins {
plugins = append(plugins, p.id)
}
return plugins
}

type Matcher struct {
namespaces []glob.Glob
capsules []glob.Glob
Expand Down
9 changes: 9 additions & 0 deletions pkg/controller/plugin/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ func (w *capsuleWatcher) flush() {
}:
case <-w.ctx.Done():
}
select {
case w.c <- &apiplugin.ObjectStatusChange{
Change: &apiplugin.ObjectStatusChange_Checkpoint_{
Checkpoint: &apiplugin.ObjectStatusChange_Checkpoint{},
},
}:
case <-w.ctx.Done():
}

w.initialized = true
}
Expand All @@ -90,6 +98,7 @@ func (w *capsuleWatcher) updated(os *apipipeline.ObjectStatus) {
}

func (w *capsuleWatcher) deleted(or *apipipeline.ObjectRef) {
// TODO: This is probably not a good idea - delay instead?.
w.flush()
select {
case w.c <- &apiplugin.ObjectStatusChange{
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller/project_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
configv1alpha1 "github.com/rigdev/rig/pkg/api/config/v1alpha1"
"github.com/rigdev/rig/pkg/api/v1alpha2"
"github.com/rigdev/rig/pkg/pipeline"
"github.com/rigdev/rig/pkg/uuid"
corev1 "k8s.io/api/core/v1"
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
kerrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -102,3 +103,7 @@ func (s namespaceStep) WatchObjectStatus(
) error {
return nil
}

func (s namespaceStep) PluginIDs() []uuid.UUID {
return nil
}
3 changes: 3 additions & 0 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/rigdev/rig/pkg/service/config"
"github.com/rigdev/rig/pkg/service/objectstatus"
"github.com/rigdev/rig/pkg/service/pipeline"
"go.uber.org/fx"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -36,6 +37,7 @@ func New(
objectstatus objectstatus.Service,
restConfig *rest.Config,
logger logr.Logger,
lc fx.Lifecycle,
) (manager.Manager, error) {
cfg := cfgS.Operator()

Expand All @@ -60,6 +62,7 @@ func New(
CapabilitiesService: capabilitiesService,
PipelineService: pipeline,
ObjectStatusService: objectstatus,
Lifecycle: lc,
}

if err := cr.SetupWithManager(mgr); err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/pipeline/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ type ObjectStatusCallback interface {
type Step[T Request] interface {
Apply(ctx context.Context, req T) error
WatchObjectStatus(ctx context.Context, namespace, capsule string, callback ObjectStatusCallback) error
PluginIDs() []uuid.UUID
}
74 changes: 60 additions & 14 deletions pkg/service/objectstatus/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package objectstatus

import (
"context"
"fmt"
"slices"
"sync"
"time"
Expand All @@ -24,6 +23,7 @@ type Service interface {
// TODO: Adopt iterators.
Watch(ctx context.Context, namespace string, c chan<- *apipipeline.ObjectStatusChange) error

CapsulesInitialized()
RegisterCapsule(namespace string, capsule string)
UnregisterCapsule(namespace string, capsule string)

Expand All @@ -46,19 +46,23 @@ func NewService(
}

type service struct {
cfg *v1alpha1.OperatorConfig
logger logr.Logger
pipeline svc_pipeline.Service
lock sync.RWMutex
capsules map[string]map[string]*capsuleCache
watchers []*watcher
cfg *v1alpha1.OperatorConfig
logger logr.Logger
pipeline svc_pipeline.Service
lock sync.RWMutex
capsules map[string]map[string]*capsuleCache
watchers []*watcher
initialized bool
}

func (s *service) runForCapsule(ctx context.Context, namespace, capsule string) {
func (s *service) runForCapsule(ctx context.Context, namespace string, c *capsuleCache) {
p := s.pipeline.GetDefaultPipeline()

for _, step := range p.Steps() {
go s.runStepForCapsule(ctx, namespace, capsule, step)
for _, pluginID := range step.PluginIDs() {
c.plugins[pluginID] = false
}
go s.runStepForCapsule(ctx, namespace, c.capsule, step)
}
}

Expand Down Expand Up @@ -95,7 +99,8 @@ func (s *service) Watch(ctx context.Context, namespace string, c chan<- *apipipe

// Keep lock (and unlock in go-routine) when all statuses are read.
go func() {
s.readStatusForNamespace(w.namespace, w)
s.readStatusForNamespace(namespace, w)
s.sendCheckpoint(namespace, []*watcher{w})
s.lock.Unlock()
}()

Expand All @@ -116,6 +121,16 @@ func (s *service) readStatusForNamespace(namespace string, w *watcher) []*apipip
return res
}

func (s *service) CapsulesInitialized() {
// Initialized!
s.lock.Lock()
defer s.lock.Unlock()
s.initialized = true
for namespace := range s.capsules {
s.sendCheckpoint(namespace, s.watchers)
}
}

func (s *service) RegisterCapsule(namespace string, capsule string) {
if s.cfg.EnableObjectStatusCache == nil || !*s.cfg.EnableObjectStatusCache {
return
Expand All @@ -132,14 +147,15 @@ func (s *service) RegisterCapsule(namespace string, capsule string) {

if _, ok := cs[capsule]; !ok {
ctx, cancel := context.WithCancel(context.Background())
cs[capsule] = &capsuleCache{
c := &capsuleCache{
plugins: map[uuid.UUID]bool{},
capsule: capsule,
cancel: cancel,
objects: map[pipeline.ObjectKey]*objectCache{},
}
cs[capsule] = c

fmt.Println("RUN FOR CAPSULE", namespace, capsule)
s.runForCapsule(ctx, namespace, capsule)
s.runForCapsule(ctx, namespace, c)
}
}

Expand Down Expand Up @@ -202,9 +218,34 @@ func (s *service) UpdateStatus(
}
}
}

if change.GetCheckpoint() != nil {
s.sendCheckpoint(namespace, s.watchers)
}

s.lock.RUnlock()
}

func (s *service) sendCheckpoint(namespace string, watchers []*watcher) {
if !s.initialized {
return
}

for _, c := range s.capsules[namespace] {
for _, initialized := range c.plugins {
if !initialized {
return
}
}
}

for _, w := range watchers {
if w.namespace == namespace {
w.checkpoint()
}
}
}

func (s *service) getCapsule(namespace string, capsule string) *capsuleCache {
s.lock.RLock()
defer s.lock.RUnlock()
Expand All @@ -223,8 +264,10 @@ func (s *service) getCapsule(namespace string, capsule string) *capsuleCache {
}

type capsuleCache struct {
lock sync.RWMutex
// This property is owned by the service.
plugins map[uuid.UUID]bool

lock sync.RWMutex
capsule string
objects map[pipeline.ObjectKey]*objectCache
cancel context.CancelFunc
Expand Down Expand Up @@ -311,6 +354,9 @@ func (c *capsuleCache) update(pluginID uuid.UUID, change *apiplugin.ObjectStatus
delete(c.objects, key)
}
}

case *apiplugin.ObjectStatusChange_Checkpoint_:
c.plugins[pluginID] = true
}

return keys
Expand Down
Loading