Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug 1822351: Fix hybrid proxier for iptables.Monitor #127

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 2 additions & 16 deletions pkg/network/node/iptables.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,12 @@ import (

"k8s.io/klog"

utilruntime "k8s.io/apimachinery/pkg/util/runtime"
utilwait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/iptables"
kexec "k8s.io/utils/exec"
)

type NodeIPTables struct {
ipt iptables.Interface
clusterNetworkCIDR []string
syncPeriod time.Duration
masqueradeServices bool
vxlanPort uint32
masqueradeBitHex string // the masquerade bit as hex value
Expand All @@ -31,11 +27,10 @@ type NodeIPTables struct {
egressIPs map[string]string
}

func newNodeIPTables(clusterNetworkCIDR []string, syncPeriod time.Duration, masqueradeServices bool, vxlanPort uint32, masqueradeBit uint32) *NodeIPTables {
func newNodeIPTables(ipt iptables.Interface, clusterNetworkCIDR []string, masqueradeServices bool, vxlanPort uint32, masqueradeBit uint32) *NodeIPTables {
return &NodeIPTables{
ipt: iptables.New(kexec.New(), iptables.ProtocolIpv4),
ipt: ipt,
clusterNetworkCIDR: clusterNetworkCIDR,
syncPeriod: syncPeriod,
masqueradeServices: masqueradeServices,
vxlanPort: vxlanPort,
masqueradeBitHex: fmt.Sprintf("%#x", 1<<masqueradeBit),
Expand All @@ -47,15 +42,6 @@ func (n *NodeIPTables) Setup() error {
if err := n.syncIPTableRules(); err != nil {
return err
}

go n.ipt.Monitor(iptables.Chain("OPENSHIFT-SDN-CANARY"),
[]iptables.Table{iptables.TableMangle, iptables.TableNAT, iptables.TableFilter},
func() {
if err := n.syncIPTableRules(); err != nil {
utilruntime.HandleError(fmt.Errorf("Reloading openshift iptables failed: %v", err))
}
},
n.syncPeriod, utilwait.NeverStop)
return nil
}

Expand Down
82 changes: 44 additions & 38 deletions pkg/network/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/apis/core/v1/helper"
ktypes "k8s.io/kubernetes/pkg/kubelet/types"
kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config"
"k8s.io/kubernetes/pkg/util/iptables"
taints "k8s.io/kubernetes/pkg/util/taints"
kexec "k8s.io/utils/exec"

Expand Down Expand Up @@ -72,27 +73,28 @@ type OsdnNodeConfig struct {
KubeInformers informers.SharedInformerFactory
NetworkInformers networkinformers.SharedInformerFactory

IPTablesSyncPeriod time.Duration
ProxyMode kubeproxyconfig.ProxyMode
MasqueradeBit *int32
IPTables iptables.Interface
ProxyMode kubeproxyconfig.ProxyMode
MasqueradeBit *int32
}

type OsdnNode struct {
policy osdnPolicy
kClient kubernetes.Interface
networkClient networkclient.Interface
recorder record.EventRecorder
oc *ovsController
networkInfo *common.ParsedClusterNetwork
podManager *podManager
clusterCIDRs []string
localSubnetCIDR string
localGatewayCIDR string
localIP string
hostName string
useConnTrack bool
iptablesSyncPeriod time.Duration
masqueradeBit uint32
policy osdnPolicy
kClient kubernetes.Interface
networkClient networkclient.Interface
recorder record.EventRecorder
oc *ovsController
networkInfo *common.ParsedClusterNetwork
podManager *podManager
ipt iptables.Interface
nodeIPTables *NodeIPTables
clusterCIDRs []string
localSubnetCIDR string
localGatewayCIDR string
localIP string
hostName string
useConnTrack bool
masqueradeBit uint32

// Synchronizes operations on egressPolicies
egressPoliciesLock sync.Mutex
Expand Down Expand Up @@ -162,23 +164,23 @@ func New(c *OsdnNodeConfig) (*OsdnNode, error) {
}

plugin := &OsdnNode{
policy: policy,
kClient: c.KClient,
networkClient: c.NetworkClient,
recorder: c.Recorder,
oc: oc,
networkInfo: networkInfo,
podManager: newPodManager(c.KClient, policy, networkInfo.MTU, oc),
localIP: c.NodeIP,
hostName: c.NodeName,
useConnTrack: useConnTrack,
iptablesSyncPeriod: c.IPTablesSyncPeriod,
masqueradeBit: masqBit,
egressPolicies: make(map[uint32][]networkapi.EgressNetworkPolicy),
egressDNS: egressDNS,
kubeInformers: c.KubeInformers,
networkInformers: c.NetworkInformers,
egressIP: newEgressIPWatcher(oc, c.NodeIP, c.MasqueradeBit),
policy: policy,
kClient: c.KClient,
networkClient: c.NetworkClient,
recorder: c.Recorder,
oc: oc,
networkInfo: networkInfo,
podManager: newPodManager(c.KClient, policy, networkInfo.MTU, oc),
localIP: c.NodeIP,
hostName: c.NodeName,
useConnTrack: useConnTrack,
ipt: c.IPTables,
masqueradeBit: masqBit,
egressPolicies: make(map[uint32][]networkapi.EgressNetworkPolicy),
egressDNS: egressDNS,
kubeInformers: c.KubeInformers,
networkInformers: c.NetworkInformers,
egressIP: newEgressIPWatcher(oc, c.NodeIP, c.MasqueradeBit),
}

RegisterMetrics()
Expand Down Expand Up @@ -342,9 +344,9 @@ func (node *OsdnNode) Start() error {
for _, cn := range node.networkInfo.ClusterNetworks {
node.clusterCIDRs = append(node.clusterCIDRs, cn.ClusterCIDR.String())
}
nodeIPTables := newNodeIPTables(node.clusterCIDRs, node.iptablesSyncPeriod, !node.useConnTrack, node.networkInfo.VXLANPort, node.masqueradeBit)

if err = nodeIPTables.Setup(); err != nil {
node.nodeIPTables = newNodeIPTables(node.ipt, node.clusterCIDRs, !node.useConnTrack, node.networkInfo.VXLANPort, node.masqueradeBit)
if err = node.nodeIPTables.Setup(); err != nil {
return fmt.Errorf("failed to set up iptables: %v", err)
}

Expand All @@ -363,7 +365,7 @@ func (node *OsdnNode) Start() error {
if err := node.SetupEgressNetworkPolicy(); err != nil {
return err
}
if err := node.egressIP.Start(node.networkInformers, nodeIPTables); err != nil {
if err := node.egressIP.Start(node.networkInformers, node.nodeIPTables); err != nil {
return err
}
}
Expand Down Expand Up @@ -562,3 +564,7 @@ func (node *OsdnNode) handleDeleteService(obj interface{}) {
klog.V(5).Infof("Watch %s event for Service %q", watch.Deleted, serv.Name)
node.DeleteServiceRules(serv)
}

func (node *OsdnNode) ReloadIPTables() error {
return node.nodeIPTables.syncIPTableRules()
}
5 changes: 5 additions & 0 deletions pkg/network/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ func (proxy *OsdnProxy) Start(proxier kubeproxy.Provider, waitChan chan<- bool)
return nil
}

func (proxy *OsdnProxy) ReloadIPTables() error {
proxy.Sync()
return nil
}

func (proxy *OsdnProxy) updateEgressNetworkPolicyLocked(policy networkv1.EgressNetworkPolicy) {
proxy.Lock()
defer proxy.Unlock()
Expand Down
5 changes: 1 addition & 4 deletions pkg/network/proxyimpl/hybrid/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ type HybridProxier struct {

mainProxy RunnableProxy
unidlingProxy RunnableProxy
syncPeriod time.Duration
minSyncPeriod time.Duration
serviceLister corev1listers.ServiceLister

Expand All @@ -61,22 +60,20 @@ type HybridProxier struct {
func NewHybridProxier(
mainProxy RunnableProxy,
unidlingProxy RunnableProxy,
syncPeriod time.Duration,
minSyncPeriod time.Duration,
serviceLister corev1listers.ServiceLister,
) (*HybridProxier, error) {
p := &HybridProxier{
mainProxy: mainProxy,
unidlingProxy: unidlingProxy,
syncPeriod: syncPeriod,
minSyncPeriod: minSyncPeriod,
serviceLister: serviceLister,

usingUserspace: make(map[types.NamespacedName]bool),
switchedToUserspace: make(map[types.NamespacedName]bool),
}

p.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", p.syncProxyRules, minSyncPeriod, syncPeriod, 4)
p.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", p.syncProxyRules, minSyncPeriod, time.Hour, 4)

// Hackery abound: we want to make sure that changes are applied
// to both proxies at approximately the same time. That means that we
Expand Down
25 changes: 25 additions & 0 deletions pkg/openshift-sdn/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@ import (
"k8s.io/klog"

kerrors "k8s.io/apimachinery/pkg/api/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
utilwait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config"
"k8s.io/kubernetes/pkg/util/interrupt"
"k8s.io/kubernetes/pkg/util/iptables"
kexec "k8s.io/utils/exec"

"github.com/openshift/library-go/pkg/serviceability"
sdnnode "github.com/openshift/sdn/pkg/network/node"
Expand All @@ -35,6 +39,8 @@ type OpenShiftSDN struct {
OsdnNode *sdnnode.OsdnNode
sdnRecorder record.EventRecorder
OsdnProxy *sdnproxy.OsdnProxy

ipt iptables.Interface
}

var networkLong = `
Expand Down Expand Up @@ -131,6 +137,8 @@ func (sdn *OpenShiftSDN) Init() error {
return fmt.Errorf("failed to build informers: %v", err)
}

sdn.ipt = iptables.New(kexec.New(), iptables.ProtocolIpv4)

// Configure SDN
err = sdn.initSDN()
if err != nil {
Expand Down Expand Up @@ -166,9 +174,26 @@ func (sdn *OpenShiftSDN) Start(stopCh <-chan struct{}) error {
klog.Fatal(err)
}
klog.V(2).Infof("openshift-sdn network plugin ready")

go sdn.ipt.Monitor(iptables.Chain("OPENSHIFT-SDN-CANARY"),
[]iptables.Table{iptables.TableMangle, iptables.TableNAT, iptables.TableFilter},
sdn.reloadIPTables,
sdn.ProxyConfig.IPTables.SyncPeriod.Duration,
utilwait.NeverStop)

return nil
}

// reloadIPTables reloads node and proxy iptables rules after a flush
func (sdn *OpenShiftSDN) reloadIPTables() {
if err := sdn.OsdnNode.ReloadIPTables(); err != nil {
utilruntime.HandleError(fmt.Errorf("Reloading openshift node iptables rules failed: %v", err))
}
if err := sdn.OsdnProxy.ReloadIPTables(); err != nil {
utilruntime.HandleError(fmt.Errorf("Reloading openshift proxy iptables rules failed: %v", err))
}
}

// watchForChanges closes stopCh if the configuration file changed.
func watchForChanges(configPath string, stopCh chan struct{}) error {
configPath, err := filepath.Abs(configPath)
Expand Down
1 change: 0 additions & 1 deletion pkg/openshift-sdn/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ func (sdn *OpenShiftSDN) runProxy(waitChan chan<- bool) {
proxier, err = hybrid.NewHybridProxier(
hp,
unidlingUserspaceProxy,
sdn.ProxyConfig.IPTables.SyncPeriod.Duration,
sdn.ProxyConfig.IPTables.MinSyncPeriod.Duration,
sdn.informers.KubeInformers.Core().V1().Services().Lister(),
)
Expand Down
20 changes: 10 additions & 10 deletions pkg/openshift-sdn/sdn.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ func (sdn *OpenShiftSDN) initSDN() error {

var err error
sdn.OsdnNode, err = sdnnode.New(&sdnnode.OsdnNodeConfig{
NodeName: sdn.nodeName,
NodeIP: sdn.nodeIP,
NetworkClient: sdn.informers.NetworkClient,
KClient: sdn.informers.KubeClient,
KubeInformers: sdn.informers.KubeInformers,
NetworkInformers: sdn.informers.NetworkInformers,
IPTablesSyncPeriod: sdn.ProxyConfig.IPTables.SyncPeriod.Duration,
MasqueradeBit: sdn.ProxyConfig.IPTables.MasqueradeBit,
ProxyMode: sdn.ProxyConfig.Mode,
Recorder: sdn.sdnRecorder,
NodeName: sdn.nodeName,
NodeIP: sdn.nodeIP,
NetworkClient: sdn.informers.NetworkClient,
KClient: sdn.informers.KubeClient,
KubeInformers: sdn.informers.KubeInformers,
NetworkInformers: sdn.informers.NetworkInformers,
IPTables: sdn.ipt,
MasqueradeBit: sdn.ProxyConfig.IPTables.MasqueradeBit,
ProxyMode: sdn.ProxyConfig.Mode,
Recorder: sdn.sdnRecorder,
})
return err
}
Expand Down