Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a race in kube-proxy causing runaways #4080

Merged
merged 1 commit into from
Feb 4, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
48 changes: 26 additions & 22 deletions pkg/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
Expand Down Expand Up @@ -52,11 +53,12 @@ var endpointDialTimeout = []time.Duration{1, 2, 4, 8}
type proxySocket interface {
// Addr gets the net.Addr for a proxySocket.
Addr() net.Addr
// Close stops the proxySocket from accepting incoming connections. Each implementation should comment
// on the impact of calling Close while sessions are active.
// Close stops the proxySocket from accepting incoming connections.
// Each implementation should comment on the impact of calling Close
// while sessions are active.
Close() error
// ProxyLoop proxies incoming connections for the specified service to the service endpoints.
ProxyLoop(service string, proxier *Proxier)
ProxyLoop(service string, info *serviceInfo, proxier *Proxier)
}

// tcpProxySocket implements proxySocket. Close() is implemented by net.Listener. When Close() is called,
Expand Down Expand Up @@ -85,20 +87,19 @@ func tryConnect(service string, srcAddr net.Addr, protocol string, proxier *Prox
return nil, fmt.Errorf("failed to connect to an endpoint.")
}

func (tcp *tcpProxySocket) ProxyLoop(service string, proxier *Proxier) {
func (tcp *tcpProxySocket) ProxyLoop(service string, myInfo *serviceInfo, proxier *Proxier) {
for {
_, exists := proxier.getServiceInfo(service)
if !exists {
if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo {
// The service port was closed or replaced.
break
}

// Block until a connection is made.
inConn, err := tcp.Accept()
if err != nil {
_, exists := proxier.getServiceInfo(service)
if !exists {
if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo {
// Then the service port was just closed so the accept failure is to be expected.
return
break
}
glog.Errorf("Accept failed: %v", err)
continue
Expand Down Expand Up @@ -161,12 +162,12 @@ func newClientCache() *clientCache {
return &clientCache{clients: map[string]net.Conn{}}
}

func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) {
func (udp *udpProxySocket) ProxyLoop(service string, myInfo *serviceInfo, proxier *Proxier) {
activeClients := newClientCache()
var buffer [4096]byte // 4KiB should be enough for most whole-packets
for {
info, exists := proxier.getServiceInfo(service)
if !exists {
if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo {
// The service port was closed or replaced.
break
}

Expand All @@ -184,7 +185,7 @@ func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) {
break
}
// If this is a client we know already, reuse the connection and goroutine.
svrConn, err := udp.getBackendConn(activeClients, cliAddr, proxier, service, info.timeout)
svrConn, err := udp.getBackendConn(activeClients, cliAddr, proxier, service, myInfo.timeout)
if err != nil {
continue
}
Expand All @@ -198,7 +199,7 @@ func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) {
}
continue
}
err = svrConn.SetDeadline(time.Now().Add(info.timeout))
err = svrConn.SetDeadline(time.Now().Add(myInfo.timeout))
if err != nil {
glog.Errorf("SetDeadline failed: %v", err)
continue
Expand Down Expand Up @@ -267,7 +268,7 @@ func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activ
func logTimeout(err error) bool {
if e, ok := err.(net.Error); ok {
if e.Timeout() {
glog.V(1).Infof("connection to endpoint closed due to inactivity")
glog.V(3).Infof("connection to endpoint closed due to inactivity")
return true
}
}
Expand Down Expand Up @@ -300,12 +301,13 @@ func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, er
// Proxier is a simple proxy for TCP connections between a localhost:lport
// and services that provide the actual implementations.
type Proxier struct {
loadBalancer LoadBalancer
mu sync.Mutex // protects serviceMap
serviceMap map[string]*serviceInfo
listenIP net.IP
iptables iptables.Interface
hostIP net.IP
loadBalancer LoadBalancer
mu sync.Mutex // protects serviceMap
serviceMap map[string]*serviceInfo
numProxyLoops int32 // use atomic ops to access this; mostly for testing
listenIP net.IP
iptables iptables.Interface
hostIP net.IP
}

// NewProxier returns a new Proxier given a LoadBalancer and an address on
Expand Down Expand Up @@ -443,7 +445,9 @@ func (proxier *Proxier) addServiceOnPort(service string, protocol api.Protocol,
glog.V(1).Infof("Proxying for service %q on %s port %d", service, protocol, portNum)
go func(service string, proxier *Proxier) {
defer util.HandleCrash()
sock.ProxyLoop(service, proxier)
atomic.AddInt32(&proxier.numProxyLoops, 1)
sock.ProxyLoop(service, si, proxier)
atomic.AddInt32(&proxier.numProxyLoops, -1)
}(service, proxier)

return si, nil
Expand Down
46 changes: 46 additions & 0 deletions pkg/proxy/proxier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net/http/httptest"
"net/url"
"strconv"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -171,6 +172,18 @@ func testEchoUDP(t *testing.T, address string, port int) {
}
}

func waitForNumProxyLoops(t *testing.T, p *Proxier, want int32) {
var got int32
for i := 0; i < 4; i++ {
got = atomic.LoadInt32(&p.numProxyLoops)
if got == want {
return
}
time.Sleep(500 * time.Millisecond)
}
t.Errorf("expected %d ProxyLoops running, got %d", want, got)
}

func TestTCPProxy(t *testing.T) {
lb := NewLoadBalancerRR()
lb.OnUpdate([]api.Endpoints{
Expand All @@ -181,12 +194,14 @@ func TestTCPProxy(t *testing.T) {
})

p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{})
waitForNumProxyLoops(t, p, 0)

svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1)
}

func TestUDPProxy(t *testing.T) {
Expand All @@ -199,12 +214,14 @@ func TestUDPProxy(t *testing.T) {
})

p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{})
waitForNumProxyLoops(t, p, 0)

svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1)
}

// Helper: Stops the proxy for the named service.
Expand All @@ -226,6 +243,7 @@ func TestTCPProxyStop(t *testing.T) {
})

p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{})
waitForNumProxyLoops(t, p, 0)

svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second)
if err != nil {
Expand All @@ -236,12 +254,14 @@ func TestTCPProxyStop(t *testing.T) {
t.Fatalf("error connecting to proxy: %v", err)
}
conn.Close()
waitForNumProxyLoops(t, p, 1)

stopProxyByName(p, "echo")
// Wait for the port to really close.
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
}
waitForNumProxyLoops(t, p, 0)
}

