Skip to content

Commit

Permalink
BPF: fix that no IP set filter was set at start-of-day. (#8101)
Browse files Browse the repository at this point in the history
In BPF mode, we don't want to program linux IP sets
unless we're using "untracked" policy, which is partially
implemented in iptables.  The filter to prevent IP set
programming was only set after a policy churn.

- Defer calculation of the filter until CompleteDeferredWork.
- Make sure the "dirty" flag is set at start of day so that
  the programming triggers before the first dataplane apply().
- Fix up tests to deal with lack of IP sets in BPF mode.

Includes cleanup of IP set reading functions.
  • Loading branch information
fasaxc committed Oct 20, 2023
1 parent 4929a58 commit 1e5c503
Show file tree
Hide file tree
Showing 10 changed files with 221 additions and 142 deletions.
5 changes: 1 addition & 4 deletions felix/dataplane/linux/int_dataplane.go
Expand Up @@ -604,10 +604,7 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
tc.CleanUpProgramsAndPins()
} else {
// In BPF mode we still use iptables for raw egress policy.
dp.RegisterManager(newRawEgressPolicyManager(rawTableV4, ruleRenderer, 4,
func(neededIPSets set.Set[string]) {
ipSetsV4.SetFilter(neededIPSets)
}))
dp.RegisterManager(newRawEgressPolicyManager(rawTableV4, ruleRenderer, 4, ipSetsV4.SetFilter))
}

interfaceRegexes := make([]string, len(config.RulesConfig.WorkloadIfacePrefixes))
Expand Down
112 changes: 64 additions & 48 deletions felix/dataplane/linux/policy_mgr.go
@@ -1,4 +1,4 @@
// Copyright (c) 2016-2021 Tigera, Inc. All rights reserved.
// Copyright (c) 2016-2023 Tigera, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,14 +29,15 @@ import (
// policyManager simply renders policy/profile updates into iptables.Chain objects and sends
// them to the dataplane layer.
type policyManager struct {
rawTable IptablesTable
mangleTable IptablesTable
filterTable IptablesTable
ruleRenderer policyRenderer
ipVersion uint8
rawEgressOnly bool
neededIPSets map[proto.PolicyID]set.Set[string]
ipSetsCallback func(neededIPSets set.Set[string])
rawTable IptablesTable
mangleTable IptablesTable
filterTable IptablesTable
ruleRenderer policyRenderer
ipVersion uint8
rawEgressOnly bool
ipSetFilterDirty bool // Only used in "raw only" mode.
neededIPSets map[proto.PolicyID]set.Set[string]
ipSetsCallback func(neededIPSets set.Set[string])
}

