Skip to content

Commit

Permalink
Ensure required flows are present at node start
Browse files Browse the repository at this point in the history
In the scenario where an OVN GR may have the wrong mac, and traffic
disruption is present, the OVNK node process may stall at trying to
start the watch factory and wait for informer caches to sync. At this
point in time, ovnkube node is not able to start its OpenFlow manager
and program the needed flows to block the OVN GR from poisoning external
ip neighbor entries.

This commit adds a boostrap function which attempts to install basic
flows before anything else is done on the node during boot time. This is
a temporary flow installation that is only done when there is just a
single NORMAL flow in the external bridge and is overriden once OF
Manager starts.

Note this intentionally does not write the bootstrap flows if OVNK has
been killed, but the OVS flows from the previous run remain. It is only
for cases where OVS has no previously programmed flows, like on node
boot up.

Signed-off-by: Tim Rozet <trozet@redhat.com>
  • Loading branch information
trozet committed Nov 13, 2023
1 parent 73e0870 commit b629b17
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 11 deletions.
5 changes: 5 additions & 0 deletions go-controller/pkg/node/default_node_network_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,11 @@ func (nc *DefaultNodeNetworkController) Start(ctx context.Context) error {
nc.routeManager.Run(nc.stopChan, 4*time.Minute)
}()

// Bootstrap flows in OVS if just normal flow is present
if err := bootstrapOVSFlows(); err != nil {
return fmt.Errorf("failed to bootstrap OVS flows: %w", err)
}

