Skip to content

Commit

Permalink
Get the DNS service to resolve its IP
Browse files Browse the repository at this point in the history
  • Loading branch information
lionelvillard committed Nov 16, 2022
1 parent 5339705 commit 245230e
Show file tree
Hide file tree
Showing 11 changed files with 97 additions and 68 deletions.
35 changes: 13 additions & 22 deletions pkg/syncer/spec/mutators/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package mutators

import (
"fmt"
"net"
"net/url"
"sort"

Expand All @@ -30,27 +29,22 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
listerscorev1 "k8s.io/client-go/listers/core/v1"
utilspointer "k8s.io/utils/pointer"

"github.com/kcp-dev/kcp/pkg/syncer/shared"
)

var (
// DefaultLookupIPFn is the IP lookup function used by the deployment mutator.
// Override for testing only
DefaultLookupIPFn = net.LookupIP
)

type ListSecretFunc func(clusterName logicalcluster.Name, namespace string) ([]runtime.Object, error)

type DeploymentMutator struct {
upstreamURL *url.URL
listSecrets ListSecretFunc
serviceLister listerscorev1.ServiceLister
syncTargetLogicalClusterName logicalcluster.Name
syncTargetUID types.UID
syncTargetName string
dnsNamespace string
dnsIPs map[string]string
}

func (dm *DeploymentMutator) GVR() schema.GroupVersionResource {
Expand All @@ -61,18 +55,18 @@ func (dm *DeploymentMutator) GVR() schema.GroupVersionResource {
}
}

func NewDeploymentMutator(upstreamURL *url.URL, secretLister ListSecretFunc, syncTargetLogicalClusterName logicalcluster.Name,
func NewDeploymentMutator(upstreamURL *url.URL, secretLister ListSecretFunc, serviceLister listerscorev1.ServiceLister,
syncTargetLogicalClusterName logicalcluster.Name,
syncTargetUID types.UID, syncTargetName, dnsNamespace string) *DeploymentMutator {

return &DeploymentMutator{
upstreamURL: upstreamURL,
listSecrets: secretLister,
serviceLister: serviceLister,
syncTargetLogicalClusterName: syncTargetLogicalClusterName,
syncTargetUID: syncTargetUID,
syncTargetName: syncTargetName,

dnsNamespace: dnsNamespace,
dnsIPs: map[string]string{}, // map workspace ID to DNS IP
dnsNamespace: dnsNamespace,
}
}

Expand Down Expand Up @@ -273,21 +267,18 @@ func (dm *DeploymentMutator) getDNSIPForWorkspace(workspace logicalcluster.Name)
// Retrieve the DNS IP associated to the workspace
dnsServiceName := shared.GetDNSID(workspace, dm.syncTargetUID, dm.syncTargetName)

// cached?
if ip, ok := dm.dnsIPs[dnsServiceName]; ok {
return ip, nil
svc, err := dm.serviceLister.Services(dm.dnsNamespace).Get(dnsServiceName)
if err != nil {
return "", fmt.Errorf("failed to get DNS service: %w", err)
}

// Not cached: do actual lookup
qname := fmt.Sprintf("%s.%s.svc.cluster.local", dnsServiceName, dm.dnsNamespace)

ips, err := DefaultLookupIPFn(qname)
if len(ips) == 0 || err != nil {
ip := svc.Spec.ClusterIP
if ip == "" {
// not available (yet)
return "", fmt.Errorf("failed to get DNS nameserver IP address: %w", err)
return "", fmt.Errorf("DNS service IP address not found")
}

return ips[0].String(), nil
return ip, nil
}

// resolveDownwardAPIFieldRefEnv replaces the downwardAPI FieldRef EnvVars with the value from the deployment, right now it only replaces the metadata.namespace
Expand Down
22 changes: 20 additions & 2 deletions pkg/syncer/spec/mutators/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
listerscorev1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
utilspointer "k8s.io/utils/pointer"
Expand Down Expand Up @@ -839,9 +840,14 @@ func TestDeploymentMutate(t *testing.T) {

workspace := logicalcluster.New("root:default:testing")

dm := NewDeploymentMutator(upstreamURL, secretLister, workspace, "syncTargetUID", "syncTargetName", "dnsNamespace")
serviceIndexer := cache.NewIndexer(cache.LegacyMetaNamespaceKeyFunc, cache.Indexers{})

dm.dnsIPs[shared.GetDNSID(workspace, "syncTargetUID", "syncTargetName")] = "8.8.8.8"
dnsServiceName := shared.GetDNSID(workspace, "syncTargetUID", "syncTargetName")
err = serviceIndexer.Add(service(dnsServiceName, "dnsNamespace"))
require.NoError(t, err, "Service Add() = %v", err)
svcLister := listerscorev1.NewServiceLister(serviceIndexer)

dm := NewDeploymentMutator(upstreamURL, secretLister, svcLister, workspace, "syncTargetUID", "syncTargetName", "dnsNamespace")

unstrOriginalDeployment, err := toUnstructured(c.originalDeployment)
require.NoError(t, err, "toUnstructured() = %v", err)
Expand Down Expand Up @@ -883,3 +889,15 @@ func toDeployment(obj *unstructured.Unstructured) (*appsv1.Deployment, error) {
}
return d, nil
}

func service(name, namespace string) *corev1.Service {
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: corev1.ServiceSpec{
ClusterIP: "8.8.8.8",
},
}
}
2 changes: 1 addition & 1 deletion pkg/syncer/spec/spec_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetWorkspace logicalcluster.
_ = upstreamInformers.ForResource(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "secrets"}).Informer()
deploymentMutator := specmutators.NewDeploymentMutator(upstreamURL, func(clusterName logicalcluster.Name, namespace string) ([]runtime.Object, error) {
return upstreamInformers.ForResource(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "secrets"}).Lister().ByCluster(clusterName).ByNamespace(namespace).List(labels.Everything())
}, syncTargetWorkspace, syncTargetUID, syncTargetName, dnsNamespace)
}, serviceLister, syncTargetWorkspace, syncTargetUID, syncTargetName, dnsNamespace)

