Skip to content

Commit

Permalink
Fix destination controller
Browse files Browse the repository at this point in the history
  • Loading branch information
alpeb committed Apr 23, 2024
1 parent e2e8990 commit f8f73c4
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 92 deletions.
3 changes: 2 additions & 1 deletion controller/api/destination/endpoint_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,8 @@ func newEmptyAddressSet() watcher.AddressSet {
// getInboundPort gets the inbound port from the proxy container's environment
// variable.
func getInboundPort(podSpec *corev1.PodSpec) (uint32, error) {
for _, containerSpec := range podSpec.Containers {
containers := append(podSpec.InitContainers, podSpec.Containers...)
for _, containerSpec := range containers {
if containerSpec.Name != pkgK8s.ProxyContainerName {
continue
}
Expand Down
181 changes: 94 additions & 87 deletions controller/api/destination/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ const fullyQualifiedNameOpaque = "name3.ns.svc.mycluster.local"
const fullyQualifiedNameOpaqueService = "name4.ns.svc.mycluster.local"
const fullyQualifiedNameSkipped = "name5.ns.svc.mycluster.local"
const fullyQualifiedPodDNS = "pod-0.statefulset-svc.ns.svc.mycluster.local"
const fullyQualifiedNamePolicy = "policy-test.ns.svc.mycluster.local"
const clusterIP = "172.17.12.0"
const clusterIPv6 = "2001:db8::88"
const clusterIPOpaque = "172.17.12.1"
Expand Down Expand Up @@ -160,134 +159,142 @@ func TestGet(t *testing.T) {
})

t.Run("Return endpoint opaque protocol controlled by a server", func(t *testing.T) {
server, client := getServerWithClient(t)
testOpaque(t, "policy-test")
})

t.Run("Return endpoint opaque protocol controlled by a server (native sidecar)", func(t *testing.T) {
testOpaque(t, "native")
})

t.Run("Remote discovery", func(t *testing.T) {
server := makeServer(t)
defer server.clusterStore.UnregisterGauges()

// Wait for cluster store to be synced.
time.Sleep(50 * time.Millisecond)

stream := &bufferingGetStream{
updates: make(chan *pb.Update, 50),
MockServerStream: util.NewMockServerStream(),
}
defer stream.Cancel()
errs := make(chan error)

path := fmt.Sprintf("%s:%d", fullyQualifiedNamePolicy, 80)

// server.Get blocks until the grpc stream is complete so we call it
// in a goroutine and watch stream.updates for updates.
go func() {
err := server.Get(&pb.GetDestination{
Scheme: "k8s",
Path: path,
}, stream)
err := server.Get(&pb.GetDestination{Scheme: "k8s", Path: fmt.Sprintf("%s:%d", "foo-target.ns.svc.mycluster.local", 80)}, stream)
if err != nil {
errs <- err
}
}()

select {
case err := <-errs:
t.Fatalf("Got error: %s", err)
case update := <-stream.updates:
addrs := update.GetAdd().Addrs
if len(addrs) == 0 {
t.Fatalf("Expected len(addrs) to be > 0")
}

if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil {
t.Fatalf("Expected opaque transport for %s but was nil", path)
if updateAddAddress(t, update)[0] != fmt.Sprintf("%s:%d", "172.17.55.1", 80) {
t.Fatalf("Expected %s but got %s", fmt.Sprintf("%s:%d", podIP1, port), updateAddAddress(t, update)[0])
}
}

// Update the Server's pod selector so that it no longer selects the
// pod. This should result in the proxy protocol no longer being marked
// as opaque.
srv, err := client.ServerV1beta2().Servers("ns").Get(context.Background(), "srv", metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}
// PodSelector is updated to NOT select the pod
srv.Spec.PodSelector.MatchLabels = map[string]string{"app": "FOOBAR"}
_, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}

select {
case update := <-stream.updates:
addrs := update.GetAdd().Addrs
if len(addrs) == 0 {
t.Fatalf("Expected len(addrs) to be > 0")
if len(stream.updates) != 0 {
t.Fatalf("Expected 1 update but got %d: %v", 1+len(stream.updates), stream.updates)
}

if addrs[0].GetProtocolHint().GetOpaqueTransport() != nil {
t.Fatalf("Expected opaque transport to be nil for %s but was %+v", path, *addrs[0].GetProtocolHint().GetOpaqueTransport())
}
case err := <-errs:
t.Fatalf("Got error: %s", err)
}
})
}

func testOpaque(t *testing.T, name string) {
server, client := getServerWithClient(t)
defer server.clusterStore.UnregisterGauges()

stream := &bufferingGetStream{
updates: make(chan *pb.Update, 50),
MockServerStream: util.NewMockServerStream(),
}
defer stream.Cancel()
errs := make(chan error)

// Update the Server's pod selector so that it once again selects the
// pod. This should result in the proxy protocol once again being marked
// as opaque.
srv.Spec.PodSelector.MatchLabels = map[string]string{"app": "policy-test"}
path := fmt.Sprintf("%s.ns.svc.mycluster.local:%d", name, 80)

_, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{})
// server.Get blocks until the grpc stream is complete so we call it
// in a goroutine and watch stream.updates for updates.
go func() {
err := server.Get(&pb.GetDestination{
Scheme: "k8s",
Path: path,
}, stream)
if err != nil {
t.Fatal(err)
errs <- err
}
}()

select {
case update := <-stream.updates:
addrs := update.GetAdd().Addrs
if len(addrs) == 0 {
t.Fatalf("Expected len(addrs) to be > 0")
}
select {
case err := <-errs:
t.Fatalf("Got error: %s", err)
case update := <-stream.updates:
addrs := update.GetAdd().Addrs
if len(addrs) == 0 {
t.Fatalf("Expected len(addrs) to be > 0")
}

if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil {
t.Fatalf("Expected opaque transport for %s but was nil", path)
}
case err := <-errs:
t.Fatalf("Got error: %s", err)
if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil {
t.Fatalf("Expected opaque transport for %s but was nil", path)
}
})
}

t.Run("Remote discovery", func(t *testing.T) {
server := makeServer(t)
defer server.clusterStore.UnregisterGauges()
// Update the Server's pod selector so that it no longer selects the
// pod. This should result in the proxy protocol no longer being marked
// as opaque.
srv, err := client.ServerV1beta2().Servers("ns").Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}
// PodSelector is updated to NOT select the pod
srv.Spec.PodSelector.MatchLabels = map[string]string{"app": "FOOBAR"}
_, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}

// Wait for cluster store to be synced.
time.Sleep(50 * time.Millisecond)
select {
case update := <-stream.updates:
addrs := update.GetAdd().Addrs
if len(addrs) == 0 {
t.Fatalf("Expected len(addrs) to be > 0")
}

stream := &bufferingGetStream{
updates: make(chan *pb.Update, 50),
MockServerStream: util.NewMockServerStream(),
if addrs[0].GetProtocolHint().GetOpaqueTransport() != nil {
t.Fatalf("Expected opaque transport to be nil for %s but was %+v", path, *addrs[0].GetProtocolHint().GetOpaqueTransport())
}
defer stream.Cancel()
errs := make(chan error)
case err := <-errs:
t.Fatalf("Got error: %s", err)
}

// server.Get blocks until the grpc stream is complete so we call it
// in a goroutine and watch stream.updates for updates.
go func() {
err := server.Get(&pb.GetDestination{Scheme: "k8s", Path: fmt.Sprintf("%s:%d", "foo-target.ns.svc.mycluster.local", 80)}, stream)
if err != nil {
errs <- err
}
}()
// Update the Server's pod selector so that it once again selects the
// pod. This should result in the proxy protocol once again being marked
// as opaque.
srv.Spec.PodSelector.MatchLabels = map[string]string{"app": name}

select {
case update := <-stream.updates:
if updateAddAddress(t, update)[0] != fmt.Sprintf("%s:%d", "172.17.55.1", 80) {
t.Fatalf("Expected %s but got %s", fmt.Sprintf("%s:%d", podIP1, port), updateAddAddress(t, update)[0])
}
_, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}

if len(stream.updates) != 0 {
t.Fatalf("Expected 1 update but got %d: %v", 1+len(stream.updates), stream.updates)
}
select {
case update := <-stream.updates:
addrs := update.GetAdd().Addrs
if len(addrs) == 0 {
t.Fatalf("Expected len(addrs) to be > 0")
}

case err := <-errs:
t.Fatalf("Got error: %s", err)
if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil {
t.Fatalf("Expected opaque transport for %s but was nil", path)
}
})
case err := <-errs:
t.Fatalf("Got error: %s", err)
}
}