if node, err = nc.Kube.GetNode(nc.name); err != nil {
return fmt.Errorf("error retrieving node %s: %v", nc.name, err)
}
Expand Down
3 changes: 3 additions & 0 deletions go-controller/pkg/node/egress_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ var _ = Describe("Egress Service Operations", func() {
app.Flags = config.Flags
fExec = ovntest.NewLooseCompareFakeExec()
fakeOvnNode = NewFakeOVNNode(fExec)
fakeOvnNode.fakeExec.AddFakeCmd(&ovntest.ExpectedCmd{
Cmd: "ovs-vsctl --timeout=15 --no-heading --data=bare --format=csv --columns name list interface",
})

config.OVNKubernetesFeature.EnableEgressService = true
_, cidr4, _ := net.ParseCIDR("10.128.0.0/16")
Expand Down
1 change: 1 addition & 0 deletions go-controller/pkg/node/gateway_init_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,7 @@ func localGatewayInterfaceTest(app *cli.App, testNS ns.NetNS,
)

fexec := ovntest.NewLooseCompareFakeExec()

fexec.AddFakeCmd(&ovntest.ExpectedCmd{
Cmd: "ovs-vsctl --timeout=15 port-to-br eth0",
Err: fmt.Errorf(""),
Expand Down
25 changes: 19 additions & 6 deletions go-controller/pkg/node/gateway_localnet_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
const (
v4localnetGatewayIP = "10.244.0.1"
v6localnetGatewayIP = "fd00:96:1::1"
gwMAC = "0a:0b:0c:0d:0e:0f"
)

func initFakeNodePortWatcher(iptV4, iptV6 util.IPTablesHelper) *nodePortWatcher {
Expand All @@ -47,14 +48,18 @@ func initFakeNodePortWatcher(iptV4, iptV6 util.IPTablesHelper) *nodePortWatcher
err = f6.MatchState(initIPTable)
Expect(err).NotTo(HaveOccurred())

gwMACParsed, _ := net.ParseMAC(gwMAC)

fNPW := nodePortWatcher{
ofportPhys: "eth0",
ofportPatch: "patch-breth0_ov",
gatewayIPv4: v4localnetGatewayIP,
gatewayIPv6: v6localnetGatewayIP,
gwBridgeMAC: gwMACParsed,
serviceInfo: make(map[k8stypes.NamespacedName]*serviceConfig),
ofm: &openflowManager{
flowCache: map[string][]string{},
flowCache: map[string][]string{},
defaultBridge: &bridgeConfiguration{macAddress: gwMACParsed},
},
}
return &fNPW
Expand Down Expand Up @@ -238,6 +243,9 @@ var _ = Describe("Node Operations", func() {
app.Flags = config.Flags
fExec = ovntest.NewFakeExec()
fakeOvnNode = NewFakeOVNNode(fExec)
fakeOvnNode.fakeExec.AddFakeCmd(&ovntest.ExpectedCmd{
Cmd: "ovs-vsctl --timeout=15 --no-heading --data=bare --format=csv --columns name list interface",
})

iptV4, iptV6 = util.SetFakeIPTablesHelpers()
_, nodeNet, err := net.ParseCIDR("10.1.1.0/24")
Expand Down Expand Up @@ -1167,19 +1175,22 @@ var _ = Describe("Node Operations", func() {
}
expectedNodePortFlows := []string{
"cookie=0x453ae29bcbbc08bd, priority=110, in_port=eth0, tcp, tp_dst=31111, actions=output:patch-breth0_ov",
"cookie=0x453ae29bcbbc08bd, priority=110, in_port=patch-breth0_ov, tcp, tp_src=31111, actions=output:eth0",
fmt.Sprintf("cookie=0x453ae29bcbbc08bd, priority=110, in_port=patch-breth0_ov, dl_src=%s, tcp, tp_src=31111, actions=output:eth0",
gwMAC),
}
expectedLBIngressFlows := []string{
"cookie=0x10c6b89e483ea111, priority=110, in_port=eth0, arp, arp_op=1, arp_tpa=5.5.5.5, actions=output:LOCAL",
"cookie=0x10c6b89e483ea111, priority=110, in_port=eth0, icmp, nw_dst=5.5.5.5, icmp_type=3, icmp_code=4, actions=output:patch-breth0_ov",
"cookie=0x10c6b89e483ea111, priority=110, in_port=eth0, tcp, nw_dst=5.5.5.5, tp_dst=8080, actions=output:patch-breth0_ov",
"cookie=0x10c6b89e483ea111, priority=110, in_port=patch-breth0_ov, tcp, nw_src=5.5.5.5, tp_src=8080, actions=output:eth0",
fmt.Sprintf("cookie=0x10c6b89e483ea111, priority=110, in_port=patch-breth0_ov, dl_src=%s, tcp, nw_src=5.5.5.5, tp_src=8080, actions=output:eth0",
gwMAC),
}
expectedLBExternalIPFlows := []string{
"cookie=0x71765945a31dc2f1, priority=110, in_port=eth0, arp, arp_op=1, arp_tpa=1.1.1.1, actions=output:LOCAL",
"cookie=0x71765945a31dc2f1, priority=110, in_port=eth0, icmp, nw_dst=1.1.1.1, icmp_type=3, icmp_code=4, actions=output:patch-breth0_ov",
"cookie=0x71765945a31dc2f1, priority=110, in_port=eth0, tcp, nw_dst=1.1.1.1, tp_dst=8080, actions=output:patch-breth0_ov",
"cookie=0x71765945a31dc2f1, priority=110, in_port=patch-breth0_ov, tcp, nw_src=1.1.1.1, tp_src=8080, actions=output:eth0",
fmt.Sprintf("cookie=0x71765945a31dc2f1, priority=110, in_port=patch-breth0_ov, dl_src=%s, tcp, nw_src=1.1.1.1, tp_src=8080, actions=output:eth0",
gwMAC),
}

f4 := iptV4.(*util.FakeIPTables)
Expand Down Expand Up @@ -2130,7 +2141,8 @@ var _ = Describe("Node Operations", func() {
expectedFlows := []string{
// default
"cookie=0x453ae29bcbbc08bd, priority=110, in_port=eth0, tcp, tp_dst=31111, actions=output:patch-breth0_ov",
"cookie=0x453ae29bcbbc08bd, priority=110, in_port=patch-breth0_ov, tcp, tp_src=31111, actions=output:eth0",
fmt.Sprintf("cookie=0x453ae29bcbbc08bd, priority=110, in_port=patch-breth0_ov, dl_src=%s, tcp, tp_src=31111, actions=output:eth0",
gwMAC),
}

f4 := iptV4.(*util.FakeIPTables)
Expand Down Expand Up @@ -2420,7 +2432,8 @@ var _ = Describe("Node Operations", func() {
expectedFlows := []string{
// default
"cookie=0x453ae29bcbbc08bd, priority=110, in_port=eth0, tcp, tp_dst=31111, actions=output:patch-breth0_ov",
"cookie=0x453ae29bcbbc08bd, priority=110, in_port=patch-breth0_ov, tcp, tp_src=31111, actions=output:eth0",
fmt.Sprintf("cookie=0x453ae29bcbbc08bd, priority=110, in_port=patch-breth0_ov, dl_src=%s, tcp, tp_src=31111, actions=output:eth0",
gwMAC),
}

f4 := iptV4.(*util.FakeIPTables)
Expand Down
26 changes: 22 additions & 4 deletions go-controller/pkg/node/gateway_shared_intf.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type nodePortWatcher struct {
ofportPhys string
ofportPatch string
gwBridge string
gwBridgeMAC net.HardwareAddr
// Map of service name to programmed iptables/OF rules
serviceInfo map[ktypes.NamespacedName]*serviceConfig
serviceInfoLock sync.Mutex
Expand Down Expand Up @@ -134,6 +135,10 @@ func (npw *nodePortWatcher) updateServiceFlowCache(service *kapi.Service, add, h
// if LGW mode and no uplink gateway bridge, ingress traffic enters host from node physical interface instead of the breth0. Skip adding these service flows to br-ex.
return nil
}

// CAUTION: when adding new flows where the in_port is ofPortPatch and the out_port is ofPortPhys, ensure
// that dl_src is included in match criteria!

npw.gatewayIPLock.Lock()
defer npw.gatewayIPLock.Unlock()
var cookie, key string
Expand Down Expand Up @@ -208,9 +213,9 @@ func (npw *nodePortWatcher) updateServiceFlowCache(service *kapi.Service, add, h
"actions=%s",
cookie, npw.ofportPhys, flowProtocol, svcPort.NodePort, actions),
// table=0, matches on return traffic from service nodePort and sends it out to primary node interface (br-ex)
fmt.Sprintf("cookie=%s, priority=110, in_port=%s, %s, tp_src=%d, "+
fmt.Sprintf("cookie=%s, priority=110, in_port=%s, dl_src=%s, %s, tp_src=%d, "+
"actions=output:%s",
cookie, npw.ofportPatch, flowProtocol, svcPort.NodePort, npw.ofportPhys)})
cookie, npw.ofportPatch, npw.ofm.defaultBridge.macAddress, flowProtocol, svcPort.NodePort, npw.ofportPhys)})
}
}
}
Expand Down Expand Up @@ -259,6 +264,10 @@ func (npw *nodePortWatcher) createLbAndExternalSvcFlows(service *kapi.Service, s
if net.ParseIP(externalIPOrLBIngressIP) == nil {
return fmt.Errorf("failed to parse %s IP: %q", ipType, externalIPOrLBIngressIP)
}

// CAUTION: when adding new flows where the in_port is ofPortPatch and the out_port is ofPortPhys, ensure
// that dl_src is included in match criteria!

flowProtocol := protocol
nwDst := "nw_dst"
nwSrc := "nw_src"
Expand Down Expand Up @@ -326,9 +335,9 @@ func (npw *nodePortWatcher) createLbAndExternalSvcFlows(service *kapi.Service, s
"actions=%s",
cookie, npw.ofportPhys, flowProtocol, nwDst, externalIPOrLBIngressIP, svcPort.Port, actions),
// table=0, matches on return traffic from service externalIP or LB ingress and sends it out to primary node interface (br-ex)
fmt.Sprintf("cookie=%s, priority=110, in_port=%s, %s, %s=%s, tp_src=%d, "+
fmt.Sprintf("cookie=%s, priority=110, in_port=%s, dl_src=%s, %s, %s=%s, tp_src=%d, "+
"actions=output:%s",
cookie, npw.ofportPatch, flowProtocol, nwSrc, externalIPOrLBIngressIP, svcPort.Port, npw.ofportPhys))
cookie, npw.ofportPatch, npw.gwBridgeMAC, flowProtocol, nwSrc, externalIPOrLBIngressIP, svcPort.Port, npw.ofportPhys))
}
npw.ofm.updateFlowCacheEntry(key, externalIPFlows)

Expand Down Expand Up @@ -1083,6 +1092,9 @@ func (ofm *openflowManager) updateBridgeFlowCache(subnets []*net.IPNet, extraIPs
ofm.defaultBridge.Lock()
defer ofm.defaultBridge.Unlock()

// CAUTION: when adding new flows where the in_port is ofPortPatch and the out_port is ofPortPhys, ensure
// that dl_src is included in match criteria!

dftFlows, err := flowsForDefaultBridge(ofm.defaultBridge, extraIPs)
if err != nil {
return err
Expand All @@ -1109,6 +1121,9 @@ func (ofm *openflowManager) updateBridgeFlowCache(subnets []*net.IPNet, extraIPs
}

func flowsForDefaultBridge(bridge *bridgeConfiguration, extraIPs []net.IP) ([]string, error) {
// CAUTION: when adding new flows where the in_port is ofPortPatch and the out_port is ofPortPhys, ensure
// that dl_src is included in match criteria!

ofPortPhys := bridge.ofPortPhys
bridgeMacAddress := bridge.macAddress.String()
ofPortPatch := bridge.ofPortPatch
Expand Down Expand Up @@ -1366,6 +1381,8 @@ func flowsForDefaultBridge(bridge *bridgeConfiguration, extraIPs []net.IP) ([]st
}

func commonFlows(subnets []*net.IPNet, bridge *bridgeConfiguration) ([]string, error) {
// CAUTION: when adding new flows where the in_port is ofPortPatch and the out_port is ofPortPhys, ensure
// that dl_src is included in match criteria!
ofPortPhys := bridge.ofPortPhys
bridgeMacAddress := bridge.macAddress.String()
ofPortPatch := bridge.ofPortPatch
Expand Down Expand Up @@ -1869,6 +1886,7 @@ func newNodePortWatcher(gwBridge *bridgeConfiguration, ofm *openflowManager,
ofportPhys: ofportPhys,
ofportPatch: ofportPatch,
gwBridge: gwBridge.bridgeName,
gwBridgeMAC: gwBridge.macAddress,
serviceInfo: make(map[ktypes.NamespacedName]*serviceConfig),
nodeIPManager: nodeIPManager,
ofm: ofm,
Expand Down
94 changes: 94 additions & 0 deletions go-controller/pkg/node/openflow_manager.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
package node

import (
"fmt"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types"
"net"
"os"
"regexp"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -147,3 +153,91 @@ func checkPorts(patchIntf, ofPortPatch, physIntf, ofPortPhys string) error {
}
return nil
}

// bootstrapOVSFlows handles ensuring basic, required flows are in place. This is done before OpenFlow manager has
// been created/started, and only done when there is just a NORMAL flow programmed and OVN/OVS is already setup
func bootstrapOVSFlows() error {
// see if patch port exists already
var portsOutput string
var stderr string
var err error
if portsOutput, stderr, err = util.RunOVSVsctl("--no-heading", "--data=bare", "--format=csv", "--columns",
"name", "list", "interface"); err != nil {
// bridge exists, but could not list ports
return fmt.Errorf("failed to list ports on existing bridge br-int: %s, %w", stderr, err)
}

var bridge string
var patchPort string
// patch-br-int-to-<bridge name>_<node>
r := regexp.MustCompile("^patch-(.*)_.*?-to-br-int$")
for _, line := range strings.Split(portsOutput, "\n") {
matches := r.FindStringSubmatch(line)
if len(matches) == 2 {
patchPort = matches[0]
bridge = matches[1]
break
}
}

if len(bridge) == 0 {
// bridge exists but no patch port was found
return nil
}

// get the current flows and if there is more than just default flow, we dont need to bootstrap as we already
// have flows
flows, err := util.GetOFFlows(bridge)
if err != nil {
return err
}
if len(flows) > 1 {
// more than 1 flow, assume the OVS has retained previous flows from previous running OVNK instance
return nil
}

// only have 1 flow, need to install required flows
klog.Infof("Default NORMAL flow installed on OVS bridge: %s, will bootstrap with required port security flows", bridge)

// Get ofport of patchPort
ofportPatch, stderr, err := util.GetOVSOfPort("get", "Interface", patchPort, "ofport")
if err != nil {
return fmt.Errorf("failed while waiting on patch port %q to be created by ovn-controller and "+
"while getting ofport. stderr: %q, error: %v", patchPort, stderr, err)
}

var bridgeMACAddress net.HardwareAddr
if config.OvnKubeNode.Mode == types.NodeModeDPU {
hostRep, err := util.GetDPUHostInterface(bridge)
if err != nil {
return err
}
bridgeMACAddress, err = util.GetSriovnetOps().GetRepresentorPeerMacAddress(hostRep)
if err != nil {
return err
}
} else {
bridgeMACAddress, err = util.GetOVSPortMACAddress(bridge)
if err != nil {
return errors.Wrapf(err, "failed to get MAC address for ovs port %s", bridge)
}
}

var dftFlows []string
// table 0, check packets coming from OVN have the correct mac address. Low priority flows that are a catch all
// for non-IP packets that would normally be forwarded with NORMAL action (table 0, priority 0 flow).
dftFlows = append(dftFlows,
fmt.Sprintf("cookie=%s, priority=10, table=0, in_port=%s, dl_src=%s, actions=output:NORMAL",
defaultOpenFlowCookie, ofportPatch, bridgeMACAddress))
dftFlows = append(dftFlows,
fmt.Sprintf("cookie=%s, priority=9, table=0, in_port=%s, actions=drop",
defaultOpenFlowCookie, ofportPatch))
dftFlows = append(dftFlows, "priority=0, table=0, actions=output:NORMAL")

_, stderr, err = util.ReplaceOFFlows(bridge, dftFlows)
if err != nil {
return fmt.Errorf("failed to add flows, error: %v, stderr, %s, flows: %s", err, stderr, dftFlows)
}

return nil
}
20 changes: 19 additions & 1 deletion go-controller/pkg/util/ovs.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,25 @@ func ReplaceOFFlows(bridgeName string, flows []string) (string, string, error) {
return strings.Trim(stdout.String(), "\" \n"), stderr.String(), err
}

// Get OpenFlow Port names or numbers for a given bridge
// GetOFFlows gets all the flows from a bridge
func GetOFFlows(bridgeName string) ([]string, error) {
stdout, stderr, err := RunOVSOfctl("dump-flows", bridgeName)
if err != nil {
return nil, fmt.Errorf("failed to get flows on bridge %q:, stderr: %q, error: %v",
bridgeName, stderr, err)
}

var flows []string
for _, line := range strings.Split(stdout, "\n") {
if strings.Contains(line, "cookie=") {
flows = append(flows, strings.TrimSpace(line))
}
}

return flows, nil
}

// GetOpenFlowPorts names or numbers for a given bridge
func GetOpenFlowPorts(bridgeName string, namedPorts bool) ([]string, error) {
stdout, stderr, err := RunOVSOfctl("show", bridgeName)
if err != nil {
Expand Down

0 comments on commit b629b17

Please sign in to comment.