Skip to content

Commit

Permalink
enable endpoint test
Browse files Browse the repository at this point in the history
  • Loading branch information
lionelvillard committed Nov 14, 2022
1 parent 3a01c8e commit 530fab8
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 43 deletions.
12 changes: 1 addition & 11 deletions cmd/syncer/cmd/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,24 +108,14 @@ func Run(ctx context.Context, options *synceroptions.Options) error {
if err := syncer.StartSyncer(
ctx,
&syncer.SyncerConfig{
<<<<<<< HEAD
UpstreamConfig: upstreamConfig,
DownstreamConfig: downstreamConfig,
ResourcesToSync: sets.NewString(options.SyncedResourceTypes...),
SyncTargetWorkspace: logicalcluster.New(options.FromClusterName),
SyncTargetName: options.SyncTargetName,
SyncTargetUID: options.SyncTargetUID,
DNSServer: options.DNSServer,
DNSImage: options.DNSImage,
DownstreamNamespaceCleanDelay: options.DownstreamNamespaceCleanDelay,
=======
UpstreamConfig: upstreamConfig,
DownstreamConfig: downstreamConfig,
ResourcesToSync: sets.NewString(options.SyncedResourceTypes...),
SyncTargetWorkspace: logicalcluster.New(options.FromClusterName),
SyncTargetName: options.SyncTargetName,
SyncTargetUID: options.SyncTargetUID,
DNSImage: options.DNSImage,
>>>>>>> ef48ca9b (Create one DNS nameserver per workspace)
},
numThreads,
options.APIImportPollInterval,
Expand Down
37 changes: 19 additions & 18 deletions pkg/syncer/spec/dns/dns_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package dns

import (
"context"
"errors"

"github.com/kcp-dev/logicalcluster/v2"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -104,15 +106,15 @@ func (d *DNSProcessor) Process(ctx context.Context, workspace logicalcluster.Nam
return err
}

// TODO: check endpoints. It's disabled until we figure out how to update e2e tests.
//endpoints, err := d.endpointLister.Endpoints(d.dnsNamespace).Get(dnsID)
//if err != nil {
// return err
//}
//
//if hasAtLeastOneReadyAddress(endpoints) {
// return nil
//}
// Check at least one endpoint is ready.
endpoints, err := d.endpointLister.Endpoints(d.dnsNamespace).Get(dnsID)
if err != nil {
return err
}

if !hasAtLeastOneReadyAddress(endpoints) {
return errors.New("no DNS endpoints available yet (retrying)")
}

return nil
}
Expand Down Expand Up @@ -232,12 +234,11 @@ func (d *DNSProcessor) processService(ctx context.Context, name string) error {
return nil
}

//
//func hasAtLeastOneReadyAddress(endpoints *corev1.Endpoints) bool {
// for _, s := range endpoints.Subsets {
// if len(s.Addresses) > 0 {
// return true
// }
// }
// return false
//}
func hasAtLeastOneReadyAddress(endpoints *corev1.Endpoints) bool {
for _, s := range endpoints.Subsets {
if len(s.Addresses) > 0 && s.Addresses[0].IP != "" {
return true
}
}
return false
}
32 changes: 32 additions & 0 deletions test/e2e/framework/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
Expand Down Expand Up @@ -351,13 +352,19 @@ func (sf *syncerFixture) Start(t *testing.T) *StartedSyncerFixture {
return []net.IP{net.ParseIP("8.8.8.8")}, nil
}
})

// Manually create the DNS Endpoints for the main workspace
dnsID := shared.GetDNSID(sf.workspaceClusterName, types.UID(syncerConfig.SyncTargetUID), sf.syncTargetName)
_, err = downstreamKubeClient.CoreV1().Endpoints(syncerID).Create(ctx, endpoints(dnsID, syncerID), metav1.CreateOptions{})
require.NoError(t, err)
}

startedSyncer := &StartedSyncerFixture{
SyncerConfig: syncerConfig,
SyncerID: syncerID,
DownstreamConfig: downstreamConfig,
DownstreamKubeClient: downstreamKubeClient,
useDeployedSyncer: useDeployedSyncer,
}

// The sync target becoming ready indicates the syncer is healthy and has
Expand All @@ -376,6 +383,7 @@ type StartedSyncerFixture struct {
// SyncerConfig will be less privileged.
DownstreamConfig *rest.Config
DownstreamKubeClient kubernetesclient.Interface
useDeployedSyncer bool
}