func TestUDPProxyStop(t *testing.T) {
Expand All @@ -254,6 +274,7 @@ func TestUDPProxyStop(t *testing.T) {
})

p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{})
waitForNumProxyLoops(t, p, 0)

svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second)
if err != nil {
Expand All @@ -264,12 +285,14 @@ func TestUDPProxyStop(t *testing.T) {
t.Fatalf("error connecting to proxy: %v", err)
}
conn.Close()
waitForNumProxyLoops(t, p, 1)

stopProxyByName(p, "echo")
// Wait for the port to really close.
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
}
waitForNumProxyLoops(t, p, 0)
}

func TestTCPProxyUpdateDelete(t *testing.T) {
Expand All @@ -282,6 +305,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
})

p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{})
waitForNumProxyLoops(t, p, 0)

svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second)
if err != nil {
Expand All @@ -292,11 +316,13 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
t.Fatalf("error connecting to proxy: %v", err)
}
conn.Close()
waitForNumProxyLoops(t, p, 1)

p.OnUpdate([]api.Service{})
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
}
waitForNumProxyLoops(t, p, 0)
}

func TestUDPProxyUpdateDelete(t *testing.T) {
Expand All @@ -309,6 +335,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
})

p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{})
waitForNumProxyLoops(t, p, 0)

svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second)
if err != nil {
Expand All @@ -319,11 +346,13 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
t.Fatalf("error connecting to proxy: %v", err)
}
conn.Close()
waitForNumProxyLoops(t, p, 1)

p.OnUpdate([]api.Service{})
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
}
waitForNumProxyLoops(t, p, 0)
}

func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
Expand All @@ -336,6 +365,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
})

p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{})
waitForNumProxyLoops(t, p, 0)

svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second)
if err != nil {
Expand All @@ -346,15 +376,18 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
t.Fatalf("error connecting to proxy: %v", err)
}
conn.Close()
waitForNumProxyLoops(t, p, 1)

p.OnUpdate([]api.Service{})
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
}
waitForNumProxyLoops(t, p, 0)
p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: "echo"}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "TCP", ProxyPort: svcInfo.proxyPort}, Status: api.ServiceStatus{}},
})
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1)
}

func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
Expand All @@ -367,6 +400,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
})

p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{})
waitForNumProxyLoops(t, p, 0)

svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second)
if err != nil {
Expand All @@ -377,15 +411,18 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
t.Fatalf("error connecting to proxy: %v", err)
}
conn.Close()
waitForNumProxyLoops(t, p, 1)

p.OnUpdate([]api.Service{})
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
}
waitForNumProxyLoops(t, p, 0)
p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: "echo"}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "UDP", ProxyPort: svcInfo.proxyPort}, Status: api.ServiceStatus{}},
})
testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1)
}

func TestTCPProxyUpdatePort(t *testing.T) {
Expand All @@ -398,11 +435,14 @@ func TestTCPProxyUpdatePort(t *testing.T) {
})

p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{})
waitForNumProxyLoops(t, p, 0)

svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1)

// add a new dummy listener in order to get a port that is free
l, _ := net.Listen("tcp", ":0")
Expand All @@ -424,6 +464,9 @@ func TestTCPProxyUpdatePort(t *testing.T) {
t.Fatalf(err.Error())
}
testEchoTCP(t, "127.0.0.1", newPort)
// This is a bit async, but this should be sufficient.
time.Sleep(500 * time.Millisecond)
waitForNumProxyLoops(t, p, 1)

// Ensure the old port is released and re-usable.
l, err = net.Listen("tcp", joinHostPort("", svcInfo.proxyPort))
Expand All @@ -443,11 +486,13 @@ func TestUDPProxyUpdatePort(t *testing.T) {
})

p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{})
waitForNumProxyLoops(t, p, 0)

svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
waitForNumProxyLoops(t, p, 1)

// add a new dummy listener in order to get a port that is free
pc, _ := net.ListenPacket("udp", ":0")
Expand All @@ -469,6 +514,7 @@ func TestUDPProxyUpdatePort(t *testing.T) {
t.Fatalf(err.Error())
}
testEchoUDP(t, "127.0.0.1", newPort)
waitForNumProxyLoops(t, p, 1)

// Ensure the old port is released and re-usable.
pc, err = net.ListenPacket("udp", joinHostPort("", svcInfo.proxyPort))
Expand Down