Skip to content

Commit

Permalink
Replacement of the "in-house" listening port creation, by one from k8…
Browse files Browse the repository at this point in the history
…s.io/utils/net.

Kube-proxy and kubelet/cri-o have a rule that when they create an iptables rule redirecting traffic from a host port to a pod, they also create a listening socket on that port, to ensure that there can never be a conflict between any of

* system daemon listening on a port
* hostNetwork pod listening on a port
* NodePort service
* pod with a HostPort

OVN-kubernetes had its proprietary implementation of this functionality. This commit replaces it by a "standard" one from k8s.io/utils/net .

In addition the commit changes bindig procidure for service with ExternalIP defined. Now the listening socket will be bound to the defined external IPs.

Signed-off-by: Alexey Roytman <roytman@il.ibm.com>
  • Loading branch information
roytman committed Oct 28, 2020
1 parent 350943c commit 4efbb59
Show file tree
Hide file tree
Showing 27 changed files with 3,106 additions and 229 deletions.
2 changes: 1 addition & 1 deletion go-controller/go.mod
Expand Up @@ -41,7 +41,7 @@ require (
k8s.io/apimachinery v0.18.6
k8s.io/client-go v0.18.6
k8s.io/klog v1.0.0
k8s.io/utils v0.0.0-20200324210504-a9aa75ae1b89
k8s.io/utils v0.0.0-20201015054608-420da100c033
)