// WaitForClusterReady waits for the cluster to be ready with the given reason.
Expand All @@ -390,6 +398,15 @@ func (sf *StartedSyncerFixture) WaitForClusterReady(t *testing.T, ctx context.Co
t.Logf("Cluster %q is %s", cfg.SyncTargetName, conditionsv1alpha1.ReadyCondition)
}

// BoundWorkspace is called when a new workspace is bound to this workload workspace
func (sf *StartedSyncerFixture) BoundWorkspace(t *testing.T, ctx context.Context, workspace logicalcluster.Name) {
if !sf.useDeployedSyncer {
dnsID := shared.GetDNSID(workspace, types.UID(sf.SyncerConfig.SyncTargetUID), sf.SyncerConfig.SyncTargetName)
_, err := sf.DownstreamKubeClient.CoreV1().Endpoints(sf.SyncerID).Create(ctx, endpoints(dnsID, sf.SyncerID), metav1.CreateOptions{})
require.NoError(t, err)
}
}

// syncerConfigFromCluster reads the configuration needed to start an in-process
// syncer from the resources applied to a cluster for a deployed syncer.
func syncerConfigFromCluster(t *testing.T, downstreamConfig *rest.Config, namespace, syncerID string) *syncer.SyncerConfig {
Expand Down Expand Up @@ -484,3 +501,18 @@ func syncerArgsToMap(args []string) (map[string][]string, error) {
}
return argMap, nil
}

func endpoints(name, namespace string) *corev1.Endpoints {
return &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Subsets: []corev1.EndpointSubset{
{Addresses: []corev1.EndpointAddress{
{
IP: "8.8.8.8",
}}},
},
}
}
1 change: 1 addition & 0 deletions test/e2e/reconciler/locationworkspace/rootcompute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func TestRootComputeWorkspace(t *testing.T) {
framework.WithAPIExportsWorkloadBindOption("root:compute:kubernetes"),
framework.WithLocationWorkspaceWorkloadBindOption(computeClusterName),
).Bind(t)
syncerFixture.BoundWorkspace(t, ctx, consumerWorkspace)

