Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ $(YQ):
yq_*

KCP = _tools/kcp
KCP_VERSION ?= 0.28.1
export KCP_VERSION ?= 0.28.1

.PHONY: $(KCP)
$(KCP):
Expand Down
29 changes: 24 additions & 5 deletions internal/controller/apiexport/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (r *Reconciler) reconcile(ctx context.Context) error {

// for each PR, we note down the created ARS and also the GVKs of related resources
arsList := sets.New[string]()
claimedResources := sets.New[string]()
claimedResources := sets.New[kcpdevv1alpha1.GroupResource]()

// PublishedResources use kinds, but the PermissionClaims use resource names (plural),
// so we must translate accordingly
Expand All @@ -139,7 +139,17 @@ func (r *Reconciler) reconcile(ctx context.Context) error {

// to evaluate the namespace filter, the agent needs to fetch the namespace
if filter := pubResource.Spec.Filter; filter != nil && filter.Namespace != nil {
claimedResources.Insert("namespaces")
claimedResources.Insert(kcpdevv1alpha1.GroupResource{
Group: "",
Resource: "namespaces",
})
}

if pubResource.Spec.EnableWorkspacePaths {
claimedResources.Insert(kcpdevv1alpha1.GroupResource{
Group: "core.kcp.io",
Resource: "logicalclusters",
})
}

for _, rr := range pubResource.Spec.Related {
Expand All @@ -150,18 +160,27 @@ func (r *Reconciler) reconcile(ctx context.Context) error {
return fmt.Errorf("unknown related resource kind %q: %w", rr.Kind, err)
}

claimedResources.Insert(resource.Resource)
claimedResources.Insert(kcpdevv1alpha1.GroupResource{
Group: "",
Resource: resource.Resource,
})
}
}

// Related resources (Secrets, ConfigMaps) are namespaced and so the Sync Agent will
// always need to be able to see and manage namespaces.
if claimedResources.Len() > 0 {
claimedResources.Insert("namespaces")
claimedResources.Insert(kcpdevv1alpha1.GroupResource{
Group: "",
Resource: "namespaces",
})
}

// We always want to create events.
claimedResources.Insert("events")
claimedResources.Insert(kcpdevv1alpha1.GroupResource{
Group: "",
Resource: "events",
})

if arsList.Len() == 0 {
r.log.Debug("No ready PublishedResources available.")
Expand Down
41 changes: 30 additions & 11 deletions internal/controller/apiexport/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package apiexport

import (
"fmt"
"slices"
"strings"

Expand All @@ -36,7 +37,7 @@ import (
// by a controller in kcp. Make sure you don't create a reconciling conflict!
func (r *Reconciler) createAPIExportReconciler(
availableResourceSchemas sets.Set[string],
claimedResourceKinds sets.Set[string],
claimedResourceKinds sets.Set[kcpdevv1alpha1.GroupResource],
agentName string,
apiExportName string,
recorder record.EventRecorder,
Expand All @@ -59,28 +60,38 @@ func (r *Reconciler) createAPIExportReconciler(
// only ensure the ones originating from the published resources;
// step 1 is to collect all existing claims with the same properties
// as ours.
existingClaims := sets.New[string]()
existingClaims := sets.New[kcpdevv1alpha1.GroupResource]()
for _, claim := range existing.Spec.PermissionClaims {
if claim.All && claim.Group == "" && len(claim.ResourceSelector) == 0 {
existingClaims.Insert(claim.Resource)
if claim.All && len(claim.ResourceSelector) == 0 {
existingClaims.Insert(claim.GroupResource)
}
}

missingClaims := claimedResourceKinds.Difference(existingClaims)

claimsToAdd := missingClaims.UnsortedList()
slices.SortStableFunc(claimsToAdd, func(a, b kcpdevv1alpha1.GroupResource) int {
if a.Group != b.Group {
return strings.Compare(a.Group, b.Group)
}

return strings.Compare(a.Resource, b.Resource)
})

// add our missing claims
for _, claimed := range sets.List(missingClaims) {
for _, claimed := range claimsToAdd {
existing.Spec.PermissionClaims = append(existing.Spec.PermissionClaims, kcpdevv1alpha1.PermissionClaim{
GroupResource: kcpdevv1alpha1.GroupResource{
Group: "",
Resource: claimed,
},
All: true,
GroupResource: claimed,
All: true,
})
}

if missingClaims.Len() > 0 {
recorder.Eventf(existing, corev1.EventTypeNormal, "AddingPermissionClaims", "Added new permission claim(s) for all %s.", strings.Join(sets.List(missingClaims), ", "))
claims := make([]string, 0, len(claimsToAdd))
for _, claimed := range claimsToAdd {
claims = append(claims, groupResourceToString(claimed))
}
recorder.Eventf(existing, corev1.EventTypeNormal, "AddingPermissionClaims", "Added new permission claim(s) for all %s.", strings.Join(claims, ", "))
}

// prevent reconcile loops by ensuring a stable order
Expand All @@ -101,6 +112,14 @@ func (r *Reconciler) createAPIExportReconciler(
}
}

func groupResourceToString(gr kcpdevv1alpha1.GroupResource) string {
if gr.Group == "" {
return gr.Resource
}

return fmt.Sprintf("%s/%s", gr.Group, gr.Resource)
}

func mergeResourceSchemas(existing []string, configured sets.Set[string]) []string {
var result []string

Expand Down
5 changes: 5 additions & 0 deletions internal/controller/syncmanager/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
syncagentv1alpha1 "github.com/kcp-dev/api-syncagent/sdk/apis/syncagent/v1alpha1"

kcpapisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1"
kcpcorev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1"
apiexportprovider "github.com/kcp-dev/multicluster-provider/apiexport"
mccontroller "sigs.k8s.io/multicluster-runtime/pkg/controller"
mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager"
Expand Down Expand Up @@ -248,6 +249,10 @@ func (r *Reconciler) ensureManager(log *zap.SugaredLogger, vwURL string) error {
return fmt.Errorf("failed to register scheme %s: %w", kcpapisv1alpha1.SchemeGroupVersion, err)
}

if err := kcpcorev1alpha1.AddToScheme(scheme); err != nil {
return fmt.Errorf("failed to register scheme %s: %w", kcpcorev1alpha1.SchemeGroupVersion, err)
}

if r.vwProvider == nil {
log.Debug("Setting up APIExport provider…")

Expand Down
100 changes: 100 additions & 0 deletions test/e2e/sync/primary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,3 +739,103 @@ spec:
t.Fatalf("Failed to wait for object to be synced down: %v", err)
}
}

func TestSyncWithWorkspacePath(t *testing.T) {
const (
apiExportName = "kcp.example.com"
kcpGroupName = "kcp.example.com"
orgWorkspace = "sync-with-paths"
)

ctx := t.Context()
ctrlruntime.SetLogger(logr.Discard())

// setup a test environment in kcp
orgKubconfig := utils.CreateOrganization(t, ctx, orgWorkspace, apiExportName)

// start a service cluster
envtestKubeconfig, envtestClient, _ := utils.RunEnvtest(t, []string{
"test/crds/crontab.yaml",
})

// publish Crontabs and Backups
t.Logf("Publishing CRDs…")
prCrontabs := &syncagentv1alpha1.PublishedResource{
ObjectMeta: metav1.ObjectMeta{
Name: "publish-crontabs",
},
Spec: syncagentv1alpha1.PublishedResourceSpec{
Resource: syncagentv1alpha1.SourceResourceDescriptor{
APIGroup: "example.com",
Version: "v1",
Kind: "CronTab",
},
// These rules make finding the local object easier, but should not be used in production.
Naming: &syncagentv1alpha1.ResourceNaming{
Name: "{{ .Object.metadata.name }}",
Namespace: "synced-{{ .Object.metadata.namespace }}",
},
Projection: &syncagentv1alpha1.ResourceProjection{
Group: kcpGroupName,
},
EnableWorkspacePaths: true,
},
}

if err := envtestClient.Create(ctx, prCrontabs); err != nil {
t.Fatalf("Failed to create PublishedResource: %v", err)
}

// start the agent in the background to update the APIExport with the CronTabs API
utils.RunAgent(ctx, t, "bob", orgKubconfig, envtestKubeconfig, apiExportName)

// wait until the API is available
kcpClusterClient := utils.GetKcpAdminClusterClient(t)

teamClusterPath := logicalcluster.NewPath("root").Join(orgWorkspace).Join("team-1")
teamClient := kcpClusterClient.Cluster(teamClusterPath)

utils.WaitForBoundAPI(t, ctx, teamClient, schema.GroupVersionResource{
Group: kcpGroupName,
Version: "v1",
Resource: "crontabs",
})

// create a Crontab object in a team workspace
t.Log("Creating CronTab in kcp…")
crontab := utils.YAMLToUnstructured(t, `
apiVersion: kcp.example.com/v1
kind: CronTab
metadata:
namespace: default
name: my-crontab
spec:
cronSpec: '* * *'
image: ubuntu:latest
`)

if err := teamClient.Create(ctx, crontab); err != nil {
t.Fatalf("Failed to create CronTab in kcp: %v", err)
}

// wait for the agent to sync the object down into the service cluster

t.Logf("Wait for CronTab to be synced…")
copy := &unstructured.Unstructured{}
copy.SetAPIVersion("example.com/v1")
copy.SetKind("CronTab")

err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 30*time.Second, false, func(ctx context.Context) (done bool, err error) {
copyKey := types.NamespacedName{Namespace: "synced-default", Name: "my-crontab"}
return envtestClient.Get(ctx, copyKey, copy) == nil, nil
})
if err != nil {
t.Fatalf("Failed to wait for object to be synced down: %v", err)
}

ann := "syncagent.kcp.io/remote-object-workspace-path"

if value := copy.GetAnnotations()[ann]; value != teamClusterPath.String() {
t.Fatalf("Expected %s annotation to be %q, but is %q.", ann, teamClusterPath.String(), value)
}
}
10 changes: 10 additions & 0 deletions test/utils/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,16 @@ func BindToAPIExport(t *testing.T, ctx context.Context, client ctrlruntimeclient
},
State: kcpapisv1alpha1.ClaimAccepted,
},
{
PermissionClaim: kcpapisv1alpha1.PermissionClaim{
GroupResource: kcpapisv1alpha1.GroupResource{
Group: "core.kcp.io",
Resource: "logicalclusters",
},
All: true,
},
State: kcpapisv1alpha1.ClaimAccepted,
},
},
},
}
Expand Down