Skip to content

Commit

Permalink
Don't bind NodePort services to INADDR_ANY
Browse files Browse the repository at this point in the history
port_claim.go opened NodePort services on INADDR_ANY, which caused
discrete issues with NodePort related NAT-ing. NodePort type services
should only bind to the node IP as it's only node-ingress-specific.

Signed-off-by: Alexander Constantinescu <aconstan@redhat.com>
  • Loading branch information
alexanderConstantinescu committed Feb 4, 2021
1 parent 30941cb commit 479ec02
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 68 deletions.
21 changes: 10 additions & 11 deletions go-controller/pkg/node/gateway_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,17 +162,6 @@ func (n *OvnNode) initGateway(subnets []*net.IPNet, nodeAnnotator kube.Annotator
klog.Info("Initializing Gateway Functionality")
var err error

var loadBalancerHealthChecker *loadBalancerHealthChecker
var portClaimWatcher *portClaimWatcher

if config.Gateway.NodeportEnable {
loadBalancerHealthChecker = newLoadBalancerHealthChecker(n.name)
portClaimWatcher, err = newPortClaimWatcher(n.recorder)
if err != nil {
return err
}
}

gatewayNextHops, gatewayIntf, err := getGatewayNextHops()
if err != nil {
return err
Expand All @@ -184,6 +173,16 @@ func (n *OvnNode) initGateway(subnets []*net.IPNet, nodeAnnotator kube.Annotator
klog.Errorf("Unable to set primary IP net label on node, err: %v", err)
}
}
var loadBalancerHealthChecker *loadBalancerHealthChecker
var portClaimWatcher *portClaimWatcher

if config.Gateway.NodeportEnable {
loadBalancerHealthChecker = newLoadBalancerHealthChecker(n.name)
portClaimWatcher, err = newPortClaimWatcher(n.recorder, v4IfAddr, v6IfAddr)
if err != nil {
return err
}
}

var gw *gateway
switch config.Gateway.Mode {
Expand Down
92 changes: 50 additions & 42 deletions go-controller/pkg/node/port_claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sync"

"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util"

kapi "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
Expand All @@ -26,25 +25,27 @@ type handler func(desc string, ip string, port int32, protocol kapi.Protocol, sv
type portManager interface {
open(desc string, ip string, port int32, protocol kapi.Protocol, svc *kapi.Service) error
close(desc string, ip string, port int32, protocol kapi.Protocol, svc *kapi.Service) error
handleService(svc *kapi.Service, handler handler) []error
}

type localPortManager struct {
recorder record.EventRecorder
activeSocketsLock sync.Mutex
localAddrSet map[string]net.IPNet
v4NodeAddr *net.IPNet
v6NodeAddr *net.IPNet
portsMap map[utilnet.LocalPort]utilnet.Closeable
portOpener utilnet.PortOpener
}

func (p *localPortManager) open(desc string, ip string, port int32, protocol kapi.Protocol, svc *kapi.Service) error {
klog.V(5).Infof("Opening socket for service: %s/%s, port: %v and protocol %s", svc.Namespace, svc.Name, port, protocol)

if ip != "" {
if _, exists := p.localAddrSet[ip]; !exists {
klog.V(5).Infof("The IP %s is not one of the node local ports", ip)
return nil
}
if _, exists := p.localAddrSet[ip]; !exists {
klog.V(5).Infof("The IP %s is not one of the node local IPs, skipping port claim", ip)
return nil
}

var localPort *utilnet.LocalPort
var portError error
switch protocol {
Expand All @@ -66,14 +67,13 @@ func (p *localPortManager) open(desc string, ip string, port int32, protocol kap

if _, exists := p.portsMap[*localPort]; exists {
return fmt.Errorf("error try to open socket for svc: %s/%s on port: %v again", svc.Namespace, svc.Name, port)
} else {
closeable, err := p.portOpener.OpenLocalPort(localPort)
if err != nil {
p.emitPortClaimEvent(svc, port, err)
return err
}
p.portsMap[*localPort] = closeable
}
closeable, err := p.portOpener.OpenLocalPort(localPort)
if err != nil {
p.emitPortClaimEvent(svc, port, err)
return err
}
p.portsMap[*localPort] = closeable
return nil
}

Expand Down Expand Up @@ -108,6 +108,36 @@ func (p *localPortManager) close(desc string, ip string, port int32, protocol ka
return fmt.Errorf("error closing socket for svc: %s/%s on port: %v, port was never opened...?", svc.Namespace, svc.Name, port)
}

func (p *localPortManager) handleService(svc *kapi.Service, handler handler) []error {
errors := []error{}
if !util.ServiceTypeHasNodePort(svc) && len(svc.Spec.ExternalIPs) == 0 {
return errors
}
for _, svcPort := range svc.Spec.Ports {
if util.ServiceTypeHasNodePort(svc) {
if p.v4NodeAddr != nil {
klog.V(5).Infof("Handle IPv4 NodePort service %s port %d", svc.Name, svcPort.NodePort)
if err := handlePort(getDescription(svcPort.Name, svc, true), svc, p.v4NodeAddr.IP.String(), svcPort.NodePort, svcPort.Protocol, handler); err != nil {
errors = append(errors, err)
}
}
if p.v6NodeAddr != nil {
klog.V(5).Infof("Handle IPv6 NodePort service %s port %d", svc.Name, svcPort.NodePort)
if err := handlePort(getDescription(svcPort.Name, svc, true), svc, p.v6NodeAddr.IP.String(), svcPort.NodePort, svcPort.Protocol, handler); err != nil {
errors = append(errors, err)
}
}
}
for _, externalIP := range svc.Spec.ExternalIPs {
klog.V(5).Infof("Handle ExternalIPs service %s external IP %s port %d", svc.Name, externalIP, svcPort.Port)
if err := handlePort(getDescription(svcPort.Name, svc, false), svc, externalIP, svcPort.Port, svcPort.Protocol, handler); err != nil {
errors = append(errors, err)
}
}
}
return errors
}

func (p *localPortManager) emitPortClaimEvent(svc *kapi.Service, port int32, err error) {
serviceRef := kapi.ObjectReference{
Kind: "Service",
Expand All @@ -116,14 +146,13 @@ func (p *localPortManager) emitPortClaimEvent(svc *kapi.Service, port int32, err
}
p.recorder.Eventf(&serviceRef, kapi.EventTypeWarning,
"PortClaim", "Service: %s/%s requires port: %v to be opened on node, but port cannot be opened, err: %v", svc.Namespace, svc.Name, port, err)
klog.Warningf("PortClaim for svc: %s/%s on port: %v, err: %v", svc.Namespace, svc.Name, port, err)
}

type portClaimWatcher struct {
port portManager
}

func newPortClaimWatcher(recorder record.EventRecorder) (*portClaimWatcher, error) {
func newPortClaimWatcher(recorder record.EventRecorder, v4NodeAddr, v6NodeAddr *net.IPNet) (*portClaimWatcher, error) {
localAddrSet, err := getLocalAddrs()
if err != nil {
return nil, err
Expand All @@ -134,13 +163,15 @@ func newPortClaimWatcher(recorder record.EventRecorder) (*portClaimWatcher, erro
activeSocketsLock: sync.Mutex{},
portsMap: make(map[utilnet.LocalPort]utilnet.Closeable),
localAddrSet: localAddrSet,
v4NodeAddr: v4NodeAddr,
v6NodeAddr: v6NodeAddr,
portOpener: &utilnet.ListenPortOpener,
},
}, nil
}

func (p *portClaimWatcher) AddService(svc *kapi.Service) {
if errors := handleService(svc, p.port.open); len(errors) > 0 {
if errors := p.port.handleService(svc, p.port.open); len(errors) > 0 {
for _, err := range errors {
klog.Errorf("Error claiming port for service: %s/%s: %v", svc.Namespace, svc.Name, err)
}
Expand All @@ -152,8 +183,8 @@ func (p *portClaimWatcher) UpdateService(old, new *kapi.Service) {
return
}
errors := []error{}
errors = append(errors, handleService(old, p.port.close)...)
errors = append(errors, handleService(new, p.port.open)...)
errors = append(errors, p.port.handleService(old, p.port.close)...)
errors = append(errors, p.port.handleService(new, p.port.open)...)
if len(errors) > 0 {
for _, err := range errors {
klog.Errorf("Error updating port claim for service: %s/%s: %v", old.Namespace, old.Name, err)
Expand All @@ -162,7 +193,7 @@ func (p *portClaimWatcher) UpdateService(old, new *kapi.Service) {
}

func (p *portClaimWatcher) DeleteService(svc *kapi.Service) {
if errors := handleService(svc, p.port.close); len(errors) > 0 {
if errors := p.port.handleService(svc, p.port.close); len(errors) > 0 {
for _, err := range errors {
klog.Errorf("Error removing port claim for service: %s/%s: %v", svc.Namespace, svc.Name, err)
}
Expand All @@ -171,29 +202,6 @@ func (p *portClaimWatcher) DeleteService(svc *kapi.Service) {

func (p *portClaimWatcher) SyncServices(objs []interface{}) {}

func handleService(svc *kapi.Service, handler handler) []error {
errors := []error{}
if !util.ServiceTypeHasNodePort(svc) && len(svc.Spec.ExternalIPs) == 0 {
return errors
}

for _, svcPort := range svc.Spec.Ports {
if util.ServiceTypeHasNodePort(svc) {
klog.V(5).Infof("Handle NodePort service %s port %d", svc.Name, svcPort.NodePort)
if err := handlePort(getDescription(svcPort.Name, svc, true), svc, "", svcPort.NodePort, svcPort.Protocol, handler); err != nil {
errors = append(errors, err)
}
}
for _, externalIP := range svc.Spec.ExternalIPs {
klog.V(5).Infof("Handle ExternalIPs service %s external IP %s port %d", svc.Name, externalIP, svcPort.Port)
if err := handlePort(getDescription(svcPort.Name, svc, false), svc, externalIP, svcPort.Port, svcPort.Protocol, handler); err != nil {
errors = append(errors, err)
}
}
}
return errors
}

// LocalPorts allows to add an arbitrary description, which can be used to distinguish LocalPorts instances having the
// same networking parameters by created for different services.
// kube-proxy and this implementation use the following format of the description: "
Expand Down

0 comments on commit 479ec02

Please sign in to comment.