Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #15797 upstream release 1.0 #16281

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion cmd/kube-proxy/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type ProxyServer struct {
Master string
Kubeconfig string
PortRange util.PortRange
UDPIdleTimeout time.Duration
}

// NewProxyServer creates a new ProxyServer object with default parameters
Expand All @@ -59,6 +60,7 @@ func NewProxyServer() *ProxyServer {
HealthzBindAddress: util.IP(net.ParseIP("127.0.0.1")),
OOMScoreAdj: -899,
ResourceContainer: "/kube-proxy",
UDPIdleTimeout: 10 * time.Second,
}
}

Expand All @@ -72,6 +74,7 @@ func (s *ProxyServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.ResourceContainer, "resource-container", s.ResourceContainer, "Absolute name of the resource-only container to create and run the Kube-proxy in (Default: /kube-proxy).")
fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization information (the master location is set by the master flag).")
fs.Var(&s.PortRange, "proxy-port-range", "Range of host ports (beginPort-endPort, inclusive) that may be consumed in order to proxy service traffic. If unspecified (0-0) then ports will be randomly chosen.")
fs.DurationVar(&s.UDPIdleTimeout, "udp-timeout", s.UDPIdleTimeout, "How long an idle UDP connection will be kept open (e.g. '250ms', '2s'). Must be greater than 0.")
}

// Run runs the specified ProxyServer. This should never exit.
Expand All @@ -96,7 +99,7 @@ func (s *ProxyServer) Run(_ []string) error {
protocol = iptables.ProtocolIpv6
}
loadBalancer := proxy.NewLoadBalancerRR()
proxier, err := proxy.NewProxier(loadBalancer, net.IP(s.BindAddress), iptables.New(exec.New(), protocol), s.PortRange)
proxier, err := proxy.NewProxier(loadBalancer, net.IP(s.BindAddress), iptables.New(exec.New(), protocol), s.PortRange, s.UDPIdleTimeout)
if err != nil {
glog.Fatalf("Unable to create proxer: %v", err)
}
Expand Down
47 changes: 23 additions & 24 deletions pkg/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,17 @@ func logTimeout(err error) bool {
// Proxier is a simple proxy for TCP connections between a localhost:lport
// and services that provide the actual implementations.
type Proxier struct {
loadBalancer LoadBalancer
mu sync.Mutex // protects serviceMap
serviceMap map[ServicePortName]*serviceInfo
portMapMutex sync.Mutex
portMap map[portMapKey]ServicePortName
numProxyLoops int32 // use atomic ops to access this; mostly for testing
listenIP net.IP
iptables iptables.Interface
hostIP net.IP
proxyPorts PortAllocator
loadBalancer LoadBalancer
mu sync.Mutex // protects serviceMap
serviceMap map[ServicePortName]*serviceInfo
udpIdleTimeout time.Duration
portMapMutex sync.Mutex
portMap map[portMapKey]ServicePortName
numProxyLoops int32 // use atomic ops to access this; mostly for testing
listenIP net.IP
iptables iptables.Interface
hostIP net.IP
proxyPorts PortAllocator
}

// A key for the portMap
Expand Down Expand Up @@ -119,7 +120,7 @@ func IsProxyLocked(err error) bool {
// if iptables fails to update or acquire the initial lock. Once a proxier is
// created, it will keep iptables up to date in the background and will not
// terminate if a particular iptables call fails.
func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, pr util.PortRange) (*Proxier, error) {
func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, pr util.PortRange, udpIdleTimeout time.Duration) (*Proxier, error) {
if listenIP.Equal(localhostIPv4) || listenIP.Equal(localhostIPv6) {
return nil, ErrProxyOnLocalhost
}
Expand All @@ -132,10 +133,10 @@ func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.In
proxyPorts := newPortAllocator(pr)

glog.V(2).Infof("Setting proxy IP to %v and initializing iptables", hostIP)
return createProxier(loadBalancer, listenIP, iptables, hostIP, proxyPorts)
return createProxier(loadBalancer, listenIP, iptables, hostIP, proxyPorts, udpIdleTimeout)
}

func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, hostIP net.IP, proxyPorts PortAllocator) (*Proxier, error) {
func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, hostIP net.IP, proxyPorts PortAllocator, udpIdleTimeout time.Duration) (*Proxier, error) {
// convenient to pass nil for tests..
if proxyPorts == nil {
proxyPorts = newPortAllocator(util.PortRange{})
Expand All @@ -150,13 +151,14 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables
return nil, fmt.Errorf("failed to flush iptables: %v", err)
}
return &Proxier{
loadBalancer: loadBalancer,
serviceMap: make(map[ServicePortName]*serviceInfo),
portMap: make(map[portMapKey]ServicePortName),
listenIP: listenIP,
iptables: iptables,
hostIP: hostIP,
proxyPorts: proxyPorts,
loadBalancer: loadBalancer,
serviceMap: make(map[ServicePortName]*serviceInfo),
portMap: make(map[portMapKey]ServicePortName),
udpIdleTimeout: udpIdleTimeout,
listenIP: listenIP,
iptables: iptables,
hostIP: hostIP,
proxyPorts: proxyPorts,
}, nil
}

Expand Down Expand Up @@ -270,9 +272,6 @@ func (proxier *Proxier) addServiceOnPort(service ServicePortName, protocol api.P
return si, nil
}

// How long we leave idle UDP connections open.
const udpIdleTimeout = 10 * time.Second

// OnUpdate manages the active set of service proxies.
// Active service proxies are reinitialized if found in the update set or
// shutdown if missing from the update set.
Expand Down Expand Up @@ -319,7 +318,7 @@ func (proxier *Proxier) OnUpdate(services []api.Service) {
}

glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol)
info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, udpIdleTimeout)
info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout)
if err != nil {
glog.Errorf("Failed to start proxy for %q: %v", serviceName, err)
continue
Expand Down
32 changes: 18 additions & 14 deletions pkg/proxy/proxier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ import (
"k8s.io/kubernetes/pkg/util/iptables"
)

const (
udpIdleTimeoutForTest = 250 * time.Millisecond
)

func joinHostPort(host string, port int) string {
return net.JoinHostPort(host, fmt.Sprintf("%d", port))
}
Expand Down Expand Up @@ -206,7 +210,7 @@ func TestTCPProxy(t *testing.T) {
},
})

p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
Expand All @@ -233,7 +237,7 @@ func TestUDPProxy(t *testing.T) {
},
})

p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -265,7 +269,7 @@ func TestMultiPortProxy(t *testing.T) {
}},
}})

p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
Expand All @@ -292,7 +296,7 @@ func TestMultiPortOnUpdate(t *testing.T) {
serviceQ := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "q"}
serviceX := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "x"}

p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -355,7 +359,7 @@ func TestTCPProxyStop(t *testing.T) {
},
})

p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -399,7 +403,7 @@ func TestUDPProxyStop(t *testing.T) {
},
})

p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -437,7 +441,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
},
})

p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -474,7 +478,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
},
})

p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -511,7 +515,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
},
})

p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -563,7 +567,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
},
})

p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -615,7 +619,7 @@ func TestTCPProxyUpdatePort(t *testing.T) {
},
})

p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -663,7 +667,7 @@ func TestUDPProxyUpdatePort(t *testing.T) {
},
})

p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -708,7 +712,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
},
})

p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -760,7 +764,7 @@ func TestProxyUpdatePortal(t *testing.T) {
},
})

p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil)
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
Expand Down