Skip to content

Commit

Permalink
Dedupe the service pods indexed in the topology
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinpollet committed Jun 26, 2020
1 parent ad8ab80 commit e32cc85
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 32 deletions.
16 changes: 13 additions & 3 deletions pkg/topology/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,21 +729,29 @@ func (r *resources) indexPodsByService(resourceFilter *mk8s.ResourceFilter, eps
continue
}

// This map keeps track of service pods already indexed. A service pod can be listed in multiple endpoint
// subset in function of the matched service ports.
indexedServicePods := make(map[Key]struct{})

for _, subset := range ep.Subsets {
for _, address := range subset.Addresses {
r.indexPodByService(ep, address, podsByName)
r.indexPodByService(ep, address, podsByName, indexedServicePods)
}
}
}
}

func (r *resources) indexPodByService(ep *corev1.Endpoints, address corev1.EndpointAddress, podsByName map[Key]*corev1.Pod) {
func (r *resources) indexPodByService(ep *corev1.Endpoints, address corev1.EndpointAddress, podsByName map[Key]*corev1.Pod, indexedServicePods map[Key]struct{}) {
if address.TargetRef == nil {
return
}

keyPod := Key{Name: address.TargetRef.Name, Namespace: address.TargetRef.Namespace}

if _, exists := indexedServicePods[keyPod]; exists {
return
}

pod, ok := podsByName[keyPod]
if !ok {
return
Expand All @@ -752,12 +760,14 @@ func (r *resources) indexPodByService(ep *corev1.Endpoints, address corev1.Endpo
keySA := Key{Name: pod.Spec.ServiceAccountName, Namespace: pod.Namespace}
keyEP := Key{Name: ep.Name, Namespace: ep.Namespace}

if _, ok := r.PodsBySvcBySa[keySA]; !ok {
if _, exists := r.PodsBySvcBySa[keySA]; !exists {
r.PodsBySvcBySa[keySA] = make(map[Key][]*corev1.Pod)
}

r.PodsBySvcBySa[keySA][keyEP] = append(r.PodsBySvcBySa[keySA][keyEP], pod)
r.PodsBySvc[keyEP] = append(r.PodsBySvc[keyEP], pod)

indexedServicePods[keyPod] = struct{}{}
}

func (r *resources) indexSMIResources(resourceFilter *mk8s.ResourceFilter, tts []*access.TrafficTarget, tss []*split.TrafficSplit, tcpRts []*spec.TCPRoute, httpRtGrps []*spec.HTTPRouteGroup) {
Expand Down
122 changes: 93 additions & 29 deletions pkg/topology/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,10 @@ func TestTopologyBuilder_HandleCircularReferenceOnTrafficSplit(t *testing.T) {
svcE := createService("my-ns", "svc-e", annotations, svcPorts, selectorAppE, "10.10.1.19")
podE := createPod("my-ns", "app-e", saE, svcE.Spec.Selector, "10.10.2.4")

epB := createEndpoints(svcB, []*corev1.Pod{podB})
epC := createEndpoints(svcC, []*corev1.Pod{podC})
epD := createEndpoints(svcD, []*corev1.Pod{podD})
epE := createEndpoints(svcE, []*corev1.Pod{podE})
epB := createEndpoints(svcB, createEndpointSubset(svcPorts, podB))
epC := createEndpoints(svcC, createEndpointSubset(svcPorts, podC))
epD := createEndpoints(svcD, createEndpointSubset(svcPorts, podD))
epE := createEndpoints(svcE, createEndpointSubset(svcPorts, podE))

apiMatch := createHTTPMatch("api", []string{"GET", "POST"}, "/api")
metricMatch := createHTTPMatch("metric", []string{"GET"}, "/metric")
Expand Down Expand Up @@ -180,9 +180,9 @@ func TestTopologyBuilder_TrafficTargetSourcesForbiddenTrafficSplit(t *testing.T)
svcD := createService("my-ns", "svc-d", annotations, svcPorts, selectorAppD, "10.10.1.18")
podD := createPod("my-ns", "app-d", saD, svcD.Spec.Selector, "10.10.2.3")

epB := createEndpoints(svcB, []*corev1.Pod{podB})
epC := createEndpoints(svcC, []*corev1.Pod{podC})
epD := createEndpoints(svcD, []*corev1.Pod{podD})
epB := createEndpoints(svcB, createEndpointSubset(svcPorts, podB))
epC := createEndpoints(svcC, createEndpointSubset(svcPorts, podC))
epD := createEndpoints(svcD, createEndpointSubset(svcPorts, podD))

apiMatch := createHTTPMatch("api", []string{"GET", "POST"}, "/api")
metricMatch := createHTTPMatch("metric", []string{"GET"}, "/metric")
Expand Down Expand Up @@ -244,10 +244,10 @@ func TestTopologyBuilder_EvaluatesIncomingTrafficSplit(t *testing.T) {
svcE := createService("my-ns", "svc-e", annotations, svcPorts, selectorAppE, "10.10.1.19")
podE := createPod("my-ns", "app-e", saE, svcE.Spec.Selector, "10.10.2.4")

epB := createEndpoints(svcB, []*corev1.Pod{podB})
epC := createEndpoints(svcC, []*corev1.Pod{podC})
epD := createEndpoints(svcD, []*corev1.Pod{podD})
epE := createEndpoints(svcE, []*corev1.Pod{podE})
epB := createEndpoints(svcB, createEndpointSubset(svcPorts, podB))
epC := createEndpoints(svcC, createEndpointSubset(svcPorts, podC))
epD := createEndpoints(svcD, createEndpointSubset(svcPorts, podD))
epE := createEndpoints(svcE, createEndpointSubset(svcPorts, podE))

apiMatch := createHTTPMatch("api", []string{"GET", "POST"}, "/api")
metricMatch := createHTTPMatch("metric", []string{"GET"}, "/metric")
Expand Down Expand Up @@ -308,7 +308,7 @@ func TestTopologyBuilder_BuildWithTrafficTarget(t *testing.T) {
svcB := createService("my-ns", "svc-b", annotations, svcPorts, selectorAppB, "10.10.1.16")
podB := createPod("my-ns", "app-b", saB, svcB.Spec.Selector, "10.10.2.1")

epB := createEndpoints(svcB, []*corev1.Pod{podB})
epB := createEndpoints(svcB, createEndpointSubset(svcPorts, podB))

apiMatch := createHTTPMatch("api", []string{"GET", "POST"}, "/api")
metricMatch := createHTTPMatch("metric", []string{"GET"}, "/metric")
Expand Down Expand Up @@ -361,9 +361,9 @@ func TestTopologyBuilder_BuildWithTrafficTargetAndTrafficSplitOnSameService(t *t
svcD := createService("my-ns", "svc-d", annotations, svcPorts, selectorAppD, "10.10.1.18")
podD := createPod("my-ns", "app-d", saD, svcD.Spec.Selector, "10.10.2.3")

epB := createEndpoints(svcB, []*corev1.Pod{podB})
epC := createEndpoints(svcC, []*corev1.Pod{podC})
epD := createEndpoints(svcD, []*corev1.Pod{podD})
epB := createEndpoints(svcB, createEndpointSubset(svcPorts, podB))
epC := createEndpoints(svcC, createEndpointSubset(svcPorts, podC))
epD := createEndpoints(svcD, createEndpointSubset(svcPorts, podD))

apiMatch := createHTTPMatch("api", []string{"GET", "POST"}, "/api")
metricMatch := createHTTPMatch("metric", []string{"GET"}, "/metric")
Expand Down Expand Up @@ -407,7 +407,7 @@ func TestTopologyBuilder_BuildWithTrafficTargetSpecEmptyMatch(t *testing.T) {
svcB := createService("my-ns", "svc-b", annotations, svcbPorts, selectorAppB, "10.10.1.16")
podB := createPod("my-ns", "app-b", saB, svcB.Spec.Selector, "10.10.2.1")

epB := createEndpoints(svcB, []*corev1.Pod{podB})
epB := createEndpoints(svcB, createEndpointSubset(svcbPorts, podB))

apiMatch := createHTTPMatch("api", []string{"GET", "POST"}, "/api")
metricMatch := createHTTPMatch("metric", []string{"GET"}, "/metric")
Expand Down Expand Up @@ -453,7 +453,7 @@ func TestTopologyBuilder_BuildWithTrafficTargetEmptyDestinationPort(t *testing.T
svcB := createService("my-ns", "svc-b", annotations, svcbPorts, selectorAppB, "10.10.1.16")
podB := createPod("my-ns", "app-b", saB, svcB.Spec.Selector, "10.10.2.1")

epB := createEndpoints(svcB, []*corev1.Pod{podB})
epB := createEndpoints(svcB, createEndpointSubset(svcbPorts, podB))

tt := createTrafficTarget("my-ns", "tt", saB, "", []*corev1.ServiceAccount{saA}, nil, []string{})

Expand Down Expand Up @@ -485,13 +485,13 @@ func TestTopologyBuilder_BuildWithTrafficTargetAndMismatchServicePort(t *testing
svcB1Ports := []corev1.ServicePort{svcPort("port-8080", 8080, 8080)}
podB1 := createPod("my-ns", "app-b1", saB, selectorAppB1, "10.10.1.2")
svcB1 := createService("my-ns", "svc-b1", annotations, svcB1Ports, selectorAppB1, "10.10.1.16")
epB1 := createEndpoints(svcB1, []*corev1.Pod{podB1})
epB1 := createEndpoints(svcB1, createEndpointSubset(svcB1Ports, podB1))

selectorAppB2 := map[string]string{"app": "app-b2"}
svcB2Ports := []corev1.ServicePort{svcPort("port-80", 80, 80)}
podB2 := createPod("my-ns", "app-b2", saB, selectorAppB2, "10.10.1.3")
svcB2 := createService("my-ns", "svc-b2", annotations, svcB2Ports, selectorAppB2, "10.10.1.17")
epB2 := createEndpoints(svcB2, []*corev1.Pod{podB2})
epB2 := createEndpoints(svcB2, createEndpointSubset(svcB2Ports, podB2))

tt := createTrafficTarget("my-ns", "tt", saB, "80", []*corev1.ServiceAccount{saA}, nil, []string{})

Expand Down Expand Up @@ -530,7 +530,7 @@ func TestTopologyBuilder_BuildTrafficTargetMultipleSourcesAndDestinations(t *tes
podC1 := createPod("my-ns", "app-c-1", saC, svcC.Spec.Selector, "10.10.3.1")
podC2 := createPod("my-ns", "app-c-2", saC, svcC.Spec.Selector, "10.10.3.2")

epC := createEndpoints(svcC, []*corev1.Pod{podC1, podC2})
epC := createEndpoints(svcC, createEndpointSubset(svccPorts, podC1, podC2))

tt := createTrafficTarget("my-ns", "tt", saC, "8080", []*corev1.ServiceAccount{saA, saB}, nil, []string{})

Expand Down Expand Up @@ -584,6 +584,68 @@ func TestTopologyBuilder_EmptyTrafficTargetDestinationNamespace(t *testing.T) {
assert.Equal(t, namespace, actual.Destination.Namespace)
}

func TestTopologyBuilder_BuildServiceWithPodPortMixture(t *testing.T) {
serviceAccount := createServiceAccount("my-ns", "service-account")

podV1 := createPod(
"my-ns",
"pod-v1",
serviceAccount,
map[string]string{"app": "my-app", "version": "v1"},
"10.10.1.1",
)

podV2 := createPod(
"my-ns",
"pod-v2",
serviceAccount,
map[string]string{"app": "my-app", "version": "v2"},
"10.10.1.2",
)

svcPorts := []corev1.ServicePort{
{
Name: "port-80",
Port: 80,
TargetPort: intstr.FromString("name"),
},
{
Name: "port-8080",
Port: 8080,
TargetPort: intstr.FromInt(8080),
},
}

svc := createService(
"my-ns",
"svc",
nil,
svcPorts,
map[string]string{"app": "my-app"},
"10.10.1.3",
)

endpoints := createEndpoints(svc,
createEndpointSubset([]corev1.ServicePort{svcPorts[0]}, podV1),
createEndpointSubset([]corev1.ServicePort{svcPorts[1]}, podV1, podV2),
)

k8sClient := fake.NewSimpleClientset(endpoints, svc, podV1, podV2)
smiAccessClient := accessfake.NewSimpleClientset()
smiSplitClient := splitfake.NewSimpleClientset()
smiSpecClient := specfake.NewSimpleClientset()

builder, err := createBuilder(k8sClient, smiAccessClient, smiSpecClient, smiSplitClient)
require.NoError(t, err)

resourceFilter := mk8s.NewResourceFilter()

got, err := builder.Build(resourceFilter)
require.NoError(t, err)

assertTopology(t, "testdata/topology-service-with-pod-port-mixture.json", got)
}

// createBuilder initializes the different k8s factories and start them, initializes listers and create
// a new topology.Builder.
func createBuilder(k8sClient k8s.Interface, smiAccessClient accessclient.Interface, smiSpecClient specsclient.Interface, smiSplitClient splitclient.Interface) (*Builder, error) {
Expand Down Expand Up @@ -773,9 +835,9 @@ func createService(namespace, name string, annotations map[string]string, target
}
}

func createEndpoints(svc *corev1.Service, pods []*corev1.Pod) *corev1.Endpoints {
ports := make([]corev1.EndpointPort, len(svc.Spec.Ports))
for i, port := range svc.Spec.Ports {
func createEndpointSubset(svcPorts []corev1.ServicePort, pods ...*corev1.Pod) corev1.EndpointSubset {
ports := make([]corev1.EndpointPort, len(svcPorts))
for i, port := range svcPorts {
ports[i] = corev1.EndpointPort{
Name: port.Name,
Port: port.TargetPort.IntVal,
Expand All @@ -797,6 +859,13 @@ func createEndpoints(svc *corev1.Service, pods []*corev1.Pod) *corev1.Endpoints
}
}

return corev1.EndpointSubset{
Addresses: addresses,
Ports: ports,
}
}

func createEndpoints(svc *corev1.Service, subsets ...corev1.EndpointSubset) *corev1.Endpoints {
return &corev1.Endpoints{
TypeMeta: metav1.TypeMeta{
Kind: "Endpoints",
Expand All @@ -806,12 +875,7 @@ func createEndpoints(svc *corev1.Service, pods []*corev1.Pod) *corev1.Endpoints
Name: svc.Name,
Namespace: svc.Namespace,
},
Subsets: []corev1.EndpointSubset{
{
Addresses: addresses,
Ports: ports,
},
},
Subsets: subsets,
}
}

Expand Down
47 changes: 47 additions & 0 deletions pkg/topology/testdata/topology-service-with-pod-port-mixture.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
{
"services": {
"svc@my-ns": {
"name": "svc",
"namespace": "my-ns",
"selector": {
"app": "my-app"
},
"ports": [
{
"name": "port-80",
"port": 80,
"targetPort": "name"
},
{
"name": "port-8080",
"port": 8080,
"targetPort": 8080
}
],
"clusterIp": "10.10.1.3",
"pods": [
"pod-v1@my-ns",
"pod-v2@my-ns"
],
"trafficTargets": []
}
},
"pods": {
"pod-v1@my-ns": {
"name": "pod-v1",
"namespace": "my-ns",
"serviceAccount": "service-account",
"ip": "10.10.1.1",
"sourceOf": []
},
"pod-v2@my-ns": {
"name": "pod-v2",
"namespace": "my-ns",
"serviceAccount": "service-account",
"ip": "10.10.1.2",
"destinationOf": []
}
},
"serviceTrafficTargets": {},
"trafficSplits": {}
}

0 comments on commit e32cc85

Please sign in to comment.