Skip to content

Commit

Permalink
Support named TargetPort in service resources
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinpollet committed Jun 25, 2020
1 parent 6f3117c commit ad8ab80
Show file tree
Hide file tree
Showing 19 changed files with 493 additions and 103 deletions.
2 changes: 1 addition & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func NewAPI(log logrus.FieldLogger, port int32, host string, client kubernetes.I
// SetReadiness sets the readiness flag in the API.
func (a *API) SetReadiness(isReady bool) {
a.readiness.Set(isReady)
a.log.Debugf("API readiness: %s", isReady)
a.log.Debugf("API readiness: %t", isReady)
}

// SetConfig sets the current dynamic configuration.
Expand Down
20 changes: 10 additions & 10 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,16 @@ func NewMeshController(clients k8s.Client, cfg Config, store SharedStore, logger
c.clients.KubernetesClient(),
)

c.topologyBuilder = &topology.Builder{
ServiceLister: c.serviceLister,
EndpointsLister: c.endpointsLister,
PodLister: c.podLister,
TrafficTargetLister: c.trafficTargetLister,
TrafficSplitLister: c.trafficSplitLister,
HTTPRouteGroupLister: c.httpRouteGroupLister,
TCPRoutesLister: c.tcpRouteLister,
Logger: c.logger,
}
c.topologyBuilder = topology.NewBuilder(
c.serviceLister,
c.endpointsLister,
c.podLister,
c.trafficTargetLister,
c.trafficSplitLister,
c.httpRouteGroupLister,
c.tcpRouteLister,
c.logger,
)

providerCfg := provider.Config{
MinHTTPPort: c.cfg.MinHTTPPort,
Expand Down
21 changes: 17 additions & 4 deletions pkg/controller/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,23 @@ func (s *ShadowServiceManager) getTargetPort(trafficType string, portID int, nam
switch trafficType {
case annotations.ServiceTypeHTTP:
return s.getHTTPPort(portID)

case annotations.ServiceTypeTCP:
return s.getMappedPort(s.tcpStateTable, name, namespace, port)
mappedPort, err := s.getMappedPort(s.tcpStateTable, name, namespace, port)
if err != nil {
return 0, fmt.Errorf("unable to map TCP service port: %w", err)
}

return mappedPort, nil

case annotations.ServiceTypeUDP:
return s.getMappedPort(s.udpStateTable, name, namespace, port)
mappedPort, err := s.getMappedPort(s.udpStateTable, name, namespace, port)
if err != nil {
return 0, fmt.Errorf("unable to map UDP service port: %w", err)
}

return mappedPort, nil

default:
return 0, errors.New("unknown service mode")
}
Expand All @@ -243,10 +256,10 @@ func (s *ShadowServiceManager) getMappedPort(stateTable PortMapper, name, namesp

mappedPort, err := stateTable.Add(namespace, name, port)
if err != nil {
return 0, fmt.Errorf("unable to add service to the TCP state table: %w", err)
return 0, fmt.Errorf("unable to add service port to the state table: %w", err)
}

s.logger.Debugf("Service %s/%s %d as been assigned port %d", namespace, name, port, mappedPort)
s.logger.Debugf("Service %s/%s %d has been assigned port %d", namespace, name, port, mappedPort)

return mappedPort, nil
}
Expand Down
99 changes: 66 additions & 33 deletions pkg/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (p *Provider) buildServicesAndRoutersForHTTPService(t *topology.Topology, c
}

key := getServiceRouterKeyFromService(svc, svcPort.Port)
cfg.HTTP.Services[key] = p.buildHTTPServiceFromService(t, svc, scheme, svcPort.TargetPort.IntVal)
cfg.HTTP.Services[key] = p.buildHTTPServiceFromService(t, svc, scheme, svcPort)
cfg.HTTP.Routers[key] = buildHTTPRouter(httpRule, entrypoint, middlewares, key, priorityService)
}
}
Expand All @@ -268,7 +268,7 @@ func (p *Provider) buildServicesAndRoutersForTCPService(t *topology.Topology, cf
}

key := getServiceRouterKeyFromService(svc, svcPort.Port)
cfg.TCP.Services[key] = p.buildTCPServiceFromService(t, svc, svcPort.TargetPort.IntVal)
cfg.TCP.Services[key] = p.buildTCPServiceFromService(t, svc, svcPort)
cfg.TCP.Routers[key] = buildTCPRouter(rule, entrypoint, key)
}
}
Expand All @@ -285,7 +285,7 @@ func (p *Provider) buildServicesAndRoutersForUDPService(t *topology.Topology, cf
}

