Skip to content

Commit

Permalink
buildPortsToEndpointsMap should use flattened value type
Browse files Browse the repository at this point in the history
  • Loading branch information
tedyu authored and yutedz committed Aug 15, 2019
1 parent 46e58df commit 2f67134
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 172 deletions.
54 changes: 7 additions & 47 deletions pkg/proxy/userspace/roundrobin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import (
"fmt"
"net"
"reflect"
"strconv"
"sync"
"time"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/util"
"k8s.io/kubernetes/pkg/util/slice"
)

Expand Down Expand Up @@ -188,28 +188,6 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne
return endpoint, nil
}

type hostPortPair struct {
host string
port int
}

func isValidEndpoint(hpp *hostPortPair) bool {
return hpp.host != "" && hpp.port > 0
}

func flattenValidEndpoints(endpoints []hostPortPair) []string {
// Convert Endpoint objects into strings for easier use later. Ignore
// the protocol field - we'll get that from the Service objects.
var result []string
for i := range endpoints {
hpp := &endpoints[i]
if isValidEndpoint(hpp) {
result = append(result, net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port)))
}
}
return result
}

// Remove any session affinity records associated to a particular endpoint (for example when a pod goes down).
func removeSessionAffinityByEndpoint(state *balancerState, svcPort proxy.ServicePortName, endpoint string) {
for _, affinity := range state.affinity.affinityMap {
Expand Down Expand Up @@ -243,33 +221,15 @@ func (lb *LoadBalancerRR) updateAffinityMap(svcPort proxy.ServicePortName, newEn
}
}

// buildPortsToEndpointsMap builds a map of portname -> all ip:ports for that
// portname. Expode Endpoints.Subsets[*] into this structure.
func buildPortsToEndpointsMap(endpoints *v1.Endpoints) map[string][]hostPortPair {
portsToEndpoints := map[string][]hostPortPair{}
for i := range endpoints.Subsets {
ss := &endpoints.Subsets[i]
for i := range ss.Ports {
port := &ss.Ports[i]
for i := range ss.Addresses {
addr := &ss.Addresses[i]
portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortPair{addr.IP, int(port.Port)})
// Ignore the protocol field - we'll get that from the Service objects.
}
}
}
return portsToEndpoints
}

func (lb *LoadBalancerRR) OnEndpointsAdd(endpoints *v1.Endpoints) {
portsToEndpoints := buildPortsToEndpointsMap(endpoints)
portsToEndpoints := util.BuildPortsToEndpointsMap(endpoints)

lb.lock.Lock()
defer lb.lock.Unlock()

for portname := range portsToEndpoints {
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname}
newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])
newEndpoints := portsToEndpoints[portname]
state, exists := lb.services[svcPort]

if !exists || state == nil || len(newEndpoints) > 0 {
Expand All @@ -289,16 +249,16 @@ func (lb *LoadBalancerRR) OnEndpointsAdd(endpoints *v1.Endpoints) {
}

func (lb *LoadBalancerRR) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
portsToEndpoints := buildPortsToEndpointsMap(endpoints)
oldPortsToEndpoints := buildPortsToEndpointsMap(oldEndpoints)
portsToEndpoints := util.BuildPortsToEndpointsMap(endpoints)
oldPortsToEndpoints := util.BuildPortsToEndpointsMap(oldEndpoints)
registeredEndpoints := make(map[proxy.ServicePortName]bool)

lb.lock.Lock()
defer lb.lock.Unlock()

for portname := range portsToEndpoints {
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname}
newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])
newEndpoints := portsToEndpoints[portname]
state, exists := lb.services[svcPort]

curEndpoints := []string{}
Expand Down Expand Up @@ -344,7 +304,7 @@ func (lb *LoadBalancerRR) resetService(svcPort proxy.ServicePortName) {
}

