Skip to content

Commit

Permalink
Support multi-port services.
Browse files Browse the repository at this point in the history
  • Loading branch information
timoreimann authored and traefiker committed Apr 16, 2018
1 parent 6b82a77 commit 21b8b2d
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 18 deletions.
4 changes: 2 additions & 2 deletions provider/kubernetes/builder_endpoint_test.go
Expand Up @@ -47,11 +47,11 @@ func subset(opts ...func(*corev1.EndpointSubset)) func(*corev1.Endpoints) {

func eAddresses(opts ...func(*corev1.EndpointAddress)) func(*corev1.EndpointSubset) {
return func(subset *corev1.EndpointSubset) {
a := &corev1.EndpointAddress{}
for _, opt := range opts {
a := &corev1.EndpointAddress{}
opt(a)
subset.Addresses = append(subset.Addresses, *a)
}
subset.Addresses = append(subset.Addresses, *a)
}
}

Expand Down
33 changes: 21 additions & 12 deletions provider/kubernetes/kubernetes.go
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
"os"
"reflect"
"strconv"
"strings"
"text/template"
"time"
Expand Down Expand Up @@ -301,8 +300,13 @@ func (p *Provider) loadIngresses(k8sClient Client) (*types.Configuration, error)
}

for _, subset := range endpoints.Subsets {
endpointPort := endpointPortNumber(port, subset.Ports)
if endpointPort == 0 {
// endpoint port does not match service.
continue
}
for _, address := range subset.Addresses {
url := protocol + "://" + address.IP + ":" + strconv.Itoa(endpointPortNumber(port, subset.Ports))
url := fmt.Sprintf("%s://%s:%d", protocol, address.IP, endpointPort)
name := url
if address.TargetRef != nil && address.TargetRef.Name != "" {
name = address.TargetRef.Name
Expand Down Expand Up @@ -468,18 +472,23 @@ func getTLS(ingress *extensionsv1beta1.Ingress, k8sClient Client) ([]*tls.Config
return tlsConfigs, nil
}

func endpointPortNumber(servicePort corev1.ServicePort, endpointPorts []corev1.EndpointPort) int {
if len(endpointPorts) > 0 {
// name is optional if there is only one port
port := endpointPorts[0]
for _, endpointPort := range endpointPorts {
if servicePort.Name == endpointPort.Name {
port = endpointPort
}
// endpointPortNumber returns the port to be used for this endpoint. It is zero
// if the endpoint does not match the given service port.
func endpointPortNumber(servicePort corev1.ServicePort, endpointPorts []corev1.EndpointPort) int32 {
// Is this reasonable to assume?
if len(endpointPorts) == 0 {
return servicePort.Port
}

for _, endpointPort := range endpointPorts {
// For matching endpoints, the port names must correspond, either by
// being empty or non-empty. Multi-port services mandate non-empty
// names and allow us to filter for the right addresses.
if servicePort.Name == endpointPort.Name {
return endpointPort.Port
}
return int(port.Port)
}
return int(servicePort.Port)
return 0
}

func equalPorts(servicePort corev1.ServicePort, ingressPort intstr.IntOrString) bool {
Expand Down
100 changes: 96 additions & 4 deletions provider/kubernetes/kubernetes_test.go
Expand Up @@ -470,7 +470,7 @@ retryexpression: IsNetworkError() && Attempts() <= 2
`),
sSpec(
clusterIP("10.0.0.3"),
sPorts(sPort(803, ""))),
sPorts(sPort(803, "http"))),
),
buildService(
sName("service4"),
Expand All @@ -480,7 +480,7 @@ retryexpression: IsNetworkError() && Attempts() <= 2
sAnnotation(annotationKubernetesMaxConnAmount, "6"),
sSpec(
clusterIP("10.0.0.4"),
sPorts(sPort(804, ""))),
sPorts(sPort(804, "http"))),
),
}

Expand All @@ -502,10 +502,10 @@ retryexpression: IsNetworkError() && Attempts() <= 2
eUID("2"),
subset(
eAddresses(eAddress("10.15.0.1")),
ePorts(ePort(8080, "http"))),
ePorts(ePort(8080, ""))),
subset(
eAddresses(eAddress("10.15.0.2")),
ePorts(ePort(8080, "http"))),
ePorts(ePort(8080, ""))),
),
buildEndpoint(
eNamespace("testing"),
Expand Down Expand Up @@ -1926,3 +1926,95 @@ func TestGetTLS(t *testing.T) {
})
}
}

func TestMultiPortServices(t *testing.T) {
ingresses := []*extensionsv1beta1.Ingress{
buildIngress(
iNamespace("testing"),
iRules(
iRule(iPaths(
onePath(iPath("/cheddar"), iBackend("service", intstr.FromString("cheddar"))),
onePath(iPath("/stilton"), iBackend("service", intstr.FromString("stilton"))),
)),
),
),
}

services := []*corev1.Service{
buildService(
sName("service"),
sNamespace("testing"),
sUID("1"),
sSpec(
clusterIP("10.0.0.1"),
sPorts(sPort(80, "cheddar")),
sPorts(sPort(81, "stilton")),
),
),
}

endpoints := []*corev1.Endpoints{
buildEndpoint(
eNamespace("testing"),
eName("service"),
eUID("1"),
subset(
eAddresses(
eAddress("10.10.0.1"),
eAddress("10.10.0.2"),
),
ePorts(ePort(8080, "cheddar")),
),
subset(
eAddresses(
eAddress("10.20.0.1"),
eAddress("10.20.0.2"),
),
ePorts(ePort(8081, "stilton")),
),
),
}

watchChan := make(chan interface{})
client := clientMock{
ingresses: ingresses,
services: services,
endpoints: endpoints,
watchChan: watchChan,
}
provider := Provider{}

actual, err := provider.loadIngresses(client)
require.NoError(t, err, "error loading ingresses")

expected := buildConfiguration(
backends(
backend("/cheddar",
lbMethod("wrr"),
servers(
server("http://10.10.0.1:8080", weight(1)),
server("http://10.10.0.2:8080", weight(1)),
),
),
backend("/stilton",
lbMethod("wrr"),
servers(
server("http://10.20.0.1:8081", weight(1)),
server("http://10.20.0.2:8081", weight(1)),
),
),
),
frontends(
frontend("/cheddar",
passHostHeader(),
routes(route("/cheddar", "PathPrefix:/cheddar")),
),
frontend("/stilton",
passHostHeader(),
routes(route("/stilton", "PathPrefix:/stilton")),
),
),
)

assert.Equal(t, expected, actual)
}

0 comments on commit 21b8b2d

Please sign in to comment.