From 48647fb9b5dce25411c5fbae511d2bf920a52f05 Mon Sep 17 00:00:00 2001 From: Anthony Howe Date: Fri, 3 Feb 2017 08:36:18 -0800 Subject: [PATCH 1/4] add tcp or udp proxy for service addresses --- cmd/kube-proxy/app/server.go | 15 +- pkg/proxy/types.go | 12 ++ pkg/proxy/winuserspace/proxier.go | 275 ++++++++++++++++---------- pkg/proxy/winuserspace/proxysocket.go | 14 +- pkg/util/netsh/netsh.go | 4 +- 5 files changed, 205 insertions(+), 115 deletions(-) diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index da6727d8e5b2..d996dc7d9e89 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -249,15 +249,15 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err userspace.CleanupLeftovers(iptInterface) } else { glog.V(0).Info("Using userspace Proxier.") - // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for - // our config.EndpointsConfigHandler. - loadBalancer := userspace.NewLoadBalancerRR() - // set EndpointsConfigHandler to our loadBalancer - endpointsHandler = loadBalancer var proxierUserspace proxy.ProxyProvider if runtime.GOOS == "windows" { + // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for + // our config.EndpointsConfigHandler. + loadBalancer := winuserspace.NewLoadBalancerRR() + // set EndpointsConfigHandler to our loadBalancer + endpointsHandler = loadBalancer proxierUserspace, err = winuserspace.NewProxier( loadBalancer, net.ParseIP(config.BindAddress), @@ -268,6 +268,11 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err config.UDPIdleTimeout.Duration, ) } else { + // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for + // our config.EndpointsConfigHandler. + loadBalancer := userspace.NewLoadBalancerRR() + // set EndpointsConfigHandler to our loadBalancer + endpointsHandler = loadBalancer proxierUserspace, err = userspace.NewProxier( loadBalancer, net.ParseIP(config.BindAddress), diff --git a/pkg/proxy/types.go b/pkg/proxy/types.go index d9ff569cbb70..72f68afc7b40 100644 --- a/pkg/proxy/types.go +++ b/pkg/proxy/types.go @@ -47,3 +47,15 @@ type ServicePortName struct { func (spn ServicePortName) String() string { return fmt.Sprintf("%s:%s", spn.NamespacedName.String(), spn.Port) } + +// ServicePortPortalName carries a namespace + name + portname + portalip. This is the unique +// identfier for a load-balanced service. +type ServicePortPortalName struct { + types.NamespacedName + Port string + PortalIPName string +} + +func (spn ServicePortPortalName) String() string { + return fmt.Sprintf("%s:%s:%s", spn.NamespacedName.String(), spn.Port, spn.PortalIPName) +} diff --git a/pkg/proxy/winuserspace/proxier.go b/pkg/proxy/winuserspace/proxier.go index c9d61e70d3b6..32c91983613e 100644 --- a/pkg/proxy/winuserspace/proxier.go +++ b/pkg/proxy/winuserspace/proxier.go @@ -19,7 +19,6 @@ package winuserspace import ( "fmt" "net" - "strconv" "strings" "sync" "sync/atomic" @@ -28,7 +27,7 @@ import ( "github.com/golang/glog" "k8s.io/apimachinery/pkg/types" - utilerrors "k8s.io/apimachinery/pkg/util/errors" + //utilerrors "k8s.io/apimachinery/pkg/util/errors" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/kubernetes/pkg/api" @@ -37,25 +36,25 @@ import ( ) type portal struct { - ip net.IP + ip string port int isExternal bool } type serviceInfo struct { - isAliveAtomic int32 // Only access this with atomic ops - portal portal - protocol api.Protocol - proxyPort int - socket proxySocket - timeout time.Duration - activeClients *clientCache - nodePort int - loadBalancerStatus api.LoadBalancerStatus + isAliveAtomic int32 // Only access this with atomic ops + portal portal + protocol api.Protocol + proxyPort int + socket proxySocket + timeout time.Duration + activeClients *clientCache + //nodePort int + //loadBalancerStatus api.LoadBalancerStatus sessionAffinityType api.ServiceAffinity stickyMaxAgeMinutes int // Deprecated, but required for back-compat (including e2e) - externalIPs []string + //externalIPs []string } func (info *serviceInfo) setAlive(b bool) { @@ -85,16 +84,16 @@ func logTimeout(err error) bool { type Proxier struct { loadBalancer LoadBalancer mu sync.Mutex // protects serviceMap - serviceMap map[proxy.ServicePortName]*serviceInfo + serviceMap map[proxy.ServicePortPortalName]*serviceInfo syncPeriod time.Duration udpIdleTimeout time.Duration portMapMutex sync.Mutex portMap map[portMapKey]*portMapValue numProxyLoops int32 // use atomic ops to access this; mostly for testing - listenIP net.IP - netsh netsh.Interface - hostIP net.IP - proxyPorts PortAllocator + //listenIP net.IP + netsh netsh.Interface + hostIP net.IP + //proxyPorts PortAllocator } // assert Proxier is a ProxyProvider @@ -114,7 +113,7 @@ func (k *portMapKey) String() string { // A value for the portMap type portMapValue struct { - owner proxy.ServicePortName + owner proxy.ServicePortPortalName socket interface { Close() error } @@ -153,31 +152,23 @@ func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, netsh netsh.Interfac return nil, fmt.Errorf("failed to select a host interface: %v", err) } - proxyPorts := newPortAllocator(pr) - glog.V(2).Infof("Setting proxy IP to %v and initializing iptables", hostIP) - return createProxier(loadBalancer, listenIP, netsh, hostIP, proxyPorts, syncPeriod, udpIdleTimeout) + return createProxier(loadBalancer, listenIP, netsh, hostIP, syncPeriod, udpIdleTimeout) } -func createProxier(loadBalancer LoadBalancer, listenIP net.IP, netsh netsh.Interface, hostIP net.IP, proxyPorts PortAllocator, syncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) { - // convenient to pass nil for tests.. - if proxyPorts == nil { - proxyPorts = newPortAllocator(utilnet.PortRange{}) - } +func createProxier(loadBalancer LoadBalancer, listenIP net.IP, netsh netsh.Interface, hostIP net.IP, syncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) { return &Proxier{ loadBalancer: loadBalancer, - serviceMap: make(map[proxy.ServicePortName]*serviceInfo), + serviceMap: make(map[proxy.ServicePortPortalName]*serviceInfo), portMap: make(map[portMapKey]*portMapValue), syncPeriod: syncPeriod, udpIdleTimeout: udpIdleTimeout, - listenIP: listenIP, netsh: netsh, hostIP: hostIP, - proxyPorts: proxyPorts, }, nil } -// Sync is called to immediately synchronize the proxier state to iptables +// Sync is called to immediately synchronize the proxier state func (proxier *Proxier) Sync() { proxier.ensurePortals() proxier.cleanupStaleStickySessions() @@ -196,7 +187,7 @@ func (proxier *Proxier) SyncLoop() { // Ensure that portals exist for all services. func (proxier *Proxier) ensurePortals() { - proxier.mu.Lock() + /*proxier.mu.Lock() defer proxier.mu.Unlock() // NB: This does not remove rules that should not be present. for name, info := range proxier.serviceMap { @@ -204,52 +195,56 @@ func (proxier *Proxier) ensurePortals() { if err != nil { glog.Errorf("Failed to ensure portal for %q: %v", name, err) } - } + }*/ } // cleanupStaleStickySessions cleans up any stale sticky session records in the hash map. func (proxier *Proxier) cleanupStaleStickySessions() { proxier.mu.Lock() defer proxier.mu.Unlock() - for name := range proxier.serviceMap { + spnMap := make(map[proxy.ServicePortName]bool) + for k := range proxier.serviceMap { + spn := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: k.Namespace, Name: k.Name}, Port: k.Port} + spnMap[spn] = true + } + for name := range spnMap { proxier.loadBalancer.CleanupStaleStickySessions(name) } } // This assumes proxier.mu is not locked. -func (proxier *Proxier) stopProxy(service proxy.ServicePortName, info *serviceInfo) error { +func (proxier *Proxier) stopProxy(service proxy.ServicePortPortalName, info *serviceInfo) error { proxier.mu.Lock() defer proxier.mu.Unlock() return proxier.stopProxyInternal(service, info) } // This assumes proxier.mu is locked. -func (proxier *Proxier) stopProxyInternal(service proxy.ServicePortName, info *serviceInfo) error { +func (proxier *Proxier) stopProxyInternal(service proxy.ServicePortPortalName, info *serviceInfo) error { delete(proxier.serviceMap, service) info.setAlive(false) err := info.socket.Close() - port := info.socket.ListenPort() - proxier.proxyPorts.Release(port) return err } -func (proxier *Proxier) getServiceInfo(service proxy.ServicePortName) (*serviceInfo, bool) { +func (proxier *Proxier) getServiceInfo(service proxy.ServicePortPortalName) (*serviceInfo, bool) { proxier.mu.Lock() defer proxier.mu.Unlock() info, ok := proxier.serviceMap[service] return info, ok } -func (proxier *Proxier) setServiceInfo(service proxy.ServicePortName, info *serviceInfo) { +func (proxier *Proxier) setServiceInfo(service proxy.ServicePortPortalName, info *serviceInfo) { proxier.mu.Lock() defer proxier.mu.Unlock() proxier.serviceMap[service] = info } +/* // addServiceOnPort starts listening for a new service, returning the serviceInfo. // Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP // connections, for now. -func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol api.Protocol, proxyPort int, timeout time.Duration) (*serviceInfo, error) { +func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortPortalName, protocol api.Protocol, proxyPort int, timeout time.Duration) (*serviceInfo, error) { sock, err := newProxySocket(protocol, proxier.listenIP, proxyPort) if err != nil { return nil, err @@ -286,13 +281,14 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol return si, nil } +*/ // OnServiceUpdate manages the active set of service proxies. // Active service proxies are reinitialized if found in the update set or // shutdown if missing from the update set. func (proxier *Proxier) OnServiceUpdate(services []api.Service) { glog.V(4).Infof("Received update notice: %+v", services) - activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set + activeServices := make(map[proxy.ServicePortPortalName]bool) // use a map as a set for i := range services { service := &services[i] @@ -304,84 +300,143 @@ func (proxier *Proxier) OnServiceUpdate(services []api.Service) { for i := range service.Spec.Ports { servicePort := &service.Spec.Ports[i] - serviceName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: servicePort.Name} - activeServices[serviceName] = true - serviceIP := net.ParseIP(service.Spec.ClusterIP) - info, exists := proxier.getServiceInfo(serviceName) - // TODO: check health of the socket? What if ProxyLoop exited? - if exists && sameConfig(info, service, servicePort) { - // Nothing changed. - continue + // create a slice of source IPs + var listenIPs []string + listenIPs = append(listenIPs, service.Spec.ClusterIP) + + /*for _, ip := range service.Spec.ExternalIPs { + listenIPs = append(listenIPs, ip) } - if exists { - glog.V(4).Infof("Something changed for service %q: stopping it", serviceName) - err := proxier.closePortal(serviceName, info) - if err != nil { - glog.Errorf("Failed to close portal for %q: %v", serviceName, err) + + for _, ip := range service.Status.LoadBalancer.Ingress { + listenIPs = append(listenIPs, ip) + } + + if int(service.Spec.Ports[i]) != 0 { + listenIPs = append(listenIPs, "") + }*/ + + for _, listenIP := range listenIPs { + serviceName := proxy.ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: servicePort.Name, PortalIPName: listenIP} + activeServices[serviceName] = true + serviceIP := net.ParseIP(listenIP) + info, exists := proxier.getServiceInfo(serviceName) + if exists && sameConfig(info, service, servicePort) { + // Nothing changed. + continue } - err = proxier.stopProxy(serviceName, info) - if err != nil { - glog.Errorf("Failed to stop service %q: %v", serviceName, err) + if exists { + glog.V(4).Infof("Something changed for service %q: stopping it", serviceName) + + // turn off the proxy + err := proxier.stopProxy(serviceName, info) + if err != nil { + glog.Errorf("Failed to stop service %q: %v", serviceName, err) + } + + // close the PortalProxy if it is not a node port + if serviceIP != nil { + args := proxier.netshIpv4AddressDeleteArgs(serviceIP) + if err := proxier.netsh.DeleteIPAddress(args); err != nil { + glog.Errorf("Failed to delete IP address for service %q, error %s", serviceName, err.Error()) + } + } else { + // TODO(ajh) release the node port + } } - } - proxyPort, err := proxier.proxyPorts.AllocateNext() - if err != nil { - glog.Errorf("failed to allocate proxy port for service %q: %v", serviceName, err) - continue - } + glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol) + + // add the IP address if it is not a node port + if serviceIP != nil { + args := proxier.netshIpv4AddressAddArgs(serviceIP) + existed, err := proxier.netsh.EnsureIPAddress(args, serviceIP) + if err != nil { + glog.Errorf("Failed to add ip address for service %q, args:%v", serviceName, args) + continue + } + if !existed { + glog.V(3).Infof("Added ip address to fowarder interface for service %q on %s %s:%d", serviceName, servicePort.Protocol, serviceIP, int(servicePort.Port)) + } + } else { + // TODO(ajh) handle the node port + } - glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol) - info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout) - if err != nil { - glog.Errorf("Failed to start proxy for %q: %v", serviceName, err) - continue + // add the listener, proxy + sock, err := newProxySocket(servicePort.Protocol, serviceIP, int(servicePort.Port)) + if err != nil { + glog.Errorf("failed to create a new proxy socket for service %q: %v", serviceName, err) + continue + } + si := &serviceInfo{ + isAliveAtomic: 1, + portal: portal{ip: listenIP, port: int(servicePort.Port), isExternal: false}, + protocol: servicePort.Protocol, + proxyPort: int(servicePort.Port), + socket: sock, + timeout: proxier.udpIdleTimeout, + activeClients: newClientCache(), + sessionAffinityType: service.Spec.SessionAffinity, // default + stickyMaxAgeMinutes: 180, // TODO: parameterize this in the API. + } + glog.V(4).Infof("info: %#v", si) + proxier.setServiceInfo(serviceName, si) + + glog.V(2).Infof("Proxying for service %q on %s port %d", serviceName, servicePort.Protocol, int(servicePort.Port)) + go func(service proxy.ServicePortPortalName, proxier *Proxier) { + defer runtime.HandleCrash() + atomic.AddInt32(&proxier.numProxyLoops, 1) + sock.ProxyLoop(service, si, proxier) + atomic.AddInt32(&proxier.numProxyLoops, -1) + }(serviceName, proxier) } - info.portal.ip = serviceIP - info.portal.port = int(servicePort.Port) - info.externalIPs = service.Spec.ExternalIPs - // Deep-copy in case the service instance changes - info.loadBalancerStatus = *api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer) - info.nodePort = int(servicePort.NodePort) - info.sessionAffinityType = service.Spec.SessionAffinity - glog.V(4).Infof("info: %#v", info) - - err = proxier.openPortal(serviceName, info) - if err != nil { - glog.Errorf("Failed to open portal for %q: %v", serviceName, err) + if len(listenIPs) > 0 { + // only one loadbalancer per service endpoint + servicePortName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: servicePort.Name} + proxier.loadBalancer.NewService(servicePortName, service.Spec.SessionAffinity, 180) } - proxier.loadBalancer.NewService(serviceName, info.sessionAffinityType, info.stickyMaxAgeMinutes) } } proxier.mu.Lock() defer proxier.mu.Unlock() + + spnMap := make(map[proxy.ServicePortName]bool) + for name, info := range proxier.serviceMap { + spn := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: name.Namespace, Name: name.Name}, Port: name.Port} + spnMap[spn] = spnMap[spn] || activeServices[name] + if !activeServices[name] { + serviceIP := net.ParseIP(info.portal.ip) glog.V(1).Infof("Stopping service %q", name) - err := proxier.closePortal(name, info) - if err != nil { - glog.Errorf("Failed to close portal for %q: %v", name, err) - } - err = proxier.stopProxyInternal(name, info) + + // turn off the proxy + err := proxier.stopProxy(name, info) if err != nil { glog.Errorf("Failed to stop service %q: %v", name, err) } - proxier.loadBalancer.DeleteService(name) + + // close the PortalProxy if it is not a node port + if serviceIP != nil { + args := proxier.netshIpv4AddressDeleteArgs(serviceIP) + if err := proxier.netsh.DeleteIPAddress(args); err != nil { + glog.Errorf("Failed to stop service %q: %v", name, err) + } + } else { + // TODO(ajh) release the node port + } + } + } + // only delete spn if all listen ips show inactive + for k := range spnMap { + if !spnMap[k] { + proxier.loadBalancer.DeleteService(k) } } } func sameConfig(info *serviceInfo, service *api.Service, port *api.ServicePort) bool { - if info.protocol != port.Protocol || info.portal.port != int(port.Port) || info.nodePort != int(port.NodePort) { - return false - } - if !info.portal.ip.Equal(net.ParseIP(service.Spec.ClusterIP)) { - return false - } - if !ipsEqual(info.externalIPs, service.Spec.ExternalIPs) { - return false - } - if !api.LoadBalancerStatusEqual(&info.loadBalancerStatus, &service.Status.LoadBalancer) { + if info.protocol != port.Protocol || info.portal.port != int(port.Port) { return false } if info.sessionAffinityType != service.Spec.SessionAffinity { @@ -402,6 +457,7 @@ func ipsEqual(lhs, rhs []string) bool { return true } +/* func (proxier *Proxier) openPortal(service proxy.ServicePortName, info *serviceInfo) error { err := proxier.openOnePortal(info.portal, info.protocol, proxier.listenIP, info.proxyPort, service) if err != nil { @@ -429,7 +485,9 @@ func (proxier *Proxier) openPortal(service proxy.ServicePortName, info *serviceI } return nil } +*/ +/* func (proxier *Proxier) openOnePortal(portal portal, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) error { if protocol == api.ProtocolUDP { glog.Warningf("Not adding rule for %q on %s:%d as UDP protocol is not supported by netsh portproxy", name, portal.ip, portal.port) @@ -461,7 +519,9 @@ func (proxier *Proxier) openOnePortal(portal portal, protocol api.Protocol, prox return nil } +*/ +/* // claimNodePort marks a port as being owned by a particular service, or returns error if already claimed. // Idempotent: reclaiming with the same owner is not an error func (proxier *Proxier) claimNodePort(ip net.IP, port int, protocol api.Protocol, owner proxy.ServicePortName) error { @@ -494,7 +554,9 @@ func (proxier *Proxier) claimNodePort(ip net.IP, port int, protocol api.Protocol } return fmt.Errorf("Port conflict detected on port %s. %v vs %v", key.String(), owner, existing) } +*/ +/* // releaseNodePort releases a claim on a port. Returns an error if the owner does not match the claim. // Tolerates release on an unclaimed port, to simplify . func (proxier *Proxier) releaseNodePort(ip net.IP, port int, protocol api.Protocol, owner proxy.ServicePortName) error { @@ -515,7 +577,9 @@ func (proxier *Proxier) releaseNodePort(ip net.IP, port int, protocol api.Protoc existing.socket.Close() return nil } +*/ +/* func (proxier *Proxier) openNodePort(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) error { if protocol == api.ProtocolUDP { glog.Warningf("Not adding node port rule for %q on port %d as UDP protocol is not supported by netsh portproxy", name, nodePort) @@ -539,8 +603,9 @@ func (proxier *Proxier) openNodePort(nodePort int, protocol api.Protocol, proxyI } return nil -} +}*/ +/* func (proxier *Proxier) closePortal(service proxy.ServicePortName, info *serviceInfo) error { // Collect errors and report them all at the end. el := proxier.closeOnePortal(info.portal, info.protocol, proxier.listenIP, info.proxyPort, service) @@ -561,8 +626,9 @@ func (proxier *Proxier) closePortal(service proxy.ServicePortName, info *service glog.Errorf("Some errors closing iptables portals for service %q", service) } return utilerrors.NewAggregate(el) -} +}*/ +/* func (proxier *Proxier) closeOnePortal(portal portal, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) []error { el := []error{} @@ -587,8 +653,9 @@ func (proxier *Proxier) closeOnePortal(portal portal, protocol api.Protocol, pro } return el -} +}*/ +/* func (proxier *Proxier) closeNodePort(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) []error { el := []error{} @@ -603,7 +670,7 @@ func (proxier *Proxier) closeNodePort(nodePort int, protocol api.Protocol, proxy } return el -} +}*/ func isLocalIP(ip net.IP) (bool, error) { addrs, err := net.InterfaceAddrs() @@ -633,6 +700,7 @@ func isClosedError(err error) bool { return strings.HasSuffix(err.Error(), "use of closed network connection") } +/* func (proxier *Proxier) netshPortProxyAddArgs(destIP net.IP, destPort int, proxyIP net.IP, proxyPort int, service proxy.ServicePortName) []string { args := []string{ "interface", "portproxy", "set", "v4tov4", @@ -646,6 +714,7 @@ func (proxier *Proxier) netshPortProxyAddArgs(destIP net.IP, destPort int, proxy return args } +*/ func (proxier *Proxier) netshIpv4AddressAddArgs(destIP net.IP) []string { intName := proxier.netsh.GetInterfaceToAddIP() @@ -658,6 +727,7 @@ func (proxier *Proxier) netshIpv4AddressAddArgs(destIP net.IP) []string { return args } +/* func (proxier *Proxier) netshPortProxyDeleteArgs(destIP net.IP, destPort int, proxyIP net.IP, proxyPort int, service proxy.ServicePortName) []string { args := []string{ "interface", "portproxy", "delete", "v4tov4", @@ -669,6 +739,7 @@ func (proxier *Proxier) netshPortProxyDeleteArgs(destIP net.IP, destPort int, pr return args } +*/ func (proxier *Proxier) netshIpv4AddressDeleteArgs(destIP net.IP) []string { intName := proxier.netsh.GetInterfaceToAddIP() diff --git a/pkg/proxy/winuserspace/proxysocket.go b/pkg/proxy/winuserspace/proxysocket.go index dbe0fb0a7a69..63012387ca2e 100644 --- a/pkg/proxy/winuserspace/proxysocket.go +++ b/pkg/proxy/winuserspace/proxysocket.go @@ -26,6 +26,7 @@ import ( "time" "github.com/golang/glog" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/proxy" @@ -40,7 +41,7 @@ type proxySocket interface { // while sessions are active. Close() error // ProxyLoop proxies incoming connections for the specified service to the service endpoints. - ProxyLoop(service proxy.ServicePortName, info *serviceInfo, proxier *Proxier) + ProxyLoop(service proxy.ServicePortPortalName, info *serviceInfo, proxier *Proxier) // ListenPort returns the host port that the proxySocket is listening on ListenPort() int } @@ -86,10 +87,11 @@ func (tcp *tcpProxySocket) ListenPort() int { return tcp.port } -func tryConnect(service proxy.ServicePortName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) { +func tryConnect(service proxy.ServicePortPortalName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) { sessionAffinityReset := false for _, dialTimeout := range endpointDialTimeout { - endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr, sessionAffinityReset) + servicePortName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port} + endpoint, err := proxier.loadBalancer.NextEndpoint(servicePortName, srcAddr, sessionAffinityReset) if err != nil { glog.Errorf("Couldn't find an endpoint for %s: %v", service, err) return nil, err @@ -111,7 +113,7 @@ func tryConnect(service proxy.ServicePortName, srcAddr net.Addr, protocol string return nil, fmt.Errorf("failed to connect to an endpoint.") } -func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serviceInfo, proxier *Proxier) { +func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortPortalName, myInfo *serviceInfo, proxier *Proxier) { for { if !myInfo.isAlive() { // The service port was closed or replaced. @@ -197,7 +199,7 @@ func newClientCache() *clientCache { return &clientCache{clients: map[string]net.Conn{}} } -func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serviceInfo, proxier *Proxier) { +func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortPortalName, myInfo *serviceInfo, proxier *Proxier) { var buffer [4096]byte // 4KiB should be enough for most whole-packets for { if !myInfo.isAlive() { @@ -241,7 +243,7 @@ func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serv } } -func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service proxy.ServicePortName, timeout time.Duration) (net.Conn, error) { +func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service proxy.ServicePortPortalName, timeout time.Duration) (net.Conn, error) { activeClients.mu.Lock() defer activeClients.mu.Unlock() diff --git a/pkg/util/netsh/netsh.go b/pkg/util/netsh/netsh.go index 59d1a18b61b2..39e2058460ef 100644 --- a/pkg/util/netsh/netsh.go +++ b/pkg/util/netsh/netsh.go @@ -167,12 +167,12 @@ func (runner *runner) DeleteIPAddress(args []string) error { // GetInterfaceToAddIP returns the interface name where Service IP needs to be added // IP Address needs to be added for netsh portproxy to redirect traffic -// Reads Environment variable INTERFACE_TO_ADD_SERVICE_IP, if it is not defined then "vEthernet (HNS Internal NIC)" is returned +// Reads Environment variable INTERFACE_TO_ADD_SERVICE_IP, if it is not defined then "vEthernet (forwarder)" is returned func (runner *runner) GetInterfaceToAddIP() string { if iface := os.Getenv("INTERFACE_TO_ADD_SERVICE_IP"); len(iface) > 0 { return iface } - return "vEthernet (HNS Internal NIC)" + return "vEthernet (forwarder)" } // Restore is part of Interface. From 0e37f0a890589bf0658be51a8a5164762c690df5 Mon Sep 17 00:00:00 2001 From: Anthony Howe Date: Wed, 8 Feb 2017 07:35:55 -0800 Subject: [PATCH 2/4] cleanup proxier --- pkg/proxy/types.go | 12 - pkg/proxy/winuserspace/BUILD | 3 +- pkg/proxy/winuserspace/port_allocator.go | 158 ----- pkg/proxy/winuserspace/port_allocator_test.go | 178 ----- pkg/proxy/winuserspace/proxier.go | 611 +++++------------- pkg/proxy/winuserspace/proxier_test.go | 248 ++++--- pkg/proxy/winuserspace/proxysocket.go | 18 +- pkg/proxy/winuserspace/types.go | 35 + pkg/util/netsh/netsh.go | 4 +- 9 files changed, 353 insertions(+), 914 deletions(-) delete mode 100644 pkg/proxy/winuserspace/port_allocator.go delete mode 100644 pkg/proxy/winuserspace/port_allocator_test.go create mode 100644 pkg/proxy/winuserspace/types.go diff --git a/pkg/proxy/types.go b/pkg/proxy/types.go index 72f68afc7b40..d9ff569cbb70 100644 --- a/pkg/proxy/types.go +++ b/pkg/proxy/types.go @@ -47,15 +47,3 @@ type ServicePortName struct { func (spn ServicePortName) String() string { return fmt.Sprintf("%s:%s", spn.NamespacedName.String(), spn.Port) } - -// ServicePortPortalName carries a namespace + name + portname + portalip. This is the unique -// identfier for a load-balanced service. -type ServicePortPortalName struct { - types.NamespacedName - Port string - PortalIPName string -} - -func (spn ServicePortPortalName) String() string { - return fmt.Sprintf("%s:%s:%s", spn.NamespacedName.String(), spn.Port, spn.PortalIPName) -} diff --git a/pkg/proxy/winuserspace/BUILD b/pkg/proxy/winuserspace/BUILD index f2fe3cd49628..9dc5e6188ffb 100644 --- a/pkg/proxy/winuserspace/BUILD +++ b/pkg/proxy/winuserspace/BUILD @@ -12,10 +12,10 @@ go_library( name = "go_default_library", srcs = [ "loadbalancer.go", - "port_allocator.go", "proxier.go", "proxysocket.go", "roundrobin.go", + "types.go", "udp_server.go", ], tags = ["automanaged"], @@ -36,7 +36,6 @@ go_library( go_test( name = "go_default_test", srcs = [ - "port_allocator_test.go", "proxier_test.go", "roundrobin_test.go", ], diff --git a/pkg/proxy/winuserspace/port_allocator.go b/pkg/proxy/winuserspace/port_allocator.go deleted file mode 100644 index 65ad777504b6..000000000000 --- a/pkg/proxy/winuserspace/port_allocator.go +++ /dev/null @@ -1,158 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package winuserspace - -import ( - "errors" - "math/big" - "math/rand" - "sync" - "time" - - "k8s.io/apimachinery/pkg/util/net" - "k8s.io/apimachinery/pkg/util/wait" -) - -var ( - errPortRangeNoPortsRemaining = errors.New("port allocation failed; there are no remaining ports left to allocate in the accepted range") -) - -type PortAllocator interface { - AllocateNext() (int, error) - Release(int) -} - -// randomAllocator is a PortAllocator implementation that allocates random ports, yielding -// a port value of 0 for every call to AllocateNext(). -type randomAllocator struct{} - -// AllocateNext always returns 0 -func (r *randomAllocator) AllocateNext() (int, error) { - return 0, nil -} - -// Release is a noop -func (r *randomAllocator) Release(_ int) { - // noop -} - -// newPortAllocator builds PortAllocator for a given PortRange. If the PortRange is empty -// then a random port allocator is returned; otherwise, a new range-based allocator -// is returned. -func newPortAllocator(r net.PortRange) PortAllocator { - if r.Base == 0 { - return &randomAllocator{} - } - return newPortRangeAllocator(r, true) -} - -const ( - portsBufSize = 16 - nextFreePortCooldown = 500 * time.Millisecond - allocateNextTimeout = 1 * time.Second -) - -type rangeAllocator struct { - net.PortRange - ports chan int - used big.Int - lock sync.Mutex - rand *rand.Rand -} - -func newPortRangeAllocator(r net.PortRange, autoFill bool) PortAllocator { - if r.Base == 0 || r.Size == 0 { - panic("illegal argument: may not specify an empty port range") - } - ra := &rangeAllocator{ - PortRange: r, - ports: make(chan int, portsBufSize), - rand: rand.New(rand.NewSource(time.Now().UnixNano())), - } - if autoFill { - go wait.Forever(func() { ra.fillPorts() }, nextFreePortCooldown) - } - return ra -} - -// fillPorts loops, always searching for the next free port and, if found, fills the ports buffer with it. -// this func blocks unless there are no remaining free ports. -func (r *rangeAllocator) fillPorts() { - for { - if !r.fillPortsOnce() { - return - } - } -} - -func (r *rangeAllocator) fillPortsOnce() bool { - port := r.nextFreePort() - if port == -1 { - return false - } - r.ports <- port - return true -} - -// nextFreePort finds a free port, first picking a random port. if that port is already in use -// then the port range is scanned sequentially until either a port is found or the scan completes -// unsuccessfully. an unsuccessful scan returns a port of -1. -func (r *rangeAllocator) nextFreePort() int { - r.lock.Lock() - defer r.lock.Unlock() - - // choose random port - j := r.rand.Intn(r.Size) - if b := r.used.Bit(j); b == 0 { - r.used.SetBit(&r.used, j, 1) - return j + r.Base - } - - // search sequentially - for i := j + 1; i < r.Size; i++ { - if b := r.used.Bit(i); b == 0 { - r.used.SetBit(&r.used, i, 1) - return i + r.Base - } - } - for i := 0; i < j; i++ { - if b := r.used.Bit(i); b == 0 { - r.used.SetBit(&r.used, i, 1) - return i + r.Base - } - } - return -1 -} - -func (r *rangeAllocator) AllocateNext() (port int, err error) { - select { - case port = <-r.ports: - case <-time.After(allocateNextTimeout): - err = errPortRangeNoPortsRemaining - } - return -} - -func (r *rangeAllocator) Release(port int) { - port -= r.Base - if port < 0 || port >= r.Size { - return - } - r.lock.Lock() - defer r.lock.Unlock() - r.used.SetBit(&r.used, port, 0) -} diff --git a/pkg/proxy/winuserspace/port_allocator_test.go b/pkg/proxy/winuserspace/port_allocator_test.go deleted file mode 100644 index 208874617462..000000000000 --- a/pkg/proxy/winuserspace/port_allocator_test.go +++ /dev/null @@ -1,178 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package winuserspace - -import ( - "reflect" - "testing" - - "k8s.io/apimachinery/pkg/util/net" -) - -func TestRangeAllocatorEmpty(t *testing.T) { - r := &net.PortRange{} - r.Set("0-0") - defer func() { - if rv := recover(); rv == nil { - t.Fatalf("expected panic because of empty port range: %#v", r) - } - }() - _ = newPortRangeAllocator(*r, true) -} - -func TestRangeAllocatorFullyAllocated(t *testing.T) { - r := &net.PortRange{} - r.Set("1-1") - // Don't auto-fill ports, we'll manually turn the crank - pra := newPortRangeAllocator(*r, false) - a := pra.(*rangeAllocator) - - // Fill in the one available port - if !a.fillPortsOnce() { - t.Fatalf("Expected to be able to fill ports") - } - - // There should be no ports available - if a.fillPortsOnce() { - t.Fatalf("Expected to be unable to fill ports") - } - - p, err := a.AllocateNext() - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if p != 1 { - t.Fatalf("unexpected allocated port: %d", p) - } - - a.lock.Lock() - if bit := a.used.Bit(p - a.Base); bit != 1 { - a.lock.Unlock() - t.Fatalf("unexpected used bit for allocated port: %d", p) - } - a.lock.Unlock() - - _, err = a.AllocateNext() - if err == nil { - t.Fatalf("expected error because of fully-allocated range") - } - - a.Release(p) - a.lock.Lock() - if bit := a.used.Bit(p - a.Base); bit != 0 { - a.lock.Unlock() - t.Fatalf("unexpected used bit for allocated port: %d", p) - } - a.lock.Unlock() - - // Fill in the one available port - if !a.fillPortsOnce() { - t.Fatalf("Expected to be able to fill ports") - } - - p, err = a.AllocateNext() - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if p != 1 { - t.Fatalf("unexpected allocated port: %d", p) - } - a.lock.Lock() - if bit := a.used.Bit(p - a.Base); bit != 1 { - a.lock.Unlock() - t.Fatalf("unexpected used bit for allocated port: %d", p) - } - a.lock.Unlock() - - _, err = a.AllocateNext() - if err == nil { - t.Fatalf("expected error because of fully-allocated range") - } -} - -func TestRangeAllocator_RandomishAllocation(t *testing.T) { - r := &net.PortRange{} - r.Set("1-100") - pra := newPortRangeAllocator(*r, false) - a := pra.(*rangeAllocator) - - // allocate all the ports - var err error - ports := make([]int, 100, 100) - for i := 0; i < 100; i++ { - if !a.fillPortsOnce() { - t.Fatalf("Expected to be able to fill ports") - } - ports[i], err = a.AllocateNext() - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if ports[i] < 1 || ports[i] > 100 { - t.Fatalf("unexpected allocated port: %d", ports[i]) - } - a.lock.Lock() - if bit := a.used.Bit(ports[i] - a.Base); bit != 1 { - a.lock.Unlock() - t.Fatalf("unexpected used bit for allocated port: %d", ports[i]) - } - a.lock.Unlock() - } - - if a.fillPortsOnce() { - t.Fatalf("Expected to be unable to fill ports") - } - - // release them all - for i := 0; i < 100; i++ { - a.Release(ports[i]) - a.lock.Lock() - if bit := a.used.Bit(ports[i] - a.Base); bit != 0 { - a.lock.Unlock() - t.Fatalf("unexpected used bit for allocated port: %d", ports[i]) - } - a.lock.Unlock() - } - - // allocate the ports again - rports := make([]int, 100, 100) - for i := 0; i < 100; i++ { - if !a.fillPortsOnce() { - t.Fatalf("Expected to be able to fill ports") - } - rports[i], err = a.AllocateNext() - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if rports[i] < 1 || rports[i] > 100 { - t.Fatalf("unexpected allocated port: %d", rports[i]) - } - a.lock.Lock() - if bit := a.used.Bit(rports[i] - a.Base); bit != 1 { - a.lock.Unlock() - t.Fatalf("unexpected used bit for allocated port: %d", rports[i]) - } - a.lock.Unlock() - } - - if a.fillPortsOnce() { - t.Fatalf("Expected to be unable to fill ports") - } - - if reflect.DeepEqual(ports, rports) { - t.Fatalf("expected re-allocated ports to be in a somewhat random order") - } -} diff --git a/pkg/proxy/winuserspace/proxier.go b/pkg/proxy/winuserspace/proxier.go index 32c91983613e..f77ddbf0c30e 100644 --- a/pkg/proxy/winuserspace/proxier.go +++ b/pkg/proxy/winuserspace/proxier.go @@ -27,7 +27,6 @@ import ( "github.com/golang/glog" "k8s.io/apimachinery/pkg/types" - //utilerrors "k8s.io/apimachinery/pkg/util/errors" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/kubernetes/pkg/api" @@ -35,6 +34,9 @@ import ( "k8s.io/kubernetes/pkg/util/netsh" ) +const allAvailableInterfaces string = "" +const stickyMaxAgeMinutes int = 180 // TODO: parameterize this in the API. + type portal struct { ip string port int @@ -42,19 +44,13 @@ type portal struct { } type serviceInfo struct { - isAliveAtomic int32 // Only access this with atomic ops - portal portal - protocol api.Protocol - proxyPort int - socket proxySocket - timeout time.Duration - activeClients *clientCache - //nodePort int - //loadBalancerStatus api.LoadBalancerStatus + isAliveAtomic int32 // Only access this with atomic ops + portal portal + protocol api.Protocol + socket proxySocket + timeout time.Duration + activeClients *clientCache sessionAffinityType api.ServiceAffinity - stickyMaxAgeMinutes int - // Deprecated, but required for back-compat (including e2e) - //externalIPs []string } func (info *serviceInfo) setAlive(b bool) { @@ -84,16 +80,14 @@ func logTimeout(err error) bool { type Proxier struct { loadBalancer LoadBalancer mu sync.Mutex // protects serviceMap - serviceMap map[proxy.ServicePortPortalName]*serviceInfo + serviceMap map[ServicePortPortalName]*serviceInfo syncPeriod time.Duration udpIdleTimeout time.Duration portMapMutex sync.Mutex portMap map[portMapKey]*portMapValue numProxyLoops int32 // use atomic ops to access this; mostly for testing - //listenIP net.IP - netsh netsh.Interface - hostIP net.IP - //proxyPorts PortAllocator + netsh netsh.Interface + hostIP net.IP } // assert Proxier is a ProxyProvider @@ -113,7 +107,7 @@ func (k *portMapKey) String() string { // A value for the portMap type portMapValue struct { - owner proxy.ServicePortPortalName + owner ServicePortPortalName socket interface { Close() error } @@ -159,7 +153,7 @@ func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, netsh netsh.Interfac func createProxier(loadBalancer LoadBalancer, listenIP net.IP, netsh netsh.Interface, hostIP net.IP, syncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) { return &Proxier{ loadBalancer: loadBalancer, - serviceMap: make(map[proxy.ServicePortPortalName]*serviceInfo), + serviceMap: make(map[ServicePortPortalName]*serviceInfo), portMap: make(map[portMapKey]*portMapValue), syncPeriod: syncPeriod, udpIdleTimeout: udpIdleTimeout, @@ -170,7 +164,6 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, netsh netsh.Inter // Sync is called to immediately synchronize the proxier state func (proxier *Proxier) Sync() { - proxier.ensurePortals() proxier.cleanupStaleStickySessions() } @@ -185,110 +178,146 @@ func (proxier *Proxier) SyncLoop() { } } -// Ensure that portals exist for all services. -func (proxier *Proxier) ensurePortals() { - /*proxier.mu.Lock() - defer proxier.mu.Unlock() - // NB: This does not remove rules that should not be present. - for name, info := range proxier.serviceMap { - err := proxier.openPortal(name, info) - if err != nil { - glog.Errorf("Failed to ensure portal for %q: %v", name, err) - } - }*/ -} - // cleanupStaleStickySessions cleans up any stale sticky session records in the hash map. func (proxier *Proxier) cleanupStaleStickySessions() { proxier.mu.Lock() defer proxier.mu.Unlock() - spnMap := make(map[proxy.ServicePortName]bool) - for k := range proxier.serviceMap { - spn := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: k.Namespace, Name: k.Name}, Port: k.Port} - spnMap[spn] = true - } - for name := range spnMap { - proxier.loadBalancer.CleanupStaleStickySessions(name) + servicePortNameMap := make(map[proxy.ServicePortName]bool) + for name := range proxier.serviceMap { + servicePortName := proxy.ServicePortName{ + NamespacedName: types.NamespacedName{ + Namespace: name.Namespace, + Name: name.Name, + }, + Port: name.Port, + } + if servicePortNameMap[servicePortName] == false { + // ensure cleanup sticky sessions only gets called once per serviceportname + servicePortNameMap[servicePortName] = true + proxier.loadBalancer.CleanupStaleStickySessions(servicePortName) + } } } // This assumes proxier.mu is not locked. -func (proxier *Proxier) stopProxy(service proxy.ServicePortPortalName, info *serviceInfo) error { +func (proxier *Proxier) stopProxy(service ServicePortPortalName, info *serviceInfo) error { proxier.mu.Lock() defer proxier.mu.Unlock() return proxier.stopProxyInternal(service, info) } // This assumes proxier.mu is locked. -func (proxier *Proxier) stopProxyInternal(service proxy.ServicePortPortalName, info *serviceInfo) error { +func (proxier *Proxier) stopProxyInternal(service ServicePortPortalName, info *serviceInfo) error { delete(proxier.serviceMap, service) info.setAlive(false) err := info.socket.Close() return err } -func (proxier *Proxier) getServiceInfo(service proxy.ServicePortPortalName) (*serviceInfo, bool) { +func (proxier *Proxier) getServiceInfo(service ServicePortPortalName) (*serviceInfo, bool) { proxier.mu.Lock() defer proxier.mu.Unlock() info, ok := proxier.serviceMap[service] return info, ok } -func (proxier *Proxier) setServiceInfo(service proxy.ServicePortPortalName, info *serviceInfo) { +func (proxier *Proxier) setServiceInfo(service ServicePortPortalName, info *serviceInfo) { proxier.mu.Lock() defer proxier.mu.Unlock() proxier.serviceMap[service] = info } -/* -// addServiceOnPort starts listening for a new service, returning the serviceInfo. -// Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP -// connections, for now. -func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortPortalName, protocol api.Protocol, proxyPort int, timeout time.Duration) (*serviceInfo, error) { - sock, err := newProxySocket(protocol, proxier.listenIP, proxyPort) - if err != nil { - return nil, err - } - _, portStr, err := net.SplitHostPort(sock.Addr().String()) - if err != nil { - sock.Close() - return nil, err +// addServicePortPortal starts listening for a new service, returning the serviceInfo. +// The timeout only applies to UDP connections, for now. +func (proxier *Proxier) addServicePortPortal(servicePortPortalName ServicePortPortalName, protocol api.Protocol, listenIP string, port int, timeout time.Duration) (*serviceInfo, error) { + var serviceIP net.IP + if listenIP != allAvailableInterfaces { + if serviceIP = net.ParseIP(listenIP); serviceIP == nil { + return nil, fmt.Errorf("could not parse ip '%q'", listenIP) + } + // add the IP address. Node port binds to all interfaces. + args := proxier.netshIpv4AddressAddArgs(serviceIP) + if existed, err := proxier.netsh.EnsureIPAddress(args, serviceIP); err != nil { + return nil, err + } else if !existed { + glog.V(3).Infof("Added ip address to fowarder interface for service %q at %s:%d/%s", servicePortPortalName, listenIP, port, protocol) + } } - portNum, err := strconv.Atoi(portStr) + + // add the listener, proxy + sock, err := newProxySocket(protocol, serviceIP, port) if err != nil { - sock.Close() return nil, err } si := &serviceInfo{ - isAliveAtomic: 1, - proxyPort: portNum, + isAliveAtomic: 1, + portal: portal{ + ip: listenIP, + port: port, + isExternal: false, + }, protocol: protocol, socket: sock, timeout: timeout, activeClients: newClientCache(), sessionAffinityType: api.ServiceAffinityNone, // default - stickyMaxAgeMinutes: 180, // TODO: parameterize this in the API. } - proxier.setServiceInfo(service, si) + proxier.setServiceInfo(servicePortPortalName, si) - glog.V(2).Infof("Proxying for service %q on %s port %d", service, protocol, portNum) - go func(service proxy.ServicePortName, proxier *Proxier) { + glog.V(2).Infof("Proxying for service %q at %s:%d/%s", servicePortPortalName, listenIP, port, protocol) + go func(service ServicePortPortalName, proxier *Proxier) { defer runtime.HandleCrash() atomic.AddInt32(&proxier.numProxyLoops, 1) sock.ProxyLoop(service, si, proxier) atomic.AddInt32(&proxier.numProxyLoops, -1) - }(service, proxier) + }(servicePortPortalName, proxier) return si, nil } -*/ + +func (proxier *Proxier) closeServicePortPortal(servicePortPortalName ServicePortPortalName, info *serviceInfo) error { + // turn off the proxy + if err := proxier.stopProxy(servicePortPortalName, info); err != nil { + return err + } + + // close the PortalProxy by deleting the service IP address + if info.portal.ip != allAvailableInterfaces { + serviceIP := net.ParseIP(info.portal.ip) + args := proxier.netshIpv4AddressDeleteArgs(serviceIP) + if err := proxier.netsh.DeleteIPAddress(args); err != nil { + return err + } + } + return nil +} + +// getListenIPPortMap returns a slice of all listen IPs for a service. +func getListenIPPortMap(service *api.Service, listenPort int, nodePort int) map[string]int { + listenIPPortMap := make(map[string]int) + listenIPPortMap[service.Spec.ClusterIP] = listenPort + + for _, ip := range service.Spec.ExternalIPs { + listenIPPortMap[ip] = listenPort + } + + for _, ingress := range service.Status.LoadBalancer.Ingress { + listenIPPortMap[ingress.IP] = listenPort + } + + if nodePort != 0 { + listenIPPortMap[allAvailableInterfaces] = nodePort + } + + return listenIPPortMap +} // OnServiceUpdate manages the active set of service proxies. // Active service proxies are reinitialized if found in the update set or // shutdown if missing from the update set. func (proxier *Proxier) OnServiceUpdate(services []api.Service) { glog.V(4).Infof("Received update notice: %+v", services) - activeServices := make(map[proxy.ServicePortPortalName]bool) // use a map as a set + activeServicePortPortals := make(map[ServicePortPortalName]bool) // use a map as a set for i := range services { service := &services[i] @@ -300,393 +329,91 @@ func (proxier *Proxier) OnServiceUpdate(services []api.Service) { for i := range service.Spec.Ports { servicePort := &service.Spec.Ports[i] - // create a slice of source IPs - var listenIPs []string - listenIPs = append(listenIPs, service.Spec.ClusterIP) - - /*for _, ip := range service.Spec.ExternalIPs { - listenIPs = append(listenIPs, ip) - } - - for _, ip := range service.Status.LoadBalancer.Ingress { - listenIPs = append(listenIPs, ip) - } - - if int(service.Spec.Ports[i]) != 0 { - listenIPs = append(listenIPs, "") - }*/ - - for _, listenIP := range listenIPs { - serviceName := proxy.ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: servicePort.Name, PortalIPName: listenIP} - activeServices[serviceName] = true - serviceIP := net.ParseIP(listenIP) - info, exists := proxier.getServiceInfo(serviceName) - if exists && sameConfig(info, service, servicePort) { + // create a slice of all the source IPs to use for service port portals + listenIPPortMap := getListenIPPortMap(service, int(servicePort.Port), int(servicePort.NodePort)) + protocol := servicePort.Protocol + + for listenIP, listenPort := range listenIPPortMap { + servicePortPortalName := ServicePortPortalName{ + NamespacedName: types.NamespacedName{ + Namespace: service.Namespace, + Name: service.Name, + }, + Port: servicePort.Name, + PortalIPName: listenIP, + } + activeServicePortPortals[servicePortPortalName] = true + info, exists := proxier.getServiceInfo(servicePortPortalName) + if exists && sameConfig(info, service, protocol, listenPort) { // Nothing changed. continue } if exists { - glog.V(4).Infof("Something changed for service %q: stopping it", serviceName) - - // turn off the proxy - err := proxier.stopProxy(serviceName, info) - if err != nil { - glog.Errorf("Failed to stop service %q: %v", serviceName, err) + glog.V(4).Infof("Something changed for service %q: stopping it", servicePortPortalName) + if err := proxier.closeServicePortPortal(servicePortPortalName, info); err != nil { + glog.Errorf("Failed to close service port portal %q: %v", servicePortPortalName, err) } - - // close the PortalProxy if it is not a node port - if serviceIP != nil { - args := proxier.netshIpv4AddressDeleteArgs(serviceIP) - if err := proxier.netsh.DeleteIPAddress(args); err != nil { - glog.Errorf("Failed to delete IP address for service %q, error %s", serviceName, err.Error()) - } - } else { - // TODO(ajh) release the node port - } - } - - glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol) - - // add the IP address if it is not a node port - if serviceIP != nil { - args := proxier.netshIpv4AddressAddArgs(serviceIP) - existed, err := proxier.netsh.EnsureIPAddress(args, serviceIP) - if err != nil { - glog.Errorf("Failed to add ip address for service %q, args:%v", serviceName, args) - continue - } - if !existed { - glog.V(3).Infof("Added ip address to fowarder interface for service %q on %s %s:%d", serviceName, servicePort.Protocol, serviceIP, int(servicePort.Port)) - } - } else { - // TODO(ajh) handle the node port } - - // add the listener, proxy - sock, err := newProxySocket(servicePort.Protocol, serviceIP, int(servicePort.Port)) + glog.V(1).Infof("Adding new service %q at %s:%d/%s", servicePortPortalName, listenIP, listenPort, protocol) + info, err := proxier.addServicePortPortal(servicePortPortalName, protocol, listenIP, listenPort, proxier.udpIdleTimeout) if err != nil { - glog.Errorf("failed to create a new proxy socket for service %q: %v", serviceName, err) + glog.Errorf("Failed to start proxy for %q: %v", servicePortPortalName, err) continue } - si := &serviceInfo{ - isAliveAtomic: 1, - portal: portal{ip: listenIP, port: int(servicePort.Port), isExternal: false}, - protocol: servicePort.Protocol, - proxyPort: int(servicePort.Port), - socket: sock, - timeout: proxier.udpIdleTimeout, - activeClients: newClientCache(), - sessionAffinityType: service.Spec.SessionAffinity, // default - stickyMaxAgeMinutes: 180, // TODO: parameterize this in the API. - } - glog.V(4).Infof("info: %#v", si) - proxier.setServiceInfo(serviceName, si) - - glog.V(2).Infof("Proxying for service %q on %s port %d", serviceName, servicePort.Protocol, int(servicePort.Port)) - go func(service proxy.ServicePortPortalName, proxier *Proxier) { - defer runtime.HandleCrash() - atomic.AddInt32(&proxier.numProxyLoops, 1) - sock.ProxyLoop(service, si, proxier) - atomic.AddInt32(&proxier.numProxyLoops, -1) - }(serviceName, proxier) + info.sessionAffinityType = service.Spec.SessionAffinity + glog.V(10).Infof("info: %#v", info) } - if len(listenIPs) > 0 { - // only one loadbalancer per service endpoint - servicePortName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: servicePort.Name} - proxier.loadBalancer.NewService(servicePortName, service.Spec.SessionAffinity, 180) + if len(listenIPPortMap) > 0 { + // only one loadbalancer per service port portal + servicePortName := proxy.ServicePortName{ + NamespacedName: types.NamespacedName{ + Namespace: service.Namespace, + Name: service.Name, + }, + Port: servicePort.Name, + } + proxier.loadBalancer.NewService(servicePortName, service.Spec.SessionAffinity, stickyMaxAgeMinutes) } } } - proxier.mu.Lock() - defer proxier.mu.Unlock() - - spnMap := make(map[proxy.ServicePortName]bool) for name, info := range proxier.serviceMap { - spn := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: name.Namespace, Name: name.Name}, Port: name.Port} - spnMap[spn] = spnMap[spn] || activeServices[name] - - if !activeServices[name] { - serviceIP := net.ParseIP(info.portal.ip) + if !activeServicePortPortals[name] { glog.V(1).Infof("Stopping service %q", name) - // turn off the proxy - err := proxier.stopProxy(name, info) - if err != nil { - glog.Errorf("Failed to stop service %q: %v", name, err) - } - - // close the PortalProxy if it is not a node port - if serviceIP != nil { - args := proxier.netshIpv4AddressDeleteArgs(serviceIP) - if err := proxier.netsh.DeleteIPAddress(args); err != nil { - glog.Errorf("Failed to stop service %q: %v", name, err) - } - } else { - // TODO(ajh) release the node port + if err := proxier.closeServicePortPortal(name, info); err != nil { + glog.Errorf("Failed to close service port portal %q: %v", name, err) } } } - // only delete spn if all listen ips show inactive - for k := range spnMap { - if !spnMap[k] { - proxier.loadBalancer.DeleteService(k) - } - } -} -func sameConfig(info *serviceInfo, service *api.Service, port *api.ServicePort) bool { - if info.protocol != port.Protocol || info.portal.port != int(port.Port) { - return false - } - if info.sessionAffinityType != service.Spec.SessionAffinity { - return false - } - return true -} + proxier.mu.Lock() + defer proxier.mu.Unlock() -func ipsEqual(lhs, rhs []string) bool { - if len(lhs) != len(rhs) { - return false - } - for i := range lhs { - if lhs[i] != rhs[i] { - return false + // servicePortNameMap tracks all service port portals with the same name/port. + // A value of true means there is one or more service port portals with name/port pair. + servicePortNameMap := make(map[proxy.ServicePortName]bool) + for name := range proxier.serviceMap { + servicePortName := proxy.ServicePortName{ + NamespacedName: types.NamespacedName{ + Namespace: name.Namespace, + Name: name.Name, + }, + Port: name.Port, } + servicePortNameMap[servicePortName] = servicePortNameMap[servicePortName] || activeServicePortPortals[name] } - return true -} -/* -func (proxier *Proxier) openPortal(service proxy.ServicePortName, info *serviceInfo) error { - err := proxier.openOnePortal(info.portal, info.protocol, proxier.listenIP, info.proxyPort, service) - if err != nil { - return err - } - for _, publicIP := range info.externalIPs { - err = proxier.openOnePortal(portal{net.ParseIP(publicIP), info.portal.port, true}, info.protocol, proxier.listenIP, info.proxyPort, service) - if err != nil { - return err - } - } - for _, ingress := range info.loadBalancerStatus.Ingress { - if ingress.IP != "" { - err = proxier.openOnePortal(portal{net.ParseIP(ingress.IP), info.portal.port, false}, info.protocol, proxier.listenIP, info.proxyPort, service) - if err != nil { - return err - } - } - } - if info.nodePort != 0 { - err = proxier.openNodePort(info.nodePort, info.protocol, proxier.listenIP, info.proxyPort, service) - if err != nil { - return err + // Only delete load balancer if all listen ips per name/port show inactive. + for name := range servicePortNameMap { + if !servicePortNameMap[name] { + proxier.loadBalancer.DeleteService(name) } } - return nil } -*/ -/* -func (proxier *Proxier) openOnePortal(portal portal, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) error { - if protocol == api.ProtocolUDP { - glog.Warningf("Not adding rule for %q on %s:%d as UDP protocol is not supported by netsh portproxy", name, portal.ip, portal.port) - return nil - } - - // Add IP address to "vEthernet (HNSTransparent)" so that portproxy could be used to redirect the traffic - args := proxier.netshIpv4AddressAddArgs(portal.ip) - existed, err := proxier.netsh.EnsureIPAddress(args, portal.ip) - - if err != nil { - glog.Errorf("Failed to add ip address for service %q, args:%v", name, args) - return err - } - if !existed { - glog.V(3).Infof("Added ip address to HNSTransparent interface for service %q on %s %s:%d", name, protocol, portal.ip, portal.port) - } - - args = proxier.netshPortProxyAddArgs(portal.ip, portal.port, proxyIP, proxyPort, name) - existed, err = proxier.netsh.EnsurePortProxyRule(args) - - if err != nil { - glog.Errorf("Failed to run portproxy rule for service %q, args:%v", name, args) - return err - } - if !existed { - glog.V(3).Infof("Added portproxy rule for service %q on %s %s:%d", name, protocol, portal.ip, portal.port) - } - - return nil -} -*/ - -/* -// claimNodePort marks a port as being owned by a particular service, or returns error if already claimed. -// Idempotent: reclaiming with the same owner is not an error -func (proxier *Proxier) claimNodePort(ip net.IP, port int, protocol api.Protocol, owner proxy.ServicePortName) error { - proxier.portMapMutex.Lock() - defer proxier.portMapMutex.Unlock() - - // TODO: We could pre-populate some reserved ports into portMap and/or blacklist some well-known ports - - key := portMapKey{ip: ip.String(), port: port, protocol: protocol} - existing, found := proxier.portMap[key] - if !found { - // Hold the actual port open, even though we use iptables to redirect - // it. This ensures that a) it's safe to take and b) that stays true. - // NOTE: We should not need to have a real listen()ing socket - bind() - // should be enough, but I can't figure out a way to e2e test without - // it. Tools like 'ss' and 'netstat' do not show sockets that are - // bind()ed but not listen()ed, and at least the default debian netcat - // has no way to avoid about 10 seconds of retries. - socket, err := newProxySocket(protocol, ip, port) - if err != nil { - return fmt.Errorf("can't open node port for %s: %v", key.String(), err) - } - proxier.portMap[key] = &portMapValue{owner: owner, socket: socket} - glog.V(2).Infof("Claimed local port %s", key.String()) - return nil - } - if existing.owner == owner { - // We are idempotent - return nil - } - return fmt.Errorf("Port conflict detected on port %s. %v vs %v", key.String(), owner, existing) -} -*/ - -/* -// releaseNodePort releases a claim on a port. Returns an error if the owner does not match the claim. -// Tolerates release on an unclaimed port, to simplify . -func (proxier *Proxier) releaseNodePort(ip net.IP, port int, protocol api.Protocol, owner proxy.ServicePortName) error { - proxier.portMapMutex.Lock() - defer proxier.portMapMutex.Unlock() - - key := portMapKey{ip: ip.String(), port: port, protocol: protocol} - existing, found := proxier.portMap[key] - if !found { - // We tolerate this, it happens if we are cleaning up a failed allocation - glog.Infof("Ignoring release on unowned port: %v", key) - return nil - } - if existing.owner != owner { - return fmt.Errorf("Port conflict detected on port %v (unowned unlock). %v vs %v", key, owner, existing) - } - delete(proxier.portMap, key) - existing.socket.Close() - return nil -} -*/ - -/* -func (proxier *Proxier) openNodePort(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) error { - if protocol == api.ProtocolUDP { - glog.Warningf("Not adding node port rule for %q on port %d as UDP protocol is not supported by netsh portproxy", name, nodePort) - return nil - } - - err := proxier.claimNodePort(nil, nodePort, protocol, name) - if err != nil { - return err - } - - args := proxier.netshPortProxyAddArgs(nil, nodePort, proxyIP, proxyPort, name) - existed, err := proxier.netsh.EnsurePortProxyRule(args) - - if err != nil { - glog.Errorf("Failed to run portproxy rule for service %q", name) - return err - } - if !existed { - glog.Infof("Added portproxy rule for service %q on %s port %d", name, protocol, nodePort) - } - - return nil -}*/ - -/* -func (proxier *Proxier) closePortal(service proxy.ServicePortName, info *serviceInfo) error { - // Collect errors and report them all at the end. - el := proxier.closeOnePortal(info.portal, info.protocol, proxier.listenIP, info.proxyPort, service) - for _, publicIP := range info.externalIPs { - el = append(el, proxier.closeOnePortal(portal{net.ParseIP(publicIP), info.portal.port, true}, info.protocol, proxier.listenIP, info.proxyPort, service)...) - } - for _, ingress := range info.loadBalancerStatus.Ingress { - if ingress.IP != "" { - el = append(el, proxier.closeOnePortal(portal{net.ParseIP(ingress.IP), info.portal.port, false}, info.protocol, proxier.listenIP, info.proxyPort, service)...) - } - } - if info.nodePort != 0 { - el = append(el, proxier.closeNodePort(info.nodePort, info.protocol, proxier.listenIP, info.proxyPort, service)...) - } - if len(el) == 0 { - glog.V(3).Infof("Closed iptables portals for service %q", service) - } else { - glog.Errorf("Some errors closing iptables portals for service %q", service) - } - return utilerrors.NewAggregate(el) -}*/ - -/* -func (proxier *Proxier) closeOnePortal(portal portal, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) []error { - el := []error{} - - if local, err := isLocalIP(portal.ip); err != nil { - el = append(el, fmt.Errorf("can't determine if IP is local, assuming not: %v", err)) - } else if local { - if err := proxier.releaseNodePort(portal.ip, portal.port, protocol, name); err != nil { - el = append(el, err) - } - } - - args := proxier.netshIpv4AddressDeleteArgs(portal.ip) - if err := proxier.netsh.DeleteIPAddress(args); err != nil { - glog.Errorf("Failed to delete IP address for service %q", name) - el = append(el, err) - } - - args = proxier.netshPortProxyDeleteArgs(portal.ip, portal.port, proxyIP, proxyPort, name) - if err := proxier.netsh.DeletePortProxyRule(args); err != nil { - glog.Errorf("Failed to delete portproxy rule for service %q", name) - el = append(el, err) - } - - return el -}*/ - -/* -func (proxier *Proxier) closeNodePort(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) []error { - el := []error{} - - args := proxier.netshPortProxyDeleteArgs(nil, nodePort, proxyIP, proxyPort, name) - if err := proxier.netsh.DeletePortProxyRule(args); err != nil { - glog.Errorf("Failed to delete portproxy rule for service %q", name) - el = append(el, err) - } - - if err := proxier.releaseNodePort(nil, nodePort, protocol, name); err != nil { - el = append(el, err) - } - - return el -}*/ - -func isLocalIP(ip net.IP) (bool, error) { - addrs, err := net.InterfaceAddrs() - if err != nil { - return false, err - } - for i := range addrs { - intf, _, err := net.ParseCIDR(addrs[i].String()) - if err != nil { - return false, err - } - if ip.Equal(intf) { - return true, nil - } - } - return false, nil +func sameConfig(info *serviceInfo, service *api.Service, protocol api.Protocol, listenPort int) bool { + return info.protocol == protocol && info.portal.port == listenPort && info.sessionAffinityType == service.Spec.SessionAffinity } func isTooManyFDsError(err error) bool { @@ -700,22 +427,6 @@ func isClosedError(err error) bool { return strings.HasSuffix(err.Error(), "use of closed network connection") } -/* -func (proxier *Proxier) netshPortProxyAddArgs(destIP net.IP, destPort int, proxyIP net.IP, proxyPort int, service proxy.ServicePortName) []string { - args := []string{ - "interface", "portproxy", "set", "v4tov4", - "listenPort=" + strconv.Itoa(destPort), - "connectaddress=" + proxyIP.String(), - "connectPort=" + strconv.Itoa(proxyPort), - } - if destIP != nil { - args = append(args, "listenaddress="+destIP.String()) - } - - return args -} -*/ - func (proxier *Proxier) netshIpv4AddressAddArgs(destIP net.IP) []string { intName := proxier.netsh.GetInterfaceToAddIP() args := []string{ @@ -727,20 +438,6 @@ func (proxier *Proxier) netshIpv4AddressAddArgs(destIP net.IP) []string { return args } -/* -func (proxier *Proxier) netshPortProxyDeleteArgs(destIP net.IP, destPort int, proxyIP net.IP, proxyPort int, service proxy.ServicePortName) []string { - args := []string{ - "interface", "portproxy", "delete", "v4tov4", - "listenPort=" + strconv.Itoa(destPort), - } - if destIP != nil { - args = append(args, "listenaddress="+destIP.String()) - } - - return args -} -*/ - func (proxier *Proxier) netshIpv4AddressDeleteArgs(destIP net.IP) []string { intName := proxier.netsh.GetInterfaceToAddIP() args := []string{ diff --git a/pkg/proxy/winuserspace/proxier_test.go b/pkg/proxy/winuserspace/proxier_test.go index 60da5fda939f..925ea068c176 100644 --- a/pkg/proxy/winuserspace/proxier_test.go +++ b/pkg/proxy/winuserspace/proxier_test.go @@ -198,6 +198,21 @@ func waitForNumProxyClients(t *testing.T, s *serviceInfo, want int, timeout time t.Errorf("expected %d ProxyClients live, got %d", want, got) } +func getPortNum(t *testing.T, addr string) int { + _, portStr, err := net.SplitHostPort(addr) + if err != nil { + t.Errorf("error getting port from %s", addr) + return 0 + } + portNum, err := strconv.Atoi(portStr) + if err != nil { + t.Errorf("error getting port from %s", addr) + return 0 + } + + return portNum +} + func TestTCPProxy(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} @@ -211,17 +226,19 @@ func TestTCPProxy(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) + listenIP := "0.0.0.0" + p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest) if err != nil { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) + servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP} + svcInfo, err := p.addServicePortPortal(servicePortPortalName, "TCP", listenIP, 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } - testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) + testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String())) waitForNumProxyLoops(t, p, 1) } @@ -238,17 +255,19 @@ func TestUDPProxy(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) + listenIP := "0.0.0.0" + p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest) if err != nil { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) + servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP} + svcInfo, err := p.addServicePortPortal(servicePortPortalName, "UDP", listenIP, 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } - testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) + testEchoUDP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String())) waitForNumProxyLoops(t, p, 1) } @@ -265,18 +284,20 @@ func TestUDPProxyTimeout(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) + listenIP := "0.0.0.0" + p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest) if err != nil { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) + servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP} + svcInfo, err := p.addServicePortPortal(servicePortPortalName, "UDP", listenIP, 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } waitForNumProxyLoops(t, p, 1) - testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) + testEchoUDP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String())) // When connecting to a UDP service endpoint, there should be a Conn for proxy. waitForNumProxyClients(t, svcInfo, 1, time.Second) // If conn has no activity for serviceInfo.timeout since last Read/Write, it should be closed because of timeout. @@ -301,24 +322,27 @@ func TestMultiPortProxy(t *testing.T) { }}, }}) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) + listenIP := "0.0.0.0" + p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest) if err != nil { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) - svcInfoP, err := p.addServiceOnPort(serviceP, "TCP", 0, time.Second) + servicePortPortalNameP := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: serviceP.Namespace, Name: serviceP.Name}, Port: serviceP.Port, PortalIPName: listenIP} + svcInfoP, err := p.addServicePortPortal(servicePortPortalNameP, "TCP", listenIP, 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } - testEchoTCP(t, "127.0.0.1", svcInfoP.proxyPort) + testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfoP.socket.Addr().String())) waitForNumProxyLoops(t, p, 1) - svcInfoQ, err := p.addServiceOnPort(serviceQ, "UDP", 0, time.Second) + servicePortPortalNameQ := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: serviceQ.Namespace, Name: serviceQ.Name}, Port: serviceQ.Port, PortalIPName: listenIP} + svcInfoQ, err := p.addServicePortPortal(servicePortPortalNameQ, "UDP", listenIP, 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } - testEchoUDP(t, "127.0.0.1", svcInfoQ.proxyPort) + testEchoUDP(t, "127.0.0.1", getPortNum(t, svcInfoQ.socket.Addr().String())) waitForNumProxyLoops(t, p, 2) } @@ -328,7 +352,8 @@ func TestMultiPortOnServiceUpdate(t *testing.T) { serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "q"} serviceX := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "x"} - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) + listenIP := "0.0.0.0" + p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest) if err != nil { t.Fatal(err) } @@ -336,41 +361,45 @@ func TestMultiPortOnServiceUpdate(t *testing.T) { p.OnServiceUpdate([]api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, - Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ + Spec: api.ServiceSpec{ClusterIP: "0.0.0.0", Ports: []api.ServicePort{{ Name: "p", - Port: 80, + Port: 0, Protocol: "TCP", }, { Name: "q", - Port: 81, + Port: 0, Protocol: "UDP", }}}, }}) waitForNumProxyLoops(t, p, 2) - svcInfo, exists := p.getServiceInfo(serviceP) + + servicePortPortalNameP := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: serviceP.Namespace, Name: serviceP.Name}, Port: serviceP.Port, PortalIPName: listenIP} + svcInfo, exists := p.getServiceInfo(servicePortPortalNameP) if !exists { - t.Fatalf("can't find serviceInfo for %s", serviceP) + t.Fatalf("can't find serviceInfo for %s", servicePortPortalNameP) } - if svcInfo.portal.ip.String() != "1.2.3.4" || svcInfo.portal.port != 80 || svcInfo.protocol != "TCP" { + if svcInfo.portal.ip != "0.0.0.0" || svcInfo.portal.port != 0 || svcInfo.protocol != "TCP" { t.Errorf("unexpected serviceInfo for %s: %#v", serviceP, svcInfo) } - svcInfo, exists = p.getServiceInfo(serviceQ) + servicePortPortalNameQ := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: serviceQ.Namespace, Name: serviceQ.Name}, Port: serviceQ.Port, PortalIPName: listenIP} + svcInfo, exists = p.getServiceInfo(servicePortPortalNameQ) if !exists { - t.Fatalf("can't find serviceInfo for %s", serviceQ) + t.Fatalf("can't find serviceInfo for %s", servicePortPortalNameQ) } - if svcInfo.portal.ip.String() != "1.2.3.4" || svcInfo.portal.port != 81 || svcInfo.protocol != "UDP" { + if svcInfo.portal.ip != "0.0.0.0" || svcInfo.portal.port != 0 || svcInfo.protocol != "UDP" { t.Errorf("unexpected serviceInfo for %s: %#v", serviceQ, svcInfo) } - svcInfo, exists = p.getServiceInfo(serviceX) + servicePortPortalNameX := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: serviceX.Namespace, Name: serviceX.Name}, Port: serviceX.Port, PortalIPName: listenIP} + svcInfo, exists = p.getServiceInfo(servicePortPortalNameX) if exists { t.Fatalf("found unwanted serviceInfo for %s: %#v", serviceX, svcInfo) } } // Helper: Stops the proxy for the named service. -func stopProxyByName(proxier *Proxier, service proxy.ServicePortName) error { +func stopProxyByName(proxier *Proxier, service ServicePortPortalName) error { info, found := proxier.getServiceInfo(service) if !found { return fmt.Errorf("unknown service: %s", service) @@ -391,32 +420,34 @@ func TestTCPProxyStop(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) + listenIP := "0.0.0.0" + p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest) if err != nil { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) + servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP} + svcInfo, err := p.addServicePortPortal(servicePortPortalName, "TCP", listenIP, 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } if !svcInfo.isAlive() { t.Fatalf("wrong value for isAlive(): expected true") } - conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort)) + conn, err := net.Dial("tcp", joinHostPort("", getPortNum(t, svcInfo.socket.Addr().String()))) if err != nil { t.Fatalf("error connecting to proxy: %v", err) } conn.Close() waitForNumProxyLoops(t, p, 1) - stopProxyByName(p, service) + stopProxyByName(p, servicePortPortalName) if svcInfo.isAlive() { t.Fatalf("wrong value for isAlive(): expected false") } // Wait for the port to really close. - if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { + if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { t.Fatalf(err.Error()) } waitForNumProxyLoops(t, p, 0) @@ -435,26 +466,28 @@ func TestUDPProxyStop(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) + listenIP := "0.0.0.0" + p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest) if err != nil { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) + servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP} + svcInfo, err := p.addServicePortPortal(servicePortPortalName, "UDP", listenIP, 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } - conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort)) + conn, err := net.Dial("udp", joinHostPort("", getPortNum(t, svcInfo.socket.Addr().String()))) if err != nil { t.Fatalf("error connecting to proxy: %v", err) } conn.Close() waitForNumProxyLoops(t, p, 1) - stopProxyByName(p, service) + stopProxyByName(p, servicePortPortalName) // Wait for the port to really close. - if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { + if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { t.Fatalf(err.Error()) } waitForNumProxyLoops(t, p, 0) @@ -473,17 +506,20 @@ func TestTCPProxyUpdateDelete(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) + listenIP := "0.0.0.0" + p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest) if err != nil { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) + servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP} + svcInfo, err := p.addServicePortPortal(servicePortPortalName, "TCP", listenIP, 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } - conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort)) + fmt.Println("here0") + conn, err := net.Dial("tcp", joinHostPort("", getPortNum(t, svcInfo.socket.Addr().String()))) if err != nil { t.Fatalf("error connecting to proxy: %v", err) } @@ -491,7 +527,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) { waitForNumProxyLoops(t, p, 1) p.OnServiceUpdate([]api.Service{}) - if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { + if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { t.Fatalf(err.Error()) } waitForNumProxyLoops(t, p, 0) @@ -510,17 +546,19 @@ func TestUDPProxyUpdateDelete(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) + listenIP := "0.0.0.0" + p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest) if err != nil { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) + servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP} + svcInfo, err := p.addServicePortPortal(servicePortPortalName, "UDP", listenIP, 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } - conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort)) + conn, err := net.Dial("udp", joinHostPort("", getPortNum(t, svcInfo.socket.Addr().String()))) if err != nil { t.Fatalf("error connecting to proxy: %v", err) } @@ -528,7 +566,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) { waitForNumProxyLoops(t, p, 1) p.OnServiceUpdate([]api.Service{}) - if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { + if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { t.Fatalf(err.Error()) } waitForNumProxyLoops(t, p, 0) @@ -546,17 +584,19 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { } lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) + listenIP := "0.0.0.0" + p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest) if err != nil { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) + servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP} + svcInfo, err := p.addServicePortPortal(servicePortPortalName, "TCP", listenIP, 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } - conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort)) + conn, err := net.Dial("tcp", joinHostPort("", getPortNum(t, svcInfo.socket.Addr().String()))) if err != nil { t.Fatalf("error connecting to proxy: %v", err) } @@ -564,7 +604,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { waitForNumProxyLoops(t, p, 1) p.OnServiceUpdate([]api.Service{}) - if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { + if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { t.Fatalf(err.Error()) } waitForNumProxyLoops(t, p, 0) @@ -573,17 +613,17 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) p.OnServiceUpdate([]api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ + Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ Name: "p", - Port: int32(svcInfo.proxyPort), + Port: int32(getPortNum(t, svcInfo.socket.Addr().String())), Protocol: "TCP", }}}, }}) - svcInfo, exists := p.getServiceInfo(service) + svcInfo, exists := p.getServiceInfo(servicePortPortalName) if !exists { - t.Fatalf("can't find serviceInfo for %s", service) + t.Fatalf("can't find serviceInfo for %s", servicePortPortalName) } - testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) + testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String())) waitForNumProxyLoops(t, p, 1) } @@ -599,17 +639,19 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { } lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) + listenIP := "0.0.0.0" + p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest) if err != nil { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) + servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP} + svcInfo, err := p.addServicePortPortal(servicePortPortalName, "UDP", listenIP, 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } - conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort)) + conn, err := net.Dial("udp", joinHostPort("", getPortNum(t, svcInfo.socket.Addr().String()))) if err != nil { t.Fatalf("error connecting to proxy: %v", err) } @@ -617,7 +659,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { waitForNumProxyLoops(t, p, 1) p.OnServiceUpdate([]api.Service{}) - if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { + if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { t.Fatalf(err.Error()) } waitForNumProxyLoops(t, p, 0) @@ -626,17 +668,17 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) p.OnServiceUpdate([]api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ + Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ Name: "p", - Port: int32(svcInfo.proxyPort), + Port: int32(getPortNum(t, svcInfo.socket.Addr().String())), Protocol: "UDP", }}}, }}) - svcInfo, exists := p.getServiceInfo(service) + svcInfo, exists := p.getServiceInfo(servicePortPortalName) if !exists { - t.Fatalf("can't find serviceInfo") + t.Fatalf("can't find serviceInfo for %s", servicePortPortalName) } - testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) + testEchoUDP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String())) waitForNumProxyLoops(t, p, 1) } @@ -653,36 +695,38 @@ func TestTCPProxyUpdatePort(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) + listenIP := "0.0.0.0" + p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest) if err != nil { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) + servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP} + svcInfo, err := p.addServicePortPortal(servicePortPortalName, "TCP", listenIP, 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } - testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) + testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String())) waitForNumProxyLoops(t, p, 1) p.OnServiceUpdate([]api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ + Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ Name: "p", - Port: 99, + Port: 0, Protocol: "TCP", }}}, }}) // Wait for the socket to actually get free. - if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { + if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { t.Fatalf(err.Error()) } - svcInfo, exists := p.getServiceInfo(service) + svcInfo, exists := p.getServiceInfo(servicePortPortalName) if !exists { - t.Fatalf("can't find serviceInfo") + t.Fatalf("can't find serviceInfo for %s", servicePortPortalName) } - testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) + testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String())) // This is a bit async, but this should be sufficient. time.Sleep(500 * time.Millisecond) waitForNumProxyLoops(t, p, 1) @@ -701,13 +745,15 @@ func TestUDPProxyUpdatePort(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) + listenIP := "0.0.0.0" + p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest) if err != nil { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) + servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP} + svcInfo, err := p.addServicePortPortal(servicePortPortalName, "UDP", listenIP, 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -715,21 +761,21 @@ func TestUDPProxyUpdatePort(t *testing.T) { p.OnServiceUpdate([]api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ + Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ Name: "p", - Port: 99, + Port: 0, Protocol: "UDP", }}}, }}) // Wait for the socket to actually get free. - if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { + if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { t.Fatalf(err.Error()) } - svcInfo, exists := p.getServiceInfo(service) + svcInfo, exists := p.getServiceInfo(servicePortPortalName) if !exists { - t.Fatalf("can't find serviceInfo") + t.Fatalf("can't find serviceInfo for %s", servicePortPortalName) } - testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) + testEchoUDP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String())) waitForNumProxyLoops(t, p, 1) } @@ -746,17 +792,19 @@ func TestProxyUpdatePublicIPs(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) + listenIP := "0.0.0.0" + p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest) if err != nil { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) + servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP} + svcInfo, err := p.addServicePortPortal(servicePortPortalName, "TCP", listenIP, 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } - testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) + testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String())) waitForNumProxyLoops(t, p, 1) p.OnServiceUpdate([]api.Service{{ @@ -767,19 +815,19 @@ func TestProxyUpdatePublicIPs(t *testing.T) { Port: int32(svcInfo.portal.port), Protocol: "TCP", }}, - ClusterIP: svcInfo.portal.ip.String(), - ExternalIPs: []string{"4.3.2.1"}, + ClusterIP: svcInfo.portal.ip, + ExternalIPs: []string{"0.0.0.0"}, }, }}) // Wait for the socket to actually get free. - if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { + if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { t.Fatalf(err.Error()) } - svcInfo, exists := p.getServiceInfo(service) + svcInfo, exists := p.getServiceInfo(servicePortPortalName) if !exists { - t.Fatalf("can't find serviceInfo") + t.Fatalf("can't find serviceInfo for %s", servicePortPortalName) } - testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) + testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String())) // This is a bit async, but this should be sufficient. time.Sleep(500 * time.Millisecond) waitForNumProxyLoops(t, p, 1) @@ -797,28 +845,30 @@ func TestProxyUpdatePortal(t *testing.T) { } lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) + listenIP := "0.0.0.0" + p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest) if err != nil { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) + servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP} + svcInfo, err := p.addServicePortPortal(servicePortPortalName, "TCP", listenIP, 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } - testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) + testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String())) waitForNumProxyLoops(t, p, 1) p.OnServiceUpdate([]api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "", Ports: []api.ServicePort{{ Name: "p", - Port: int32(svcInfo.proxyPort), + Port: int32(svcInfo.portal.port), Protocol: "TCP", }}}, }}) - _, exists := p.getServiceInfo(service) + _, exists := p.getServiceInfo(servicePortPortalName) if exists { t.Fatalf("service with empty ClusterIP should not be included in the proxy") } @@ -827,29 +877,29 @@ func TestProxyUpdatePortal(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "None", Ports: []api.ServicePort{{ Name: "p", - Port: int32(svcInfo.proxyPort), + Port: int32(getPortNum(t, svcInfo.socket.Addr().String())), Protocol: "TCP", }}}, }}) - _, exists = p.getServiceInfo(service) + _, exists = p.getServiceInfo(servicePortPortalName) if exists { t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy") } p.OnServiceUpdate([]api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ + Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ Name: "p", - Port: int32(svcInfo.proxyPort), + Port: int32(svcInfo.portal.port), Protocol: "TCP", }}}, }}) lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) - svcInfo, exists = p.getServiceInfo(service) + svcInfo, exists = p.getServiceInfo(servicePortPortalName) if !exists { t.Fatalf("service with ClusterIP set not found in the proxy") } - testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) + testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String())) waitForNumProxyLoops(t, p, 1) } diff --git a/pkg/proxy/winuserspace/proxysocket.go b/pkg/proxy/winuserspace/proxysocket.go index 63012387ca2e..2cf61241d2a9 100644 --- a/pkg/proxy/winuserspace/proxysocket.go +++ b/pkg/proxy/winuserspace/proxysocket.go @@ -41,7 +41,7 @@ type proxySocket interface { // while sessions are active. Close() error // ProxyLoop proxies incoming connections for the specified service to the service endpoints. - ProxyLoop(service proxy.ServicePortPortalName, info *serviceInfo, proxier *Proxier) + ProxyLoop(service ServicePortPortalName, info *serviceInfo, proxier *Proxier) // ListenPort returns the host port that the proxySocket is listening on ListenPort() int } @@ -87,10 +87,16 @@ func (tcp *tcpProxySocket) ListenPort() int { return tcp.port } -func tryConnect(service proxy.ServicePortPortalName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) { +func tryConnect(service ServicePortPortalName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) { sessionAffinityReset := false for _, dialTimeout := range endpointDialTimeout { - servicePortName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port} + servicePortName := proxy.ServicePortName{ + NamespacedName: types.NamespacedName{ + Namespace: service.Namespace, + Name: service.Name, + }, + Port: service.Port, + } endpoint, err := proxier.loadBalancer.NextEndpoint(servicePortName, srcAddr, sessionAffinityReset) if err != nil { glog.Errorf("Couldn't find an endpoint for %s: %v", service, err) @@ -113,7 +119,7 @@ func tryConnect(service proxy.ServicePortPortalName, srcAddr net.Addr, protocol return nil, fmt.Errorf("failed to connect to an endpoint.") } -func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortPortalName, myInfo *serviceInfo, proxier *Proxier) { +func (tcp *tcpProxySocket) ProxyLoop(service ServicePortPortalName, myInfo *serviceInfo, proxier *Proxier) { for { if !myInfo.isAlive() { // The service port was closed or replaced. @@ -199,7 +205,7 @@ func newClientCache() *clientCache { return &clientCache{clients: map[string]net.Conn{}} } -func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortPortalName, myInfo *serviceInfo, proxier *Proxier) { +func (udp *udpProxySocket) ProxyLoop(service ServicePortPortalName, myInfo *serviceInfo, proxier *Proxier) { var buffer [4096]byte // 4KiB should be enough for most whole-packets for { if !myInfo.isAlive() { @@ -243,7 +249,7 @@ func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortPortalName, myInfo } } -func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service proxy.ServicePortPortalName, timeout time.Duration) (net.Conn, error) { +func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service ServicePortPortalName, timeout time.Duration) (net.Conn, error) { activeClients.mu.Lock() defer activeClients.mu.Unlock() diff --git a/pkg/proxy/winuserspace/types.go b/pkg/proxy/winuserspace/types.go new file mode 100644 index 000000000000..cf31999704cd --- /dev/null +++ b/pkg/proxy/winuserspace/types.go @@ -0,0 +1,35 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package winuserspace + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/types" +) + +// ServicePortPortalName carries a namespace + name + portname + portalip. This is the unique +// identifier for a windows service port portal. +type ServicePortPortalName struct { + types.NamespacedName + Port string + PortalIPName string +} + +func (spn ServicePortPortalName) String() string { + return fmt.Sprintf("%s:%s:%s", spn.NamespacedName.String(), spn.Port, spn.PortalIPName) +} diff --git a/pkg/util/netsh/netsh.go b/pkg/util/netsh/netsh.go index 39e2058460ef..59d1a18b61b2 100644 --- a/pkg/util/netsh/netsh.go +++ b/pkg/util/netsh/netsh.go @@ -167,12 +167,12 @@ func (runner *runner) DeleteIPAddress(args []string) error { // GetInterfaceToAddIP returns the interface name where Service IP needs to be added // IP Address needs to be added for netsh portproxy to redirect traffic -// Reads Environment variable INTERFACE_TO_ADD_SERVICE_IP, if it is not defined then "vEthernet (forwarder)" is returned +// Reads Environment variable INTERFACE_TO_ADD_SERVICE_IP, if it is not defined then "vEthernet (HNS Internal NIC)" is returned func (runner *runner) GetInterfaceToAddIP() string { if iface := os.Getenv("INTERFACE_TO_ADD_SERVICE_IP"); len(iface) > 0 { return iface } - return "vEthernet (forwarder)" + return "vEthernet (HNS Internal NIC)" } // Restore is part of Interface. From 7e2c71f698a83595ea5b33d7af68b66c7fcd0e3d Mon Sep 17 00:00:00 2001 From: Anthony Howe Date: Tue, 28 Feb 2017 02:43:35 +0000 Subject: [PATCH 3/4] per Jenkin's test instructions run Run ./hack/update-bazel.sh --- pkg/proxy/winuserspace/BUILD | 3 --- 1 file changed, 3 deletions(-) diff --git a/pkg/proxy/winuserspace/BUILD b/pkg/proxy/winuserspace/BUILD index 9dc5e6188ffb..e5921605acd2 100644 --- a/pkg/proxy/winuserspace/BUILD +++ b/pkg/proxy/winuserspace/BUILD @@ -26,10 +26,8 @@ go_library( "//pkg/util/slice:go_default_library", "//vendor:github.com/golang/glog", "//vendor:k8s.io/apimachinery/pkg/types", - "//vendor:k8s.io/apimachinery/pkg/util/errors", "//vendor:k8s.io/apimachinery/pkg/util/net", "//vendor:k8s.io/apimachinery/pkg/util/runtime", - "//vendor:k8s.io/apimachinery/pkg/util/wait", ], ) @@ -47,7 +45,6 @@ go_test( "//pkg/util/netsh/testing:go_default_library", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/types", - "//vendor:k8s.io/apimachinery/pkg/util/net", "//vendor:k8s.io/apimachinery/pkg/util/runtime", ], ) From b9dfb69dd7679ae31bad1fb4980561fea22f8ed4 Mon Sep 17 00:00:00 2001 From: Jiangtian Li Date: Sat, 11 Feb 2017 15:19:40 -0800 Subject: [PATCH 4/4] Fix DNS suffix search list issue for Windows container and workaround in kube-proxy. kube-proxy iterates over DNS suffix search list and appends to DNS query for client. --- pkg/proxy/winuserspace/BUILD | 3 + pkg/proxy/winuserspace/proxier.go | 2 + pkg/proxy/winuserspace/proxysocket.go | 467 ++++++++++++++++++++- pkg/proxy/winuserspace/proxysocket_test.go | 129 ++++++ pkg/util/BUILD | 1 + pkg/util/ipconfig/BUILD | 43 ++ pkg/util/ipconfig/doc.go | 18 + pkg/util/ipconfig/ipconfig.go | 99 +++++ pkg/util/ipconfig/ipconfig_test.go | 33 ++ 9 files changed, 780 insertions(+), 15 deletions(-) create mode 100644 pkg/proxy/winuserspace/proxysocket_test.go create mode 100644 pkg/util/ipconfig/BUILD create mode 100644 pkg/util/ipconfig/doc.go create mode 100644 pkg/util/ipconfig/ipconfig.go create mode 100644 pkg/util/ipconfig/ipconfig_test.go diff --git a/pkg/proxy/winuserspace/BUILD b/pkg/proxy/winuserspace/BUILD index e5921605acd2..77e619950459 100644 --- a/pkg/proxy/winuserspace/BUILD +++ b/pkg/proxy/winuserspace/BUILD @@ -22,6 +22,8 @@ go_library( deps = [ "//pkg/api:go_default_library", "//pkg/proxy:go_default_library", + "//pkg/util/exec:go_default_library", + "//pkg/util/ipconfig:go_default_library", "//pkg/util/netsh:go_default_library", "//pkg/util/slice:go_default_library", "//vendor:github.com/golang/glog", @@ -35,6 +37,7 @@ go_test( name = "go_default_test", srcs = [ "proxier_test.go", + "proxysocket_test.go", "roundrobin_test.go", ], library = ":go_default_library", diff --git a/pkg/proxy/winuserspace/proxier.go b/pkg/proxy/winuserspace/proxier.go index f77ddbf0c30e..86bd9083a2d1 100644 --- a/pkg/proxy/winuserspace/proxier.go +++ b/pkg/proxy/winuserspace/proxier.go @@ -50,6 +50,7 @@ type serviceInfo struct { socket proxySocket timeout time.Duration activeClients *clientCache + dnsClients *dnsClientCache sessionAffinityType api.ServiceAffinity } @@ -260,6 +261,7 @@ func (proxier *Proxier) addServicePortPortal(servicePortPortalName ServicePortPo socket: sock, timeout: timeout, activeClients: newClientCache(), + dnsClients: newDnsClientCache(), sessionAffinityType: api.ServiceAffinityNone, // default } proxier.setServiceInfo(servicePortPortalName, si) diff --git a/pkg/proxy/winuserspace/proxysocket.go b/pkg/proxy/winuserspace/proxysocket.go index 2cf61241d2a9..2f0b361dc4f6 100644 --- a/pkg/proxy/winuserspace/proxysocket.go +++ b/pkg/proxy/winuserspace/proxysocket.go @@ -17,12 +17,14 @@ limitations under the License. package winuserspace import ( + "encoding/binary" "fmt" "io" "net" "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/golang/glog" @@ -30,6 +32,36 @@ import ( "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/proxy" + "k8s.io/kubernetes/pkg/util/exec" + "k8s.io/kubernetes/pkg/util/ipconfig" +) + +const ( + // Kubernetes DNS suffix search list + // TODO: Get DNS suffix search list from docker containers. + // --dns-search option doesn't work on Windows containers and has been + // fixed recently in docker. + + // Kubernetes cluster domain + clusterDomain = "cluster.local" + + // Kubernetes service domain + serviceDomain = "svc." + clusterDomain + + // Kubernetes default namespace domain + namespaceServiceDomain = "default." + serviceDomain + + // Kubernetes DNS service port name + dnsPortName = "dns" + + // DNS TYPE value A (a host address) + dnsTypeA uint16 = 0x01 + + // DNS TYPE value AAAA (a host IPv6 address) + dnsTypeAAAA uint16 = 0x1c + + // DNS CLASS value IN (the Internet) + dnsClassInternet uint16 = 0x01 ) // Abstraction over TCP/UDP sockets which are proxied. @@ -205,8 +237,399 @@ func newClientCache() *clientCache { return &clientCache{clients: map[string]net.Conn{}} } +// TODO: use Go net dnsmsg library to walk DNS message format +// DNS packet header +type dnsHeader struct { + id uint16 + bits uint16 + qdCount uint16 + anCount uint16 + nsCount uint16 + arCount uint16 +} + +// DNS domain name +type dnsDomainName struct { + name string +} + +// DNS packet question section +type dnsQuestion struct { + qName dnsDomainName + qType uint16 + qClass uint16 +} + +// DNS message, only interested in question now +type dnsMsg struct { + header dnsHeader + question []dnsQuestion +} + +type dnsStruct interface { + walk(f func(field interface{}) (ok bool)) (ok bool) +} + +func (header *dnsHeader) walk(f func(field interface{}) bool) bool { + return f(&header.id) && + f(&header.bits) && + f(&header.qdCount) && + f(&header.anCount) && + f(&header.nsCount) && + f(&header.arCount) +} + +func (question *dnsQuestion) walk(f func(field interface{}) bool) bool { + return f(&question.qName) && + f(&question.qType) && + f(&question.qClass) +} + +func packDomainName(name string, buffer []byte, index int) (newIndex int, ok bool) { + if name == "" { + buffer[index] = 0 + index++ + return index, true + } + + // one more dot plus trailing 0 + if index+len(name)+2 > len(buffer) { + return len(buffer), false + } + + domains := strings.Split(name, ".") + for _, domain := range domains { + domainLen := len(domain) + if domainLen == 0 { + return len(buffer), false + } + buffer[index] = byte(domainLen) + index++ + copy(buffer[index:index+domainLen], domain) + index += domainLen + } + + buffer[index] = 0 + index++ + return index, true +} + +func unpackDomainName(buffer []byte, index int) (name string, newIndex int, ok bool) { + name = "" + + for index < len(buffer) { + cnt := int(buffer[index]) + index++ + if cnt == 0 { + break + } + + if index+cnt > len(buffer) { + return "", len(buffer), false + } + if name != "" { + name += "." + } + name += string(buffer[index : index+cnt]) + index += cnt + } + + if index >= len(buffer) { + return "", len(buffer), false + } + return name, index, true +} + +func packStruct(any dnsStruct, buffer []byte, index int) (newIndex int, ok bool) { + ok = any.walk(func(field interface{}) bool { + switch value := field.(type) { + case *uint16: + if index+2 > len(buffer) { + return false + } + binary.BigEndian.PutUint16(buffer[index:index+2], *value) + index += 2 + return true + case *dnsDomainName: + index, ok = packDomainName((*value).name, buffer, index) + return ok + default: + return false + } + }) + + if !ok { + return len(buffer), false + } + return index, true +} + +func unpackStruct(any dnsStruct, buffer []byte, index int) (newIndex int, ok bool) { + ok = any.walk(func(field interface{}) bool { + switch value := field.(type) { + case *uint16: + if index+2 > len(buffer) { + return false + } + *value = binary.BigEndian.Uint16(buffer[index : index+2]) + index += 2 + return true + case *dnsDomainName: + (*value).name, index, ok = unpackDomainName(buffer, index) + return ok + default: + return false + } + }) + + if !ok { + return len(buffer), false + } + return index, true +} + +// Pack the message structure into buffer +func (msg *dnsMsg) packDnsMsg(buffer []byte) (length int, ok bool) { + index := 0 + + if index, ok = packStruct(&msg.header, buffer, index); !ok { + return len(buffer), false + } + + for i := 0; i < len(msg.question); i++ { + if index, ok = packStruct(&msg.question[i], buffer, index); !ok { + return len(buffer), false + } + } + return index, true +} + +// Unpack the buffer into the message structure +func (msg *dnsMsg) unpackDnsMsg(buffer []byte) (ok bool) { + index := 0 + + if index, ok = unpackStruct(&msg.header, buffer, index); !ok { + return false + } + + msg.question = make([]dnsQuestion, msg.header.qdCount) + for i := 0; i < len(msg.question); i++ { + if index, ok = unpackStruct(&msg.question[i], buffer, index); !ok { + return false + } + } + return true +} + +// DNS query client classified by address and QTYPE +type dnsClientQuery struct { + clientAddress string + dnsQType uint16 +} + +// Holds DNS client query, the value contains the index in DNS suffix search list, +// the original DNS message and length for the same client and QTYPE +type dnsClientCache struct { + mu sync.Mutex + clients map[dnsClientQuery]*dnsQueryState +} + +type dnsQueryState struct { + searchIndex int32 + msg *dnsMsg +} + +func newDnsClientCache() *dnsClientCache { + return &dnsClientCache{clients: map[dnsClientQuery]*dnsQueryState{}} +} + +func packetRequiresDnsSuffix(dnsType, dnsClass uint16) bool { + return (dnsType == dnsTypeA || dnsType == dnsTypeAAAA) && dnsClass == dnsClassInternet +} + +func isDnsService(portName string) bool { + return portName == dnsPortName +} + +func appendDnsSuffix(msg *dnsMsg, buffer []byte, length int, dnsSuffix string) int { + if msg == nil || len(msg.question) == 0 { + glog.Warning("DNS message parameter is invalid.") + return length + } + + // Save the original name since it will be reused for next iteration + origName := msg.question[0].qName.name + if dnsSuffix != "" { + msg.question[0].qName.name += "." + dnsSuffix + } + len, ok := msg.packDnsMsg(buffer) + msg.question[0].qName.name = origName + + if !ok { + glog.Warning("Unable to pack DNS packet.") + return length + } + + return len +} + +func processUnpackedDnsQueryPacket(dnsClients *dnsClientCache, msg *dnsMsg, host string, dnsQType uint16, buffer []byte, length int, dnsSearch []string) int { + if dnsSearch == nil || len(dnsSearch) == 0 { + glog.V(1).Infof("DNS search list is not initialized and is empty.") + return length + } + + // TODO: handle concurrent queries from a client + dnsClients.mu.Lock() + state, found := dnsClients.clients[dnsClientQuery{host, dnsQType}] + if !found { + state = &dnsQueryState{0, msg} + dnsClients.clients[dnsClientQuery{host, dnsQType}] = state + } + dnsClients.mu.Unlock() + + index := atomic.SwapInt32(&state.searchIndex, state.searchIndex+1) + // Also update message ID if the client retries due to previous query time out + state.msg.header.id = msg.header.id + + if index < 0 || index >= int32(len(dnsSearch)) { + glog.V(1).Infof("Search index %d is out of range.", index) + return length + } + + length = appendDnsSuffix(msg, buffer, length, dnsSearch[index]) + return length +} + +func processUnpackedDnsResponsePacket(svrConn net.Conn, dnsClients *dnsClientCache, rcode uint16, host string, dnsQType uint16, buffer []byte, length int, dnsSearch []string) bool { + var drop bool + if dnsSearch == nil || len(dnsSearch) == 0 { + glog.V(1).Infof("DNS search list is not initialized and is empty.") + return drop + } + + dnsClients.mu.Lock() + state, found := dnsClients.clients[dnsClientQuery{host, dnsQType}] + dnsClients.mu.Unlock() + + if found { + index := atomic.SwapInt32(&state.searchIndex, state.searchIndex+1) + if rcode != 0 && index >= 0 && index < int32(len(dnsSearch)) { + // If the reponse has failure and iteration through the search list has not + // reached the end, retry on behalf of the client using the original query message + drop = true + length = appendDnsSuffix(state.msg, buffer, length, dnsSearch[index]) + + _, err := svrConn.Write(buffer[0:length]) + if err != nil { + if !logTimeout(err) { + glog.Errorf("Write failed: %v", err) + } + } + } else { + dnsClients.mu.Lock() + delete(dnsClients.clients, dnsClientQuery{host, dnsQType}) + dnsClients.mu.Unlock() + } + } + + return drop +} + +func processDnsQueryPacket(dnsClients *dnsClientCache, cliAddr net.Addr, buffer []byte, length int, dnsSearch []string) int { + msg := &dnsMsg{} + if !msg.unpackDnsMsg(buffer[:length]) { + glog.Warning("Unable to unpack DNS packet.") + return length + } + + // Query - Response bit that specifies whether this message is a query (0) or a response (1). + qr := msg.header.bits & 0x8000 + if qr != 0 { + glog.Warning("DNS packet should be a query message.") + return length + } + + // QDCOUNT + if msg.header.qdCount != 1 { + glog.V(1).Infof("Number of entries in the question section of the DNS packet is: %d", msg.header.qdCount) + glog.V(1).Infof("DNS suffix appending does not support more than one question.") + return length + } + + // ANCOUNT, NSCOUNT, ARCOUNT + if msg.header.anCount != 0 || msg.header.nsCount != 0 || msg.header.arCount != 0 { + glog.V(1).Infof("DNS packet contains more than question section.") + return length + } + + dnsQType := msg.question[0].qType + dnsQClass := msg.question[0].qClass + if packetRequiresDnsSuffix(dnsQType, dnsQClass) { + host, _, err := net.SplitHostPort(cliAddr.String()) + if err != nil { + glog.V(1).Infof("Failed to get host from client address: %v", err) + host = cliAddr.String() + } + + length = processUnpackedDnsQueryPacket(dnsClients, msg, host, dnsQType, buffer, length, dnsSearch) + } + + return length +} + +func processDnsResponsePacket(svrConn net.Conn, dnsClients *dnsClientCache, cliAddr net.Addr, buffer []byte, length int, dnsSearch []string) bool { + var drop bool + msg := &dnsMsg{} + if !msg.unpackDnsMsg(buffer[:length]) { + glog.Warning("Unable to unpack DNS packet.") + return drop + } + + // Query - Response bit that specifies whether this message is a query (0) or a response (1). + qr := msg.header.bits & 0x8000 + if qr == 0 { + glog.Warning("DNS packet should be a response message.") + return drop + } + + // QDCOUNT + if msg.header.qdCount != 1 { + glog.V(1).Infof("Number of entries in the reponse section of the DNS packet is: %d", msg.header.qdCount) + return drop + } + + dnsQType := msg.question[0].qType + dnsQClass := msg.question[0].qClass + if packetRequiresDnsSuffix(dnsQType, dnsQClass) { + host, _, err := net.SplitHostPort(cliAddr.String()) + if err != nil { + glog.V(1).Infof("Failed to get host from client address: %v", err) + host = cliAddr.String() + } + + rcode := msg.header.bits & 0xf + drop = processUnpackedDnsResponsePacket(svrConn, dnsClients, rcode, host, dnsQType, buffer, length, dnsSearch) + } + + return drop +} + func (udp *udpProxySocket) ProxyLoop(service ServicePortPortalName, myInfo *serviceInfo, proxier *Proxier) { var buffer [4096]byte // 4KiB should be enough for most whole-packets + var dnsSearch []string + if isDnsService(service.Port) { + dnsSearch = []string{"", namespaceServiceDomain, serviceDomain, clusterDomain} + execer := exec.New() + ipconfigInterface := ipconfig.New(execer) + suffixList, err := ipconfigInterface.GetDnsSuffixSearchList() + if err == nil { + for _, suffix := range suffixList { + dnsSearch = append(dnsSearch, suffix) + } + } + } + for { if !myInfo.isAlive() { // The service port was closed or replaced. @@ -226,8 +649,14 @@ func (udp *udpProxySocket) ProxyLoop(service ServicePortPortalName, myInfo *serv glog.Errorf("ReadFrom failed, exiting ProxyLoop: %v", err) break } + + // If this is DNS query packet + if isDnsService(service.Port) { + n = processDnsQueryPacket(myInfo.dnsClients, cliAddr, buffer[:], n, dnsSearch) + } + // If this is a client we know already, reuse the connection and goroutine. - svrConn, err := udp.getBackendConn(myInfo.activeClients, cliAddr, proxier, service, myInfo.timeout) + svrConn, err := udp.getBackendConn(myInfo.activeClients, myInfo.dnsClients, cliAddr, proxier, service, myInfo.timeout, dnsSearch) if err != nil { continue } @@ -249,7 +678,7 @@ func (udp *udpProxySocket) ProxyLoop(service ServicePortPortalName, myInfo *serv } } -func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service ServicePortPortalName, timeout time.Duration) (net.Conn, error) { +func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, dnsClients *dnsClientCache, cliAddr net.Addr, proxier *Proxier, service ServicePortPortalName, timeout time.Duration, dnsSearch []string) (net.Conn, error) { activeClients.mu.Lock() defer activeClients.mu.Unlock() @@ -268,17 +697,17 @@ func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr ne return nil, err } activeClients.clients[cliAddr.String()] = svrConn - go func(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, timeout time.Duration) { + go func(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, dnsClients *dnsClientCache, service ServicePortPortalName, timeout time.Duration, dnsSearch []string) { defer runtime.HandleCrash() - udp.proxyClient(cliAddr, svrConn, activeClients, timeout) - }(cliAddr, svrConn, activeClients, timeout) + udp.proxyClient(cliAddr, svrConn, activeClients, dnsClients, service, timeout, dnsSearch) + }(cliAddr, svrConn, activeClients, dnsClients, service, timeout, dnsSearch) } return svrConn, nil } // This function is expected to be called as a goroutine. // TODO: Track and log bytes copied, like TCP -func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, timeout time.Duration) { +func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, dnsClients *dnsClientCache, service ServicePortPortalName, timeout time.Duration, dnsSearch []string) { defer svrConn.Close() var buffer [4096]byte for { @@ -289,17 +718,25 @@ func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activ } break } - err = svrConn.SetDeadline(time.Now().Add(timeout)) - if err != nil { - glog.Errorf("SetDeadline failed: %v", err) - break + + drop := false + if isDnsService(service.Port) { + drop = processDnsResponsePacket(svrConn, dnsClients, cliAddr, buffer[:], n, dnsSearch) } - n, err = udp.WriteTo(buffer[0:n], cliAddr) - if err != nil { - if !logTimeout(err) { - glog.Errorf("WriteTo failed: %v", err) + + if !drop { + err = svrConn.SetDeadline(time.Now().Add(timeout)) + if err != nil { + glog.Errorf("SetDeadline failed: %v", err) + break + } + n, err = udp.WriteTo(buffer[0:n], cliAddr) + if err != nil { + if !logTimeout(err) { + glog.Errorf("WriteTo failed: %v", err) + } + break } - break } } activeClients.mu.Lock() diff --git a/pkg/proxy/winuserspace/proxysocket_test.go b/pkg/proxy/winuserspace/proxysocket_test.go new file mode 100644 index 000000000000..66b94fc97b4f --- /dev/null +++ b/pkg/proxy/winuserspace/proxysocket_test.go @@ -0,0 +1,129 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package winuserspace + +import ( + "reflect" + "testing" +) + +func TestPackUnpackDnsMsgUnqualifiedName(t *testing.T) { + msg := &dnsMsg{} + var buffer [4096]byte + + msg.header.id = 1 + msg.header.qdCount = 1 + msg.question = make([]dnsQuestion, msg.header.qdCount) + msg.question[0].qClass = 0x01 + msg.question[0].qType = 0x01 + msg.question[0].qName.name = "kubernetes" + + length, ok := msg.packDnsMsg(buffer[:]) + if !ok { + t.Errorf("Pack DNS message failed.") + } + + unpackedMsg := &dnsMsg{} + if !unpackedMsg.unpackDnsMsg(buffer[:length]) { + t.Errorf("Unpack DNS message failed.") + } + + if !reflect.DeepEqual(msg, unpackedMsg) { + t.Errorf("Pack and Unpack DNS message are not consistent.") + } +} + +func TestPackUnpackDnsMsgFqdn(t *testing.T) { + msg := &dnsMsg{} + var buffer [4096]byte + + msg.header.id = 1 + msg.header.qdCount = 1 + msg.question = make([]dnsQuestion, msg.header.qdCount) + msg.question[0].qClass = 0x01 + msg.question[0].qType = 0x01 + msg.question[0].qName.name = "kubernetes.default.svc.cluster.local" + + length, ok := msg.packDnsMsg(buffer[:]) + if !ok { + t.Errorf("Pack DNS message failed.") + } + + unpackedMsg := &dnsMsg{} + if !unpackedMsg.unpackDnsMsg(buffer[:length]) { + t.Errorf("Unpack DNS message failed.") + } + + if !reflect.DeepEqual(msg, unpackedMsg) { + t.Errorf("Pack and Unpack DNS message are not consistent.") + } +} + +func TestPackUnpackDnsMsgEmptyName(t *testing.T) { + msg := &dnsMsg{} + var buffer [4096]byte + + msg.header.id = 1 + msg.header.qdCount = 1 + msg.question = make([]dnsQuestion, msg.header.qdCount) + msg.question[0].qClass = 0x01 + msg.question[0].qType = 0x01 + msg.question[0].qName.name = "" + + length, ok := msg.packDnsMsg(buffer[:]) + if !ok { + t.Errorf("Pack DNS message failed.") + } + + unpackedMsg := &dnsMsg{} + if !unpackedMsg.unpackDnsMsg(buffer[:length]) { + t.Errorf("Unpack DNS message failed.") + } + + if !reflect.DeepEqual(msg, unpackedMsg) { + t.Errorf("Pack and Unpack DNS message are not consistent.") + } +} + +func TestPackUnpackDnsMsgMultipleQuestions(t *testing.T) { + msg := &dnsMsg{} + var buffer [4096]byte + + msg.header.id = 1 + msg.header.qdCount = 2 + msg.question = make([]dnsQuestion, msg.header.qdCount) + msg.question[0].qClass = 0x01 + msg.question[0].qType = 0x01 + msg.question[0].qName.name = "kubernetes" + msg.question[1].qClass = 0x01 + msg.question[1].qType = 0x1c + msg.question[1].qName.name = "kubernetes.default" + + length, ok := msg.packDnsMsg(buffer[:]) + if !ok { + t.Errorf("Pack DNS message failed.") + } + + unpackedMsg := &dnsMsg{} + if !unpackedMsg.unpackDnsMsg(buffer[:length]) { + t.Errorf("Unpack DNS message failed.") + } + + if !reflect.DeepEqual(msg, unpackedMsg) { + t.Errorf("Pack and Unpack DNS message are not consistent.") + } +} diff --git a/pkg/util/BUILD b/pkg/util/BUILD index 179c9614e372..d5f7ae7e40f2 100644 --- a/pkg/util/BUILD +++ b/pkg/util/BUILD @@ -65,6 +65,7 @@ filegroup( "//pkg/util/interrupt:all-srcs", "//pkg/util/intstr:all-srcs", "//pkg/util/io:all-srcs", + "//pkg/util/ipconfig:all-srcs", "//pkg/util/iptables:all-srcs", "//pkg/util/json:all-srcs", "//pkg/util/keymutex:all-srcs", diff --git a/pkg/util/ipconfig/BUILD b/pkg/util/ipconfig/BUILD new file mode 100644 index 000000000000..78cde65e574a --- /dev/null +++ b/pkg/util/ipconfig/BUILD @@ -0,0 +1,43 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_library( + name = "go_default_library", + srcs = [ + "doc.go", + "ipconfig.go", + ], + tags = ["automanaged"], + deps = [ + "//pkg/util/exec:go_default_library", + "//vendor:github.com/golang/glog", + ], +) + +go_test( + name = "go_default_test", + srcs = ["ipconfig_test.go"], + library = ":go_default_library", + tags = ["automanaged"], + deps = ["//pkg/util/exec:go_default_library"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/pkg/util/ipconfig/doc.go b/pkg/util/ipconfig/doc.go new file mode 100644 index 000000000000..49a14534a1ea --- /dev/null +++ b/pkg/util/ipconfig/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package ipconfig provides an interface and implementations for running Windows ipconfig commands. +package ipconfig // import "k8s.io/kubernetes/pkg/util/ipconfig" diff --git a/pkg/util/ipconfig/ipconfig.go b/pkg/util/ipconfig/ipconfig.go new file mode 100644 index 000000000000..ce1571719f62 --- /dev/null +++ b/pkg/util/ipconfig/ipconfig.go @@ -0,0 +1,99 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipconfig + +import ( + "runtime" + "strings" + "sync" + + "github.com/golang/glog" + + utilexec "k8s.io/kubernetes/pkg/util/exec" +) + +// Interface is an injectable interface for running ipconfig commands. Implementations must be goroutine-safe. +type Interface interface { + // GetDnsSuffixSearchList returns the list of DNS suffix to search + GetDnsSuffixSearchList() ([]string, error) +} + +const ( + cmdIpconfig string = "ipconfig" + + cmdDefaultArgs string = "/all" + + dnsSuffixSearchLisLabel string = "DNS Suffix Search List" +) + +// runner implements Interface in terms of exec("ipconfig"). +type runner struct { + mu sync.Mutex + exec utilexec.Interface +} + +// New returns a new Interface which will exec ipconfig. +func New(exec utilexec.Interface) Interface { + runner := &runner{ + exec: exec, + } + return runner +} + +// GetDnsSuffixSearchList returns the list of DNS suffix to search +func (runner *runner) GetDnsSuffixSearchList() ([]string, error) { + // Parse the DNS suffix search list from ipconfig output + // ipconfig /all on Windows displays the entry of DNS suffix search list + // An example output contains: + // + // DNS Suffix Search List. . . . . . : example1.com + // example2.com + // + // TODO: this does not work when the label is localized + suffixList := []string{} + if runtime.GOOS != "windows" { + glog.V(1).Infof("ipconfig not supported on GOOS=%s", runtime.GOOS) + return suffixList, nil + } + + out, err := runner.exec.Command(cmdIpconfig, cmdDefaultArgs).Output() + + if err == nil { + lines := strings.Split(string(out), "\n") + for i, line := range lines { + if trimmed := strings.TrimSpace(line); strings.HasPrefix(trimmed, dnsSuffixSearchLisLabel) { + if parts := strings.Split(trimmed, ":"); len(parts) > 1 { + if trimmed := strings.TrimSpace(parts[1]); trimmed != "" { + suffixList = append(suffixList, strings.TrimSpace(parts[1])) + } + for j := i + 1; j < len(lines); j++ { + if trimmed := strings.TrimSpace(lines[j]); trimmed != "" && !strings.Contains(trimmed, ":") { + suffixList = append(suffixList, trimmed) + } else { + break + } + } + } + break + } + } + } else { + glog.V(1).Infof("Running %s %s failed: %v", cmdIpconfig, cmdDefaultArgs, err) + } + + return suffixList, err +} diff --git a/pkg/util/ipconfig/ipconfig_test.go b/pkg/util/ipconfig/ipconfig_test.go new file mode 100644 index 000000000000..e378cfba2f4b --- /dev/null +++ b/pkg/util/ipconfig/ipconfig_test.go @@ -0,0 +1,33 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipconfig + +import ( + "testing" + + "k8s.io/kubernetes/pkg/util/exec" +) + +func TestGetDnsSuffixSearchList(t *testing.T) { + // Simple test + ipconfigInterface := New(exec.New()) + + _, err := ipconfigInterface.GetDnsSuffixSearchList() + if err != nil { + t.Errorf("expected success, got %v", err) + } +}