Skip to content

Commit

Permalink
e2e/virtual/initializingworkspaces: adapt TestInitializingWorkspacesV…
Browse files Browse the repository at this point in the history
…irtualWorkspaceAccess to multi shard
  • Loading branch information
p0lyn0mial authored and sttts committed Jan 27, 2023
1 parent f2b7e8f commit 5aa4278
Showing 1 changed file with 96 additions and 90 deletions.
186 changes: 96 additions & 90 deletions test/e2e/virtual/initializingworkspaces/virtualworkspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"math/rand"
"sort"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -181,6 +182,7 @@ func TestInitializingWorkspacesVirtualWorkspaceAccess(t *testing.T) {

t.Log("Create workspaces using the new types, which will get stuck in initializing")
wsNames := make([]string, 0, 3)

for _, workspaceType := range []string{
"alpha",
"beta",
Expand Down Expand Up @@ -211,7 +213,7 @@ func TestInitializingWorkspacesVirtualWorkspaceAccess(t *testing.T) {
t.Logf("got %d workspaces, expected 3", len(workspaces.Items))
return false
}
return workspacesStuckInInitializing(t, workspaces.Items...)
return workspacesStuckInInitializing(t, sourceKcpClusterClient, workspaces.Items...)
}, wait.ForeverTestTimeout, 100*time.Millisecond)
workspacesByType := map[string]tenancyv1alpha1.Workspace{}
for i := range workspaces.Items {
Expand Down Expand Up @@ -246,8 +248,25 @@ func TestInitializingWorkspacesVirtualWorkspaceAccess(t *testing.T) {
"beta",
"gamma",
} {
wt, hasWt := workspacetypes[initializer]
require.True(t, hasWt, "didn't find a WorkspaceType for %v initializer", initializer)

initialWs, hasInitialWs := workspacesByType[wt.Name]
require.True(t, hasInitialWs, "didn't find a Workspace for %v initializer with type %v", initializer, wt.Name)

ws, err := sourceKcpClusterClient.TenancyV1alpha1().Cluster(wsPath).Workspaces().Get(ctx, initialWs.Name, metav1.GetOptions{})
require.NoError(t, err)
vwURLs := []string{}
for _, vwURL := range workspacetypes[initializer].Status.VirtualWorkspaces {
vwURLs = append(vwURLs, vwURL.URL)
}

targetVwURL, foundTargetVwURL, err := framework.VirtualWorkspaceURL(ctx, sourceKcpClusterClient, ws, vwURLs)
require.NoError(t, err)
require.True(t, foundTargetVwURL, "didn't find a VirtualWorkspace URL for %v initializer and %v workspace", initializer, ws.Name)

virtualWorkspaceConfig := rest.AddUserAgent(rest.CopyConfig(sourceConfig), t.Name()+"-virtual")
virtualWorkspaceConfig.Host = workspacetypes[initializer].Status.VirtualWorkspaces[0].URL
virtualWorkspaceConfig.Host = targetVwURL
virtualKcpClusterClient, err := kcpclientset.NewForConfig(framework.StaticTokenUserConfig("user-1", virtualWorkspaceConfig))
require.NoError(t, err)
virtualKubeClusterClient, err := kcpkubernetesclientset.NewForConfig(framework.StaticTokenUserConfig("user-1", virtualWorkspaceConfig))
Expand Down Expand Up @@ -341,22 +360,29 @@ func TestInitializingWorkspacesVirtualWorkspaceAccess(t *testing.T) {
}, wait.ForeverTestTimeout, 100*time.Millisecond)
}

