Skip to content

Commit

Permalink
Fix hybrid proxier for iptables.Monitor
Browse files Browse the repository at this point in the history
The iptables proxier has been fixed to not constantly resync its rules
when not needed... except that the hybrid proxier was forcing it to do
it anyway. Fix that by moving the NodeIPTables monitor up out to the
top level of openshift-sdn-node and sharing it between the node and
proxy code.
  • Loading branch information
danwinship committed Apr 4, 2020
1 parent 363a211 commit dece792
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 28 deletions.
18 changes: 2 additions & 16 deletions pkg/network/node/iptables.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,12 @@ import (

"k8s.io/klog"

utilruntime "k8s.io/apimachinery/pkg/util/runtime"
utilwait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/iptables"
kexec "k8s.io/utils/exec"
)

type NodeIPTables struct {
ipt iptables.Interface
clusterNetworkCIDR []string
syncPeriod time.Duration
masqueradeServices bool
vxlanPort uint32
masqueradeBitHex string // the masquerade bit as hex value
Expand All @@ -31,11 +27,10 @@ type NodeIPTables struct {
egressIPs map[string]string
}

func newNodeIPTables(clusterNetworkCIDR []string, syncPeriod time.Duration, masqueradeServices bool, vxlanPort uint32, masqueradeBit uint32) *NodeIPTables {
func newNodeIPTables(ipt iptables.Interface, clusterNetworkCIDR []string, masqueradeServices bool, vxlanPort uint32, masqueradeBit uint32) *NodeIPTables {
return &NodeIPTables{
ipt: iptables.New(kexec.New(), iptables.ProtocolIpv4),
ipt: ipt,
clusterNetworkCIDR: clusterNetworkCIDR,
syncPeriod: syncPeriod,
masqueradeServices: masqueradeServices,
vxlanPort: vxlanPort,
masqueradeBitHex: fmt.Sprintf("%#x", 1<<masqueradeBit),
Expand All @@ -47,15 +42,6 @@ func (n *NodeIPTables) Setup() error {
if err := n.syncIPTableRules(); err != nil {
return err
}

go n.ipt.Monitor(iptables.Chain("OPENSHIFT-SDN-CANARY"),
[]iptables.Table{iptables.TableMangle, iptables.TableNAT, iptables.TableFilter},
func() {
if err := n.syncIPTableRules(); err != nil {
utilruntime.HandleError(fmt.Errorf("Reloading openshift iptables failed: %v", err))
}
},
n.syncPeriod, utilwait.NeverStop)
return nil
}

Expand Down
18 changes: 12 additions & 6 deletions pkg/network/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/apis/core/v1/helper"
ktypes "k8s.io/kubernetes/pkg/kubelet/types"
kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config"
"k8s.io/kubernetes/pkg/util/iptables"
taints "k8s.io/kubernetes/pkg/util/taints"
kexec "k8s.io/utils/exec"

Expand Down Expand Up @@ -72,7 +73,7 @@ type OsdnNodeConfig struct {
KubeInformers informers.SharedInformerFactory
NetworkInformers networkinformers.SharedInformerFactory

IPTablesSyncPeriod time.Duration
IPTables iptables.Interface
ProxyMode kubeproxyconfig.ProxyMode
MasqueradeBit *int32
}
Expand All @@ -85,13 +86,14 @@ type OsdnNode struct {
oc *ovsController
networkInfo *common.ParsedClusterNetwork
podManager *podManager
ipt iptables.Interface
nodeIPTables *NodeIPTables
clusterCIDRs []string
localSubnetCIDR string
localGatewayCIDR string
localIP string
hostName string
useConnTrack bool
iptablesSyncPeriod time.Duration
masqueradeBit uint32

// Synchronizes operations on egressPolicies
Expand Down Expand Up @@ -172,7 +174,7 @@ func New(c *OsdnNodeConfig) (*OsdnNode, error) {
localIP: c.NodeIP,
hostName: c.NodeName,
useConnTrack: useConnTrack,
iptablesSyncPeriod: c.IPTablesSyncPeriod,
ipt: c.IPTables,
masqueradeBit: masqBit,
egressPolicies: make(map[uint32][]networkapi.EgressNetworkPolicy),
egressDNS: egressDNS,
Expand Down Expand Up @@ -342,9 +344,9 @@ func (node *OsdnNode) Start() error {
for _, cn := range node.networkInfo.ClusterNetworks {
node.clusterCIDRs = append(node.clusterCIDRs, cn.ClusterCIDR.String())
}
nodeIPTables := newNodeIPTables(node.clusterCIDRs, node.iptablesSyncPeriod, !node.useConnTrack, node.networkInfo.VXLANPort, node.masqueradeBit)

if err = nodeIPTables.Setup(); err != nil {
node.nodeIPTables = newNodeIPTables(node.ipt, node.clusterCIDRs, !node.useConnTrack, node.networkInfo.VXLANPort, node.masqueradeBit)
if err = node.nodeIPTables.Setup(); err != nil {
return fmt.Errorf("failed to set up iptables: %v", err)
}

Expand All @@ -363,7 +365,7 @@ func (node *OsdnNode) Start() error {
if err := node.SetupEgressNetworkPolicy(); err != nil {
return err
}
if err := node.egressIP.Start(node.networkInformers, nodeIPTables); err != nil {
if err := node.egressIP.Start(node.networkInformers, node.nodeIPTables); err != nil {
return err
}
}
Expand Down Expand Up @@ -562,3 +564,7 @@ func (node *OsdnNode) handleDeleteService(obj interface{}) {
klog.V(5).Infof("Watch %s event for Service %q", watch.Deleted, serv.Name)
node.DeleteServiceRules(serv)
}

func (node *OsdnNode) ReloadIPTables() error {
return node.nodeIPTables.syncIPTableRules()
}
5 changes: 5 additions & 0 deletions pkg/network/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ func (proxy *OsdnProxy) Start(proxier kubeproxy.Provider, waitChan chan<- bool)
return nil
}

func (proxy *OsdnProxy) ReloadIPTables() error {
proxy.Sync()
return nil
}

func (proxy *OsdnProxy) updateEgressNetworkPolicyLocked(policy networkv1.EgressNetworkPolicy) {
proxy.Lock()
defer proxy.Unlock()
Expand Down
5 changes: 1 addition & 4 deletions pkg/network/proxyimpl/hybrid/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ type HybridProxier struct {

mainProxy RunnableProxy
unidlingProxy RunnableProxy
syncPeriod time.Duration
minSyncPeriod time.Duration
serviceLister corev1listers.ServiceLister

Expand All @@ -61,22 +60,20 @@ type HybridProxier struct {
func NewHybridProxier(
mainProxy RunnableProxy,
unidlingProxy RunnableProxy,
syncPeriod time.Duration,
minSyncPeriod time.Duration,
serviceLister corev1listers.ServiceLister,
) (*HybridProxier, error) {
p := &HybridProxier{
mainProxy: mainProxy,
unidlingProxy: unidlingProxy,
syncPeriod: syncPeriod,
minSyncPeriod: minSyncPeriod,
serviceLister: serviceLister,

usingUserspace: make(map[types.NamespacedName]bool),
switchedToUserspace: make(map[types.NamespacedName]bool),
}

p.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", p.syncProxyRules, minSyncPeriod, syncPeriod, 4)
p.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", p.syncProxyRules, minSyncPeriod, time.Hour, 4)

// Hackery abound: we want to make sure that changes are applied
// to both proxies at approximately the same time. That means that we
Expand Down
25 changes: 25 additions & 0 deletions pkg/openshift-sdn/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@ import (
"k8s.io/klog"

kerrors "k8s.io/apimachinery/pkg/api/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
utilwait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config"
"k8s.io/kubernetes/pkg/util/interrupt"
"k8s.io/kubernetes/pkg/util/iptables"
kexec "k8s.io/utils/exec"

"github.com/openshift/library-go/pkg/serviceability"
sdnnode "github.com/openshift/sdn/pkg/network/node"
Expand All @@ -35,6 +39,8 @@ type OpenShiftSDN struct {
OsdnNode *sdnnode.OsdnNode
sdnRecorder record.EventRecorder
OsdnProxy *sdnproxy.OsdnProxy

ipt iptables.Interface
}

var networkLong = `
Expand Down Expand Up @@ -131,6 +137,8 @@ func (sdn *OpenShiftSDN) Init() error {
return fmt.Errorf("failed to build informers: %v", err)
}

sdn.ipt = iptables.New(kexec.New(), iptables.ProtocolIpv4)

// Configure SDN
err = sdn.initSDN()
if err != nil {
Expand Down Expand Up @@ -166,9 +174,26 @@ func (sdn *OpenShiftSDN) Start(stopCh <-chan struct{}) error {
klog.Fatal(err)
}
klog.V(2).Infof("openshift-sdn network plugin ready")

go sdn.ipt.Monitor(iptables.Chain("OPENSHIFT-SDN-CANARY"),
[]iptables.Table{iptables.TableMangle, iptables.TableNAT, iptables.TableFilter},
sdn.reloadIPTables,
sdn.ProxyConfig.IPTables.SyncPeriod.Duration,
utilwait.NeverStop)

return nil
}

// reloadIPTables reloads node and proxy iptables rules after a flush
func (sdn *OpenShiftSDN) reloadIPTables() {
if err := sdn.OsdnNode.ReloadIPTables(); err != nil {
utilruntime.HandleError(fmt.Errorf("Reloading openshift node iptables rules failed: %v", err))
}
if err := sdn.OsdnProxy.ReloadIPTables(); err != nil {
utilruntime.HandleError(fmt.Errorf("Reloading openshift proxy iptables rules failed: %v", err))
}
}

// watchForChanges closes stopCh if the configuration file changed.
func watchForChanges(configPath string, stopCh chan struct{}) error {
configPath, err := filepath.Abs(configPath)
Expand Down
1 change: 0 additions & 1 deletion pkg/openshift-sdn/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ func (sdn *OpenShiftSDN) runProxy(waitChan chan<- bool) {
proxier, err = hybrid.NewHybridProxier(
hp,
unidlingUserspaceProxy,
sdn.ProxyConfig.IPTables.SyncPeriod.Duration,
sdn.ProxyConfig.IPTables.MinSyncPeriod.Duration,
sdn.informers.KubeInformers.Core().V1().Services().Lister(),
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/openshift-sdn/sdn.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (sdn *OpenShiftSDN) initSDN() error {
KClient: sdn.informers.KubeClient,
KubeInformers: sdn.informers.KubeInformers,
NetworkInformers: sdn.informers.NetworkInformers,
IPTablesSyncPeriod: sdn.ProxyConfig.IPTables.SyncPeriod.Duration,
IPTables: sdn.ipt,
MasqueradeBit: sdn.ProxyConfig.IPTables.MasqueradeBit,
ProxyMode: sdn.ProxyConfig.Mode,
Recorder: sdn.sdnRecorder,
Expand Down

0 comments on commit dece792

Please sign in to comment.