Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #15797 upstream release 1.1 #16167

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion cmd/kube-proxy/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type ProxyServerConfig struct {
nodeRef *api.ObjectReference // Reference to this node.
MasqueradeAll bool
CleanupAndExit bool
UDPIdleTimeout time.Duration
}

type ProxyServer struct {
Expand Down Expand Up @@ -93,6 +94,7 @@ func (s *ProxyServerConfig) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&s.SyncPeriod, "iptables-sync-period", 30*time.Second, "How often iptables rules are refreshed (e.g. '5s', '1m', '2h22m'). Must be greater than 0.")
fs.BoolVar(&s.MasqueradeAll, "masquerade-all", false, "If using the pure iptables proxy, SNAT everything")
fs.BoolVar(&s.CleanupAndExit, "cleanup-iptables", false, "If true cleanup iptables rules and exit.")
fs.DurationVar(&s.UDPIdleTimeout, "udp-timeout", s.UDPIdleTimeout, "How long an idle UDP connection will be kept open (e.g. '250ms', '2s'). Must be greater than 0. Only applicable for proxy-mode=userspace")
}

const (
Expand All @@ -117,6 +119,7 @@ func NewProxyConfig() *ProxyServerConfig {
OOMScoreAdj: qos.KubeProxyOomScoreAdj,
ResourceContainer: "/kube-proxy",
SyncPeriod: 30 * time.Second,
UDPIdleTimeout: 250 * time.Millisecond,
}
}

Expand Down Expand Up @@ -241,7 +244,7 @@ func NewProxyServerDefault(config *ProxyServerConfig) (*ProxyServer, error) {
// set EndpointsConfigHandler to our loadBalancer
endpointsHandler = loadBalancer

proxierUserspace, err := userspace.NewProxier(loadBalancer, config.BindAddress, iptInterface, config.PortRange, config.SyncPeriod)
proxierUserspace, err := userspace.NewProxier(loadBalancer, config.BindAddress, iptInterface, config.PortRange, config.SyncPeriod, config.UDPIdleTimeout)
if err != nil {
glog.Fatalf("Unable to create proxier: %v", err)
}
Expand Down
1 change: 1 addition & 0 deletions hack/verify-flags/known-flags.txt
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ token-auth-file
ttl-secs
type-src
udp-port
udp-timeout
unix-socket
update-period
upgrade-target
Expand Down
53 changes: 26 additions & 27 deletions pkg/proxy/userspace/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,18 @@ func logTimeout(err error) bool {
// Proxier is a simple proxy for TCP connections between a localhost:lport
// and services that provide the actual implementations.
type Proxier struct {
loadBalancer LoadBalancer
mu sync.Mutex // protects serviceMap
serviceMap map[proxy.ServicePortName]*serviceInfo
syncPeriod time.Duration
portMapMutex sync.Mutex
portMap map[portMapKey]*portMapValue
numProxyLoops int32 // use atomic ops to access this; mostly for testing
listenIP net.IP
iptables iptables.Interface
hostIP net.IP
proxyPorts PortAllocator
loadBalancer LoadBalancer
mu sync.Mutex // protects serviceMap
serviceMap map[proxy.ServicePortName]*serviceInfo
syncPeriod time.Duration
udpIdleTimeout time.Duration
portMapMutex sync.Mutex
portMap map[portMapKey]*portMapValue
numProxyLoops int32 // use atomic ops to access this; mostly for testing
listenIP net.IP
iptables iptables.Interface
hostIP net.IP
proxyPorts PortAllocator
}

// assert Proxier is a ProxyProvider
Expand Down Expand Up @@ -137,7 +138,7 @@ func IsProxyLocked(err error) bool {
// if iptables fails to update or acquire the initial lock. Once a proxier is
// created, it will keep iptables up to date in the background and will not
// terminate if a particular iptables call fails.
func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, pr util.PortRange, syncPeriod time.Duration) (*Proxier, error) {
func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, pr util.PortRange, syncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) {
if listenIP.Equal(localhostIPv4) || listenIP.Equal(localhostIPv6) {
return nil, ErrProxyOnLocalhost
}
Expand All @@ -155,14 +156,14 @@ func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.In
proxyPorts := newPortAllocator(pr)

glog.V(2).Infof("Setting proxy IP to %v and initializing iptables", hostIP)
return createProxier(loadBalancer, listenIP, iptables, hostIP, proxyPorts, syncPeriod)
return createProxier(loadBalancer, listenIP, iptables, hostIP, proxyPorts, syncPeriod, udpIdleTimeout)
}

func setRLimit(limit uint64) error {
return syscall.Setrlimit(syscall.RLIMIT_NOFILE, &syscall.Rlimit{Max: limit, Cur: limit})
}

func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, hostIP net.IP, proxyPorts PortAllocator, syncPeriod time.Duration) (*Proxier, error) {
func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, hostIP net.IP, proxyPorts PortAllocator, syncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) {
// convenient to pass nil for tests..
if proxyPorts == nil {
proxyPorts = newPortAllocator(util.PortRange{})
Expand All @@ -177,14 +178,15 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables
return nil, fmt.Errorf("failed to flush iptables: %v", err)
}
return &Proxier{
loadBalancer: loadBalancer,
serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
portMap: make(map[portMapKey]*portMapValue),
syncPeriod: syncPeriod,
listenIP: listenIP,
iptables: iptables,
hostIP: hostIP,
proxyPorts: proxyPorts,
loadBalancer: loadBalancer,
serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
portMap: make(map[portMapKey]*portMapValue),
syncPeriod: syncPeriod,
udpIdleTimeout: udpIdleTimeout,
listenIP: listenIP,
iptables: iptables,
hostIP: hostIP,
proxyPorts: proxyPorts,
}, nil
}

Expand Down Expand Up @@ -345,10 +347,7 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol
return si, nil
}