c.mutators = mutatorGvrMap{
deploymentMutator.GVR(): deploymentMutator.Mutate,
Expand Down
28 changes: 14 additions & 14 deletions pkg/syncer/spec/spec_process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package spec
import (
"context"
"encoding/json"
"net"
"net/url"
"strings"
"testing"
Expand Down Expand Up @@ -55,7 +54,6 @@ import (
namespacecontroller "github.com/kcp-dev/kcp/pkg/syncer/namespace"
"github.com/kcp-dev/kcp/pkg/syncer/resourcesync"
"github.com/kcp-dev/kcp/pkg/syncer/spec/dns"
"github.com/kcp-dev/kcp/pkg/syncer/spec/mutators"
"github.com/kcp-dev/kcp/third_party/keyfunctions"
)

Expand Down Expand Up @@ -506,7 +504,7 @@ func TestSyncerProcess(t *testing.T) {
dns.MakeRole("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
dns.MakeRoleBinding("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
dns.MakeDeployment("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n", "dnsimage"),
dns.MakeService("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
service("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
endpoints("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
},
resourceToProcessLogicalClusterName: "root:org:ws",
Expand Down Expand Up @@ -560,7 +558,7 @@ func TestSyncerProcess(t *testing.T) {
dns.MakeRole("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
dns.MakeRoleBinding("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
dns.MakeDeployment("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n", "dnsimage"),
dns.MakeService("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
service("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
endpoints("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
},
resourceToProcessLogicalClusterName: "root:org:ws",
Expand Down Expand Up @@ -646,7 +644,7 @@ func TestSyncerProcess(t *testing.T) {
dns.MakeRole("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
dns.MakeRoleBinding("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
dns.MakeDeployment("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n", "dnsimage"),
dns.MakeService("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
service("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
endpoints("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
},
fromResources: []runtime.Object{
Expand Down Expand Up @@ -691,7 +689,7 @@ func TestSyncerProcess(t *testing.T) {
dns.MakeRole("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
dns.MakeRoleBinding("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
dns.MakeDeployment("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n", "dnsimage"),
dns.MakeService("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
service("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
endpoints("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
},
fromResources: []runtime.Object{
Expand Down Expand Up @@ -753,7 +751,7 @@ func TestSyncerProcess(t *testing.T) {
dns.MakeRole("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
dns.MakeRoleBinding("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
dns.MakeDeployment("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n", "dnsimage"),
dns.MakeService("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
service("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
endpoints("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
},
fromResources: []runtime.Object{
Expand Down Expand Up @@ -835,7 +833,7 @@ func TestSyncerProcess(t *testing.T) {
dns.MakeRole("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
dns.MakeRoleBinding("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
dns.MakeDeployment("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n", "dnsimage"),
dns.MakeService("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
service("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
endpoints("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
},
resourceToProcessLogicalClusterName: "root:org:ws",
Expand Down Expand Up @@ -976,7 +974,7 @@ func TestSyncerProcess(t *testing.T) {
dns.MakeRole("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
dns.MakeRoleBinding("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
dns.MakeDeployment("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n", "dnsimage"),
dns.MakeService("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
service("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
endpoints("kcp-dns-us-west1-1nuzj7pw-2fcy2vpi", "kcp-01c0zzvlqsi7n"),
},
fromResources: []runtime.Object{
Expand Down Expand Up @@ -1067,10 +1065,6 @@ func TestSyncerProcess(t *testing.T) {
serviceAccountLister, roleLister, roleBindingLister, deploymentLister, serviceLister, endpointLister, "kcp-01c0zzvlqsi7n", "dnsimage")
require.NoError(t, err)

mutators.DefaultLookupIPFn = func(host string) ([]net.IP, error) {
return []net.IP{net.ParseIP("8.8.8.8")}, nil
}

fromInformers.Start(ctx.Done())
toInformers.Start(ctx.Done())
toInformerFactory.Start(ctx.Done())
Expand Down Expand Up @@ -1175,12 +1169,18 @@ func endpoints(name, namespace string) *corev1.Endpoints {
Subsets: []corev1.EndpointSubset{
{Addresses: []corev1.EndpointAddress{
{
IP: "1.2.3.4",
IP: "8.8.8.8",
}}},
},
}
}

func service(name, namespace string) *corev1.Service {
svc := dns.MakeService(name, namespace)
svc.Spec.ClusterIP = "8.8.8.8"
return svc
}

func secret(name, namespace, clusterName string, labels, annotations map[string]string, data map[string][]byte) *corev1.Secret {
return secretWithFinalizers(name, namespace, clusterName, labels, annotations, nil, data)
}
Expand Down
55 changes: 37 additions & 18 deletions test/e2e/framework/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ package framework
import (
"context"
"fmt"
"net"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"

Expand All @@ -33,6 +31,7 @@ import (

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -44,6 +43,7 @@ import (
kubernetesclient "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/yaml"

apiresourcev1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apiresource/v1alpha1"
Expand All @@ -53,13 +53,10 @@ import (
workloadcliplugin "github.com/kcp-dev/kcp/pkg/cliplugins/workload/plugin"
"github.com/kcp-dev/kcp/pkg/syncer"
"github.com/kcp-dev/kcp/pkg/syncer/shared"
"github.com/kcp-dev/kcp/pkg/syncer/spec/mutators"
)

type SyncerOption func(t *testing.T, fs *syncerFixture)

var dnsLookupIPOnce sync.Once

func NewSyncerFixture(t *testing.T, server RunningServer, clusterName logicalcluster.Name, opts ...SyncerOption) *syncerFixture {
if !sets.NewString(TestConfig.Suites()...).HasAny("transparent-multi-cluster", "transparent-multi-cluster:requires-kind") {
t.Fatalf("invalid to use a syncer fixture when only the following suites were requested: %v", TestConfig.Suites())
Expand Down Expand Up @@ -345,17 +342,10 @@ func (sf *syncerFixture) Start(t *testing.T) *StartedSyncerFixture {
err := syncer.StartSyncer(ctx, syncerConfig, 2, 5*time.Second)
require.NoError(t, err, "syncer failed to start")

dnsLookupIPOnce.Do(func() {
// DNS IP lookup always resolves
mutators.DefaultLookupIPFn = func(host string) ([]net.IP, error) {
return []net.IP{net.ParseIP("8.8.8.8")}, nil
}
})

// Manually create the DNS Endpoints for the main workspace
dnsID := shared.GetDNSID(sf.workspaceClusterName, types.UID(syncerConfig.SyncTargetUID), sf.syncTargetName)
_, err = downstreamKubeClient.CoreV1().Endpoints(syncerID).Create(ctx, endpoints(dnsID, syncerID), metav1.CreateOptions{})
require.NoError(t, err)
//// Manually create the DNS Endpoints for the main workspace
//dnsID := shared.GetDNSID(sf.workspaceClusterName, types.UID(syncerConfig.SyncTargetUID), sf.syncTargetName)
//_, err = downstreamKubeClient.CoreV1().Endpoints(syncerID).Create(ctx, endpoints(dnsID, syncerID), metav1.CreateOptions{})
//require.NoError(t, err)
}

startedSyncer := &StartedSyncerFixture{
Expand Down Expand Up @@ -397,12 +387,29 @@ func (sf *StartedSyncerFixture) WaitForClusterReady(t *testing.T, ctx context.Co
t.Logf("Cluster %q is %s", cfg.SyncTargetName, conditionsv1alpha1.ReadyCondition)
}

// BoundWorkspace is called when a new workspace is bound to this workload workspace
func (sf *StartedSyncerFixture) BoundWorkspace(t *testing.T, ctx context.Context, workspace logicalcluster.Name) {
// WorkspaceBound is called when a new workspace has been bound to this workload workspace
func (sf *StartedSyncerFixture) WorkspaceBound(t *testing.T, ctx context.Context, workspace logicalcluster.Name) {
if !sf.useDeployedSyncer {
dnsID := shared.GetDNSID(workspace, types.UID(sf.SyncerConfig.SyncTargetUID), sf.SyncerConfig.SyncTargetName)
_, err := sf.DownstreamKubeClient.CoreV1().Endpoints(sf.SyncerID).Create(ctx, endpoints(dnsID, sf.SyncerID), metav1.CreateOptions{})
require.NoError(t, err)

// The DNS service may or may not have been created by the spec controller. In any cases, we want to make sure
// the service ClusterIP is set
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
svc, err := sf.DownstreamKubeClient.CoreV1().Services(sf.SyncerID).Get(ctx, dnsID, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
_, err = sf.DownstreamKubeClient.CoreV1().Services(sf.SyncerID).Create(ctx, service(dnsID, sf.SyncerID), metav1.CreateOptions{})
}
return err
}

svc.Spec.ClusterIP = "8.8.8.8"
_, err = sf.DownstreamKubeClient.CoreV1().Services(sf.SyncerID).Update(ctx, svc, metav1.UpdateOptions{})
return err
})
require.NoError(t, err)
}
}

Expand Down Expand Up @@ -515,3 +522,15 @@ func endpoints(name, namespace string) *corev1.Endpoints {
},
}
}

func service(name, namespace string) *corev1.Service {
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: corev1.ServiceSpec{
ClusterIP: "8.8.8.8",
},
}
}
2 changes: 1 addition & 1 deletion test/e2e/reconciler/locationworkspace/rootcompute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestRootComputeWorkspace(t *testing.T) {
framework.WithAPIExportsWorkloadBindOption("root:compute:kubernetes"),
framework.WithLocationWorkspaceWorkloadBindOption(computeClusterName),
).Bind(t)
syncerFixture.BoundWorkspace(t, ctx, consumerWorkspace)
syncerFixture.WorkspaceBound(t, ctx, consumerWorkspace)

t.Logf("Wait for being able to list Services in the user workspace")
require.Eventually(t, func() bool {
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/reconciler/scheduling/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestScheduling(t *testing.T) {
framework.NewBindCompute(t, userClusterName, source,
framework.WithLocationWorkspaceWorkloadBindOption(negotiationClusterName),
).Bind(t)
syncerFixture.BoundWorkspace(t, ctx, userClusterName)
syncerFixture.WorkspaceBound(t, ctx, userClusterName)

t.Logf("Wait for being able to list Services in the user workspace")
require.Eventually(t, func() bool {
Expand All @@ -185,7 +185,7 @@ func TestScheduling(t *testing.T) {
framework.NewBindCompute(t, secondUserClusterName, source,
framework.WithLocationWorkspaceWorkloadBindOption(negotiationClusterName),
).Bind(t)
syncerFixture.BoundWorkspace(t, ctx, secondUserClusterName)
syncerFixture.WorkspaceBound(t, ctx, secondUserClusterName)

t.Logf("Wait for being able to list Services in the user workspace")
require.Eventually(t, func() bool {
Expand Down
Loading

0 comments on commit 245230e

Please sign in to comment.