key := getServiceRouterKeyFromService(svc, svcPort.Port)
cfg.UDP.Services[key] = p.buildUDPServiceFromService(t, svc, svcPort.TargetPort.IntVal)
cfg.UDP.Services[key] = p.buildUDPServiceFromService(t, svc, svcPort)
cfg.UDP.Routers[key] = buildUDPRouter(entrypoint, key)
}
}
Expand Down Expand Up @@ -332,7 +332,7 @@ func (p *Provider) buildHTTPServicesAndRoutersForTrafficTarget(t *topology.Topol
}

svcKey := getServiceKeyFromTrafficTarget(tt, svcPort.Port)
cfg.HTTP.Services[svcKey] = p.buildHTTPServiceFromTrafficTarget(t, tt, scheme, svcPort.TargetPort.IntVal)
cfg.HTTP.Services[svcKey] = p.buildHTTPServiceFromTrafficTarget(t, tt, scheme, svcPort)

rtrMiddlewares := addToSliceCopy(middlewares, whitelistDirectKey)

Expand Down Expand Up @@ -373,7 +373,7 @@ func (p *Provider) buildTCPServicesAndRoutersForTrafficTarget(t *topology.Topolo
}

key := getServiceRouterKeyFromService(ttSvc, svcPort.Port)
cfg.TCP.Services[key] = p.buildTCPServiceFromTrafficTarget(t, tt, svcPort.TargetPort.IntVal)
cfg.TCP.Services[key] = p.buildTCPServiceFromTrafficTarget(t, tt, svcPort)
cfg.TCP.Routers[key] = buildTCPRouter(rule, entrypoint, key)
}
}
Expand Down Expand Up @@ -596,21 +596,26 @@ func (p Provider) buildUDPEntrypoint(svc *topology.Service, port int32) (string,
return fmt.Sprintf("udp-%d", meshPort), nil
}