// How long we leave idle UDP connections open.
const udpIdleTimeout = 250 * time.Millisecond

// OnUpdate manages the active set of service proxies.
// OnServiceUpdate manages the active set of service proxies.
// Active service proxies are reinitialized if found in the update set or
// shutdown if missing from the update set.
func (proxier *Proxier) OnServiceUpdate(services []api.Service) {
Expand Down Expand Up @@ -393,7 +392,7 @@ func (proxier *Proxier) OnServiceUpdate(services []api.Service) {
}

glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol)
info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, udpIdleTimeout)
info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout)
if err != nil {
glog.Errorf("Failed to start proxy for %q: %v", serviceName, err)
continue
Expand Down
34 changes: 19 additions & 15 deletions pkg/proxy/userspace/proxier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ import (
"k8s.io/kubernetes/pkg/util/iptables"
)

const (
udpIdleTimeoutForTest = 250 * time.Millisecond
)

func joinHostPort(host string, port int) string {
return net.JoinHostPort(host, fmt.Sprintf("%d", port))
}
Expand Down Expand Up @@ -245,7 +249,7 @@ func TestTCPProxy(t *testing.T) {
},
})

p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
Expand All @@ -272,7 +276,7 @@ func TestUDPProxy(t *testing.T) {
},
})

p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
Expand All @@ -299,7 +303,7 @@ func TestUDPProxyTimeout(t *testing.T) {
},
})

p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -335,7 +339,7 @@ func TestMultiPortProxy(t *testing.T) {
}},
}})

p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
Expand All @@ -362,7 +366,7 @@ func TestMultiPortOnServiceUpdate(t *testing.T) {
serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "q"}
serviceX := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "x"}

p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -425,7 +429,7 @@ func TestTCPProxyStop(t *testing.T) {
},
})

p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -469,7 +473,7 @@ func TestUDPProxyStop(t *testing.T) {
},
})

p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -507,7 +511,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
},
})

p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -544,7 +548,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
},
})

p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -581,7 +585,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
},
})

p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -633,7 +637,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
},
})

p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -685,7 +689,7 @@ func TestTCPProxyUpdatePort(t *testing.T) {
},
})

p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -733,7 +737,7 @@ func TestUDPProxyUpdatePort(t *testing.T) {
},
})

p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -778,7 +782,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
},
})

p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -830,7 +834,7 @@ func TestProxyUpdatePortal(t *testing.T) {
},
})

p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
Expand Down