Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #13345 upstream release 1.0 #13498

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 15 additions & 0 deletions pkg/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type portal struct {
}

type serviceInfo struct {
isAliveAtomic int32 // Only access this with atomic ops
portal portal
protocol api.Protocol
proxyPort int
Expand All @@ -52,6 +53,18 @@ type serviceInfo struct {
deprecatedPublicIPs []string
}

func (info *serviceInfo) setAlive(b bool) {
var i int32
if b {
i = 1
}
atomic.StoreInt32(&info.isAliveAtomic, i)
}

func (info *serviceInfo) isAlive() bool {
return atomic.LoadInt32(&info.isAliveAtomic) != 0
}

func logTimeout(err error) bool {
if e, ok := err.(net.Error); ok {
if e.Timeout() {
Expand Down Expand Up @@ -197,6 +210,7 @@ func (proxier *Proxier) stopProxy(service ServicePortName, info *serviceInfo) er
// This assumes proxier.mu is locked.
func (proxier *Proxier) stopProxyInternal(service ServicePortName, info *serviceInfo) error {
delete(proxier.serviceMap, service)
info.setAlive(false)
err := info.socket.Close()
port := info.socket.ListenPort()
proxier.proxyPorts.Release(port)
Expand Down Expand Up @@ -235,6 +249,7 @@ func (proxier *Proxier) addServiceOnPort(service ServicePortName, protocol api.P
return nil, err
}
si := &serviceInfo{
isAliveAtomic: 1,
proxyPort: portNum,
protocol: protocol,
socket: sock,
Expand Down
6 changes: 6 additions & 0 deletions pkg/proxy/proxier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,9 @@ func TestTCPProxyStop(t *testing.T) {
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
if !svcInfo.isAlive() {
t.Fatalf("wrong value for isAlive(): expected true")
}
conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort))
if err != nil {
t.Fatalf("error connecting to proxy: %v", err)
Expand All @@ -373,6 +376,9 @@ func TestTCPProxyStop(t *testing.T) {
waitForNumProxyLoops(t, p, 1)

stopProxyByName(p, service)
if svcInfo.isAlive() {
t.Fatalf("wrong value for isAlive(): expected false")
}
// Wait for the port to really close.
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
Expand Down
6 changes: 3 additions & 3 deletions pkg/proxy/proxysocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func tryConnect(service ServicePortName, srcAddr net.Addr, protocol string, prox

func (tcp *tcpProxySocket) ProxyLoop(service ServicePortName, myInfo *serviceInfo, proxier *Proxier) {
for {
if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo {
if !myInfo.isAlive() {
// The service port was closed or replaced.
return
}
Expand All @@ -120,7 +120,7 @@ func (tcp *tcpProxySocket) ProxyLoop(service ServicePortName, myInfo *serviceInf
if isClosedError(err) {
return
}
if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo {
if !myInfo.isAlive() {
// Then the service port was just closed so the accept failure is to be expected.
return
}
Expand Down Expand Up @@ -194,7 +194,7 @@ func (udp *udpProxySocket) ProxyLoop(service ServicePortName, myInfo *serviceInf
activeClients := newClientCache()
var buffer [4096]byte // 4KiB should be enough for most whole-packets
for {
if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo {
if !myInfo.isAlive() {
// The service port was closed or replaced.
break
}
Expand Down