func (lb *LoadBalancerRR) OnEndpointsDelete(endpoints *v1.Endpoints) {
portsToEndpoints := buildPortsToEndpointsMap(endpoints)
portsToEndpoints := util.BuildPortsToEndpointsMap(endpoints)

lb.lock.Lock()
defer lb.lock.Unlock()
Expand Down
39 changes: 0 additions & 39 deletions pkg/proxy/userspace/roundrobin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,45 +26,6 @@ import (
"k8s.io/kubernetes/pkg/proxy"
)

func TestValidateWorks(t *testing.T) {
if isValidEndpoint(&hostPortPair{}) {
t.Errorf("Didn't fail for empty set")
}
if isValidEndpoint(&hostPortPair{host: "foobar"}) {
t.Errorf("Didn't fail with invalid port")
}
if isValidEndpoint(&hostPortPair{host: "foobar", port: -1}) {
t.Errorf("Didn't fail with a negative port")
}
if !isValidEndpoint(&hostPortPair{host: "foobar", port: 8080}) {
t.Errorf("Failed a valid config.")
}
}

func TestFilterWorks(t *testing.T) {
endpoints := []hostPortPair{
{host: "foobar", port: 1},
{host: "foobar", port: 2},
{host: "foobar", port: -1},
{host: "foobar", port: 3},
{host: "foobar", port: -2},
}
filtered := flattenValidEndpoints(endpoints)

if len(filtered) != 3 {
t.Errorf("Failed to filter to the correct size")
}
if filtered[0] != "foobar:1" {
t.Errorf("Index zero is not foobar:1")
}
if filtered[1] != "foobar:2" {
t.Errorf("Index one is not foobar:2")
}
if filtered[2] != "foobar:3" {
t.Errorf("Index two is not foobar:3")
}
}

func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "does-not-exist"}
Expand Down
25 changes: 25 additions & 0 deletions pkg/proxy/util/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"net"
"strconv"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -48,6 +49,30 @@ var (
ErrNoAddresses = errors.New("No addresses for hostname")
)

// isValidEndpoint checks that the given host / port pair are valid endpoint
func isValidEndpoint(host string, port int) bool {
return host != "" && port > 0
}

// BuildPortsToEndpointsMap builds a map of portname -> all ip:ports for that
// portname. Explode Endpoints.Subsets[*] into this structure.
func BuildPortsToEndpointsMap(endpoints *v1.Endpoints) map[string][]string {
portsToEndpoints := map[string][]string{}
for i := range endpoints.Subsets {
ss := &endpoints.Subsets[i]
for i := range ss.Ports {
port := &ss.Ports[i]
for i := range ss.Addresses {
addr := &ss.Addresses[i]
if isValidEndpoint(addr.IP, int(port.Port)) {
portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], net.JoinHostPort(addr.IP, strconv.Itoa(int(port.Port))))
}
}
}
}
return portsToEndpoints
}

// IsZeroCIDR checks whether the input CIDR string is either
// the IPv4 or IPv6 zero CIDR
func IsZeroCIDR(cidr string) bool {
Expand Down
67 changes: 67 additions & 0 deletions pkg/proxy/util/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package util
import (
"context"
"net"
"reflect"
"testing"

"k8s.io/api/core/v1"
Expand All @@ -28,6 +29,72 @@ import (
fake "k8s.io/kubernetes/pkg/proxy/util/testing"
)

func TestValidateWorks(t *testing.T) {
if isValidEndpoint("", 0) {
t.Errorf("Didn't fail for empty set")
}
if isValidEndpoint("foobar", 0) {
t.Errorf("Didn't fail with invalid port")
}
if isValidEndpoint("foobar", -1) {
t.Errorf("Didn't fail with a negative port")
}
if !isValidEndpoint("foobar", 8080) {
t.Errorf("Failed a valid config.")
}
}

func TestBuildPortsToEndpointsMap(t *testing.T) {
endpoints := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "testnamespace"},
Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{IP: "10.0.0.1"},
{IP: "10.0.0.2"},
},
Ports: []v1.EndpointPort{
{Name: "http", Port: 80},
{Name: "https", Port: 443},
},
},
{
Addresses: []v1.EndpointAddress{
{IP: "10.0.0.1"},
{IP: "10.0.0.3"},
},
Ports: []v1.EndpointPort{
{Name: "http", Port: 8080},
{Name: "dns", Port: 53},
},
},
{
Addresses: []v1.EndpointAddress{},
Ports: []v1.EndpointPort{
{Name: "http", Port: 8888},
{Name: "ssh", Port: 22},
},
},
{
Addresses: []v1.EndpointAddress{
{IP: "10.0.0.1"},
},
Ports: []v1.EndpointPort{},
},
},
}
expectedPortsToEndpoints := map[string][]string{
"http": {"10.0.0.1:80", "10.0.0.2:80", "10.0.0.1:8080", "10.0.0.3:8080"},
"https": {"10.0.0.1:443", "10.0.0.2:443"},
"dns": {"10.0.0.1:53", "10.0.0.3:53"},
}

