Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kube-proxy userspace, enable RR if connect endpoint which is session affinity fails #24655

Merged
merged 1 commit into from
May 9, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/proxy/userspace/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
type LoadBalancer interface {
// NextEndpoint returns the endpoint to handle a request for the given
// service-port and source address.
NextEndpoint(service proxy.ServicePortName, srcAddr net.Addr) (string, error)
NextEndpoint(service proxy.ServicePortName, srcAddr net.Addr, sessionAffinityReset bool) (string, error)
NewService(service proxy.ServicePortName, sessionAffinityType api.ServiceAffinity, stickyMaxAgeMinutes int) error
CleanupStaleStickySessions(service proxy.ServicePortName)
}
4 changes: 3 additions & 1 deletion pkg/proxy/userspace/proxysocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ func (tcp *tcpProxySocket) ListenPort() int {
}

func tryConnect(service proxy.ServicePortName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) {
sessionAffinityReset := false
for _, dialTimeout := range endpointDialTimeout {
endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr)
endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr, sessionAffinityReset)
if err != nil {
glog.Errorf("Couldn't find an endpoint for %s: %v", service, err)
return nil, err
Expand All @@ -102,6 +103,7 @@ func tryConnect(service proxy.ServicePortName, srcAddr net.Addr, protocol string
panic("Dial failed: " + err.Error())
}
glog.Errorf("Dial failed: %v", err)
sessionAffinityReset = true
continue
}
return outConn, nil
Expand Down
18 changes: 10 additions & 8 deletions pkg/proxy/userspace/roundrobin.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func isSessionAffinity(affinity *affinityPolicy) bool {

// NextEndpoint returns a service endpoint.
// The service endpoint is chosen using the round-robin algorithm.
func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr net.Addr) (string, error) {
func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr net.Addr, sessionAffinityReset bool) (string, error) {
// Coarse locking is simple. We can get more fine-grained if/when we
// can prove it matters.
lb.lock.Lock()
Expand All @@ -139,13 +139,15 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne
if err != nil {
return "", fmt.Errorf("malformed source address %q: %v", srcAddr.String(), err)
}
sessionAffinity, exists := state.affinity.affinityMap[ipaddr]
if exists && int(time.Now().Sub(sessionAffinity.lastUsed).Minutes()) < state.affinity.ttlMinutes {
// Affinity wins.
endpoint := sessionAffinity.endpoint
sessionAffinity.lastUsed = time.Now()
glog.V(4).Infof("NextEndpoint for service %q from IP %s with sessionAffinity %+v: %s", svcPort, ipaddr, sessionAffinity, endpoint)
return endpoint, nil
if !sessionAffinityReset {
sessionAffinity, exists := state.affinity.affinityMap[ipaddr]
if exists && int(time.Now().Sub(sessionAffinity.lastUsed).Minutes()) < state.affinity.ttlMinutes {
// Affinity wins.
endpoint := sessionAffinity.endpoint
sessionAffinity.lastUsed = time.Now()
glog.V(4).Infof("NextEndpoint for service %q from IP %s with sessionAffinity %+v: %s", svcPort, ipaddr, sessionAffinity, endpoint)
return endpoint, nil
}
}
}
// Take the next endpoint.
Expand Down
106 changes: 84 additions & 22 deletions pkg/proxy/userspace/roundrobin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) {
var endpoints []api.Endpoints
loadBalancer.OnEndpointsUpdate(endpoints)
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "does-not-exist"}
endpoint, err := loadBalancer.NextEndpoint(service, nil)
endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
if err == nil {
t.Errorf("Didn't fail with non-existent service")
}
Expand All @@ -79,7 +79,17 @@ func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) {
}

func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service proxy.ServicePortName, expected string, netaddr net.Addr) {
endpoint, err := loadBalancer.NextEndpoint(service, netaddr)
endpoint, err := loadBalancer.NextEndpoint(service, netaddr, false)
if err != nil {
t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err)
}
if endpoint != expected {
t.Errorf("Didn't get expected endpoint for service %s client %v, expected %s, got: %s", service, netaddr, expected, endpoint)
}
}

func expectEndpointWithSessionAffinityReset(t *testing.T, loadBalancer *LoadBalancerRR, service proxy.ServicePortName, expected string, netaddr net.Addr) {
endpoint, err := loadBalancer.NextEndpoint(service, netaddr, true)
if err != nil {
t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err)
}
Expand All @@ -91,7 +101,7 @@ func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service proxy.Se
func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
endpoint, err := loadBalancer.NextEndpoint(service, nil)
endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
Expand Down Expand Up @@ -129,7 +139,7 @@ func stringsInSlice(haystack []string, needles ...string) bool {
func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
endpoint, err := loadBalancer.NextEndpoint(service, nil)
endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
Expand Down Expand Up @@ -157,7 +167,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsMultiplePorts(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "q"}
endpoint, err := loadBalancer.NextEndpoint(serviceP, nil)
endpoint, err := loadBalancer.NextEndpoint(serviceP, nil, false)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
Expand Down Expand Up @@ -200,7 +210,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "q"}
endpoint, err := loadBalancer.NextEndpoint(serviceP, nil)
endpoint, err := loadBalancer.NextEndpoint(serviceP, nil, false)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
Expand Down Expand Up @@ -281,7 +291,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: nil}
loadBalancer.OnEndpointsUpdate(endpoints)