func (p *Provider) buildHTTPServiceFromService(t *topology.Topology, svc *topology.Service, scheme string, port int32) *dynamic.Service {
func (p *Provider) buildHTTPServiceFromService(t *topology.Topology, svc *topology.Service, scheme string, svcPort corev1.ServicePort) *dynamic.Service {
var servers []dynamic.Server

for _, podKey := range svc.Pods {
pod, ok := t.Pods[podKey]
if !ok {
p.logger.Errorf("Unable to find Pod %q for HTTP service from Service %s@%s", podKey, topology.Key{Name: svc.Name, Namespace: svc.Namespace})
continue
}

hostPort, ok := topology.ResolveServicePort(svcPort, pod.ContainerPorts)
if !ok {
p.logger.Warnf("Unable to resolve HTTP service port %q for Pod %q", svcPort.Name, podKey)
continue
}

url := net.JoinHostPort(pod.IP, strconv.Itoa(int(port)))
address := net.JoinHostPort(pod.IP, strconv.Itoa(int(hostPort)))

servers = append(servers, dynamic.Server{
URL: fmt.Sprintf("%s://%s", scheme, url),
URL: fmt.Sprintf("%s://%s", scheme, address),
})
}

Expand All @@ -622,20 +627,31 @@ func (p *Provider) buildHTTPServiceFromService(t *topology.Topology, svc *topolo
}
}

func (p *Provider) buildHTTPServiceFromTrafficTarget(t *topology.Topology, tt *topology.ServiceTrafficTarget, scheme string, port int32) *dynamic.Service {
servers := make([]dynamic.Server, len(tt.Destination.Pods))
func (p *Provider) buildHTTPServiceFromTrafficTarget(t *topology.Topology, tt *topology.ServiceTrafficTarget, scheme string, svcPort corev1.ServicePort) *dynamic.Service {
var servers []dynamic.Server

for i, podKey := range tt.Destination.Pods {
for _, podKey := range tt.Destination.Pods {
pod, ok := t.Pods[podKey]
if !ok {
p.logger.Errorf("Unable to find Pod %q for HTTP service from Traffic Target %s@%s", podKey, topology.Key{Name: tt.Name, Namespace: tt.Namespace})
p.logger.Errorf("Unable to find Pod %q for HTTP service from Traffic Target %q", podKey, topology.ServiceTrafficTargetKey{
Service: tt.Service,
TrafficTarget: topology.Key{Name: tt.Name, Namespace: tt.Namespace},
})

continue
}

url := net.JoinHostPort(pod.IP, strconv.Itoa(int(port)))
hostPort, ok := topology.ResolveServicePort(svcPort, pod.ContainerPorts)
if !ok {
p.logger.Warnf("Unable to resolve HTTP service port %q for Pod %q", svcPort.TargetPort, podKey)
continue
}

address := net.JoinHostPort(pod.IP, strconv.Itoa(int(hostPort)))

servers[i].URL = fmt.Sprintf("%s://%s", scheme, url)
servers = append(servers, dynamic.Server{
URL: fmt.Sprintf("%s://%s", scheme, address),
})
}

return &dynamic.Service{
Expand All @@ -646,18 +662,23 @@ func (p *Provider) buildHTTPServiceFromTrafficTarget(t *topology.Topology, tt *t
}
}

func (p *Provider) buildTCPServiceFromService(t *topology.Topology, svc *topology.Service, port int32) *dynamic.TCPService {
func (p *Provider) buildTCPServiceFromService(t *topology.Topology, svc *topology.Service, svcPort corev1.ServicePort) *dynamic.TCPService {
var servers []dynamic.TCPServer

for _, podKey := range svc.Pods {
pod, ok := t.Pods[podKey]
if !ok {
p.logger.Errorf("Unable to find Pod %q for TCP service from Service %s@%s", podKey, topology.Key{Name: svc.Name, Namespace: svc.Namespace})
continue
}

hostPort, ok := topology.ResolveServicePort(svcPort, pod.ContainerPorts)
if !ok {
p.logger.Warnf("Unable to resolve TCP service port %q for Pod %q", svcPort.Name, podKey)
continue
}

address := net.JoinHostPort(pod.IP, strconv.Itoa(int(port)))
address := net.JoinHostPort(pod.IP, strconv.Itoa(int(hostPort)))

servers = append(servers, dynamic.TCPServer{
Address: address,
Expand All @@ -671,47 +692,61 @@ func (p *Provider) buildTCPServiceFromService(t *topology.Topology, svc *topolog
}
}

func (p *Provider) buildUDPServiceFromService(t *topology.Topology, svc *topology.Service, port int32) *dynamic.UDPService {
var servers []dynamic.UDPServer
func (p *Provider) buildTCPServiceFromTrafficTarget(t *topology.Topology, tt *topology.ServiceTrafficTarget, svcPort corev1.ServicePort) *dynamic.TCPService {
var servers []dynamic.TCPServer

for _, podKey := range svc.Pods {
for _, podKey := range tt.Destination.Pods {
pod, ok := t.Pods[podKey]
if !ok {
p.logger.Errorf("Unable to find Pod %q for UDP service from Service %s@%s", podKey, topology.Key{Name: svc.Name, Namespace: svc.Namespace})
p.logger.Errorf("Unable to find Pod %q for TCP service from Traffic Target %s@%s", podKey, topology.Key{Name: tt.Name, Namespace: tt.Namespace})
continue
}

hostPort, ok := topology.ResolveServicePort(svcPort, pod.ContainerPorts)
if !ok {
p.logger.Warnf("Unable to resolve TCP service port %q for Pod %q", svcPort.Name, podKey)
continue
}

address := net.JoinHostPort(pod.IP, strconv.Itoa(int(port)))
address := net.JoinHostPort(pod.IP, strconv.Itoa(int(hostPort)))

servers = append(servers, dynamic.UDPServer{
servers = append(servers, dynamic.TCPServer{
Address: address,
})
}

return &dynamic.UDPService{
LoadBalancer: &dynamic.UDPServersLoadBalancer{
return &dynamic.TCPService{
LoadBalancer: &dynamic.TCPServersLoadBalancer{
Servers: servers,
},
}
}

func (p *Provider) buildTCPServiceFromTrafficTarget(t *topology.Topology, tt *topology.ServiceTrafficTarget, port int32) *dynamic.TCPService {
servers := make([]dynamic.TCPServer, len(tt.Destination.Pods))
func (p *Provider) buildUDPServiceFromService(t *topology.Topology, svc *topology.Service, svcPort corev1.ServicePort) *dynamic.UDPService {
var servers []dynamic.UDPServer

for i, podKey := range tt.Destination.Pods {
for _, podKey := range svc.Pods {
pod, ok := t.Pods[podKey]
if !ok {
p.logger.Errorf("Unable to find Pod %q for TCP service from Traffic Target %s@%s", podKey, topology.Key{Name: tt.Name, Namespace: tt.Namespace})
p.logger.Errorf("Unable to find Pod %q for UDP service from Service %s@%s", podKey, topology.Key{Name: svc.Name, Namespace: svc.Namespace})
continue
}

hostPort, ok := topology.ResolveServicePort(svcPort, pod.ContainerPorts)
if !ok {
p.logger.Warnf("Unable to resolve UDP service port %q for Pod %q", svcPort.Name, podKey)
continue
}

servers[i].Address = net.JoinHostPort(pod.IP, strconv.Itoa(int(port)))
address := net.JoinHostPort(pod.IP, strconv.Itoa(int(hostPort)))

servers = append(servers, dynamic.UDPServer{
Address: address,
})
}

return &dynamic.TCPService{
LoadBalancer: &dynamic.TCPServersLoadBalancer{
return &dynamic.UDPService{
LoadBalancer: &dynamic.UDPServersLoadBalancer{
Servers: servers,
},
}
Expand All @@ -728,7 +763,6 @@ func (p *Provider) buildWhitelistMiddlewareFromTrafficTargetDirect(t *topology.T
pod, ok := t.Pods[podKey]
if !ok {
p.logger.Errorf("Unable to find Pod %q for WhitelistMiddleware from Traffic Target %s@%s", podKey, topology.Key{Name: tt.Name, Namespace: tt.Namespace})

continue
}

Expand All @@ -753,7 +787,6 @@ func (p *Provider) buildWhitelistMiddlewareFromTrafficSplitDirect(t *topology.To
pod, ok := t.Pods[podKey]
if !ok {
p.logger.Errorf("Unable to find Pod %q for WhitelistMiddleware from Traffic Split %s@%s", podKey, topology.Key{Name: ts.Name, Namespace: ts.Namespace})

continue
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/provider/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func TestProvider_BuildConfig(t *testing.T) {
defaultTrafficType: "tcp",
tcpStateTable: map[servicePort]int32{
{Namespace: "my-ns", Name: "svc-a", Port: 8080}: 5000,
{Namespace: "my-ns", Name: "svc-a", Port: 8081}: 5001,
},
topology: "testdata/acl-disabled-tcp-basic-topology.json",
wantConfig: "testdata/acl-disabled-tcp-basic-config.json",
Expand All @@ -77,6 +78,7 @@ func TestProvider_BuildConfig(t *testing.T) {
defaultTrafficType: "udp",
udpStateTable: map[servicePort]int32{
{Namespace: "my-ns", Name: "svc-a", Port: 8080}: 15000,
{Namespace: "my-ns", Name: "svc-a", Port: 8081}: 15001,
},
topology: "testdata/acl-disabled-udp-basic-topology.json",
wantConfig: "testdata/acl-disabled-udp-basic-config.json",
Expand All @@ -101,6 +103,7 @@ func TestProvider_BuildConfig(t *testing.T) {
defaultTrafficType: "tcp",
tcpStateTable: map[servicePort]int32{
{Namespace: "my-ns", Name: "svc-b", Port: 8080}: 5000,
{Namespace: "my-ns", Name: "svc-b", Port: 8081}: 5001,
},
topology: "testdata/acl-enabled-tcp-basic-topology.json",
wantConfig: "testdata/acl-enabled-tcp-basic-config.json",
Expand Down
21 changes: 21 additions & 0 deletions pkg/provider/testdata/acl-disabled-http-basic-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@
"rule": "Host(`svc-a.my-ns.maesh`) || Host(`10.10.14.1`)",
"priority": 1001
},
"my-ns-svc-a-8081": {
"entryPoints": [
"http-10001"
],
"service": "my-ns-svc-a-8081",
"rule": "Host(`svc-a.my-ns.maesh`) || Host(`10.10.14.1`)",
"priority": 1001
},
"readiness": {
"entryPoints": [
"readiness"
Expand Down Expand Up @@ -36,6 +44,19 @@
"passHostHeader": true
}
},
"my-ns-svc-a-8081": {
"loadBalancer": {
"servers": [
{
"url": "http://10.10.2.1:8080"
},
{
"url": "http://10.10.2.2:8081"
}
],
"passHostHeader": true
}
},
"readiness": {
"loadBalancer": {
"servers": [
Expand Down

0 comments on commit ad8ab80

Please sign in to comment.