portsToEndpoints := BuildPortsToEndpointsMap(endpoints)
if !reflect.DeepEqual(expectedPortsToEndpoints, portsToEndpoints) {
t.Errorf("expected ports to endpoints not seen")
}
}

func TestIsProxyableIP(t *testing.T) {
testCases := []struct {
ip string
Expand Down
1 change: 1 addition & 0 deletions pkg/proxy/winuserspace/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/proxy:go_default_library",
"//pkg/proxy/config:go_default_library",
"//pkg/proxy/util:go_default_library",
"//pkg/util/ipconfig:go_default_library",
"//pkg/util/netsh:go_default_library",
"//pkg/util/slice:go_default_library",
Expand Down
54 changes: 7 additions & 47 deletions pkg/proxy/winuserspace/roundrobin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import (
"fmt"
"net"
"reflect"
"strconv"
"sync"
"time"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/util"
"k8s.io/kubernetes/pkg/util/slice"
)

Expand Down Expand Up @@ -178,28 +178,6 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne
return endpoint, nil
}

type hostPortPair struct {
host string
port int
}

func isValidEndpoint(hpp *hostPortPair) bool {
return hpp.host != "" && hpp.port > 0
}

func flattenValidEndpoints(endpoints []hostPortPair) []string {
// Convert Endpoint objects into strings for easier use later. Ignore
// the protocol field - we'll get that from the Service objects.
var result []string
for i := range endpoints {
hpp := &endpoints[i]
if isValidEndpoint(hpp) {
result = append(result, net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port)))
}
}
return result
}

// Remove any session affinity records associated to a particular endpoint (for example when a pod goes down).
func removeSessionAffinityByEndpoint(state *balancerState, svcPort proxy.ServicePortName, endpoint string) {
for _, affinity := range state.affinity.affinityMap {
Expand Down Expand Up @@ -233,33 +211,15 @@ func (lb *LoadBalancerRR) updateAffinityMap(svcPort proxy.ServicePortName, newEn
}
}

// buildPortsToEndpointsMap builds a map of portname -> all ip:ports for that
// portname. Explode Endpoints.Subsets[*] into this structure.
func buildPortsToEndpointsMap(endpoints *v1.Endpoints) map[string][]hostPortPair {
portsToEndpoints := map[string][]hostPortPair{}
for i := range endpoints.Subsets {
ss := &endpoints.Subsets[i]
for i := range ss.Ports {
port := &ss.Ports[i]
for i := range ss.Addresses {
addr := &ss.Addresses[i]
portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortPair{addr.IP, int(port.Port)})
// Ignore the protocol field - we'll get that from the Service objects.
}
}
}
return portsToEndpoints
}

func (lb *LoadBalancerRR) OnEndpointsAdd(endpoints *v1.Endpoints) {
portsToEndpoints := buildPortsToEndpointsMap(endpoints)
portsToEndpoints := util.BuildPortsToEndpointsMap(endpoints)

lb.lock.Lock()
defer lb.lock.Unlock()

for portname := range portsToEndpoints {
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname}
newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])
newEndpoints := portsToEndpoints[portname]
state, exists := lb.services[svcPort]

if !exists || state == nil || len(newEndpoints) > 0 {
Expand All @@ -279,16 +239,16 @@ func (lb *LoadBalancerRR) OnEndpointsAdd(endpoints *v1.Endpoints) {
}

func (lb *LoadBalancerRR) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
portsToEndpoints := buildPortsToEndpointsMap(endpoints)
oldPortsToEndpoints := buildPortsToEndpointsMap(oldEndpoints)
portsToEndpoints := util.BuildPortsToEndpointsMap(endpoints)
oldPortsToEndpoints := util.BuildPortsToEndpointsMap(oldEndpoints)
registeredEndpoints := make(map[proxy.ServicePortName]bool)

lb.lock.Lock()
defer lb.lock.Unlock()

for portname := range portsToEndpoints {
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname}
newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])
newEndpoints := portsToEndpoints[portname]
state, exists := lb.services[svcPort]

curEndpoints := []string{}
Expand Down Expand Up @@ -326,7 +286,7 @@ func (lb *LoadBalancerRR) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoint
}

func (lb *LoadBalancerRR) OnEndpointsDelete(endpoints *v1.Endpoints) {
portsToEndpoints := buildPortsToEndpointsMap(endpoints)
portsToEndpoints := util.BuildPortsToEndpointsMap(endpoints)

lb.lock.Lock()
defer lb.lock.Unlock()
Expand Down
Loading

0 comments on commit 2f67134

Please sign in to comment.