Skip to content

Commit

Permalink
add e2e test
Browse files Browse the repository at this point in the history
  • Loading branch information
lionelvillard committed Feb 8, 2023
1 parent 5d7672c commit 4f500f8
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 14 deletions.
6 changes: 6 additions & 0 deletions pkg/cliplugins/workload/plugin/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ rules:
- "list"
- "watch"
- "delete"
- apiGroups:
- ""
resources:
- endpoints
verbs:
- "get"
- apiGroups:
- "apiextensions.k8s.io"
resources:
Expand Down
6 changes: 6 additions & 0 deletions pkg/cliplugins/workload/plugin/syncer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ rules:
- "list"
- "watch"
- "delete"
- apiGroups:
- ""
resources:
- endpoints
verbs:
- "get"
- apiGroups:
- "apiextensions.k8s.io"
resources:
Expand Down
12 changes: 11 additions & 1 deletion pkg/syncer/spec/dns/dns_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package dns

import (
"context"
"errors"
"sync"

"github.com/kcp-dev/logicalcluster/v3"
Expand Down Expand Up @@ -247,9 +248,18 @@ func (d *DNSProcessor) processService(ctx context.Context, name string) error {
func (d *DNSProcessor) processNetworkPolicy(ctx context.Context, name string) error {
logger := klog.FromContext(ctx)

var kubeEndpoints *corev1.Endpoints
_, err := d.networkPolicyLister.NetworkPolicies(d.dnsNamespace).Get(name)
if apierrors.IsNotFound(err) {
expected := MakeNetworkPolicy(name, d.dnsNamespace, d.syncTargetKey)
kubeEndpoints, err = d.downstreamKubeClient.CoreV1().Endpoints("default").Get(ctx, "kubernetes", metav1.GetOptions{})
if err != nil {
return err
}
if len(kubeEndpoints.Subsets) == 0 || len(kubeEndpoints.Subsets[0].Addresses) == 0 {
return errors.New("missing kubernetes API endpoints")
}

expected := MakeNetworkPolicy(name, d.dnsNamespace, d.syncTargetKey, &kubeEndpoints.Subsets[0])
_, err = d.downstreamKubeClient.NetworkingV1().NetworkPolicies(d.dnsNamespace).Create(ctx, expected, metav1.CreateOptions{})
if err == nil {
logger.Info("NetworkPolicy created")
Expand Down
30 changes: 19 additions & 11 deletions pkg/syncer/spec/dns/dns_process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ var (
roleBindingGVR = schema.GroupVersionResource{Group: "rbac.authorization.k8s.io", Version: "v1", Resource: "rolebindings"}
serviceGVR = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"}
deploymentGVR = schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}
endpointGVR = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "endpoints"}
networkPolicyGVR = schema.GroupVersionResource{Group: "networking.k8s.io", Version: "v1", Resource: "networkpolicies"}
)

Expand Down Expand Up @@ -96,7 +97,7 @@ func TestDNSProcess(t *testing.T) {
MakeService(dnsID, dnsns),
MakeDeployment(dnsID, dnsns, "dnsimage"),
endpoints(dnsID, dnsns, "8.8.8.8"),
MakeNetworkPolicy(dnsID, dnsns, syncTargetKey),
MakeNetworkPolicy(dnsID, dnsns, syncTargetKey, &corev1.EndpointSubset{}),
},
expectReady: true,
expectActions: []clienttesting.Action{},
Expand All @@ -111,7 +112,7 @@ func TestDNSProcess(t *testing.T) {
MakeService(dnsID, dnsns),
MakeDeployment(dnsID, dnsns, "dnsimage"),
endpoints(dnsID, dnsns, "8.8.8.8"),
MakeNetworkPolicy(dnsID, dnsns, syncTargetKey),
MakeNetworkPolicy(dnsID, dnsns, syncTargetKey, &corev1.EndpointSubset{}),
},
expectReady: false,
expectActions: []clienttesting.Action{
Expand All @@ -121,15 +122,20 @@ func TestDNSProcess(t *testing.T) {
dnsImage: "newdnsimage",
},
"endpoint does not exist, no DNS objects": {
resources: []runtime.Object{},
resources: []runtime.Object{
endpoints("kubernetes", "default", "10.0.0.0"),
},
expectReady: false,
expectActions: []clienttesting.Action{
clienttesting.NewCreateAction(serviceAccountGVR, dnsns, MakeServiceAccount(dnsID, dnsns)),
clienttesting.NewCreateAction(roleGVR, dnsns, MakeRole(dnsID, dnsns)),
clienttesting.NewCreateAction(roleBindingGVR, dnsns, MakeRoleBinding(dnsID, dnsns)),
clienttesting.NewCreateAction(deploymentGVR, dnsns, MakeDeployment(dnsID, dnsns, "dnsimage")),
clienttesting.NewCreateAction(serviceGVR, dnsns, MakeService(dnsID, dnsns)),
clienttesting.NewCreateAction(networkPolicyGVR, dnsns, MakeNetworkPolicy(dnsID, dnsns, syncTargetKey)),
clienttesting.NewGetAction(endpointGVR, "default", "kubernetes"),
clienttesting.NewCreateAction(networkPolicyGVR, dnsns, MakeNetworkPolicy(dnsID, dnsns, syncTargetKey, &corev1.EndpointSubset{
Addresses: []corev1.EndpointAddress{{IP: "10.0.0.0"}},
})),
},
initialized: true,
dnsImage: "dnsimage",
Expand All @@ -141,7 +147,7 @@ func TestDNSProcess(t *testing.T) {
MakeRoleBinding(dnsID, dnsns),
MakeService(dnsID, dnsns),
MakeDeployment(dnsID, dnsns, "dnsimage"),
MakeNetworkPolicy(dnsID, dnsns, syncTargetKey),
MakeNetworkPolicy(dnsID, dnsns, syncTargetKey, &corev1.EndpointSubset{}),
},
expectReady: false,
expectActions: []clienttesting.Action{},
Expand All @@ -155,7 +161,7 @@ func TestDNSProcess(t *testing.T) {
MakeRoleBinding(dnsID, dnsns),
MakeService(dnsID, dnsns),
MakeDeployment(dnsID, dnsns, "dnsimage"),
MakeNetworkPolicy(dnsID, dnsns, syncTargetKey),
MakeNetworkPolicy(dnsID, dnsns, syncTargetKey, &corev1.EndpointSubset{}),
},
expectReady: false,
expectActions: []clienttesting.Action{},
Expand All @@ -169,7 +175,7 @@ func TestDNSProcess(t *testing.T) {
MakeRoleBinding(dnsID, dnsns),
MakeService(dnsID, dnsns),
MakeDeployment(dnsID, dnsns, "dnsimage"),
MakeNetworkPolicy(dnsID, dnsns, syncTargetKey),
MakeNetworkPolicy(dnsID, dnsns, syncTargetKey, &corev1.EndpointSubset{}),
},
expectReady: false,
expectActions: []clienttesting.Action{
Expand Down Expand Up @@ -278,10 +284,12 @@ func endpoints(name, namespace, ip string) *corev1.Endpoints {
}
if ip != "" {
endpoint.Subsets = []corev1.EndpointSubset{
{Addresses: []corev1.EndpointAddress{
{
IP: ip,
}}},
{
Addresses: []corev1.EndpointAddress{
{
IP: ip,
}},
},
}
}
return endpoint
Expand Down
14 changes: 13 additions & 1 deletion pkg/syncer/spec/dns/networkpolicy_dns.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ spec:
- from:
- namespaceSelector:
matchLabels:
internal.workload.kcp.dev/cluster: Cluster
internal.workload.kcp.io/cluster: Cluster
ports:
- protocol: TCP
port: 5353
Expand All @@ -26,8 +26,20 @@ spec:
- namespaceSelector:
matchLabels:
kubernetes.io/metadata.name: kube-system
- podSelector:
matchLabels:
k8s-app: kube-dns
ports:
- protocol: TCP
port: 53
- protocol: UDP
port: 53
# Give access to the API server to watch its associated configmap
- to:
# one ipBlock per IP (dynamically filled)
- ipBlock:
cidr: APIServerIP
ports:
- protocol: TCP
port: 6443

22 changes: 21 additions & 1 deletion pkg/syncer/spec/dns/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/yaml"

workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1"
Expand Down Expand Up @@ -109,14 +110,33 @@ func MakeService(name, namespace string) *corev1.Service {
return service
}

func MakeNetworkPolicy(name, namespace, cluster string) *networkingv1.NetworkPolicy {
func MakeNetworkPolicy(name, namespace, cluster string, kubeEndpoints *corev1.EndpointSubset) *networkingv1.NetworkPolicy {
np := networkPolicyTemplate.DeepCopy()

np.Name = name
np.Namespace = namespace
np.Spec.PodSelector.MatchLabels["app"] = name
np.Spec.Ingress[0].From[0].NamespaceSelector.MatchLabels[workloadv1alpha1.InternalDownstreamClusterLabel] = cluster

to := make([]networkingv1.NetworkPolicyPeer, len(kubeEndpoints.Addresses))
for i, endpoint := range kubeEndpoints.Addresses {
to[i] = networkingv1.NetworkPolicyPeer{
IPBlock: &networkingv1.IPBlock{
CIDR: endpoint.IP + "/32",
},
}
}
np.Spec.Egress[1].To = to

ports := make([]networkingv1.NetworkPolicyPort, len(kubeEndpoints.Ports))
for i, port := range kubeEndpoints.Ports {
pport := intstr.FromInt(int(port.Port))
ports[i].Port = &pport
pprotocol := port.Protocol
ports[i].Protocol = &pprotocol
}
np.Spec.Egress[1].Ports = ports

return np
}

Expand Down
48 changes: 48 additions & 0 deletions test/e2e/syncer/dns/dns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"

"github.com/kcp-dev/kcp/test/e2e/framework"
"github.com/kcp-dev/kcp/test/e2e/syncer/dns/workspace1"
Expand Down Expand Up @@ -91,6 +92,18 @@ func TestDNSResolution(t *testing.T) {
downstreamWS2NS1 := syncer.DownstreamNamespaceFor(t, logicalcluster.Name(workloadWorkspace2.Spec.Cluster), "dns-ws2-ns1")
t.Logf("Downstream namespace 1 in workspace 2 is %s", downstreamWS2NS1)

t.Log("Checking network policies have been created")
framework.Eventually(t, func() (success bool, reason string) {
np, err := downstreamKubeClient.NetworkingV1().NetworkPolicies(syncer.SyncerID).List(ctx, metav1.ListOptions{})
if err != nil {
return false, fmt.Sprintf("error while getting network policies: %v\n", err)
}
if len(np.Items) != 2 {
return false, fmt.Sprintf("expecting 2 network policies, got: %d\n", len(np.Items))
}
return true, ""
}, wait.ForeverTestTimeout, time.Millisecond*500, "Network policies haven't been created")

t.Log("Checking fully qualified DNS name resolves")
framework.Eventually(t, checkLogs(ctx, t, downstreamKubeClient, downstreamWS1NS1, "ping-fully-qualified", "PING svc.dns-ws1-ns1.svc.cluster.local ("),
wait.ForeverTestTimeout, time.Millisecond*500, "Service name was not resolved")
Expand All @@ -106,6 +119,41 @@ func TestDNSResolution(t *testing.T) {
t.Log("Checking DNS name does not resolve across workspaces")
framework.Eventually(t, checkLogs(ctx, t, downstreamKubeClient, downstreamWS2NS1, "ping-fully-qualified-fail", "ping: bad"),
wait.ForeverTestTimeout, time.Millisecond*500, "Service name was resolved")

t.Log("Change ping-fully-qualified deployment DNS config to use workspace 2 nameserver")
dnsServices, err := downstreamKubeClient.CoreV1().Services(syncer.SyncerID).List(ctx, metav1.ListOptions{})
require.NoError(t, err)
require.True(t, len(dnsServices.Items) >= 2)

deployment, err := downstreamKubeClient.AppsV1().Deployments(downstreamWS1NS1).Get(ctx, "ping-fully-qualified", metav1.GetOptions{})
require.NoError(t, err)

existingDNSIP := deployment.Spec.Template.Spec.DNSConfig.Nameservers[0]
newDNSIP := ""
for _, svc := range dnsServices.Items {
if strings.HasPrefix(svc.Name, "kcp-dns-") {
if svc.Spec.ClusterIP != existingDNSIP {
newDNSIP = svc.Spec.ClusterIP
break
}
}
}
require.NotEmpty(t, newDNSIP, "could not find another DNS service")
deployment.Spec.Template.Spec.DNSConfig.Nameservers[0] = newDNSIP
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
deployment, err := downstreamKubeClient.AppsV1().Deployments(downstreamWS1NS1).Get(ctx, "ping-fully-qualified", metav1.GetOptions{})
if err != nil {
return err
}

deployment.Spec.Template.Spec.DNSConfig.Nameservers[0] = newDNSIP
_, err = downstreamKubeClient.AppsV1().Deployments(downstreamWS1NS1).Update(ctx, deployment, metav1.UpdateOptions{})
return err
})
require.NoError(t, err)

framework.Eventually(t, checkLogs(ctx, t, downstreamKubeClient, downstreamWS1NS1, "ping-fully-qualified", "ping: bad"),
wait.ForeverTestTimeout, time.Millisecond*500, "Service name was still not resolved")
}

func checkLogs(ctx context.Context, t *testing.T, downstreamKubeClient *kubernetes.Clientset, downstreamNamespace, containerName, expectedPrefix string) func() (success bool, reason string) {
Expand Down

0 comments on commit 4f500f8

Please sign in to comment.