Skip to content

Commit

Permalink
Enhance bundle to support both group and flow modifications (antrea-i…
Browse files Browse the repository at this point in the history
  • Loading branch information
wenyingd committed Apr 13, 2020
1 parent 78108bb commit e070043
Show file tree
Hide file tree
Showing 14 changed files with 376 additions and 69 deletions.
2 changes: 1 addition & 1 deletion hack/update-codegen-dockerized.sh
Expand Up @@ -55,7 +55,7 @@ $GOPATH/bin/openapi-gen \
MOCKGEN_TARGETS=(
"pkg/agent/cniserver/ipam IPAMDriver"
"pkg/agent/interfacestore InterfaceStore"
"pkg/agent/openflow Client,FlowOperations"
"pkg/agent/openflow Client,OFEntryOperations"
"pkg/agent/route Interface"
"pkg/ovs/openflow Bridge,Table,Flow,Action,FlowBuilder"
"pkg/ovs/ovsconfig OVSBridgeClient"
Expand Down
28 changes: 14 additions & 14 deletions pkg/agent/openflow/client.go
Expand Up @@ -139,7 +139,7 @@ func (c *client) addFlows(cache *flowCategoryCache, flowCacheKey string, flows [
klog.V(2).Infof("Flows with cache key %s are already installed", flowCacheKey)
return nil
}
err := c.flowOperations.AddAll(flows)
err := c.ofEntryOperations.AddAll(flows)
if err != nil {
return err
}
Expand All @@ -165,7 +165,7 @@ func (c *client) deleteFlows(cache *flowCategoryCache, flowCacheKey string) erro
for _, flow := range fCache {
delFlows = append(delFlows, flow)
}
if err := c.flowOperations.DeleteAll(delFlows); err != nil {
if err := c.ofEntryOperations.DeleteAll(delFlows); err != nil {
return err
}
cache.Delete(flowCacheKey)
Expand Down Expand Up @@ -235,7 +235,7 @@ func (c *client) UninstallPodFlows(containerID string) error {

func (c *client) InstallClusterServiceCIDRFlows(serviceNet *net.IPNet, gatewayMAC net.HardwareAddr, gatewayOFPort uint32) error {
flow := c.serviceCIDRDNATFlow(serviceNet, gatewayMAC, gatewayOFPort, cookie.Service)
if err := c.flowOperations.Add(flow); err != nil {
if err := c.ofEntryOperations.Add(flow); err != nil {
return err
}
c.clusterServiceCIDRFlows = []binding.Flow{flow}
Expand All @@ -261,7 +261,7 @@ func (c *client) InstallGatewayFlows(gatewayAddr net.IP, gatewayMAC net.Hardware
flows = append(flows, c.reEntranceBypassCTFlow(gatewayOFPort, gatewayOFPort, cookie.Default))
}

if err := c.flowOperations.AddAll(flows); err != nil {
if err := c.ofEntryOperations.AddAll(flows); err != nil {
return err
}
c.gatewayFlows = flows
Expand All @@ -270,32 +270,32 @@ func (c *client) InstallGatewayFlows(gatewayAddr net.IP, gatewayMAC net.Hardware

func (c *client) InstallDefaultTunnelFlows(tunnelOFPort uint32) error {
flow := c.tunnelClassifierFlow(tunnelOFPort, cookie.Default)
if err := c.flowOperations.Add(flow); err != nil {
if err := c.ofEntryOperations.Add(flow); err != nil {
return err
}
c.defaultTunnelFlows = []binding.Flow{flow}
return nil
}

func (c *client) initialize() error {
if err := c.flowOperations.AddAll(c.defaultFlows()); err != nil {
if err := c.ofEntryOperations.AddAll(c.defaultFlows()); err != nil {
return fmt.Errorf("failed to install default flows: %v", err)
}
if err := c.flowOperations.Add(c.arpNormalFlow(cookie.Default)); err != nil {
if err := c.ofEntryOperations.Add(c.arpNormalFlow(cookie.Default)); err != nil {
return fmt.Errorf("failed to install arp normal flow: %v", err)
}
if err := c.flowOperations.Add(c.l2ForwardOutputFlow(cookie.Default)); err != nil {
if err := c.ofEntryOperations.Add(c.l2ForwardOutputFlow(cookie.Default)); err != nil {
return fmt.Errorf("failed to install L2 forward output flows: %v", err)
}
if err := c.flowOperations.AddAll(c.connectionTrackFlows(cookie.Default)); err != nil {
if err := c.ofEntryOperations.AddAll(c.connectionTrackFlows(cookie.Default)); err != nil {
return fmt.Errorf("failed to install connection track flows: %v", err)
}
if err := c.flowOperations.AddAll(c.establishedConnectionFlows(cookie.Default)); err != nil {
if err := c.ofEntryOperations.AddAll(c.establishedConnectionFlows(cookie.Default)); err != nil {
return fmt.Errorf("failed to install flows to skip established connections: %v", err)
}

if c.encapMode.SupportsNoEncap() {
if err := c.flowOperations.Add(c.l2ForwardOutputReentInPortFlow(c.gatewayPort, cookie.Default)); err != nil {
if err := c.ofEntryOperations.Add(c.l2ForwardOutputReentInPortFlow(c.gatewayPort, cookie.Default)); err != nil {
return fmt.Errorf("failed to install L2 forward same in-port and out-port flow: %v", err)
}
}
Expand Down Expand Up @@ -347,7 +347,7 @@ func (c *client) ReplayFlows() {
for _, flow := range flows {
flow.Reset()
}
if err := c.flowOperations.AddAll(flows); err != nil {
if err := c.ofEntryOperations.AddAll(flows); err != nil {
klog.Errorf("Error when replaying fixed flows: %v", err)
}

Expand All @@ -366,7 +366,7 @@ func (c *client) ReplayFlows() {
cachedFlows = append(cachedFlows, flow)
}

if err := c.flowOperations.AddAll(cachedFlows); err != nil {
if err := c.ofEntryOperations.AddAll(cachedFlows); err != nil {
klog.Errorf("Error when replaying cached flows: %v", err)
}
return true
Expand Down Expand Up @@ -400,7 +400,7 @@ func (c *client) setupPolicyOnlyFlows() error {
// Replies any ARP request with the same global virtual MAC.
c.arpResponderStaticFlow(cookie.Default),
}
if err := c.flowOperations.AddAll(flows); err != nil {
if err := c.ofEntryOperations.AddAll(flows); err != nil {
return fmt.Errorf("failed to setup policy-only flows: %w", err)
}
return nil
Expand Down
16 changes: 8 additions & 8 deletions pkg/agent/openflow/client_test.go
Expand Up @@ -83,12 +83,12 @@ func TestIdempotentFlowInstallation(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
m := oftest.NewMockFlowOperations(ctrl)
m := oftest.NewMockOFEntryOperations(ctrl)
ofClient := NewClient(bridgeName)
client := ofClient.(*client)
client.cookieAllocator = cookie.NewAllocator(0)
client.nodeConfig = &config.NodeConfig{}
client.flowOperations = m
client.ofEntryOperations = m

m.EXPECT().AddAll(gomock.Any()).Return(nil).Times(1)
// Installing the flows should succeed, and all the flows should be added into the cache.
Expand All @@ -111,12 +111,12 @@ func TestIdempotentFlowInstallation(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
m := oftest.NewMockFlowOperations(ctrl)
m := oftest.NewMockOFEntryOperations(ctrl)
ofClient := NewClient(bridgeName)
client := ofClient.(*client)
client.cookieAllocator = cookie.NewAllocator(0)
client.nodeConfig = &config.NodeConfig{}
client.flowOperations = m
client.ofEntryOperations = m

errorCall := m.EXPECT().AddAll(gomock.Any()).Return(errors.New("Bundle error")).Times(1)
m.EXPECT().AddAll(gomock.Any()).Return(nil).After(errorCall)
Expand Down Expand Up @@ -152,12 +152,12 @@ func TestFlowInstallationFailed(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
m := oftest.NewMockFlowOperations(ctrl)
m := oftest.NewMockOFEntryOperations(ctrl)
ofClient := NewClient(bridgeName)
client := ofClient.(*client)
client.cookieAllocator = cookie.NewAllocator(0)
client.nodeConfig = &config.NodeConfig{}
client.flowOperations = m
client.ofEntryOperations = m

// We generate an error for AddAll call.
m.EXPECT().AddAll(gomock.Any()).Return(errors.New("Bundle error"))
Expand Down Expand Up @@ -186,12 +186,12 @@ func TestConcurrentFlowInstallation(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
m := oftest.NewMockFlowOperations(ctrl)
m := oftest.NewMockOFEntryOperations(ctrl)
ofClient := NewClient(bridgeName)
client := ofClient.(*client)
client.cookieAllocator = cookie.NewAllocator(0)
client.nodeConfig = &config.NodeConfig{}
client.flowOperations = m
client.ofEntryOperations = m

var concurrentCalls atomic.Value // set to true if we observe concurrent calls
timeoutCh := make(chan struct{})
Expand Down
6 changes: 3 additions & 3 deletions pkg/agent/openflow/network_policy.go
Expand Up @@ -730,7 +730,7 @@ func (c *client) InstallPolicyRuleFlows(ruleID uint32, rule *types.PolicyRule) e
actionFlows = append(actionFlows, flow)
}
}
if err := c.flowOperations.AddAll(actionFlows); err != nil {
if err := c.ofEntryOperations.AddAll(actionFlows); err != nil {
return nil
}
// Add the action flows after the Openflow entries are installed on the OVS bridge successfully.
Expand Down Expand Up @@ -909,7 +909,7 @@ func (c *client) UninstallPolicyRuleFlows(ruleID uint32) error {
}

// Delete action flows from the OVS bridge.
if err := c.flowOperations.DeleteAll(conj.actionFlows); err != nil {
if err := c.ofEntryOperations.DeleteAll(conj.actionFlows); err != nil {
return err
}

Expand Down Expand Up @@ -955,7 +955,7 @@ func (c *client) replayPolicyFlows() {
for _, ctx := range c.globalConjMatchFlowCache {
addMatchFlows(ctx)
}
if err := c.flowOperations.AddAll(flows); err != nil {
if err := c.ofEntryOperations.AddAll(flows); err != nil {
klog.Errorf("Error when replaying flows: %v", err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/openflow/network_policy_test.go
Expand Up @@ -415,9 +415,9 @@ func prepareClient(ctrl *gomock.Controller) *client {
bridge: bridge,
}
c.cookieAllocator = cookie.NewAllocator(0)
m := oftest.NewMockFlowOperations(ctrl)
m := oftest.NewMockOFEntryOperations(ctrl)
m.EXPECT().AddAll(gomock.Any()).Return(nil).AnyTimes()
m.EXPECT().DeleteAll(gomock.Any()).Return(nil).AnyTimes()
c.flowOperations = m
c.ofEntryOperations = m
return c
}
18 changes: 14 additions & 4 deletions pkg/agent/openflow/pipeline.go
Expand Up @@ -96,12 +96,14 @@ var (
ReentranceMAC, _ = net.ParseMAC("de:ad:be:ef:de:ad")
)

type FlowOperations interface {
type OFEntryOperations interface {
Add(flow binding.Flow) error
Modify(flow binding.Flow) error
Delete(flow binding.Flow) error
AddAll(flows []binding.Flow) error
DeleteAll(flows []binding.Flow) error
AddOFEntries(ofEntries []binding.OFEntry) error
DeleteOFEntries(ofEntries []binding.OFEntry) error
}

type flowCache map[string]binding.Flow
Expand All @@ -119,9 +121,9 @@ type client struct {
// "fixed" flows installed by the agent after initialization and which do not change during
// the lifetime of the client.
gatewayFlows, clusterServiceCIDRFlows, defaultTunnelFlows []binding.Flow
// flowOperations is a wrapper interface for flow Add / Modify / Delete operations. It
// ofEntryOperations is a wrapper interface for OpenFlow entry Add / Modify / Delete operations. It
// enables convenient mocking in unit tests.
flowOperations FlowOperations
ofEntryOperations OFEntryOperations
// policyCache is a map from PolicyRule ID to policyRuleConjunction. It's guaranteed that one policyRuleConjunction
// is processed by at most one goroutine at any given time.
policyCache sync.Map
Expand Down Expand Up @@ -156,6 +158,14 @@ func (c *client) DeleteAll(flows []binding.Flow) error {
return c.bridge.AddFlowsInBundle(nil, nil, flows)
}

func (c *client) AddOFEntries(ofEntries []binding.OFEntry) error {
return c.bridge.AddOFEntriesInBundle(ofEntries, nil, nil)
}

func (c *client) DeleteOFEntries(ofEntries []binding.OFEntry) error {
return c.bridge.AddOFEntriesInBundle(nil, nil, ofEntries)
}

// defaultFlows generates the default flows of all tables.
func (c *client) defaultFlows() (flows []binding.Flow) {
for _, table := range c.pipeline {
Expand Down Expand Up @@ -650,6 +660,6 @@ func NewClient(bridgeName string) Client {
policyCache: sync.Map{},
globalConjMatchFlowCache: map[string]*conjMatchFlowContext{},
}
c.flowOperations = c
c.ofEntryOperations = c
return c
}

0 comments on commit e070043

Please sign in to comment.