for initializer, expected := range map[string][]tenancyv1alpha1.Workspace{
"alpha": {workspacesByType[workspacetypeNames["alpha"]], workspacesByType[workspacetypeNames["gamma"]]},
"beta": {workspacesByType[workspacetypeNames["beta"]], workspacesByType[workspacetypeNames["gamma"]]},
"gamma": {workspacesByType[workspacetypeNames["gamma"]]},
for initializers, expected := range map[string][]tenancyv1alpha1.Workspace{
"alpha,gamma": {workspacesByType[workspacetypeNames["alpha"]], workspacesByType[workspacetypeNames["gamma"]]},
"beta,gamma": {workspacesByType[workspacetypeNames["beta"]], workspacesByType[workspacetypeNames["gamma"]]},
"gamma": {workspacesByType[workspacetypeNames["gamma"]]},
} {
sort.Slice(expected, func(i, j int) bool {
return expected[i].UID < expected[j].UID
})
var actual *corev1alpha1.LogicalClusterList
require.Eventually(t, func() bool {
actual, err = user1VwKcpClusterClients[initializer].CoreV1alpha1().LogicalClusters().List(ctx, metav1.ListOptions{}) // no list options, all filtering is implicit
if err != nil && !errors.IsForbidden(err) {
require.NoError(t, err)
}
return err == nil
}, wait.ForeverTestTimeout, 100*time.Millisecond)
actual := &corev1alpha1.LogicalClusterList{}
for _, initializer := range strings.Split(initializers, ",") {
require.Eventually(t, func() bool {
clusters, err := user1VwKcpClusterClients[initializer].CoreV1alpha1().LogicalClusters().List(ctx, metav1.ListOptions{}) // no list options, all filtering is implicit
if err != nil {
if !errors.IsForbidden(err) {
require.NoError(t, err)
}
return false // wait until cr, crb are replicated
}
actual.Items = append(actual.Items, clusters.Items...)
return true
}, wait.ForeverTestTimeout, 100*time.Millisecond)
}

lclusters, expectedClusters := sets.NewString(), sets.NewString()
for i := range actual.Items {
lclusters.Insert(logicalcluster.From(&actual.Items[i]).String())
Expand All @@ -367,44 +393,48 @@ func TestInitializingWorkspacesVirtualWorkspaceAccess(t *testing.T) {
sort.Slice(actual.Items, func(i, j int) bool {
return actual.Items[i].UID < actual.Items[j].UID
})
require.Equal(t, expectedClusters.List(), lclusters.List(), "unexpected clusters for initializer %q", initializer)
require.Equal(t, expectedClusters.List(), lclusters.List(), "unexpected clusters for initializers %q", initializers)
}

t.Log("Start WATCH streams to confirm behavior on changes")
watchers := map[string]watch.Interface{}
for _, initializer := range []string{
"alpha",
"beta",
"gamma",
} {
watcher, err := user1VwKcpClusterClients[initializer].CoreV1alpha1().LogicalClusters().Watch(ctx, metav1.ListOptions{
ResourceVersion: workspaces.ResourceVersion,
})
watcher, err := user1VwKcpClusterClients[initializer].CoreV1alpha1().LogicalClusters().Watch(ctx, metav1.ListOptions{})
require.NoError(t, err)
watchers[initializer] = watcher
}

t.Log("Adding a new workspace that the watchers should see")
ws, err := sourceKcpClusterClient.TenancyV1alpha1().Cluster(wsPath).Workspaces().Create(ctx, workspaceForType(workspacetypes["gamma"], testLabelSelector), metav1.CreateOptions{})
require.NoError(t, err)
source.Artifact(t, func() (runtime.Object, error) {
return sourceKcpClusterClient.TenancyV1alpha1().Cluster(wsPath).Workspaces().Get(ctx, ws.Name, metav1.GetOptions{})
})
require.Eventually(t, func() bool {
workspace, err := sourceKcpClusterClient.TenancyV1alpha1().Cluster(wsPath).Workspaces().Get(ctx, ws.Name, metav1.GetOptions{})
if err != nil {
t.Logf("error listing workspaces: %v", err)
return false
}
return workspacesStuckInInitializing(t, *workspace)
}, wait.ForeverTestTimeout, 100*time.Millisecond)
t.Logf("Adding a new workspace that the watcher for %s initializer should see", initializer)
wt, ok := workspacetypes[initializer]
require.True(t, ok, "didn't find WorkspaceType for %s initializer", initializer)
initializerWs, ok := workspacesByType[wt.Name]
require.True(t, ok, "didn't find Workspace for %v type", wt.Name)
initializerWsShard := framework.WorkspaceShardOrDie(t, sourceKcpClusterClient, &initializerWs)

ws, err := sourceKcpClusterClient.TenancyV1alpha1().Cluster(wsPath).Workspaces().Create(ctx, func() *tenancyv1alpha1.Workspace {
w := workspaceForType(workspacetypes["gamma"], testLabelSelector)
framework.WithShard(initializerWsShard.Name)(w)
return w
}(), metav1.CreateOptions{})
require.NoError(t, err)
source.Artifact(t, func() (runtime.Object, error) {
return sourceKcpClusterClient.TenancyV1alpha1().Cluster(wsPath).Workspaces().Get(ctx, ws.Name, metav1.GetOptions{})
})
require.Eventually(t, func() bool {
workspace, err := sourceKcpClusterClient.TenancyV1alpha1().Cluster(wsPath).Workspaces().Get(ctx, ws.Name, metav1.GetOptions{})
if err != nil {
t.Logf("error listing workspaces: %v", err)
return false
}
return workspacesStuckInInitializing(t, sourceKcpClusterClient, *workspace)
}, wait.ForeverTestTimeout, 100*time.Millisecond)

ws, err = sourceKcpClusterClient.TenancyV1alpha1().Cluster(wsPath).Workspaces().Get(ctx, ws.Name, metav1.GetOptions{})
require.NoError(t, err)
wsClusterName := logicalcluster.Name(ws.Spec.Cluster)
ws, err = sourceKcpClusterClient.TenancyV1alpha1().Cluster(wsPath).Workspaces().Get(ctx, ws.Name, metav1.GetOptions{})
require.NoError(t, err)
wsClusterName := logicalcluster.Name(ws.Spec.Cluster)

t.Logf("Waiting for watchers to see the logicalcluster in %s for workspace %s", ws.Spec.Cluster, ws.Name)
for initializer, watcher := range watchers {
t.Logf("Waiting for a watcher for %s initializer to see the logicalcluster in %s for workspace %s", initializer, ws.Spec.Cluster, ws.Name)
for {
select {
case evt := <-watcher.ResultChan():
Expand All @@ -419,18 +449,12 @@ func TestInitializingWorkspacesVirtualWorkspaceAccess(t *testing.T) {
}
break
}
}

t.Log("Access an object inside of the workspace")
for _, initializer := range []string{
"alpha",
"beta",
"gamma",
} {
t.Log("Access an object inside of the workspace")
coreClusterClient := user1VwKubeClusterClients[initializer]

nsName := "testing"
_, err := coreClusterClient.Cluster(wsClusterName.Path()).CoreV1().Namespaces().Create(ctx, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: nsName}}, metav1.CreateOptions{})
_, err = coreClusterClient.Cluster(wsClusterName.Path()).CoreV1().Namespaces().Create(ctx, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: nsName}}, metav1.CreateOptions{})
if err != nil && !errors.IsAlreadyExists(err) {
require.NoError(t, err)
}
Expand Down Expand Up @@ -468,33 +492,27 @@ func TestInitializingWorkspacesVirtualWorkspaceAccess(t *testing.T) {
configMaps, err = coreClusterClient.Cluster(wsClusterName.Path()).CoreV1().ConfigMaps(nsName).List(ctx, metav1.ListOptions{LabelSelector: labels.SelectorFromSet(labelSelector).String()})
require.NoError(t, err)
require.Empty(t, cmp.Diff(configMaps.Items, []corev1.ConfigMap{}))
}

patchBytesFor := func(ws *corev1alpha1.LogicalCluster, mutator func(*corev1alpha1.LogicalCluster)) []byte {
previous := ws.DeepCopy()
oldData, err := json.Marshal(corev1alpha1.LogicalCluster{
Status: previous.Status,
})
require.NoError(t, err)
patchBytesFor := func(ws *corev1alpha1.LogicalCluster, mutator func(*corev1alpha1.LogicalCluster)) []byte {
previous := ws.DeepCopy()
oldData, err := json.Marshal(corev1alpha1.LogicalCluster{
Status: previous.Status,
})
require.NoError(t, err)

obj := ws.DeepCopy()
mutator(obj)
newData, err := json.Marshal(corev1alpha1.LogicalCluster{
Status: obj.Status,
})
require.NoError(t, err)
obj := ws.DeepCopy()
mutator(obj)
newData, err := json.Marshal(corev1alpha1.LogicalCluster{
Status: obj.Status,
})
require.NoError(t, err)

patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
require.NoError(t, err)
return patchBytes
}
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
require.NoError(t, err)
return patchBytes
}

t.Log("Transitioning the new workspace out of initializing")
for _, initializer := range []string{
"alpha",
"beta",
"gamma",
} {
t.Log("Transitioning the new workspace out of initializing")
clusterClient := user1VwKcpClusterClients[initializer].CoreV1alpha1().LogicalClusters()
logicalCluster, err := clusterClient.Cluster(wsClusterName.Path()).Get(ctx, corev1alpha1.LogicalClusterName, metav1.GetOptions{})
require.NoError(t, err)
Expand All @@ -514,9 +532,8 @@ func TestInitializingWorkspacesVirtualWorkspaceAccess(t *testing.T) {
})
_, err = clusterClient.Cluster(wsClusterName.Path()).Patch(ctx, corev1alpha1.LogicalClusterName, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
require.NoError(t, err)
}

for initializer, watcher := range watchers {
t.Logf("Waiting for a watcher for %s initializer to see an update to the logicalcluster in %s for workspace %s", initializer, ws.Spec.Cluster, ws.Name)
for {
select {
case evt := <-watcher.ResultChan():
Expand All @@ -534,29 +551,17 @@ func TestInitializingWorkspacesVirtualWorkspaceAccess(t *testing.T) {
}
break
}
}

t.Log("Ensure accessing objects in the workspace is forbidden now that it is not initializing")
for _, initializer := range []string{
"alpha",
"beta",
"gamma",
} {
t.Log("Ensure accessing objects in the workspace is forbidden now that it is not initializing")
kubeClusterClient := user1VwKubeClusterClients[initializer].Cluster(wsClusterName.Path()).CoreV1().ConfigMaps("testing")
_, err := kubeClusterClient.List(ctx, metav1.ListOptions{})
_, err = kubeClusterClient.List(ctx, metav1.ListOptions{})
if !errors.IsForbidden(err) {
t.Fatalf("got %#v error from initial list, expected unauthorized", err)
}
}

t.Log("Ensure get workspace requests are 404 now that it is not initializing")
for _, initializer := range []string{
"alpha",
"beta",
"gamma",
} {
t.Log("Ensure get workspace requests are 404 now that it is not initializing")
wsClient := user1VwKcpClusterClients[initializer].CoreV1alpha1().LogicalClusters()
_, err := wsClient.Cluster(wsClusterName.Path()).Get(ctx, corev1alpha1.LogicalClusterName, metav1.GetOptions{})
_, err = wsClient.Cluster(wsClusterName.Path()).Get(ctx, corev1alpha1.LogicalClusterName, metav1.GetOptions{})
if !errors.IsNotFound(err) {
t.Fatalf("got error from get, expected not found: %v", err)
}
Expand All @@ -578,7 +583,7 @@ func workspaceForType(workspaceType *tenancyv1alpha1.WorkspaceType, testLabelSel
}
}

func workspacesStuckInInitializing(t *testing.T, workspaces ...tenancyv1alpha1.Workspace) bool {
func workspacesStuckInInitializing(t *testing.T, kcpClient kcpclientset.ClusterInterface, workspaces ...tenancyv1alpha1.Workspace) bool {
t.Helper()

for _, workspace := range workspaces {
Expand All @@ -590,6 +595,7 @@ func workspacesStuckInInitializing(t *testing.T, workspaces ...tenancyv1alpha1.W
t.Logf("workspace %s has no initializers", workspace.Name)
return false
}
t.Logf("Workspace %s (accessible via /clusters/%s) on %s shard is stuck in initializing", workspace.Name, workspace.Spec.Cluster, framework.WorkspaceShardOrDie(t, kcpClient, &workspace).Name)
}
return true
}

0 comments on commit 5aa4278

Please sign in to comment.