Skip to content

Commit

Permalink
Fixes after PR review comments
Browse files Browse the repository at this point in the history
Signed-off-by: David Festal <dfestal@redhat.com>
  • Loading branch information
davidfestal committed Feb 7, 2023
1 parent a1476bf commit 6dae242
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 21 deletions.
4 changes: 2 additions & 2 deletions pkg/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,12 +402,12 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i
go startSyncerTunnel(ctx, upstreamConfig, downstreamConfig, logicalcluster.From(syncTarget), cfg.SyncTargetName)
}

StartHeartbeatKeeper(ctx, kcpSyncTargetClient, cfg.SyncTargetName, cfg.SyncTargetUID)
StartHeartbeat(ctx, kcpSyncTargetClient, cfg.SyncTargetName, cfg.SyncTargetUID)

return nil
}

func StartHeartbeatKeeper(ctx context.Context, kcpSyncTargetClient kcpclientset.Interface, syncTargetName, syncTargetUID string) {
func StartHeartbeat(ctx context.Context, kcpSyncTargetClient kcpclientset.Interface, syncTargetName, syncTargetUID string) {
logger := klog.FromContext(ctx)

// Attempt to heartbeat every interval
Expand Down
24 changes: 13 additions & 11 deletions test/e2e/framework/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ func WithDownstreamPreparation(prepare func(config *rest.Config, isFakePCluster
}
}

// CreateAndStart creates SyncTarget resource, applies it in the physical cluster,
// CreateAndStart creates a SyncTarget resource through the `workload sync` CLI command,
// applies the syncer-related resources in the physical cluster,
// and then starts a new syncer against the given upstream kcp workspace.
// Whether the syncer runs in-process or deployed on a pcluster will depend
// whether --pcluster-kubeconfig and --syncer-image are supplied to the test invocation.
Expand All @@ -144,7 +145,7 @@ func (sf *syncerFixture) CreateAndStart(t *testing.T) *StartedSyncerFixture {
t.Errorf("failed to create temp dir for syncer artifacts: %v", err)
}

downstreamConfig, downstreamKubeconfigPath, downstreamKubeClient, syncerConfig, syncerID := sf.createAndApplySyncTarget(t, useDeployedSyncer, artifactDir)
downstreamConfig, downstreamKubeconfigPath, downstreamKubeClient, syncerConfig, syncerID := sf.createSyncTargetAndApplySyncerResources(t, useDeployedSyncer, artifactDir)

ctx, cancelFunc := context.WithCancel(context.Background())
t.Cleanup(cancelFunc)
Expand All @@ -157,12 +158,12 @@ func (sf *syncerFixture) CreateAndStart(t *testing.T) *StartedSyncerFixture {

// The sync target becoming ready indicates the syncer is healthy and has
// successfully sent a heartbeat to kcp.
startedSyncer.WaitForClusterReady(ctx, t)
startedSyncer.WaitForSyncTargetReady(ctx, t)

return startedSyncer
}

func (sf *syncerFixture) createAndApplySyncTarget(t *testing.T, useDeployedSyncer bool, artifactDir string) (downstreamConfig *rest.Config, downstreamKubeconfigPath string, downstreamKubeClient kubernetesclient.Interface, syncerConfig *syncer.SyncerConfig, syncerID string) {
func (sf *syncerFixture) createSyncTargetAndApplySyncerResources(t *testing.T, useDeployedSyncer bool, artifactDir string) (downstreamConfig *rest.Config, downstreamKubeconfigPath string, downstreamKubeClient kubernetesclient.Interface, syncerConfig *syncer.SyncerConfig, syncerID string) {
// Write the upstream logical cluster config to disk for the workspace plugin
upstreamRawConfig, err := sf.upstreamServer.RawConfig()
require.NoError(t, err)
Expand Down Expand Up @@ -528,7 +529,8 @@ func (sf *syncerFixture) buildAppliedSyncerFixture(ctx context.Context, t *testi
}
}

// Create creates the SyncTarget and applies it to the physical cluster.
// CreateAndStart creates a SyncTarget resource through the `workload sync` CLI command,
// applies the syncer-related resources in the physical cluster.
// No resource will be effectively synced after calling this method.
func (sf *syncerFixture) Create(t *testing.T) *appliedSyncerFixture {
t.Helper()
Expand All @@ -538,7 +540,7 @@ func (sf *syncerFixture) Create(t *testing.T) *appliedSyncerFixture {
t.Errorf("failed to create temp dir for syncer artifacts: %v", err)
}

downstreamConfig, _, downstreamKubeClient, syncerConfig, syncerID := sf.createAndApplySyncTarget(t, false, artifactDir)
downstreamConfig, _, downstreamKubeClient, syncerConfig, syncerID := sf.createSyncTargetAndApplySyncerResources(t, false, artifactDir)

ctx, cancelFunc := context.WithCancel(context.Background())
t.Cleanup(cancelFunc)
Expand Down Expand Up @@ -579,15 +581,15 @@ func (sf *appliedSyncerFixture) StartHeartBeat(t *testing.T) *StartedSyncerFixtu
kcpSyncTargetClient := kcpBootstrapClusterClient.Cluster(sf.SyncerConfig.SyncTargetPath)

// Start the heartbeat keeper to have the SyncTarget always ready during the e2e test.
syncer.StartHeartbeatKeeper(ctx, kcpSyncTargetClient, sf.SyncerConfig.SyncTargetName, sf.SyncerConfig.SyncTargetUID)
syncer.StartHeartbeat(ctx, kcpSyncTargetClient, sf.SyncerConfig.SyncTargetName, sf.SyncerConfig.SyncTargetUID)

startedSyncer := &StartedSyncerFixture{
sf,
}

// The sync target becoming ready indicates the syncer is healthy and has
// successfully sent a heartbeat to kcp.
startedSyncer.WaitForClusterReady(ctx, t)
startedSyncer.WaitForSyncTargetReady(ctx, t)

return startedSyncer
}
Expand Down Expand Up @@ -636,17 +638,17 @@ type StartedSyncerFixture struct {
*appliedSyncerFixture
}

// StopHeartBeat stop maitining the heartbeat for this Syncer SyncTarget.
// StopHeartBeat stop maintaining the heartbeat for this Syncer SyncTarget.
func (sf *StartedSyncerFixture) StopHeartBeat(t *testing.T) {
t.Helper()

sf.stopHeartBeat()
}

// WaitForClusterReady waits for the SyncTarget to be ready.
// WaitForSyncTargetReady waits for the SyncTarget to be ready.
// The SyncTarget becoming ready indicates that the syncer on the related
// physical cluster is healthy and has successfully sent a heartbeat to kcp.
func (sf *StartedSyncerFixture) WaitForClusterReady(ctx context.Context, t *testing.T) {
func (sf *StartedSyncerFixture) WaitForSyncTargetReady(ctx context.Context, t *testing.T) {
t.Helper()

cfg := sf.SyncerConfig
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/reconciler/deployment/deployment_coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ func TestDeploymentCoordinator(t *testing.T) {
_, err = kcpClusterClient.Cluster(locationWorkspacePath).WorkloadV1alpha1().SyncTargets().Patch(ctx, "west", types.JSONPatchType, []byte(`[{"op":"add","path":"/metadata/labels/region","value":"west"}]`), metav1.PatchOptions{})
require.NoError(t, err)

eastSyncer.WaitForClusterReady(ctx, t)
westSyncer.WaitForClusterReady(ctx, t)
eastSyncer.WaitForSyncTargetReady(ctx, t)
westSyncer.WaitForSyncTargetReady(ctx, t)

t.Logf("Create 2 locations, one for each SyncTargets")
err = framework.CreateResources(ctx, locations.FS, upstreamConfig, locationWorkspacePath)
Expand Down
5 changes: 3 additions & 2 deletions test/e2e/reconciler/namespace/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ func TestNamespaceScheduler(t *testing.T) {
return &workloadnamespace.NamespaceConditionsAdapter{Namespace: ns}, err
}, framework.IsNot(workloadnamespace.NamespaceScheduled).WithReason(workloadnamespace.NamespaceReasonUnschedulable))

t.Log("Deploy a syncer")
// Create and Start a syncer against a workload cluster so that there's a ready cluster to schedule to.
t.Log("Create the SyncTarget and start both the Syncer APIImporter and Syncer HeartBeat")
// Create the SyncTarget and start both the Syncer APIImporter and Syncer HeartBeat against a workload cluster
// so that there's a ready cluster to schedule to.
syncerFixture := framework.NewSyncerFixture(t, server, server.path,
framework.WithExtraResources("services"),
).Create(t).StartAPIImporter(t).StartHeartBeat(t)
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/reconciler/scheduling/api_compatibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ func TestSchedulingOnSupportedAPI(t *testing.T) {
require.NoError(t, err)

firstSyncTargetName := fmt.Sprintf("firstsynctarget-%d", +rand.Intn(1000000))
t.Logf("Creating a SyncTarget with no supported APIExports and syncer in %s", locationPath)
t.Logf("Creating a SyncTarget with no supported APIExports in %s, and start both the Syncer APIImporter and Syncer HeartBeat", locationPath)
_ = framework.NewSyncerFixture(t, source, locationPath,
framework.WithSyncTargetName(firstSyncTargetName),
framework.WithSyncedUserWorkspaces(userWS),
framework.WithAPIExports(""),
).Create(t).StartAPIImporter(t).StartHeartBeat(t)

secondSyncTargetName := fmt.Sprintf("secondsynctarget-%d", +rand.Intn(1000000))
t.Logf("Creating a SyncTarget with global kubernetes APIExports and syncer in %s", locationPath)
t.Logf("Creating a SyncTarget with global kubernetes APIExports in %s,and start both the Syncer APIImporter and Syncer HeartBeat", locationPath)
_ = framework.NewSyncerFixture(t, source, locationPath,
framework.WithSyncTargetName(secondSyncTargetName),
framework.WithSyncedUserWorkspaces(userWS),
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/syncer/dns/dns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestDNSResolution(t *testing.T) {
syncer := framework.NewSyncerFixture(t, upstreamServer, locationWorkspacePath,
framework.WithSyncedUserWorkspaces(workloadWorkspace1, workloadWorkspace2),
).CreateAndStart(t)
syncer.WaitForClusterReady(ctx, t)
syncer.WaitForSyncTargetReady(ctx, t)

downstreamKubeClient, err := kubernetes.NewForConfig(syncer.DownstreamConfig)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/syncer/tunnels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestSyncerTunnel(t *testing.T) {
framework.WithSyncedUserWorkspaces(userWs),
).CreateAndStart(t)

syncerFixture.WaitForClusterReady(ctx, t)
syncerFixture.WaitForSyncTargetReady(ctx, t)

t.Log("Binding the consumer workspace to the location workspace")
framework.NewBindCompute(t, userWsName.Path(), upstreamServer,
Expand Down

0 comments on commit 6dae242

Please sign in to comment.