replace github.com/ebay/go-ovn v0.1.0 => github.com/ebay/go-ovn v0.1.1-0.20200810162212-30abed5fb968
5 changes: 5 additions & 0 deletions go-controller/go.sum
Expand Up @@ -134,6 +134,7 @@ github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logr/logr v0.1.0 h1:M1Tv3VzNlEHg6uyACnRdtrploV2P7wZqH8BoQMtz0cg=
github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
github.com/go-openapi/analysis v0.0.0-20180825180245-b006789cd277/go.mod h1:k70tL6pCuVxPJOHXQ+wIac1FUrvNkHolPie/cLEU6hI=
github.com/go-openapi/analysis v0.17.0/go.mod h1:IowGgpVeD0vNm45So8nr+IcQ3pxVtpRoBWb8PVZO0ik=
Expand Down Expand Up @@ -619,10 +620,14 @@ k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUc
k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8=
k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
k8s.io/klog/v2 v2.0.0 h1:Foj74zO6RbjjP4hBEKjnYtjjAhGg4jNynUdYF6fJrok=
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
k8s.io/kube-openapi v0.0.0-20200410145947-61e04a5be9a6 h1:Oh3Mzx5pJ+yIumsAD0MOECPVeXsVot0UkiaCGVyfGQY=
k8s.io/kube-openapi v0.0.0-20200410145947-61e04a5be9a6/go.mod h1:GRQhZsXIAJ1xR0C9bd8UpWHZ5plfAS9fzPjJuQ6JL3E=
k8s.io/utils v0.0.0-20200324210504-a9aa75ae1b89 h1:d4vVOjXm687F1iLSP2q3lyPPuyvTUt3aVoBpi2DqRsU=
k8s.io/utils v0.0.0-20200324210504-a9aa75ae1b89/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
k8s.io/utils v0.0.0-20201015054608-420da100c033 h1:Pqyrvq79s/H2+6GSEIfeVHifPjJ03sVEggHnXw9KRMs=
k8s.io/utils v0.0.0-20201015054608-420da100c033/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.7/go.mod h1:PHgbrJT7lCHcxMU+mDHEm+nx46H4zuuHZkDP6icnhu0=
sigs.k8s.io/structured-merge-diff/v3 v3.0.0-20200116222232-67a7b8c61874/go.mod h1:PlARxl6Hbt/+BC80dRLi1qAmnMqwqDg62YvvVkZjemw=
sigs.k8s.io/structured-merge-diff/v3 v3.0.0 h1:dOmIZBMfhcHS09XZkMyUgkq5trg3/jRyJYFZUiaOp8E=
Expand Down
5 changes: 4 additions & 1 deletion go-controller/pkg/node/gateway_init.go
Expand Up @@ -160,7 +160,10 @@ func (n *OvnNode) initGateway(subnets []*net.IPNet, nodeAnnotator kube.Annotator

if config.Gateway.NodeportEnable {
initLoadBalancerHealthChecker(n.name, n.watchFactory)
initPortClaimWatcher(n.recorder, n.watchFactory)
err := initPortClaimWatcher(n.recorder, n.watchFactory)
if err != nil {
return err
}
}

gatewayNextHops, gatewayIntf, err := getGatewayNextHops()
Expand Down
160 changes: 104 additions & 56 deletions go-controller/pkg/node/port_claim.go
Expand Up @@ -8,41 +8,55 @@ import (

"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/factory"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util"

kapi "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
utilnet "k8s.io/utils/net"
)

type handler func(port int32, protocol kapi.Protocol, svc *kapi.Service) error
type handler func(desc string, ip string, port int32, protocol kapi.Protocol, svc *kapi.Service) error

type localPort interface {
open(port int32, protocol kapi.Protocol, svc *kapi.Service) error
close(port int32, protocol kapi.Protocol, svc *kapi.Service) error
type localPortHandler interface {
open(desc string, ip string, port int32, protocol kapi.Protocol, svc *kapi.Service) error
close(desc string, ip string, port int32, protocol kapi.Protocol, svc *kapi.Service) error
}

var port localPort
var portHandler localPortHandler

type activeSocket interface {
Close() error
}
var portOpener utilnet.PortOpener

type portClaimWatcher struct {
recorder record.EventRecorder
activeSocketsLock sync.Mutex
activeSockets map[kapi.Protocol]map[int32]activeSocket
localAddrSet map[string]net.IPNet
portsMap map[utilnet.LocalPort]utilnet.Closeable
}

func newPortClaimWatcher(recorder record.EventRecorder) localPort {
// Constants for valid LocalHost descriptions:
const (
nodePortDescr = "nodePort for"
externalPortDescr = "externalIP for"
)

func newPortClaimWatcher(recorder record.EventRecorder, localAddrSet map[string]net.IPNet) localPortHandler {
return &portClaimWatcher{
recorder: recorder,
activeSocketsLock: sync.Mutex{},
activeSockets: make(map[kapi.Protocol]map[int32]activeSocket),
portsMap: make(map[utilnet.LocalPort]utilnet.Closeable),
localAddrSet: localAddrSet,
}
}

func initPortClaimWatcher(recorder record.EventRecorder, wf factory.NodeWatchFactory) {
port = newPortClaimWatcher(recorder)
func initPortClaimWatcher(recorder record.EventRecorder, wf factory.NodeWatchFactory) error {
localAddrSet, err := getLocalAddrs()
if err != nil {
return err
}
portHandler = newPortClaimWatcher(recorder, localAddrSet)
portOpener = &utilnet.ListenPortOpener
wf.AddServiceHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
svc := obj.(*kapi.Service)
Expand Down Expand Up @@ -70,41 +84,63 @@ func initPortClaimWatcher(recorder record.EventRecorder, wf factory.NodeWatchFac
}
},
}, nil)
return nil
}

func addServicePortClaim(svc *kapi.Service) []error {
return handleService(svc, port.open)
return handleService(svc, portHandler.open)
}

func deleteServicePortClaim(svc *kapi.Service) []error {
return handleService(svc, port.close)
return handleService(svc, portHandler.close)
}

func handleService(svc *kapi.Service, handler handler) []error {
errors := []error{}
if !util.ServiceTypeHasNodePort(svc) && len(svc.Spec.ExternalIPs) == 0 {
return errors
}

for _, svcPort := range svc.Spec.Ports {
if util.ServiceTypeHasNodePort(svc) {
if err := handlePort(svc, svcPort.NodePort, svcPort.Protocol, handler); err != nil {
klog.V(5).Infof("Handle NodePort service %s port %d", svc.Name, svcPort.NodePort)
if err := handlePort(getDescription(svcPort.Name, svc, true), svc, "", svcPort.NodePort, svcPort.Protocol, handler); err != nil {
errors = append(errors, err)
}
}
if len(svc.Spec.ExternalIPs) > 0 {
if err := handlePort(svc, svcPort.Port, svcPort.Protocol, handler); err != nil {
for _, externalIP := range svc.Spec.ExternalIPs {
klog.V(5).Infof("Handle ExternalIPs service %s external IP %s port %d", svc.Name, externalIP, svcPort.Port)
if err := handlePort(getDescription(svcPort.Name, svc, false), svc, externalIP, svcPort.Port, svcPort.Protocol, handler); err != nil {
errors = append(errors, err)
}
}
}
return errors
}

func handlePort(svc *kapi.Service, port int32, protocol kapi.Protocol, handler handler) error {
// LocalPorts allows to add an arbitrary description, which can be used to distinguish LocalPorts instances having the
// same networking parameters by created for different services.
// kube-proxy and this implementation use the following format of the description: "
// for NodePort services - "nodePort for namespace/name[:portName]
// for services with External IPs - "externalIP for namespace/name[:portName]
func getDescription(portName string, svc *kapi.Service, nodePort bool) string {
svcName := types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name}
prefix := externalPortDescr
if nodePort {
prefix = nodePortDescr
}
if len(portName) == 0 {
return fmt.Sprintf("%s %s", prefix, svcName.String())
} else {
return fmt.Sprintf("%s %s:%s", prefix, svcName.String(), portName)
}
}

func handlePort(desc string, svc *kapi.Service, ip string, port int32, protocol kapi.Protocol, handler handler) error {
if err := util.ValidatePort(protocol, port); err != nil {
return fmt.Errorf("invalid service port %s, err: %v", svc.Name, err)
}
if err := handler(port, protocol, svc); err != nil {
if err := handler(desc, ip, port, protocol, svc); err != nil {
return err
}
return nil
Expand All @@ -120,61 +156,73 @@ func updateServicePortClaim(oldSvc, newSvc *kapi.Service) []error {
return errors
}

func (p *portClaimWatcher) open(port int32, protocol kapi.Protocol, svc *kapi.Service) error {
klog.V(5).Infof("Opening socket for service: %s/%s and port: %v", svc.Namespace, svc.Name, port)
var socket activeSocket
var socketError error
switch protocol {
case kapi.ProtocolTCP:
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
socketError = err
break
}
socket = listener
case kapi.ProtocolUDP:
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", port))
if err != nil {
socketError = err
break
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
socketError = err
break
func (p *portClaimWatcher) open(desc string, ip string, port int32, protocol kapi.Protocol, svc *kapi.Service) error {
klog.V(5).Infof("Opening socket for service: %s/%s, port: %v and protocol %s", svc.Namespace, svc.Name, port, protocol)

if ip != "" {
if _, exists := p.localAddrSet[ip]; !exists {
klog.V(5).Infof("The IP %s is not one of the node local ports", ip)
return nil
}
socket = conn
}
var localPort *utilnet.LocalPort
var portError error
switch protocol {
case kapi.ProtocolTCP, kapi.ProtocolUDP:
localPort, portError = utilnet.NewLocalPort(desc, ip, "", int(port), utilnet.Protocol(protocol))
case kapi.ProtocolSCTP:
// Do not open ports for SCTP, ref: https://github.com/kubernetes/enhancements/blob/master/keps/sig-network/0015-20180614-SCTP-support.md#the-solution-in-the-kubernetes-sctp-support-implementation
return nil
default:
socketError = fmt.Errorf("unknown protocol %q", protocol)
portError = fmt.Errorf("unknown protocol %q", protocol)
}
if socketError != nil {
p.emitPortClaimEvent(svc, port, socketError)
return socketError
if portError != nil {
p.emitPortClaimEvent(svc, port, portError)
return portError
}
klog.V(5).Infof("Opening socket for LocalPort %v", localPort)
p.activeSocketsLock.Lock()
defer p.activeSocketsLock.Unlock()
if _, exists := p.activeSockets[protocol]; exists {
p.activeSockets[protocol][port] = socket

if _, exists := p.portsMap[*localPort]; exists {
return fmt.Errorf("error try to open socket for svc: %s/%s on port: %v again", svc.Namespace, svc.Name, port)
} else {
p.activeSockets[protocol] = map[int32]activeSocket{
port: socket,
closeable, err := portOpener.OpenLocalPort(localPort)
if err != nil {
p.emitPortClaimEvent(svc, port, err)
return err
}
p.portsMap[*localPort] = closeable
}
return nil
}

func (p *portClaimWatcher) close(port int32, protocol kapi.Protocol, svc *kapi.Service) error {
func (p *portClaimWatcher) close(desc string, ip string, port int32, protocol kapi.Protocol, svc *kapi.Service) error {
klog.V(5).Infof("Closing socket claimed for service: %s/%s and port: %v", svc.Namespace, svc.Name, port)

if protocol != kapi.ProtocolTCP && protocol != kapi.ProtocolUDP {
return nil
}
if ip != "" {
if _, exists := p.localAddrSet[ip]; !exists {
klog.V(5).Infof("The IP %s is not one of the node local ports", ip)
return nil
}
}
localPort, err := utilnet.NewLocalPort(desc, ip, "", int(port), utilnet.Protocol(protocol))
if err != nil {
return fmt.Errorf("error localPort creation for svc: %s/%s on port: %v, err: %v", svc.Namespace, svc.Name, port, err)
}
klog.V(5).Infof("Closing socket for LocalPort %v", localPort)

p.activeSocketsLock.Lock()
defer p.activeSocketsLock.Unlock()
klog.V(5).Infof("Closing socket claimed for service: %s/%s and port: %v", svc.Namespace, svc.Name, port)
if socket, exists := p.activeSockets[protocol][port]; exists {
if err := socket.Close(); err != nil {

if _, exists := p.portsMap[*localPort]; exists {
if err = p.portsMap[*localPort].Close(); err != nil {
return fmt.Errorf("error closing socket for svc: %s/%s on port: %v, err: %v", svc.Namespace, svc.Name, port, err)
}
delete(p.activeSockets[protocol], port)
delete(p.portsMap, *localPort)
return nil
}
return fmt.Errorf("error closing socket for svc: %s/%s on port: %v, port was never opened...?", svc.Namespace, svc.Name, port)
Expand Down

0 comments on commit 4efbb59

Please sign in to comment.