func TestGetProfiles(t *testing.T) {
Expand Down
83 changes: 79 additions & 4 deletions controller/api/destination/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ endpoints:
- 172.17.0.16
targetRef:
kind: Pod
name: pod-policyResources
name: policy-test
namespace: ns
ports:
- port: 80
Expand All @@ -388,7 +388,7 @@ metadata:
labels:
linkerd.io/control-plane-ns: linkerd
app: policy-test
name: pod-policyResources
name: policy-test
namespace: ns
status:
phase: Running
Expand All @@ -414,7 +414,7 @@ spec:
apiVersion: policy.linkerd.io/v1beta2
kind: Server
metadata:
name: srv
name: policy-test
namespace: ns
spec:
podSelector:
Expand All @@ -426,7 +426,7 @@ spec:
apiVersion: policy.linkerd.io/v1beta2
kind: Server
metadata:
name: srv-external-workload
name: policy-test-external-workload
namespace: ns
spec:
externalWorkloadSelector:
Expand All @@ -436,6 +436,80 @@ spec:
proxyProtocol: opaque`,
}

policyResourcesNativeSidecar := []string{
`
apiVersion: v1
kind: Service
metadata:
name: native
namespace: ns
spec:
type: LoadBalancer
clusterIP: 172.17.12.4
ports:
- port: 80`,
`
apiVersion: discovery.k8s.io/v1
kind: EndpointSlice
metadata:
name: native
namespace: ns
labels:
kubernetes.io/service-name: native
addressType: IPv4
endpoints:
- addresses:
- 172.17.0.18
targetRef:
kind: Pod
name: native
namespace: ns
ports:
- port: 80
protocol: TCP`,
`
apiVersion: v1
kind: Pod
metadata:
labels:
linkerd.io/control-plane-ns: linkerd
app: native
name: native
namespace: ns
status:
phase: Running
conditions:
- type: Ready
status: "True"
podIP: 172.17.0.18
podIPs:
- ip: 172.17.0.18
spec:
initContainers:
- name: linkerd-proxy
env:
- name: LINKERD2_PROXY_INBOUND_LISTEN_ADDR
value: 0.0.0.0:4143
- name: app
image: nginx
ports:
- containerPort: 80
name: http
protocol: TCP`,
`
apiVersion: policy.linkerd.io/v1beta2
kind: Server
metadata:
name: native
namespace: ns
spec:
podSelector:
matchLabels:
app: native
port: 80
proxyProtocol: opaque`,
}

hostPortMapping := []string{
`
kind: Pod
Expand Down Expand Up @@ -643,6 +717,7 @@ spec:
res = append(res, meshedSkippedPodResource...)
res = append(res, meshedStatefulSetPodResource...)
res = append(res, policyResources...)
res = append(res, policyResourcesNativeSidecar...)
res = append(res, hostPortMapping...)
res = append(res, mirrorServiceResources...)
res = append(res, destinationCredentialsResources...)
Expand Down

0 comments on commit f8f73c4

Please sign in to comment.