type policyRenderer interface {
Expand All @@ -57,38 +58,25 @@ func newPolicyManager(rawTable, mangleTable, filterTable IptablesTable, ruleRend
func newRawEgressPolicyManager(rawTable IptablesTable, ruleRenderer policyRenderer, ipVersion uint8,
ipSetsCallback func(neededIPSets set.Set[string])) *policyManager {
return &policyManager{
rawTable: rawTable,
mangleTable: iptables.NewNoopTable(),
filterTable: iptables.NewNoopTable(),
ruleRenderer: ruleRenderer,
ipVersion: ipVersion,
rawEgressOnly: true,
neededIPSets: make(map[proto.PolicyID]set.Set[string]),
ipSetsCallback: ipSetsCallback,
rawTable: rawTable,
mangleTable: iptables.NewNoopTable(),
filterTable: iptables.NewNoopTable(),
ruleRenderer: ruleRenderer,
ipVersion: ipVersion,
rawEgressOnly: true,
// Make sure we set the filter at start-of-day, even if there are no policies.
ipSetFilterDirty: true,
neededIPSets: make(map[proto.PolicyID]set.Set[string]),
ipSetsCallback: ipSetsCallback,
}
}

func (m *policyManager) mergeNeededIPSets(id *proto.PolicyID, neededIPSets set.Set[string]) {
if neededIPSets != nil {
m.neededIPSets[*id] = neededIPSets
} else {
delete(m.neededIPSets, *id)
}
merged := set.New[string]()
for _, ipSets := range m.neededIPSets {
ipSets.Iter(func(item string) error {
merged.Add(item)
return nil
})
}
m.ipSetsCallback(merged)
}

func (m *policyManager) OnUpdate(msg interface{}) {
switch msg := msg.(type) {
case *proto.ActivePolicyUpdate:
if m.rawEgressOnly && !msg.Policy.Untracked {
log.WithField("id", msg.Id).Debug("Ignore non-untracked policy")
log.WithField("id", msg.Id).Debug("Clean up non-untracked policy.")
m.cleanUpPolicy(msg.Id)
return
}
log.WithField("id", msg.Id).Debug("Updating policy chains")
Expand All @@ -103,7 +91,7 @@ func (m *policyManager) OnUpdate(msg interface{}) {
}
}
chains = filteredChains
m.mergeNeededIPSets(msg.Id, neededIPSets)
m.updateNeededIPSets(msg.Id, neededIPSets)
}
// We can't easily tell whether the policy is in use in a particular table, and, if the policy
// type gets changed it may move between tables. Hence, we put the policy into all tables.
Expand All @@ -113,18 +101,7 @@ func (m *policyManager) OnUpdate(msg interface{}) {
m.filterTable.UpdateChains(chains)
case *proto.ActivePolicyRemove:
log.WithField("id", msg.Id).Debug("Removing policy chains")
if m.rawEgressOnly {
m.mergeNeededIPSets(msg.Id, nil)
}
inName := rules.PolicyChainName(rules.PolicyInboundPfx, msg.Id)
outName := rules.PolicyChainName(rules.PolicyOutboundPfx, msg.Id)
// As above, we need to clean up in all the tables.
m.filterTable.RemoveChainByName(inName)
m.filterTable.RemoveChainByName(outName)
m.mangleTable.RemoveChainByName(inName)
m.mangleTable.RemoveChainByName(outName)
m.rawTable.RemoveChainByName(inName)
m.rawTable.RemoveChainByName(outName)
m.cleanUpPolicy(msg.Id)
case *proto.ActiveProfileUpdate:
if m.rawEgressOnly {
log.WithField("id", msg.Id).Debug("Ignore non-untracked profile")
Expand All @@ -144,7 +121,46 @@ func (m *policyManager) OnUpdate(msg interface{}) {
}
}

func (m *policyManager) cleanUpPolicy(id *proto.PolicyID) {
if m.rawEgressOnly {
m.updateNeededIPSets(id, nil)
}
inName := rules.PolicyChainName(rules.PolicyInboundPfx, id)
outName := rules.PolicyChainName(rules.PolicyOutboundPfx, id)
// As above, we need to clean up in all the tables.
m.filterTable.RemoveChainByName(inName)
m.filterTable.RemoveChainByName(outName)
m.mangleTable.RemoveChainByName(inName)
m.mangleTable.RemoveChainByName(outName)
m.rawTable.RemoveChainByName(inName)
m.rawTable.RemoveChainByName(outName)
}

func (m *policyManager) updateNeededIPSets(id *proto.PolicyID, neededIPSets set.Set[string]) {
if neededIPSets != nil {
m.neededIPSets[*id] = neededIPSets
} else {
delete(m.neededIPSets, *id)
}
m.ipSetFilterDirty = true
}

func (m *policyManager) CompleteDeferredWork() error {
// Nothing to do, we don't defer any work.
if !m.rawEgressOnly {
return nil
}
if !m.ipSetFilterDirty {
return nil
}
m.ipSetFilterDirty = false

merged := set.New[string]()
for _, ipSets := range m.neededIPSets {
ipSets.Iter(func(item string) error {
merged.Add(item)
return nil
})
}
m.ipSetsCallback(merged)
return nil
}
31 changes: 27 additions & 4 deletions felix/dataplane/linux/policy_mgr_test.go
Expand Up @@ -250,12 +250,15 @@ var _ = Describe("Policy manager", func() {

var _ = Describe("Raw egress policy manager", func() {
var (
policyMgr *policyManager
rawTable *mockTable
neededIPSets set.Set[string]
policyMgr *policyManager
rawTable *mockTable
neededIPSets set.Set[string]
numCallbackCalls int
)

BeforeEach(func() {
neededIPSets = nil
numCallbackCalls = 0
rawTable = newMockTable("raw")
ruleRenderer := rules.NewRenderer(rules.Config{
IPSetConfigV4: ipsets.NewIPVersionConfig(ipsets.IPFamilyV4, "cali", nil, nil),
Expand All @@ -267,7 +270,27 @@ var _ = Describe("Raw egress policy manager", func() {
IptablesMarkEndpoint: 0xff00,
IptablesMarkNonCaliEndpoint: 0x0100,
})
policyMgr = newRawEgressPolicyManager(rawTable, ruleRenderer, 4, func(ipSets set.Set[string]) { neededIPSets = ipSets })
policyMgr = newRawEgressPolicyManager(
rawTable,
ruleRenderer,
4,
func(ipSets set.Set[string]) {
neededIPSets = ipSets
numCallbackCalls++
})
})

It("correctly reports no IP sets at start of day", func() {
err := policyMgr.CompleteDeferredWork()
Expect(err).NotTo(HaveOccurred())
Expect(neededIPSets).ToNot(BeNil())
Expect(neededIPSets.Len()).To(BeZero())
Expect(numCallbackCalls).To(Equal(1))

By("Not repeating the callback.")
err = policyMgr.CompleteDeferredWork()
Expect(err).NotTo(HaveOccurred())
Expect(numCallbackCalls).To(Equal(1))
})

It("correctly reports needed IP sets", func() {
Expand Down
50 changes: 50 additions & 0 deletions felix/fv/containers/containers.go
Expand Up @@ -717,6 +717,56 @@ func (c *Container) AttachTCPDump(iface string) *tcpdump.TCPDump {
return tcpdump.AttachUnavailable(c.GetID(), iface)
}

// IPSetSize returns the size of the given (netfilter) IP set (or 0 is it is not present).
func (c *Container) IPSetSize(ipSetName string) int {
// If we later optimize this to use 'ipset list <name>' note that the
// <name> variant fails with non-zero RC if the ipset doesn't exist.
return c.IPSetSizes()[ipSetName]
}

func (c *Container) IPSetSizeFn(ipSetName string) func() int {
return func() int {
return c.IPSetSize(ipSetName)
}
}

func (c *Container) IPSetSizes() map[string]int {
args := []string{"ipset", "list"}
ipsetsOutput, err := c.ExecOutput(args...)
Expect(err).NotTo(HaveOccurred())
numMembers := map[string]int{}
currentName := ""
membersSeen := false
log.WithField("ipsets", ipsetsOutput).Info("IP sets state")
for _, line := range strings.Split(ipsetsOutput, "\n") {
log.WithField("line", line).Debug("Parsing line")
if strings.HasPrefix(line, "Name:") {
currentName = strings.Split(line, " ")[1]
membersSeen = false
} else if strings.HasPrefix(line, "Members:") {
membersSeen = true
} else if membersSeen && len(strings.TrimSpace(line)) > 0 {
log.Debugf("IP set %s has member %s", currentName, line)
numMembers[currentName]++
}
}
return numMembers
}

func (c *Container) IPSetNames() []string {
out, err := c.ExecOutput("ipset", "list", "-name")
Expect(err).NotTo(HaveOccurred())
out = strings.Trim(out, "\n")
if out == "" {
return nil
}
return strings.Split(out, "\n")
}

func (c *Container) NumIPSets() int {
return len(c.IPSetNames())
}

// NumTCBPFProgs Returns the number of TC BPF programs attached to the given interface. Only direct-action
// programs are listed (i.e. the type that we use).
func (c *Container) NumTCBPFProgs(ifaceName string) int {
Expand Down
11 changes: 11 additions & 0 deletions felix/fv/donottrack_test.go
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/projectcalico/calico/felix/fv/connectivity"
"github.com/projectcalico/calico/felix/fv/utils"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -165,6 +166,11 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ do-not-track policy tests;
host0Selector := fmt.Sprintf("name == 'eth0-%s'", tc.Felixes[0].Name)
host1Selector := fmt.Sprintf("name == 'eth0-%s'", tc.Felixes[1].Name)

if BPFMode() {
By("Having no Linux IP sets")
Consistently(tc.Felixes[0].IPSetNames, "2s", "1s").Should(BeEmpty())
}

By("Having connectivity after installing bidirectional policies")
host0Pol := api.NewGlobalNetworkPolicy()
host0Pol.Name = "host-0-pol"
Expand Down Expand Up @@ -216,6 +222,11 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ do-not-track policy tests;

expectFullConnectivity()

if BPFMode() {
By("Having a Linux IP set for the egress policy")
Expect(tc.Felixes[0].IPSetNames()).To(ConsistOf(utils.IPSetNameForSelector(4, host1Selector)))
}

By("Having only failsafe connectivity after replacing host-0's egress rules with Deny")
// Since there's no conntrack, removing rules in one direction is enough to prevent
// connectivity in either direction.
Expand Down
54 changes: 16 additions & 38 deletions felix/fv/ipip_test.go
Expand Up @@ -370,9 +370,7 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ IPIP topology before adding
// Removing the BGP config triggers a Felix restart and Felix has a 2s timer during
// a config restart to ensure that it doesn't tight loop. Wait for the ipset to be
// updated as a signal that Felix has restarted.
Eventually(func() int {
return getNumIPSetMembers(f.Container, "cali40all-hosts-net")
}, "5s", "200ms").Should(BeZero())
Eventually(f.IPSetSizeFn("cali40all-hosts-net"), "5s", "200ms").Should(BeZero())
}
}
})
Expand Down Expand Up @@ -420,15 +418,18 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ IPIP topology before adding
externalClient.Stop()
})

It("should have all-hosts-net ipset configured with the external hosts and workloads connect", func() {
It("should allow IPIP to external client iff it is in ExternalNodesCIDRList", func() {

By("testing that ext client ipip does not work if not part of ExternalNodesCIDRList")

// Make sure that only the internal nodes are present in the ipset
for _, f := range tc.Felixes {
Eventually(func() int {
return getNumIPSetMembers(f.Container, "cali40all-hosts-net")
}, "5s", "200ms").Should(Equal(2))
if BPFMode() {
Eventually(f.BPFRoutes, "10s").Should(ContainSubstring(f.IP))
Consistently(f.BPFRoutes).ShouldNot(ContainSubstring(externalClient.IP))
} else {
// Make sure that only the internal nodes are present in the ipset
Eventually(f.IPSetSizeFn("cali40all-hosts-net"), "5s", "200ms").Should(Equal(2))
}
}

cc.ExpectNone(externalClient, w[0])
Expand Down Expand Up @@ -461,46 +462,23 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ IPIP topology before adding

// Wait for the config to take
for _, f := range tc.Felixes {
Eventually(func() int {
return getNumIPSetMembers(f.Container, "cali40all-hosts-net")
}, "15s", "200ms").Should(Equal(3))
if BPFMode() {
Eventually(f.BPFRoutes, "10s").Should(ContainSubstring(externalClient.IP))
Expect(f.IPSetSize("cali40all-hosts-net")).To(BeZero(),
"BPF mode shouldn't program IP sets")
} else {
Eventually(f.IPSetSizeFn("cali40all-hosts-net"), "15s", "200ms").Should(Equal(3))
}
}

By("testing that the ext client can connect via ipip")

cc.ResetExpectations()
cc.ExpectSome(externalClient, w[0])
cc.CheckConnectivity()
})
})
})

func getNumIPSetMembers(c *containers.Container, ipSetName string) int {
return getIPSetCounts(c)[ipSetName]
}

func getIPSetCounts(c *containers.Container) map[string]int {
ipsetsOutput, err := c.ExecOutput("ipset", "list")
Expect(err).NotTo(HaveOccurred())
numMembers := map[string]int{}
currentName := ""
membersSeen := false
log.WithField("ipsets", ipsetsOutput).Info("IP sets state")
for _, line := range strings.Split(ipsetsOutput, "\n") {
log.WithField("line", line).Debug("Parsing line")
if strings.HasPrefix(line, "Name:") {
currentName = strings.Split(line, " ")[1]
membersSeen = false
} else if strings.HasPrefix(line, "Members:") {
membersSeen = true
} else if membersSeen && len(strings.TrimSpace(line)) > 0 {
log.Debugf("IP set %s has member %s", currentName, line)
numMembers[currentName]++
}
}
return numMembers
}

type createK8sServiceWithoutKubeProxyArgs struct {
infra infrastructure.DatastoreInfra
felix *infrastructure.Felix
Expand Down

0 comments on commit 1e5c503

Please sign in to comment.