endpoint, err = loadBalancer.NextEndpoint(serviceP, nil)
endpoint, err = loadBalancer.NextEndpoint(serviceP, nil, false)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
Expand All @@ -291,7 +301,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
fooServiceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
barServiceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "bar"}, Port: "p"}
endpoint, err := loadBalancer.NextEndpoint(fooServiceP, nil)
endpoint, err := loadBalancer.NextEndpoint(fooServiceP, nil, false)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
Expand Down Expand Up @@ -331,7 +341,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {

// Then update the configuration by removing foo
loadBalancer.OnEndpointsUpdate(endpoints[1:])
endpoint, err = loadBalancer.NextEndpoint(fooServiceP, nil)
endpoint, err = loadBalancer.NextEndpoint(fooServiceP, nil, false)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
Expand All @@ -346,7 +356,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
endpoint, err := loadBalancer.NextEndpoint(service, nil)
endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
Expand All @@ -368,23 +378,23 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) {
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}

ep1, err := loadBalancer.NextEndpoint(service, client1)
ep1, err := loadBalancer.NextEndpoint(service, client1, false)
if err != nil {
t.Errorf("Didn't find a service for %s: %v", service, err)
}
expectEndpoint(t, loadBalancer, service, ep1, client1)
expectEndpoint(t, loadBalancer, service, ep1, client1)
expectEndpoint(t, loadBalancer, service, ep1, client1)

ep2, err := loadBalancer.NextEndpoint(service, client2)
ep2, err := loadBalancer.NextEndpoint(service, client2, false)
if err != nil {
t.Errorf("Didn't find a service for %s: %v", service, err)
}
expectEndpoint(t, loadBalancer, service, ep2, client2)
expectEndpoint(t, loadBalancer, service, ep2, client2)
expectEndpoint(t, loadBalancer, service, ep2, client2)

ep3, err := loadBalancer.NextEndpoint(service, client3)
ep3, err := loadBalancer.NextEndpoint(service, client3, false)
if err != nil {
t.Errorf("Didn't find a service for %s: %v", service, err)
}
Expand All @@ -403,7 +413,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) {
func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
endpoint, err := loadBalancer.NextEndpoint(service, nil)
endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
Expand All @@ -424,23 +434,23 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) {
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}

ep1, err := loadBalancer.NextEndpoint(service, client1)
ep1, err := loadBalancer.NextEndpoint(service, client1, false)
if err != nil {
t.Errorf("Didn't find a service for %s: %v", service, err)
}
expectEndpoint(t, loadBalancer, service, ep1, client1)
expectEndpoint(t, loadBalancer, service, ep1, client1)
expectEndpoint(t, loadBalancer, service, ep1, client1)

ep2, err := loadBalancer.NextEndpoint(service, client2)
ep2, err := loadBalancer.NextEndpoint(service, client2, false)
if err != nil {
t.Errorf("Didn't find a service for %s: %v", service, err)
}
expectEndpoint(t, loadBalancer, service, ep2, client2)
expectEndpoint(t, loadBalancer, service, ep2, client2)
expectEndpoint(t, loadBalancer, service, ep2, client2)

ep3, err := loadBalancer.NextEndpoint(service, client3)
ep3, err := loadBalancer.NextEndpoint(service, client3, false)
if err != nil {
t.Errorf("Didn't find a service for %s: %v", service, err)
}
Expand All @@ -465,7 +475,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
client6 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 6), Port: 0}
loadBalancer := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
endpoint, err := loadBalancer.NextEndpoint(service, nil)
endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
Expand Down Expand Up @@ -539,7 +549,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
loadBalancer := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
endpoint, err := loadBalancer.NextEndpoint(service, nil)
endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
Expand Down Expand Up @@ -588,7 +598,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: nil}
loadBalancer.OnEndpointsUpdate(endpoints)

endpoint, err = loadBalancer.NextEndpoint(service, nil)
endpoint, err = loadBalancer.NextEndpoint(service, nil, false)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
Expand All @@ -600,7 +610,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
loadBalancer := NewLoadBalancerRR()
fooService := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
endpoint, err := loadBalancer.NextEndpoint(fooService, nil)
endpoint, err := loadBalancer.NextEndpoint(fooService, nil, false)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
Expand Down Expand Up @@ -649,7 +659,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {

// Then update the configuration by removing foo
loadBalancer.OnEndpointsUpdate(endpoints[1:])
endpoint, err = loadBalancer.NextEndpoint(fooService, nil)
endpoint, err = loadBalancer.NextEndpoint(fooService, nil, false)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
Expand All @@ -663,3 +673,55 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
}

func TestStickyLoadBalanceWorksWithEndpointFails(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}

// Call NewService() before OnEndpointsUpdate()
loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0)
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{
{Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}},
{Addresses: []api.EndpointAddress{{IP: "endpoint2"}}, Ports: []api.EndpointPort{{Port: 2}}},
{Addresses: []api.EndpointAddress{{IP: "endpoint3"}}, Ports: []api.EndpointPort{{Port: 3}}},
},
}
loadBalancer.OnEndpointsUpdate(endpoints)

client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}

ep1, err := loadBalancer.NextEndpoint(service, client1, false)
if err != nil {
t.Errorf("Didn't find a service for %s: %v", service, err)
}

ep2, err := loadBalancer.NextEndpoint(service, client2, false)
if err != nil {
t.Errorf("Didn't find a service for %s: %v", service, err)
}

ep3, err := loadBalancer.NextEndpoint(service, client3, false)
if err != nil {
t.Errorf("Didn't find a service for %s: %v", service, err)
}

expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep1, client1)
expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep2, client1)
expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep3, client1)

expectEndpoint(t, loadBalancer, service, ep2, client2)
expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep1, client2)
expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep2, client3)
expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep3, client1)
expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep1, client2)
expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep2, client3)
}