Skip to content

Commit

Permalink
Merge pull request #7897 from pasanw/upstream-vxlan-changes
Browse files Browse the repository at this point in the history
VXLAN and VXLAN FV Refactors
  • Loading branch information
fasaxc committed Aug 7, 2023
2 parents 500f481 + 378327b commit 60f8a9e
Show file tree
Hide file tree
Showing 47 changed files with 1,688 additions and 1,361 deletions.
179 changes: 123 additions & 56 deletions felix/calc/l3_route_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type L3RouteResolver struct {
nodeNameToNodeInfo map[string]l3rrNodeInfo
blockToRoutes map[string]set.Set[nodenameRoute]
nodeRoutes nodeRoutes
allPools map[string]model.IPPool
allPools map[string]l3rrPoolInfo
workloadIDToCIDRs map[model.WorkloadEndpointKey][]cnet.IPNet
useNodeResourceUpdates bool
routeSource string
Expand All @@ -90,6 +90,14 @@ type l3rrNodeInfo struct {
Addresses []ip.Addr
}

type l3rrPoolInfo struct {
CIDR ip.CIDR
PoolType proto.IPPoolType
NATOutgoing bool
CrossSubnet bool
AWSSubnetID string
}

var (
emptyV4Addr ip.V4Addr
emptyV6Addr ip.V6Addr
Expand Down Expand Up @@ -173,7 +181,7 @@ func NewL3RouteResolver(hostname string, callbacks PipelineCallbacks, useNodeRes

nodeNameToNodeInfo: map[string]l3rrNodeInfo{},
blockToRoutes: map[string]set.Set[nodenameRoute]{},
allPools: map[string]model.IPPool{},
allPools: map[string]l3rrPoolInfo{},
workloadIDToCIDRs: map[model.WorkloadEndpointKey][]cnet.IPNet{},
useNodeResourceUpdates: useNodeResourceUpdates,
routeSource: routeSource,
Expand Down Expand Up @@ -267,7 +275,7 @@ func (c *L3RouteResolver) OnBlockUpdate(update api.Update) (_ bool) {
// We don't allow multiple blocks with the same CIDR, so no need to check
// for duplicates here. Look at the routes contributed by this block and determine if we
// need to send any updates.
newRoutes := c.routesFromBlock(update.Value.(*model.AllocationBlock))
newRoutes := c.routesFromBlock(update)
logrus.WithField("numRoutes", len(newRoutes)).Debug("IPAM block update")
cachedRoutes, ok := c.blockToRoutes[key]
if !ok {
Expand Down Expand Up @@ -592,9 +600,9 @@ func (c *L3RouteResolver) visitAllRoutes(trie *ip.CIDRTrie, v func(route nodenam
if len(ri.Refs) > 0 {
// From a Ref.
nnr.nodeName = ri.Refs[0].NodeName
} else if ri.Block.NodeName != "" {
} else if len(ri.Blocks) > 0 {
// From IPAM.
nnr.nodeName = ri.Block.NodeName
nnr.nodeName = ri.Blocks[0].NodeName
} else {
// No host associated with route.
return true
Expand All @@ -613,33 +621,43 @@ func (c *L3RouteResolver) OnPoolUpdate(update api.Update) (_ bool) {
k := update.Key.(model.IPPoolKey)
poolKey := k.String()
oldPool, oldPoolExists := c.allPools[poolKey]
oldPoolType := proto.IPPoolType_NONE
var poolCIDR ip.CIDR
if oldPoolExists {
// Need explicit oldPoolExists check so that we don't pass a zero-struct to poolTypeForPool.
oldPoolType = c.poolTypeForPool(&oldPool)
poolCIDR = ip.CIDRFromCalicoNet(oldPool.CIDR)
}
var newPool *model.IPPool
var newPool *l3rrPoolInfo
if update.Value != nil {
newPool = update.Value.(*model.IPPool)
newPool = c.getPoolInfo(update)
}
newPoolType := c.poolTypeForPool(newPool)
logCxt := logrus.WithFields(logrus.Fields{"oldType": oldPoolType, "newType": newPoolType})
if newPool != nil && newPoolType != proto.IPPoolType_NONE {
logCxt.Info("Pool is active")
if newPool != nil && newPool.PoolType != proto.IPPoolType_NONE {
logrus.WithFields(logrus.Fields{
"oldType": oldPool.PoolType,
"newType": newPool.PoolType,
"newPool": *newPool,
}).Info("Pool is active")
c.allPools[poolKey] = *newPool
poolCIDR = ip.CIDRFromCalicoNet(newPool.CIDR)
crossSubnet := newPool.IPIPMode == encap.CrossSubnet || newPool.VXLANMode == encap.CrossSubnet
c.trie.UpdatePool(poolCIDR, newPoolType, newPool.Masquerade, crossSubnet)
} else {
c.trie.UpdatePool(newPool.CIDR, newPool.PoolType, newPool.NATOutgoing, newPool.CrossSubnet)
} else if oldPoolExists {
delete(c.allPools, poolKey)
c.trie.RemovePool(poolCIDR)
c.trie.RemovePool(oldPool.CIDR)
}

return
}

func (c *L3RouteResolver) getPoolInfo(update api.Update) *l3rrPoolInfo {
var v1Pool *model.IPPool
switch v := update.Value.(type) {
case *model.IPPool:
v1Pool = v
default:
logrus.Panic("Encountered unexpected value type when handling Pool update")
}

return &l3rrPoolInfo{
CIDR: ip.CIDRFromCalicoNet(v1Pool.CIDR),
PoolType: c.poolTypeForPool(v1Pool),
NATOutgoing: v1Pool.Masquerade,
CrossSubnet: v1Pool.IPIPMode == encap.CrossSubnet || v1Pool.VXLANMode == encap.CrossSubnet,
}
}

func (c *L3RouteResolver) poolTypeForPool(pool *model.IPPool) proto.IPPoolType {
if pool == nil {
return proto.IPPoolType_NONE
Expand All @@ -655,7 +673,15 @@ func (c *L3RouteResolver) poolTypeForPool(pool *model.IPPool) proto.IPPoolType {

// routesFromBlock returns a list of routes which should exist based on the provided
// allocation block.
func (c *L3RouteResolver) routesFromBlock(b *model.AllocationBlock) map[string]nodenameRoute {
func (c *L3RouteResolver) routesFromBlock(update api.Update) map[string]nodenameRoute {
var b *model.AllocationBlock
switch k := update.Key.(type) {
case model.BlockKey:
b = update.Value.(*model.AllocationBlock)
default:
logrus.WithField("key", k).Panic("Unexpected key type for Block update")
}

routes := make(map[string]nodenameRoute)
for _, alloc := range b.NonAffineAllocations() {
if alloc.Host == "" {
Expand Down Expand Up @@ -724,20 +750,24 @@ func (c *L3RouteResolver) flush() {
poolAllowsCrossSubnet := false
for _, entry := range buf {
ri := entry.Data.(RouteInfo)
if ri.Pool.Type != proto.IPPoolType_NONE {
logCxt.WithField("type", ri.Pool.Type).Debug("Found containing IP pool.")
rt.IpPoolType = ri.Pool.Type
}
if ri.Pool.NATOutgoing {
logCxt.Debug("NAT outgoing enabled on this CIDR.")
rt.NatOutgoing = true
}
if ri.Pool.CrossSubnet {
logCxt.Debug("Cross-subnet enabled on this CIDR.")
poolAllowsCrossSubnet = true
if len(ri.Pools) > 0 {
// We only expect one Pool entry for any given CIDR. This constraint is upheld by the datastore.
if ri.Pools[0].Type != proto.IPPoolType_NONE {
logCxt.WithField("type", ri.Pools[0].Type).Debug("Found containing IP pool.")
rt.IpPoolType = ri.Pools[0].Type
}
if ri.Pools[0].NATOutgoing {
logCxt.Debug("NAT outgoing enabled on this CIDR.")
rt.NatOutgoing = true
}
if ri.Pools[0].CrossSubnet {
logCxt.Debug("Cross-subnet enabled on this CIDR.")
poolAllowsCrossSubnet = true
}
}
if ri.Block.NodeName != "" {
rt.DstNodeName = ri.Block.NodeName
if len(ri.Blocks) > 0 {
// We only expect one Block entry for any given CIDR. This constraint is upheld by the datastore.
rt.DstNodeName = ri.Blocks[0].NodeName
if rt.DstNodeName == c.myNodeName {
logCxt.Debug("Local workload route.")
rt.Type = proto.RouteType_LOCAL_WORKLOAD
Expand Down Expand Up @@ -924,9 +954,17 @@ func (r *RouteTrie) UpdatePool(cidr ip.CIDR, poolType proto.IPPoolType, natOutgo
"crossSubnet": crossSubnet,
}).Debug("IP pool update")
changed := r.updateCIDR(cidr, func(ri *RouteInfo) {
ri.Pool.Type = poolType
ri.Pool.NATOutgoing = natOutgoing
ri.Pool.CrossSubnet = crossSubnet
newPool := Pool{
Type: poolType,
NATOutgoing: natOutgoing,
CrossSubnet: crossSubnet,
}

if len(ri.Pools) == 0 {
ri.Pools = append(ri.Pools, newPool)
} else {
ri.Pools[0] = newPool
}
})
if !changed {
return
Expand All @@ -951,17 +989,33 @@ func (r *RouteTrie) MarkCIDRDirty(cidr ip.CIDR) {
}

func (r *RouteTrie) RemovePool(cidr ip.CIDR) {
r.UpdatePool(cidr, proto.IPPoolType_NONE, false, false)
changed := r.updateCIDR(cidr, func(ri *RouteInfo) {
// The datastore constraints guarantee that we only see one Pool for a CIDR.
ri.Pools = nil
})
if !changed {
return
}
r.markChildrenDirty(cidr)
}

func (r *RouteTrie) UpdateBlockRoute(cidr ip.CIDR, nodeName string) {
r.updateCIDR(cidr, func(ri *RouteInfo) {
ri.Block.NodeName = nodeName
block := Block{NodeName: nodeName}

if len(ri.Blocks) == 0 {
ri.Blocks = append(ri.Blocks, block)
} else {
ri.Blocks[0] = block
}
})
}

func (r *RouteTrie) RemoveBlockRoute(cidr ip.CIDR) {
r.UpdateBlockRoute(cidr, "")
r.updateCIDR(cidr, func(ri *RouteInfo) {
// The datastore constraints guarantee that we only see one Block for a CIDR.
ri.Blocks = nil
})
}

func (r *RouteTrie) AddHost(cidr ip.CIDR, nodeName string) {
Expand Down Expand Up @@ -1109,17 +1163,13 @@ func (r *RouteTrie) trieForCIDR(cidr ip.CIDR) *ip.CIDRTrie {
}

type RouteInfo struct {
// Pool contains information extracted from the IP pool that has this CIDR.
Pool struct {
Type proto.IPPoolType // Only set if this CIDR represents an IP pool
NATOutgoing bool
CrossSubnet bool
}
// Pools contains information extracted from the local and remote IP pools that have this CIDR.
// Since the datastore guarantees that pools have unique CIDRs within the cluster, we only expect one entry.
Pools []Pool

// Block contains route information extracted from IPAM blocks.
Block struct {
NodeName string // Set for each route that comes from an IPAM block.
}
// Blocks contains route information extracted from local and remote IPAM blocks.
//Since the datastore guarantees that blocks have unique CIDRs within the cluster, we only expect one entry.
Blocks []Block

// Host contains information extracted from the node/host config updates.
Host struct {
Expand All @@ -1142,6 +1192,16 @@ const (
RefTypeVXLAN
)

type Pool struct {
Type proto.IPPoolType // Only set if this CIDR represents an IP pool
NATOutgoing bool
CrossSubnet bool
}

type Block struct {
NodeName string // Set for each route that comes from an IPAM block.
}

type Ref struct {
// Count of Refs that have this CIDR. Normally, for WEPs this will be 0 or 1 but Felix has to be tolerant
// to bad data (two Refs with the same CIDR) so we do ref counting. For tunnel IPs, multiple tunnels may share the
Expand All @@ -1160,10 +1220,9 @@ type Ref struct {
// this CIDR was previously sent. If IsValidRoute() returns false but WasSent is true then we need to withdraw
// the route.
func (r RouteInfo) IsValidRoute() bool {
return r.Pool.Type != proto.IPPoolType_NONE ||
r.Block.NodeName != "" ||
return (len(r.Pools) > 0) ||
len(r.Blocks) > 0 ||
len(r.Host.NodeNames) > 0 ||
r.Pool.NATOutgoing ||
len(r.Refs) > 0
}

Expand All @@ -1175,6 +1234,14 @@ func (r RouteInfo) Copy() RouteInfo {
cp.Refs = make([]Ref, len(r.Refs))
copy(cp.Refs, r.Refs)
}
if len(r.Pools) != 0 {
cp.Pools = make([]Pool, len(r.Pools))
copy(cp.Pools, r.Pools)
}
if len(r.Blocks) != 0 {
cp.Blocks = make([]Block, len(r.Blocks))
copy(cp.Blocks, r.Blocks)
}
return cp
}

Expand Down
27 changes: 14 additions & 13 deletions felix/dataplane/linux/vxlan_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ func (m *vxlanManager) OnUpdate(protoBufMsg interface{}) {
// In case the route changes type to one we no longer care about...
m.deleteRoute(msg.Dst)

// Process remote IPAM blocks.
if msg.Type == proto.RouteType_REMOTE_WORKLOAD && msg.IpPoolType == proto.IPPoolType_VXLAN {
m.logCtx.WithField("msg", msg).Debug("VXLAN data plane received route update")
m.routesByDest[msg.Dst] = msg
Expand Down Expand Up @@ -420,7 +421,7 @@ func (m *vxlanManager) CompleteDeferredWork() error {
// known VTEPs.
var l2routes []routetable.L2Target
for _, u := range m.vtepsByNode {
mac, err := m.parseMacForIPVersion(u)
mac, err := parseMacForIPVersion(u, m.ipVersion)
if err != nil {
// Don't block programming of other VTEPs if somehow we receive one with a bad mac.
m.logCtx.WithError(err).Warn("Failed to parse VTEP mac address")
Expand Down Expand Up @@ -604,17 +605,6 @@ func (m *vxlanManager) getParentInterface(localVTEP *proto.VXLANTunnelEndpointUp
return nil, fmt.Errorf("Unable to find parent interface with address %s", parentDeviceIP)
}

func (m *vxlanManager) parseMacForIPVersion(vtep *proto.VXLANTunnelEndpointUpdate) (net.HardwareAddr, error) {
switch m.ipVersion {
case 4:
return net.ParseMAC(vtep.Mac)
case 6:
return net.ParseMAC(vtep.MacV6)
default:
return nil, fmt.Errorf("Invalid IP version")
}
}

// configureVXLANDevice ensures the VXLAN tunnel device is up and configured correctly.
func (m *vxlanManager) configureVXLANDevice(mtu int, localVTEP *proto.VXLANTunnelEndpointUpdate, xsumBroken bool) error {
logCtx := m.logCtx.WithFields(logrus.Fields{"device": m.vxlanDevice})
Expand All @@ -623,7 +613,7 @@ func (m *vxlanManager) configureVXLANDevice(mtu int, localVTEP *proto.VXLANTunne
if err != nil {
return err
}
mac, err := m.parseMacForIPVersion(localVTEP)
mac, err := parseMacForIPVersion(localVTEP, m.ipVersion)
if err != nil {
return err
}
Expand Down Expand Up @@ -797,3 +787,14 @@ func vxlanLinksIncompat(l1, l2 netlink.Link) string {

return ""
}

func parseMacForIPVersion(vtep *proto.VXLANTunnelEndpointUpdate, ipVersion uint8) (net.HardwareAddr, error) {
switch ipVersion {
case 4:
return net.ParseMAC(vtep.Mac)
case 6:
return net.ParseMAC(vtep.MacV6)
default:
return nil, fmt.Errorf("Invalid IP version")
}
}
Loading

0 comments on commit 60f8a9e

Please sign in to comment.