Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/controller/apiresourceschema/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (r *Reconciler) reconcile(ctx context.Context, log *zap.SugaredLogger, pubR
if apierrors.IsNotFound(err) {
ars, err := kcp.CreateAPIResourceSchema(projectedCRD, arsName, r.agentName)
if err != nil {
return nil, fmt.Errorf("failed to create APIResourceSchema: %w", err)
return nil, fmt.Errorf("failed to construct APIResourceSchema: %w", err)
}

log.With("name", arsName).Info("Creating APIResourceSchema…")
Expand Down
117 changes: 93 additions & 24 deletions internal/controller/syncmanager/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ package syncmanager

import (
"context"
"errors"
"fmt"
"sync"

"go.uber.org/zap"

"github.com/kcp-dev/api-syncagent/internal/controller/sync"
controllersync "github.com/kcp-dev/api-syncagent/internal/controller/sync"
"github.com/kcp-dev/api-syncagent/internal/controllerutil"
"github.com/kcp-dev/api-syncagent/internal/controllerutil/predicate"
"github.com/kcp-dev/api-syncagent/internal/discovery"
Expand Down Expand Up @@ -91,12 +93,20 @@ type Reconciler struct {

// The provider based on the APIExport; like the vwManager, this is stopped and recreated
// whenever the APIExport's URL changes.
vwProvider *apiexportprovider.Provider
providerOnce sync.Once
vwProvider *apiexportprovider.Provider

syncWorkersLock sync.RWMutex
// A map of sync controllers, one for each PublishedResource, using their
// UIDs and resourceVersion as the map keys; using the version ensures that
// when a PR changes, the old controller is orphaned and will be shut down.
syncWorkers map[string]syncWorker

clustersLock sync.RWMutex
// A map of clusters that have been engaged with the shim layer. Since this
// reconciler dynamically starts and stops controllers, we need to keep track
// of clusters and engage them with sync controllers started at a later point in time.
clusters map[string]engagedCluster
}

type syncWorker struct {
Expand Down Expand Up @@ -135,7 +145,14 @@ func Add(
prFilter: prFilter,
stateNamespace: stateNamespace,
agentName: agentName,

providerOnce: sync.Once{},

syncWorkersLock: sync.RWMutex{},
syncWorkers: map[string]syncWorker{},

clustersLock: sync.RWMutex{},
clusters: make(map[string]engagedCluster),
}

bldr := builder.ControllerManagedBy(localManager).
Expand All @@ -161,7 +178,7 @@ func Add(
return err
}

func (r *Reconciler) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) {
func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
log := r.log.Named(ControllerName)
log.Debug("Processing")

Expand Down Expand Up @@ -232,9 +249,11 @@ func (r *Reconciler) reconcile(ctx context.Context, log *zap.SugaredLogger, vwUR
}

func (r *Reconciler) ensureManager(log *zap.SugaredLogger, vwURL string) error {
// Use the global app context so this provider is independent of the reconcile
// context, which might get cancelled right after Reconcile() is done.
r.vwManagerCtx, r.vwManagerCancel = context.WithCancel(r.ctx)
if r.vwManagerCtx == nil {
// Use the global app context so this provider is independent of the reconcile
// context, which might get cancelled right after Reconcile() is done.
r.vwManagerCtx, r.vwManagerCancel = context.WithCancel(r.ctx)
}

vwConfig := rest.CopyConfig(r.kcpRestConfig)
vwConfig.Host = vwURL
Expand Down Expand Up @@ -303,23 +322,47 @@ func (r *Reconciler) ensureManager(log *zap.SugaredLogger, vwURL string) error {
r.vwManager = manager
}

// start the provider
go func() {
// Use the global app context so this provider is independent of the reconcile
// context, which might get cancelled right after Reconcile() is done.
if err := r.vwProvider.Run(r.vwManagerCtx, r.vwManager); err != nil {
log.Fatalw("Failed to start apiexport provider.", zap.Error(err))
}
}()
r.providerOnce.Do(func() {
log.Debug("Starting virtual workspace provider…")
// start the provider
go func() {
// Use the global app context so this provider is independent of the reconcile
// context, which might get cancelled right after Reconcile() is done.
if err := r.vwProvider.Run(r.vwManagerCtx, r.vwManager); err != nil {
log.Fatalw("Failed to start apiexport provider", zap.Error(err))
}
}()
})

return nil
}

type engagedCluster struct {
ctx context.Context
cl cluster.Cluster
}

type controllerShim struct {
reconciler *Reconciler
}

func (s *controllerShim) Engage(ctx context.Context, clusterName string, cl cluster.Cluster) error {
if _, ok := s.reconciler.clusters[clusterName]; !ok {
s.reconciler.clustersLock.Lock()
s.reconciler.clusters[clusterName] = engagedCluster{ctx: ctx, cl: cl}
s.reconciler.clustersLock.Unlock()

// start a goroutine to make sure we remove the cluster when the context is done
go func() {
<-ctx.Done()
s.reconciler.clustersLock.Lock()
delete(s.reconciler.clusters, clusterName)
s.reconciler.clustersLock.Unlock()
}()
}

s.reconciler.syncWorkersLock.RLock()
defer s.reconciler.syncWorkersLock.RUnlock()
for _, worker := range s.reconciler.syncWorkers {
if err := worker.controller.Engage(ctx, clusterName, cl); err != nil {
return err
Expand Down Expand Up @@ -348,10 +391,17 @@ func (r *Reconciler) shutdown(log *zap.SugaredLogger) {
r.vwManagerCtx = nil
r.vwManagerCancel = nil
r.vwURL = ""
r.providerOnce = sync.Once{}

r.clustersLock.Lock()
r.clusters = make(map[string]engagedCluster)
r.clustersLock.Unlock()

// Free all workers; since their contexts are based on the manager's context,
// they have also been cancelled already above.
r.syncWorkers = nil
r.syncWorkersLock.Lock()
r.syncWorkers = make(map[string]syncWorker)
r.syncWorkersLock.Unlock()
}

func getPublishedResourceKey(pr *syncagentv1alpha1.PublishedResource) string {
Expand All @@ -373,7 +423,10 @@ func (r *Reconciler) ensureSyncControllers(ctx context.Context, log *zap.Sugared
log.Infow("Stopping sync controller…", "key", key)

worker.cancel()

r.syncWorkersLock.Lock()
delete(r.syncWorkers, key)
r.syncWorkersLock.Unlock()
}

// start missing controllers
Expand All @@ -386,13 +439,14 @@ func (r *Reconciler) ensureSyncControllers(ctx context.Context, log *zap.Sugared
continue
}

log.Infow("Starting new sync controller…", "key", key)

prlog := log.With("key", key, "name", pubRes.Name)
ctrlCtx, ctrlCancel := context.WithCancel(r.vwManagerCtx)

prlog.Info("Creating new sync controller…")

// create the sync controller;
// use the reconciler's log without any additional reconciling context
syncController, err := sync.Create(
syncController, err := controllersync.Create(
// This can be the reconciling context, as it's only used to find the target CRD during setup;
// this context *must not* be stored in the sync controller!
ctx,
Expand All @@ -410,18 +464,33 @@ func (r *Reconciler) ensureSyncControllers(ctx context.Context, log *zap.Sugared
return fmt.Errorf("failed to create sync controller: %w", err)
}

log.Infof("storing worker at %s", key)
r.syncWorkersLock.Lock()
r.syncWorkers[key] = syncWorker{
controller: syncController,
cancel: ctrlCancel,
}
r.syncWorkersLock.Unlock()

// let 'er rip (remember to use the long-lived context here)
if err := syncController.Start(ctrlCtx); err != nil {
ctrlCancel()
log.Info("deleting again")
go func() {
log.Infow("Starting sync controller…", "key", key)
if err := syncController.Start(ctrlCtx); err != nil && !errors.Is(err, context.Canceled) {
ctrlCancel()
prlog.Errorw("failed to start sync controller", zap.Error(err))
}

prlog.Debug("Stopped sync controller")

r.syncWorkersLock.Lock()
delete(r.syncWorkers, key)
return fmt.Errorf("failed to start sync controller: %w", err)
r.syncWorkersLock.Unlock()
}()

r.clustersLock.RLock()
defer r.clustersLock.RUnlock()
for name, ec := range r.clusters {
if err := syncController.Engage(ec.ctx, name, ec.cl); err != nil {
prlog.Errorw("failed to engage cluster", zap.Error(err), "cluster", name)
}
}
}

Expand Down
150 changes: 149 additions & 1 deletion test/e2e/sync/primary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestSyncSimpleObject(t *testing.T) {
"test/crds/crontab.yaml",
})

// publish Crontabs and Backups
// publish Crontabs
t.Logf("Publishing CRDs…")
prCrontabs := &syncagentv1alpha1.PublishedResource{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -839,3 +839,151 @@ spec:
t.Fatalf("Expected %s annotation to be %q, but is %q.", ann, teamClusterPath.String(), value)
}
}

func TestSyncMultiResources(t *testing.T) {
const (
apiExportName = "kcp.example.com"
kcpGroupName = "kcp.example.com"
orgWorkspace = "sync-multi-resources"
)

ctx := t.Context()
ctrlruntime.SetLogger(logr.Discard())

// setup a test environment in kcp
orgKubconfig := utils.CreateOrganization(t, ctx, orgWorkspace, apiExportName)

// start a service cluster
envtestKubeconfig, envtestClient, _ := utils.RunEnvtest(t, []string{
"test/crds/crontab.yaml",
})

// publish Crontabs and ConfigMaps
t.Logf("Publishing CRDs…")
prCrontabs := &syncagentv1alpha1.PublishedResource{
ObjectMeta: metav1.ObjectMeta{
Name: "publish-crontabs",
},
Spec: syncagentv1alpha1.PublishedResourceSpec{
Resource: syncagentv1alpha1.SourceResourceDescriptor{
APIGroup: "example.com",
Version: "v1",
Kind: "CronTab",
},
// These rules make finding the local object easier, but should not be used in production.
Naming: &syncagentv1alpha1.ResourceNaming{
Name: "{{ .Object.metadata.name }}",
Namespace: "synced-{{ .Object.metadata.namespace }}",
},
Projection: &syncagentv1alpha1.ResourceProjection{
Group: kcpGroupName,
},
},
}

prConfigMaps := &syncagentv1alpha1.PublishedResource{
ObjectMeta: metav1.ObjectMeta{
Name: "publish-configmaps",
},
Spec: syncagentv1alpha1.PublishedResourceSpec{
Resource: syncagentv1alpha1.SourceResourceDescriptor{
APIGroup: "",
Version: "v1",
Kind: "ConfigMap",
},
// These rules make finding the local object easier, but should not be used in production.
Naming: &syncagentv1alpha1.ResourceNaming{
Name: "{{ .Object.metadata.name }}",
Namespace: "synced-{{ .Object.metadata.namespace }}",
},
Projection: &syncagentv1alpha1.ResourceProjection{
Group: kcpGroupName,
Kind: "KubeConfigMap",
},
},
}

if err := envtestClient.Create(ctx, prCrontabs); err != nil {
t.Fatalf("Failed to create PublishedResource for CronTabs: %v", err)
}

if err := envtestClient.Create(ctx, prConfigMaps); err != nil {
t.Fatalf("Failed to create PublishedResource for ConfigMaps: %v", err)
}

// start the agent in the background to update the APIExport with the CronTabs API
utils.RunAgent(ctx, t, "bob", orgKubconfig, envtestKubeconfig, apiExportName)

// wait until the API is available
kcpClusterClient := utils.GetKcpAdminClusterClient(t)

teamClusterPath := logicalcluster.NewPath("root").Join(orgWorkspace).Join("team-1")
teamClient := kcpClusterClient.Cluster(teamClusterPath)

utils.WaitForBoundAPI(t, ctx, teamClient, schema.GroupVersionResource{
Group: kcpGroupName,
Version: "v1",
Resource: "crontabs",
})

// create a Crontab object in a team workspace
t.Log("Creating CronTab in kcp…")
crontab := utils.YAMLToUnstructured(t, `
apiVersion: kcp.example.com/v1
kind: CronTab
metadata:
namespace: default
name: my-crontab
spec:
cronSpec: '* * *'
image: ubuntu:latest
`)

if err := teamClient.Create(ctx, crontab); err != nil {
t.Fatalf("Failed to create CronTab in kcp: %v", err)
}

// wait for the agent to sync the object down into the service cluster

t.Logf("Wait for CronTab to be synced…")
copy := &unstructured.Unstructured{}
copy.SetAPIVersion("example.com/v1")
copy.SetKind("CronTab")

err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 30*time.Second, false, func(ctx context.Context) (done bool, err error) {
copyKey := types.NamespacedName{Namespace: "synced-default", Name: "my-crontab"}
return envtestClient.Get(ctx, copyKey, copy) == nil, nil
})
if err != nil {
t.Fatalf("Failed to wait for CronTab object to be synced down: %v", err)
}

// create a ConfigMap object in a team workspace
t.Log("Creating KubeConfigMap in kcp…")
configmap := utils.YAMLToUnstructured(t, `
apiVersion: kcp.example.com/v1
kind: KubeConfigMap
metadata:
namespace: default
name: my-configmap
data:
dummydata: dummydata
`)

if err := teamClient.Create(ctx, configmap); err != nil {
t.Fatalf("Failed to create KubeConfigMap in kcp: %v", err)
}

// wait for the agent to sync the object down into the service cluster

t.Logf("Wait for KubeConfigMap to be synced…")
copyCM := &corev1.ConfigMap{}

err = wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 30*time.Second, false, func(ctx context.Context) (done bool, err error) {
copyKey := types.NamespacedName{Namespace: "synced-default", Name: "my-configmap"}
return envtestClient.Get(ctx, copyKey, copyCM) == nil, nil
})
if err != nil {
t.Fatalf("Failed to wait for ConfigMap object to be synced down: %v", err)
}
}