Skip to content

Commit

Permalink
tmc e2e : Split SyncerFixture...
Browse files Browse the repository at this point in the history
... to provide a way to only maintain
heartbeat and import apis without
effectively syncing.

And change all the candidate tests to use the new function.

Signed-off-by: David Festal <dfestal@redhat.com>
  • Loading branch information
davidfestal committed Feb 7, 2023
1 parent 3a58b1c commit a1476bf
Show file tree
Hide file tree
Showing 17 changed files with 258 additions and 124 deletions.
17 changes: 12 additions & 5 deletions pkg/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"k8s.io/klog/v2"

workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1"
kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned"
kcpclusterclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/cluster"
kcpinformers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions"
kcpfeatures "github.com/kcp-dev/kcp/pkg/features"
Expand Down Expand Up @@ -401,6 +402,14 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i
go startSyncerTunnel(ctx, upstreamConfig, downstreamConfig, logicalcluster.From(syncTarget), cfg.SyncTargetName)
}

StartHeartbeatKeeper(ctx, kcpSyncTargetClient, cfg.SyncTargetName, cfg.SyncTargetUID)

return nil
}

func StartHeartbeatKeeper(ctx context.Context, kcpSyncTargetClient kcpclientset.Interface, syncTargetName, syncTargetUID string) {
logger := klog.FromContext(ctx)

// Attempt to heartbeat every interval
go wait.UntilWithContext(ctx, func(ctx context.Context) {
var heartbeatTime time.Time
Expand All @@ -409,20 +418,18 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i
// Attempt to heartbeat every second until successful. Errors are logged instead of being returned so the
// poll error can be safely ignored.
_ = wait.PollImmediateInfiniteWithContext(ctx, 1*time.Second, func(ctx context.Context) (bool, error) {
patchBytes := []byte(fmt.Sprintf(`[{"op":"test","path":"/metadata/uid","value":%q},{"op":"replace","path":"/status/lastSyncerHeartbeatTime","value":%q}]`, cfg.SyncTargetUID, time.Now().Format(time.RFC3339)))
syncTarget, err = kcpSyncTargetClient.WorkloadV1alpha1().SyncTargets().Patch(ctx, cfg.SyncTargetName, types.JSONPatchType, patchBytes, metav1.PatchOptions{}, "status")
patchBytes := []byte(fmt.Sprintf(`[{"op":"test","path":"/metadata/uid","value":%q},{"op":"replace","path":"/status/lastSyncerHeartbeatTime","value":%q}]`, syncTargetUID, time.Now().Format(time.RFC3339)))
syncTarget, err := kcpSyncTargetClient.WorkloadV1alpha1().SyncTargets().Patch(ctx, syncTargetName, types.JSONPatchType, patchBytes, metav1.PatchOptions{}, "status")
if err != nil {
logger.Error(err, "failed to set status.lastSyncerHeartbeatTime")
return false, nil //nolint:nilerr
return false, nil
}

heartbeatTime = syncTarget.Status.LastSyncerHeartbeatTime.Time
return true, nil
})
logger.V(5).Info("Heartbeat set", "heartbeatTime", heartbeatTime)
}, heartbeatInterval)

return nil
}