t.Logf("Wait for being able to list Services in the user workspace")
require.Eventually(t, func() bool {
Expand Down
2 changes: 2 additions & 0 deletions test/e2e/reconciler/scheduling/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func TestScheduling(t *testing.T) {
framework.NewBindCompute(t, userClusterName, source,
framework.WithLocationWorkspaceWorkloadBindOption(negotiationClusterName),
).Bind(t)
syncerFixture.BoundWorkspace(t, ctx, userClusterName)

t.Logf("Wait for being able to list Services in the user workspace")
require.Eventually(t, func() bool {
Expand All @@ -184,6 +185,7 @@ func TestScheduling(t *testing.T) {
framework.NewBindCompute(t, secondUserClusterName, source,
framework.WithLocationWorkspaceWorkloadBindOption(negotiationClusterName),
).Bind(t)
syncerFixture.BoundWorkspace(t, ctx, secondUserClusterName)

t.Logf("Wait for being able to list Services in the user workspace")
require.Eventually(t, func() bool {
Expand Down
2 changes: 2 additions & 0 deletions test/e2e/reconciler/scheduling/multi_placements_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,14 @@ func TestMultiPlacement(t *testing.T) {
framework.WithLocationWorkspaceWorkloadBindOption(locationClusterName),
framework.WithLocationSelectorWorkloadBindOption(metav1.LabelSelector{MatchLabels: map[string]string{"loc": "loc1"}}),
).Bind(t)
firstSyncerFixture.BoundWorkspace(t, ctx, userClusterName)

t.Logf("Bind user workspace to location workspace with loc 2")
framework.NewBindCompute(t, userClusterName, source,
framework.WithLocationWorkspaceWorkloadBindOption(locationClusterName),
framework.WithLocationSelectorWorkloadBindOption(metav1.LabelSelector{MatchLabels: map[string]string{"loc": "loc2"}}),
).Bind(t)
secondSyncerFixture.BoundWorkspace(t, ctx, userClusterName)

t.Logf("Wait for being able to list Services in the user workspace")
require.Eventually(t, func() bool {
Expand Down
1 change: 1 addition & 0 deletions test/e2e/reconciler/scheduling/placement_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func TestPlacementUpdate(t *testing.T) {
framework.WithLocationWorkspaceWorkloadBindOption(locationClusterName),
framework.WithPlacementNameBindOption(placementName),
).Bind(t)
syncerFixture.BoundWorkspace(t, ctx, userClusterName)

t.Logf("Wait for being able to list Services in the user workspace")
require.Eventually(t, func() bool {
Expand Down
44 changes: 30 additions & 14 deletions test/e2e/virtual/syncer/virtualworkspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,13 @@ func TestSyncerVirtualWorkspace(t *testing.T) {

var testCases = []struct {
name string
work func(t *testing.T, kubelikeSyncerVWConfig, wildwestSyncerVWConfig *rest.Config, kubelikeClusterName, wildwestClusterName logicalcluster.Name, wildwestSyncTargetName string)
work func(t *testing.T, kubelikeSyncerVWConfig, wildwestSyncerVWConfig *rest.Config, kubelikeClusterName, wildwestClusterName logicalcluster.Name, wildwestSyncTargetName string,
wildwestSyncer *framework.StartedSyncerFixture)
}{
{
name: "isolated API domains per syncer",
work: func(t *testing.T, kubelikeSyncerVWConfig, wildwestSyncerVWConfig *rest.Config, kubelikeClusterName, wildwestClusterName logicalcluster.Name, wildwestSyncTargetName string) {
work: func(t *testing.T, kubelikeSyncerVWConfig, wildwestSyncerVWConfig *rest.Config, kubelikeClusterName, wildwestClusterName logicalcluster.Name, wildwestSyncTargetName string,
wildwestSyncer *framework.StartedSyncerFixture) {
kubelikeVWDiscoverClusterClient, err := clientgodiscovery.NewDiscoveryClientForConfig(kubelikeSyncerVWConfig)
require.NoError(t, err)

Expand Down Expand Up @@ -266,7 +268,8 @@ func TestSyncerVirtualWorkspace(t *testing.T) {
},
{
name: "access is authorized",
work: func(t *testing.T, kubelikeSyncerVWConfig, wildwestSyncerVWConfig *rest.Config, kubelikeClusterName, wildwestClusterName logicalcluster.Name, wildwestSyncTargetName string) {
work: func(t *testing.T, kubelikeSyncerVWConfig, wildwestSyncerVWConfig *rest.Config, kubelikeClusterName, wildwestClusterName logicalcluster.Name, wildwestSyncTargetName string,
wildwestSyncer *framework.StartedSyncerFixture) {
ctx, cancelFunc := context.WithCancel(context.Background())
t.Cleanup(cancelFunc)

Expand Down Expand Up @@ -364,7 +367,8 @@ func TestSyncerVirtualWorkspace(t *testing.T) {
},
{
name: "access kcp resources through syncer virtual workspace",
work: func(t *testing.T, kubelikeSyncerVWConfig, wildwestSyncerVWConfig *rest.Config, kubelikeClusterName, wildwestClusterName logicalcluster.Name, wildwestSyncTargetName string) {
work: func(t *testing.T, kubelikeSyncerVWConfig, wildwestSyncerVWConfig *rest.Config, kubelikeClusterName, wildwestClusterName logicalcluster.Name, wildwestSyncTargetName string,
wildwestSyncer *framework.StartedSyncerFixture) {
ctx, cancelFunc := context.WithCancel(context.Background())
t.Cleanup(cancelFunc)

Expand Down Expand Up @@ -477,7 +481,8 @@ func TestSyncerVirtualWorkspace(t *testing.T) {
},
{
name: "access kcp resources through syncer virtual workspace, from a other workspace to the wildwest resources through an APIBinding",
work: func(t *testing.T, kubelikeSyncerVWConfig, wildwestSyncerVWConfig *rest.Config, kubelikeClusterName, wildwestClusterName logicalcluster.Name, wildwestSyncTargetName string) {
work: func(t *testing.T, kubelikeSyncerVWConfig, wildwestSyncerVWConfig *rest.Config, kubelikeClusterName, wildwestClusterName logicalcluster.Name, wildwestSyncTargetName string,
wildwestSyncer *framework.StartedSyncerFixture) {
ctx, cancelFunc := context.WithCancel(context.Background())
t.Cleanup(cancelFunc)

Expand All @@ -488,6 +493,7 @@ func TestSyncerVirtualWorkspace(t *testing.T) {
framework.WithAPIExportsWorkloadBindOption(wildwestClusterName.String()+":kubernetes"),
framework.WithLocationWorkspaceWorkloadBindOption(wildwestClusterName),
).Bind(t)
wildwestSyncer.BoundWorkspace(t, ctx, otherWorkspace)

wildwestClusterClient, err := wildwestclientset.NewForConfig(server.BaseConfig(t))
require.NoError(t, err)
Expand Down Expand Up @@ -582,7 +588,8 @@ func TestSyncerVirtualWorkspace(t *testing.T) {
},
{
name: "Never promote overridden syncer view status to upstream when scheduled on 2 synctargets",
work: func(t *testing.T, kubelikeSyncerVWConfig, wildwestSyncerVWConfig *rest.Config, kubelikeClusterName, wildwestClusterName logicalcluster.Name, wildwestSyncTargetName string) {
work: func(t *testing.T, kubelikeSyncerVWConfig, wildwestSyncerVWConfig *rest.Config, kubelikeClusterName, wildwestClusterName logicalcluster.Name, wildwestSyncTargetName string,
wildwestSyncer *framework.StartedSyncerFixture) {
ctx, cancelFunc := context.WithCancel(context.Background())
t.Cleanup(cancelFunc)

Expand Down Expand Up @@ -682,6 +689,7 @@ func TestSyncerVirtualWorkspace(t *testing.T) {
},
}),
).Bind(t)
wildwestSyncer.BoundWorkspace(t, ctx, otherWorkspace)

framework.NewBindCompute(t, otherWorkspace, server,
framework.WithPlacementNameBindOption("secondplacement"),
Expand All @@ -693,6 +701,7 @@ func TestSyncerVirtualWorkspace(t *testing.T) {
},
}),
).Bind(t)
wildwestSecondSyncer.BoundWorkspace(t, ctx, otherWorkspace)

wildwestClusterClient, err := wildwestclientset.NewForConfig(server.BaseConfig(t))
require.NoError(t, err)
Expand Down Expand Up @@ -806,7 +815,8 @@ func TestSyncerVirtualWorkspace(t *testing.T) {
},
{
name: "Correctly manage status, with promote and unpromote, when moving a cowboy from one synctarget to the other",
work: func(t *testing.T, kubelikeSyncerVWConfig, wildwestSyncerVWConfig *rest.Config, kubelikeClusterName, wildwestClusterName logicalcluster.Name, wildwestSyncTargetName string) {
work: func(t *testing.T, kubelikeSyncerVWConfig, wildwestSyncerVWConfig *rest.Config, kubelikeClusterName, wildwestClusterName logicalcluster.Name, wildwestSyncTargetName string,
wildwestSyncer *framework.StartedSyncerFixture) {
ctx, cancelFunc := context.WithCancel(context.Background())
t.Cleanup(cancelFunc)

Expand All @@ -816,6 +826,8 @@ func TestSyncerVirtualWorkspace(t *testing.T) {
_, err = kcpClusterClient.WorkloadV1alpha1().SyncTargets().Patch(logicalcluster.WithCluster(ctx, wildwestClusterName), wildwestSyncTargetName, types.JSONPatchType, []byte(`[{"op":"add","path":"/metadata/labels/name","value":"`+wildwestSyncTargetName+`"}]`), metav1.PatchOptions{})
require.NoError(t, err)

otherWorkspace := framework.NewWorkspaceFixture(t, server, orgClusterName)

t.Logf("Deploying second syncer into workspace %s", wildwestClusterName)
wildwestSecondSyncTargetName := wildwestSyncTargetName + "second"
wildwestSecondSyncer := framework.NewSyncerFixture(t, server, wildwestClusterName,
Expand Down Expand Up @@ -899,7 +911,6 @@ func TestSyncerVirtualWorkspace(t *testing.T) {
}, metav1.CreateOptions{})
require.NoError(t, err)

otherWorkspace := framework.NewWorkspaceFixture(t, server, orgClusterName)
t.Logf("Using User workspace: %s", otherWorkspace.String())

logWithTimestamp := func(format string, args ...interface{}) {
Expand All @@ -917,6 +928,7 @@ func TestSyncerVirtualWorkspace(t *testing.T) {
},
}),
).Bind(t)
wildwestSyncer.BoundWorkspace(t, ctx, otherWorkspace)

logWithTimestamp("Wait for being able to list cowboys in the other workspace (kubelike) through the virtual workspace")
require.Eventually(t, func() bool {
Expand Down Expand Up @@ -1021,6 +1033,7 @@ func TestSyncerVirtualWorkspace(t *testing.T) {
},
}),
).Bind(t)
wildwestSecondSyncer.BoundWorkspace(t, ctx, otherWorkspace)

logWithTimestamp("Wait for resource controller to schedule cowboy on the 2 synctargets, and for both syncers to own it")
require.Eventually(t, func() bool {
Expand Down Expand Up @@ -1118,7 +1131,8 @@ func TestSyncerVirtualWorkspace(t *testing.T) {
},
{
name: "Transform spec through spec-diff annotation",
work: func(t *testing.T, kubelikeSyncerVWConfig, wildwestSyncerVWConfig *rest.Config, kubelikeClusterName, wildwestClusterName logicalcluster.Name, wildwestSyncTargetName string) {
work: func(t *testing.T, kubelikeSyncerVWConfig, wildwestSyncerVWConfig *rest.Config, kubelikeClusterName, wildwestClusterName logicalcluster.Name, wildwestSyncTargetName string,
wildwestSyncer *framework.StartedSyncerFixture) {
ctx, cancelFunc := context.WithCancel(context.Background())
t.Cleanup(cancelFunc)

Expand Down Expand Up @@ -1181,7 +1195,8 @@ func TestSyncerVirtualWorkspace(t *testing.T) {
},
{
name: "Override summarizing rules to disable status promotion",
work: func(t *testing.T, kubelikeSyncerVWConfig, wildwestSyncerVWConfig *rest.Config, kubelikeClusterName, wildwestClusterName logicalcluster.Name, wildwestSyncTargetName string) {
work: func(t *testing.T, kubelikeSyncerVWConfig, wildwestSyncerVWConfig *rest.Config, kubelikeClusterName, wildwestClusterName logicalcluster.Name, wildwestSyncTargetName string,
wildwestSyncer *framework.StartedSyncerFixture) {
ctx, cancelFunc := context.WithCancel(context.Background())
t.Cleanup(cancelFunc)

Expand Down Expand Up @@ -1353,7 +1368,6 @@ func TestSyncerVirtualWorkspace(t *testing.T) {
}, wait.ForeverTestTimeout, time.Millisecond*100)

t.Log("Setting up an unrelated workspace with cowboys...")

unrelatedWorkspace := framework.NewWorkspaceFixture(t, server, orgClusterName)

sourceCrdClient, err := kcpapiextensionsclientset.NewForConfig(server.BaseConfig(t))
Expand Down Expand Up @@ -1382,10 +1396,11 @@ func TestSyncerVirtualWorkspace(t *testing.T) {
}, metav1.CreateOptions{})
require.NoError(t, err)

wildwestWorkspace := framework.NewWorkspaceFixture(t, server, orgClusterName)
wildwestSyncTargetName := fmt.Sprintf("wildwest-%d", +rand.Intn(1000000))

wildwestWorkspace := framework.NewWorkspaceFixture(t, server, orgClusterName)
t.Logf("Deploying syncer into workspace %s", wildwestWorkspace)

wildwestSyncer := framework.NewSyncerFixture(t, server, wildwestWorkspace,
framework.WithExtraResources("cowboys.wildwest.dev", "roles.rbac.authorization.k8s.io", "rolebindings.rbac.authorization.k8s.io"),
// empty APIExports so we do not add global kubernetes APIExport.
Expand Down Expand Up @@ -1442,7 +1457,7 @@ func TestSyncerVirtualWorkspace(t *testing.T) {
require.NoError(t, err)

t.Log("Starting test...")
testCase.work(t, kubelikeVWConfig, wildwestVWConfig, kubelikeWorkspace, wildwestWorkspace, wildwestSyncTargetName)
testCase.work(t, kubelikeVWConfig, wildwestVWConfig, kubelikeWorkspace, wildwestWorkspace, wildwestSyncTargetName, wildwestSyncer)
})
}
}
Expand Down Expand Up @@ -1781,7 +1796,7 @@ func TestUpsyncerVirtualWorkspace(t *testing.T) {
t.Logf("Deploying syncer into workspace %s", kubelikeWorkspace)
kubelikeSyncer := framework.NewSyncerFixture(t, server, kubelikeWorkspace,
framework.WithSyncTarget(kubelikeWorkspace, "kubelike"),
framework.WithExtraResources("persistentvolumes"),
framework.WithExtraResources("persistentvolumes", "roles.rbac.authorization.k8s.io", "rolebindings.rbac.authorization.k8s.io"),
framework.WithDownstreamPreparation(func(config *rest.Config, isFakePCluster bool) {
if !isFakePCluster {
// Only need to install services,ingresses and persistentvolumes in a logical cluster
Expand All @@ -1793,6 +1808,7 @@ func TestUpsyncerVirtualWorkspace(t *testing.T) {
kubefixtures.Create(t, sinkCrdClient.ApiextensionsV1().CustomResourceDefinitions(),
metav1.GroupResource{Group: "core.k8s.io", Resource: "services"},
metav1.GroupResource{Group: "core.k8s.io", Resource: "persistentvolumes"},
metav1.GroupResource{Group: "core.k8s.io", Resource: "endpoints"},
)
require.NoError(t, err)
}),
Expand Down

0 comments on commit 530fab8

Please sign in to comment.