diff --git a/internal/controller/apiresourceschema/controller.go b/internal/controller/apiresourceschema/controller.go index ed9dafb..f1bd0e0 100644 --- a/internal/controller/apiresourceschema/controller.go +++ b/internal/controller/apiresourceschema/controller.go @@ -174,7 +174,7 @@ func (r *Reconciler) reconcile(ctx context.Context, log *zap.SugaredLogger, pubR if apierrors.IsNotFound(err) { ars, err := kcp.CreateAPIResourceSchema(projectedCRD, arsName, r.agentName) if err != nil { - return nil, fmt.Errorf("failed to create APIResourceSchema: %w", err) + return nil, fmt.Errorf("failed to construct APIResourceSchema: %w", err) } log.With("name", arsName).Info("Creating APIResourceSchema…") diff --git a/internal/controller/syncmanager/controller.go b/internal/controller/syncmanager/controller.go index e159d44..4458f3a 100644 --- a/internal/controller/syncmanager/controller.go +++ b/internal/controller/syncmanager/controller.go @@ -18,11 +18,13 @@ package syncmanager import ( "context" + "errors" "fmt" + "sync" "go.uber.org/zap" - "github.com/kcp-dev/api-syncagent/internal/controller/sync" + controllersync "github.com/kcp-dev/api-syncagent/internal/controller/sync" "github.com/kcp-dev/api-syncagent/internal/controllerutil" "github.com/kcp-dev/api-syncagent/internal/controllerutil/predicate" "github.com/kcp-dev/api-syncagent/internal/discovery" @@ -91,12 +93,20 @@ type Reconciler struct { // The provider based on the APIExport; like the vwManager, this is stopped and recreated // whenever the APIExport's URL changes. - vwProvider *apiexportprovider.Provider + providerOnce sync.Once + vwProvider *apiexportprovider.Provider + syncWorkersLock sync.RWMutex // A map of sync controllers, one for each PublishedResource, using their // UIDs and resourceVersion as the map keys; using the version ensures that // when a PR changes, the old controller is orphaned and will be shut down. syncWorkers map[string]syncWorker + + clustersLock sync.RWMutex + // A map of clusters that have been engaged with the shim layer. Since this + // reconciler dynamically starts and stops controllers, we need to keep track + // of clusters and engage them with sync controllers started at a later point in time. + clusters map[string]engagedCluster } type syncWorker struct { @@ -135,7 +145,14 @@ func Add( prFilter: prFilter, stateNamespace: stateNamespace, agentName: agentName, + + providerOnce: sync.Once{}, + + syncWorkersLock: sync.RWMutex{}, syncWorkers: map[string]syncWorker{}, + + clustersLock: sync.RWMutex{}, + clusters: make(map[string]engagedCluster), } bldr := builder.ControllerManagedBy(localManager). @@ -161,7 +178,7 @@ func Add( return err } -func (r *Reconciler) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) { +func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { log := r.log.Named(ControllerName) log.Debug("Processing") @@ -232,9 +249,11 @@ func (r *Reconciler) reconcile(ctx context.Context, log *zap.SugaredLogger, vwUR } func (r *Reconciler) ensureManager(log *zap.SugaredLogger, vwURL string) error { - // Use the global app context so this provider is independent of the reconcile - // context, which might get cancelled right after Reconcile() is done. - r.vwManagerCtx, r.vwManagerCancel = context.WithCancel(r.ctx) + if r.vwManagerCtx == nil { + // Use the global app context so this provider is independent of the reconcile + // context, which might get cancelled right after Reconcile() is done. + r.vwManagerCtx, r.vwManagerCancel = context.WithCancel(r.ctx) + } vwConfig := rest.CopyConfig(r.kcpRestConfig) vwConfig.Host = vwURL @@ -303,23 +322,47 @@ func (r *Reconciler) ensureManager(log *zap.SugaredLogger, vwURL string) error { r.vwManager = manager } - // start the provider - go func() { - // Use the global app context so this provider is independent of the reconcile - // context, which might get cancelled right after Reconcile() is done. - if err := r.vwProvider.Run(r.vwManagerCtx, r.vwManager); err != nil { - log.Fatalw("Failed to start apiexport provider.", zap.Error(err)) - } - }() + r.providerOnce.Do(func() { + log.Debug("Starting virtual workspace provider…") + // start the provider + go func() { + // Use the global app context so this provider is independent of the reconcile + // context, which might get cancelled right after Reconcile() is done. + if err := r.vwProvider.Run(r.vwManagerCtx, r.vwManager); err != nil { + log.Fatalw("Failed to start apiexport provider", zap.Error(err)) + } + }() + }) return nil } +type engagedCluster struct { + ctx context.Context + cl cluster.Cluster +} + type controllerShim struct { reconciler *Reconciler } func (s *controllerShim) Engage(ctx context.Context, clusterName string, cl cluster.Cluster) error { + if _, ok := s.reconciler.clusters[clusterName]; !ok { + s.reconciler.clustersLock.Lock() + s.reconciler.clusters[clusterName] = engagedCluster{ctx: ctx, cl: cl} + s.reconciler.clustersLock.Unlock() + + // start a goroutine to make sure we remove the cluster when the context is done + go func() { + <-ctx.Done() + s.reconciler.clustersLock.Lock() + delete(s.reconciler.clusters, clusterName) + s.reconciler.clustersLock.Unlock() + }() + } + + s.reconciler.syncWorkersLock.RLock() + defer s.reconciler.syncWorkersLock.RUnlock() for _, worker := range s.reconciler.syncWorkers { if err := worker.controller.Engage(ctx, clusterName, cl); err != nil { return err @@ -348,10 +391,17 @@ func (r *Reconciler) shutdown(log *zap.SugaredLogger) { r.vwManagerCtx = nil r.vwManagerCancel = nil r.vwURL = "" + r.providerOnce = sync.Once{} + + r.clustersLock.Lock() + r.clusters = make(map[string]engagedCluster) + r.clustersLock.Unlock() // Free all workers; since their contexts are based on the manager's context, // they have also been cancelled already above. - r.syncWorkers = nil + r.syncWorkersLock.Lock() + r.syncWorkers = make(map[string]syncWorker) + r.syncWorkersLock.Unlock() } func getPublishedResourceKey(pr *syncagentv1alpha1.PublishedResource) string { @@ -373,7 +423,10 @@ func (r *Reconciler) ensureSyncControllers(ctx context.Context, log *zap.Sugared log.Infow("Stopping sync controller…", "key", key) worker.cancel() + + r.syncWorkersLock.Lock() delete(r.syncWorkers, key) + r.syncWorkersLock.Unlock() } // start missing controllers @@ -386,13 +439,14 @@ func (r *Reconciler) ensureSyncControllers(ctx context.Context, log *zap.Sugared continue } - log.Infow("Starting new sync controller…", "key", key) - + prlog := log.With("key", key, "name", pubRes.Name) ctrlCtx, ctrlCancel := context.WithCancel(r.vwManagerCtx) + prlog.Info("Creating new sync controller…") + // create the sync controller; // use the reconciler's log without any additional reconciling context - syncController, err := sync.Create( + syncController, err := controllersync.Create( // This can be the reconciling context, as it's only used to find the target CRD during setup; // this context *must not* be stored in the sync controller! ctx, @@ -410,18 +464,33 @@ func (r *Reconciler) ensureSyncControllers(ctx context.Context, log *zap.Sugared return fmt.Errorf("failed to create sync controller: %w", err) } - log.Infof("storing worker at %s", key) + r.syncWorkersLock.Lock() r.syncWorkers[key] = syncWorker{ controller: syncController, cancel: ctrlCancel, } + r.syncWorkersLock.Unlock() - // let 'er rip (remember to use the long-lived context here) - if err := syncController.Start(ctrlCtx); err != nil { - ctrlCancel() - log.Info("deleting again") + go func() { + log.Infow("Starting sync controller…", "key", key) + if err := syncController.Start(ctrlCtx); err != nil && !errors.Is(err, context.Canceled) { + ctrlCancel() + prlog.Errorw("failed to start sync controller", zap.Error(err)) + } + + prlog.Debug("Stopped sync controller") + + r.syncWorkersLock.Lock() delete(r.syncWorkers, key) - return fmt.Errorf("failed to start sync controller: %w", err) + r.syncWorkersLock.Unlock() + }() + + r.clustersLock.RLock() + defer r.clustersLock.RUnlock() + for name, ec := range r.clusters { + if err := syncController.Engage(ec.ctx, name, ec.cl); err != nil { + prlog.Errorw("failed to engage cluster", zap.Error(err), "cluster", name) + } } } diff --git a/test/e2e/sync/primary_test.go b/test/e2e/sync/primary_test.go index 42293e0..9778d12 100644 --- a/test/e2e/sync/primary_test.go +++ b/test/e2e/sync/primary_test.go @@ -60,7 +60,7 @@ func TestSyncSimpleObject(t *testing.T) { "test/crds/crontab.yaml", }) - // publish Crontabs and Backups + // publish Crontabs t.Logf("Publishing CRDs…") prCrontabs := &syncagentv1alpha1.PublishedResource{ ObjectMeta: metav1.ObjectMeta{ @@ -839,3 +839,151 @@ spec: t.Fatalf("Expected %s annotation to be %q, but is %q.", ann, teamClusterPath.String(), value) } } + +func TestSyncMultiResources(t *testing.T) { + const ( + apiExportName = "kcp.example.com" + kcpGroupName = "kcp.example.com" + orgWorkspace = "sync-multi-resources" + ) + + ctx := t.Context() + ctrlruntime.SetLogger(logr.Discard()) + + // setup a test environment in kcp + orgKubconfig := utils.CreateOrganization(t, ctx, orgWorkspace, apiExportName) + + // start a service cluster + envtestKubeconfig, envtestClient, _ := utils.RunEnvtest(t, []string{ + "test/crds/crontab.yaml", + }) + + // publish Crontabs and ConfigMaps + t.Logf("Publishing CRDs…") + prCrontabs := &syncagentv1alpha1.PublishedResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "publish-crontabs", + }, + Spec: syncagentv1alpha1.PublishedResourceSpec{ + Resource: syncagentv1alpha1.SourceResourceDescriptor{ + APIGroup: "example.com", + Version: "v1", + Kind: "CronTab", + }, + // These rules make finding the local object easier, but should not be used in production. + Naming: &syncagentv1alpha1.ResourceNaming{ + Name: "{{ .Object.metadata.name }}", + Namespace: "synced-{{ .Object.metadata.namespace }}", + }, + Projection: &syncagentv1alpha1.ResourceProjection{ + Group: kcpGroupName, + }, + }, + } + + prConfigMaps := &syncagentv1alpha1.PublishedResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "publish-configmaps", + }, + Spec: syncagentv1alpha1.PublishedResourceSpec{ + Resource: syncagentv1alpha1.SourceResourceDescriptor{ + APIGroup: "", + Version: "v1", + Kind: "ConfigMap", + }, + // These rules make finding the local object easier, but should not be used in production. + Naming: &syncagentv1alpha1.ResourceNaming{ + Name: "{{ .Object.metadata.name }}", + Namespace: "synced-{{ .Object.metadata.namespace }}", + }, + Projection: &syncagentv1alpha1.ResourceProjection{ + Group: kcpGroupName, + Kind: "KubeConfigMap", + }, + }, + } + + if err := envtestClient.Create(ctx, prCrontabs); err != nil { + t.Fatalf("Failed to create PublishedResource for CronTabs: %v", err) + } + + if err := envtestClient.Create(ctx, prConfigMaps); err != nil { + t.Fatalf("Failed to create PublishedResource for ConfigMaps: %v", err) + } + + // start the agent in the background to update the APIExport with the CronTabs API + utils.RunAgent(ctx, t, "bob", orgKubconfig, envtestKubeconfig, apiExportName) + + // wait until the API is available + kcpClusterClient := utils.GetKcpAdminClusterClient(t) + + teamClusterPath := logicalcluster.NewPath("root").Join(orgWorkspace).Join("team-1") + teamClient := kcpClusterClient.Cluster(teamClusterPath) + + utils.WaitForBoundAPI(t, ctx, teamClient, schema.GroupVersionResource{ + Group: kcpGroupName, + Version: "v1", + Resource: "crontabs", + }) + + // create a Crontab object in a team workspace + t.Log("Creating CronTab in kcp…") + crontab := utils.YAMLToUnstructured(t, ` +apiVersion: kcp.example.com/v1 +kind: CronTab +metadata: + namespace: default + name: my-crontab +spec: + cronSpec: '* * *' + image: ubuntu:latest +`) + + if err := teamClient.Create(ctx, crontab); err != nil { + t.Fatalf("Failed to create CronTab in kcp: %v", err) + } + + // wait for the agent to sync the object down into the service cluster + + t.Logf("Wait for CronTab to be synced…") + copy := &unstructured.Unstructured{} + copy.SetAPIVersion("example.com/v1") + copy.SetKind("CronTab") + + err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 30*time.Second, false, func(ctx context.Context) (done bool, err error) { + copyKey := types.NamespacedName{Namespace: "synced-default", Name: "my-crontab"} + return envtestClient.Get(ctx, copyKey, copy) == nil, nil + }) + if err != nil { + t.Fatalf("Failed to wait for CronTab object to be synced down: %v", err) + } + + // create a ConfigMap object in a team workspace + t.Log("Creating KubeConfigMap in kcp…") + configmap := utils.YAMLToUnstructured(t, ` +apiVersion: kcp.example.com/v1 +kind: KubeConfigMap +metadata: + namespace: default + name: my-configmap +data: + dummydata: dummydata +`) + + if err := teamClient.Create(ctx, configmap); err != nil { + t.Fatalf("Failed to create KubeConfigMap in kcp: %v", err) + } + + // wait for the agent to sync the object down into the service cluster + + t.Logf("Wait for KubeConfigMap to be synced…") + copyCM := &corev1.ConfigMap{} + + err = wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 30*time.Second, false, func(ctx context.Context) (done bool, err error) { + copyKey := types.NamespacedName{Namespace: "synced-default", Name: "my-configmap"} + return envtestClient.Get(ctx, copyKey, copyCM) == nil, nil + }) + if err != nil { + t.Fatalf("Failed to wait for ConfigMap object to be synced down: %v", err) + } +}