type filteredGVRSource struct {
Expand Down
174 changes: 146 additions & 28 deletions test/e2e/framework/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -44,6 +45,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"sigs.k8s.io/yaml"

apiresourcev1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apiresource/v1alpha1"
Expand All @@ -52,6 +54,7 @@ import (
"github.com/kcp-dev/kcp/pkg/apis/third_party/conditions/util/conditions"
workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1"
kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/cluster"
kcpinformers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions"
workloadcliplugin "github.com/kcp-dev/kcp/pkg/cliplugins/workload/plugin"
"github.com/kcp-dev/kcp/pkg/syncer"
"github.com/kcp-dev/kcp/pkg/syncer/shared"
Expand Down Expand Up @@ -127,19 +130,44 @@ func WithDownstreamPreparation(prepare func(config *rest.Config, isFakePCluster
}
}

// Start starts a new syncer against the given upstream kcp workspace. Whether the syncer run
// in-process or deployed on a pcluster will depend whether --pcluster-kubeconfig and
// --syncer-image are supplied to the test invocation.
func (sf *syncerFixture) Start(t *testing.T) *StartedSyncerFixture {
// CreateAndStart creates SyncTarget resource, applies it in the physical cluster,
// and then starts a new syncer against the given upstream kcp workspace.
// Whether the syncer runs in-process or deployed on a pcluster will depend
// whether --pcluster-kubeconfig and --syncer-image are supplied to the test invocation.
func (sf *syncerFixture) CreateAndStart(t *testing.T) *StartedSyncerFixture {
t.Helper()

useDeployedSyncer := len(TestConfig.PClusterKubeconfig()) > 0

artifactDir, _, err := ScratchDirs(t)
if err != nil {
t.Errorf("failed to create temp dir for syncer artifacts: %v", err)
}

downstreamConfig, downstreamKubeconfigPath, downstreamKubeClient, syncerConfig, syncerID := sf.createAndApplySyncTarget(t, useDeployedSyncer, artifactDir)

ctx, cancelFunc := context.WithCancel(context.Background())
t.Cleanup(cancelFunc)

sf.startSyncer(ctx, t, useDeployedSyncer, downstreamKubeClient, syncerConfig, syncerID, artifactDir, downstreamKubeconfigPath)

startedSyncer := &StartedSyncerFixture{
sf.buildAppliedSyncerFixture(ctx, t, downstreamConfig, downstreamKubeClient, syncerConfig, syncerID),
}

// The sync target becoming ready indicates the syncer is healthy and has
// successfully sent a heartbeat to kcp.
startedSyncer.WaitForClusterReady(ctx, t)

return startedSyncer
}

func (sf *syncerFixture) createAndApplySyncTarget(t *testing.T, useDeployedSyncer bool, artifactDir string) (downstreamConfig *rest.Config, downstreamKubeconfigPath string, downstreamKubeClient kubernetesclient.Interface, syncerConfig *syncer.SyncerConfig, syncerID string) {
// Write the upstream logical cluster config to disk for the workspace plugin
upstreamRawConfig, err := sf.upstreamServer.RawConfig()
require.NoError(t, err)
_, kubeconfigPath := WriteLogicalClusterConfig(t, upstreamRawConfig, "base", sf.syncTargetPath)

useDeployedSyncer := len(TestConfig.PClusterKubeconfig()) > 0

syncerImage := TestConfig.SyncerImage()
if useDeployedSyncer {
require.NotZero(t, len(syncerImage), "--syncer-image must be specified if testing with a deployed syncer")
Expand Down Expand Up @@ -169,8 +197,6 @@ func (sf *syncerFixture) Start(t *testing.T) *StartedSyncerFixture {
}
syncerYAML := RunKcpCliPlugin(t, kubeconfigPath, pluginArgs)

var downstreamConfig *rest.Config
var downstreamKubeconfigPath string
if useDeployedSyncer {
// The syncer will target the pcluster identified by `--pcluster-kubeconfig`.
downstreamKubeconfigPath = TestConfig.PClusterKubeconfig()
Expand Down Expand Up @@ -200,11 +226,6 @@ func (sf *syncerFixture) Start(t *testing.T) *StartedSyncerFixture {
// Apply the yaml output from the plugin to the downstream server
KubectlApply(t, downstreamKubeconfigPath, syncerYAML)

artifactDir, _, err := ScratchDirs(t)
if err != nil {
t.Errorf("failed to create temp dir for syncer artifacts: %v", err)
}

// collect both in deployed and in-process mode
t.Cleanup(func() {
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(wait.ForeverTestTimeout))
Expand Down Expand Up @@ -266,7 +287,6 @@ func (sf *syncerFixture) Start(t *testing.T) *StartedSyncerFixture {
// Extract the configuration for an in-process syncer from the resources that were
// applied to the downstream server. This maximizes the parity between the
// configuration of a deployed and in-process syncer.
var syncerID string
for _, doc := range strings.Split(string(syncerYAML), "\n---\n") {
var manifest struct {
metav1.ObjectMeta `json:"metadata"`
Expand All @@ -280,14 +300,15 @@ func (sf *syncerFixture) Start(t *testing.T) *StartedSyncerFixture {
}
require.NotEmpty(t, syncerID, "failed to extract syncer namespace from yaml produced by plugin:\n%s", string(syncerYAML))

syncerConfig := syncerConfigFromCluster(t, downstreamConfig, syncerID, syncerID)

ctx, cancelFunc := context.WithCancel(context.Background())
t.Cleanup(cancelFunc)
syncerConfig = syncerConfigFromCluster(t, downstreamConfig, syncerID, syncerID)

downstreamKubeClient, err := kubernetesclient.NewForConfig(downstreamConfig)
downstreamKubeClient, err = kubernetesclient.NewForConfig(downstreamConfig)
require.NoError(t, err)

return
}

func (sf *syncerFixture) startSyncer(ctx context.Context, t *testing.T, useDeployedSyncer bool, downstreamKubeClient kubernetesclient.Interface, syncerConfig *syncer.SyncerConfig, syncerID, artifactDir, downstreamKubeconfigPath string) {
if useDeployedSyncer {
t.Cleanup(func() {
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(wait.ForeverTestTimeout))
Expand Down Expand Up @@ -333,7 +354,7 @@ func (sf *syncerFixture) Start(t *testing.T) *StartedSyncerFixture {
}

t.Logf("Deleting syncer resources for sync target %s|%s", syncerConfig.SyncTargetPath, syncerConfig.SyncTargetName)
err = downstreamKubeClient.CoreV1().Namespaces().Delete(ctx, syncerID, metav1.DeleteOptions{})
err := downstreamKubeClient.CoreV1().Namespaces().Delete(ctx, syncerID, metav1.DeleteOptions{})
if err != nil {
t.Errorf("failed to delete Namespace %q: %v", syncerID, err)
}
Expand Down Expand Up @@ -456,7 +477,9 @@ func (sf *syncerFixture) Start(t *testing.T) *StartedSyncerFixture {
require.NoError(t, err)
}
}
}

func (sf *syncerFixture) buildAppliedSyncerFixture(ctx context.Context, t *testing.T, downstreamConfig *rest.Config, downstreamKubeClient kubernetesclient.Interface, syncerConfig *syncer.SyncerConfig, syncerID string) *appliedSyncerFixture {
rawConfig, err := sf.upstreamServer.RawConfig()
require.NoError(t, err)

Expand Down Expand Up @@ -493,7 +516,7 @@ func (sf *syncerFixture) Start(t *testing.T) *StartedSyncerFixture {
upsyncerVWConfig = rest.AddUserAgent(rest.CopyConfig(upsyncerVWConfig), t.Name())
require.NoError(t, err)

startedSyncer := &StartedSyncerFixture{
return &appliedSyncerFixture{
SyncerConfig: syncerConfig,
SyncerID: syncerID,
SyncTargetClusterName: syncTargetClusterName,
Expand All @@ -503,17 +526,29 @@ func (sf *syncerFixture) Start(t *testing.T) *StartedSyncerFixture {
SyncerVirtualWorkspaceConfig: syncerVWConfig,
UpsyncerVirtualWorkspaceConfig: upsyncerVWConfig,
}
}

// The sync target becoming ready indicates the syncer is healthy and has
// successfully sent a heartbeat to kcp.
startedSyncer.WaitForClusterReady(ctx, t)
// Create creates the SyncTarget and applies it to the physical cluster.
// No resource will be effectively synced after calling this method.
func (sf *syncerFixture) Create(t *testing.T) *appliedSyncerFixture {
t.Helper()

return startedSyncer
artifactDir, _, err := ScratchDirs(t)
if err != nil {
t.Errorf("failed to create temp dir for syncer artifacts: %v", err)
}

downstreamConfig, _, downstreamKubeClient, syncerConfig, syncerID := sf.createAndApplySyncTarget(t, false, artifactDir)

ctx, cancelFunc := context.WithCancel(context.Background())
t.Cleanup(cancelFunc)

return sf.buildAppliedSyncerFixture(ctx, t, downstreamConfig, downstreamKubeClient, syncerConfig, syncerID)
}

// StartedSyncerFixture contains the configuration used to start a syncer and interact with its
// appliedSyncerFixture contains the configuration required to start a syncer and interact with its
// downstream cluster.
type StartedSyncerFixture struct {
type appliedSyncerFixture struct {
SyncerConfig *syncer.SyncerConfig
SyncerID string
SyncTargetClusterName logicalcluster.Name
Expand All @@ -525,9 +560,92 @@ type StartedSyncerFixture struct {

SyncerVirtualWorkspaceConfig *rest.Config
UpsyncerVirtualWorkspaceConfig *rest.Config

stopHeartBeat context.CancelFunc
}

// StartHeartBeat starts the Heartbeat keeper to maintain
// the SyncTarget to the Ready state.
// No resource will be effectively synced after calling this method.
func (sf *appliedSyncerFixture) StartHeartBeat(t *testing.T) *StartedSyncerFixture {
t.Helper()

ctx, cancelFunc := context.WithCancel(context.Background())
t.Cleanup(cancelFunc)
sf.stopHeartBeat = cancelFunc

kcpBootstrapClusterClient, err := kcpclientset.NewForConfig(sf.SyncerConfig.UpstreamConfig)
require.NoError(t, err)
kcpSyncTargetClient := kcpBootstrapClusterClient.Cluster(sf.SyncerConfig.SyncTargetPath)

// Start the heartbeat keeper to have the SyncTarget always ready during the e2e test.
syncer.StartHeartbeatKeeper(ctx, kcpSyncTargetClient, sf.SyncerConfig.SyncTargetName, sf.SyncerConfig.SyncTargetUID)

startedSyncer := &StartedSyncerFixture{
sf,
}

// The sync target becoming ready indicates the syncer is healthy and has
// successfully sent a heartbeat to kcp.
startedSyncer.WaitForClusterReady(ctx, t)

return startedSyncer
}

// StartAPIImporter starts the APIImporter the same way as the Syncer would have done if started.
// This will allow KCP to do the API compatibilitiy checks and update the SyncTarget accordingly.
// The real syncer is not started, and resource will be effectively synced after calling this method.
func (sf *appliedSyncerFixture) StartAPIImporter(t *testing.T) *appliedSyncerFixture {
t.Helper()

ctx, cancelFunc := context.WithCancel(context.Background())
t.Cleanup(cancelFunc)

kcpBootstrapClusterClient, err := kcpclientset.NewForConfig(sf.SyncerConfig.UpstreamConfig)
require.NoError(t, err)
kcpSyncTargetClient := kcpBootstrapClusterClient.Cluster(sf.SyncerConfig.SyncTargetPath)

// Import the resource schemas of the resources to sync from the physical cludster, to enable compatibility check in KCP.
resources := sf.SyncerConfig.ResourcesToSync.List()
kcpSyncTargetInformerFactory := kcpinformers.NewSharedScopedInformerFactoryWithOptions(kcpSyncTargetClient, 10*time.Hour, kcpinformers.WithTweakListOptions(
func(listOptions *metav1.ListOptions) {
listOptions.FieldSelector = fields.OneTermEqualSelector("metadata.name", sf.SyncerConfig.SyncTargetName).String()
},
))
kcpImporterInformerFactory := kcpinformers.NewSharedScopedInformerFactoryWithOptions(kcpSyncTargetClient, 10*time.Hour)
apiImporter, err := syncer.NewAPIImporter(
sf.SyncerConfig.UpstreamConfig, sf.SyncerConfig.DownstreamConfig,
kcpSyncTargetInformerFactory.Workload().V1alpha1().SyncTargets(),
kcpImporterInformerFactory.Apiresource().V1alpha1().APIResourceImports(),
resources,
sf.SyncerConfig.SyncTargetPath, sf.SyncerConfig.SyncTargetName, types.UID(sf.SyncerConfig.SyncTargetUID))
require.NoError(t, err)

kcpImporterInformerFactory.Start(ctx.Done())
kcpSyncTargetInformerFactory.Start(ctx.Done())
kcpSyncTargetInformerFactory.WaitForCacheSync(ctx.Done())

go apiImporter.Start(klog.NewContext(ctx, klog.FromContext(ctx).WithValues("resources", resources)), 5*time.Second)

return sf
}

// StartedSyncerFixture contains the configuration used to start a syncer and interact with its
// downstream cluster.
type StartedSyncerFixture struct {
*appliedSyncerFixture
}

// StopHeartBeat stop maitining the heartbeat for this Syncer SyncTarget.
func (sf *StartedSyncerFixture) StopHeartBeat(t *testing.T) {
t.Helper()

sf.stopHeartBeat()
}

// WaitForClusterReady waits for the cluster to be ready with the given reason.
// WaitForClusterReady waits for the SyncTarget to be ready.
// The SyncTarget becoming ready indicates that the syncer on the related
// physical cluster is healthy and has successfully sent a heartbeat to kcp.
func (sf *StartedSyncerFixture) WaitForClusterReady(ctx context.Context, t *testing.T) {
t.Helper()

Expand Down
2 changes: 1 addition & 1 deletion test/e2e/reconciler/cluster/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func TestClusterController(t *testing.T) {
require.NoError(t, err)
t.Log("Installing test CRDs into sink cluster...")
fixturewildwest.FakePClusterCreate(t, sinkCrdClient.ApiextensionsV1().CustomResourceDefinitions(), metav1.GroupResource{Group: wildwest.GroupName, Resource: "cowboys"})
})).Start(t)
})).CreateAndStart(t)

t.Logf("Bind second user workspace to location workspace")
framework.NewBindCompute(t, wsPath, source).Bind(t)
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/reconciler/deployment/deployment_coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,15 @@ func TestDeploymentCoordinator(t *testing.T) {
eastSyncer := framework.NewSyncerFixture(t, upstreamServer, locationWorkspacePath,
framework.WithSyncTargetName("east"),
framework.WithSyncedUserWorkspaces(workloadWorkspace1, workloadWorkspace2),
).Start(t)
).CreateAndStart(t)

_, err = kcpClusterClient.Cluster(locationWorkspacePath).WorkloadV1alpha1().SyncTargets().Patch(ctx, "east", types.JSONPatchType, []byte(`[{"op":"add","path":"/metadata/labels/region","value":"east"}]`), metav1.PatchOptions{})
require.NoError(t, err)

westSyncer := framework.NewSyncerFixture(t, upstreamServer, locationWorkspacePath,
framework.WithSyncTargetName("west"),
framework.WithSyncedUserWorkspaces(workloadWorkspace1, workloadWorkspace2),
).Start(t)
).CreateAndStart(t)

_, err = kcpClusterClient.Cluster(locationWorkspacePath).WorkloadV1alpha1().SyncTargets().Patch(ctx, "west", types.JSONPatchType, []byte(`[{"op":"add","path":"/metadata/labels/region","value":"west"}]`), metav1.PatchOptions{})
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestSyncTargetLocalExport(t *testing.T) {
framework.WithExtraResources("services"),
framework.WithSyncTargetName(syncTargetName),
framework.WithSyncedUserWorkspaces(computeWorkspace),
).Start(t)
).CreateAndStart(t)

framework.Eventually(t, func() (bool, string) {
syncTarget, err := kcpClients.Cluster(computePath).WorkloadV1alpha1().SyncTargets().Get(ctx, syncTargetName, metav1.GetOptions{})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestMultipleExports(t *testing.T) {
)
require.NoError(t, err)
}),
).Start(t)
).CreateAndStart(t)

t.Logf("syncTarget should have one resource to sync")
require.Eventually(t, func() bool {
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/reconciler/locationworkspace/rootcompute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestRootComputeWorkspace(t *testing.T) {
)
require.NoError(t, err)
}),
).Start(t)
).CreateAndStart(t)

require.Eventually(t, func() bool {
syncTarget, err := kcpClients.Cluster(computePath).WorkloadV1alpha1().SyncTargets().Get(ctx, syncTargetName, metav1.GetOptions{})
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/reconciler/locationworkspace/synctarget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestSyncTargetExport(t *testing.T) {
syncTarget := framework.NewSyncerFixture(t, source, computePath,
framework.WithAPIExports(fmt.Sprintf("%s:%s", schemaPath.String(), cowboysAPIExport.Name)),
framework.WithSyncTargetName(syncTargetName),
).Start(t)
).CreateAndStart(t)

require.Eventually(t, func() bool {
syncTarget, err := kcpClients.Cluster(computePath).WorkloadV1alpha1().SyncTargets().Get(ctx, syncTargetName, metav1.GetOptions{})
Expand Down

0 comments on commit a1476bf

Please sign in to comment.