Skip to content

Commit

Permalink
Initial nftables prototype implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
caseydavenport committed May 2, 2024
1 parent bcd6c93 commit 92b2660
Show file tree
Hide file tree
Showing 46 changed files with 6,909 additions and 1,387 deletions.
1 change: 1 addition & 0 deletions felix/dataplane/common/ipsets_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type IPSetsDataplane interface {
QueueResync()
ApplyUpdates()
ApplyDeletions() (reschedule bool)
SetFilter(neededIPSets set.Set[string])
}

// Except for domain IP sets, IPSetsManager simply passes through IP set updates from the datastore
Expand Down
5 changes: 3 additions & 2 deletions felix/dataplane/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func StartDataplaneDriver(configParams *config.Config,
healthAggregator *health.HealthAggregator,
configChangedRestartCallback func(),
fatalErrorCallback func(error),
k8sClientSet *kubernetes.Clientset) (DataplaneDriver, *exec.Cmd) {

k8sClientSet *kubernetes.Clientset,
) (DataplaneDriver, *exec.Cmd) {
if !configParams.IsLeader() {
// Return an inactive dataplane, since we're not the leader.
log.Info("Not the leader, using an inactive dataplane")
Expand Down Expand Up @@ -207,6 +207,7 @@ func StartDataplaneDriver(configParams *config.Config,
NetlinkTimeout: configParams.NetlinkTimeoutSecs,
},
RulesConfig: rules.Config{
NFTables: true, // TODO
WorkloadIfacePrefixes: configParams.InterfacePrefixes(),

IPSetConfigV4: ipsets.NewIPVersionConfig(
Expand Down
61 changes: 27 additions & 34 deletions felix/dataplane/linux/bpf_ep_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"time"

"github.com/projectcalico/calico/felix/ethtool"
"github.com/projectcalico/calico/felix/generictables"
"github.com/projectcalico/calico/libcalico-go/lib/health"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -80,7 +81,6 @@ import (
"github.com/projectcalico/calico/felix/idalloc"
"github.com/projectcalico/calico/felix/ifacemonitor"
"github.com/projectcalico/calico/felix/ip"
"github.com/projectcalico/calico/felix/iptables"
"github.com/projectcalico/calico/felix/proto"
"github.com/projectcalico/calico/felix/routetable"
)
Expand Down Expand Up @@ -321,7 +321,7 @@ type bpfEndpointManager struct {
// UT-able BPF dataplane interface.
dp bpfDataplane

//ifaceToIpMap map[string]net.IP
// ifaceToIpMap map[string]net.IP
opReporter logutils.OpRecorder

// XDP
Expand Down Expand Up @@ -374,7 +374,7 @@ type bpfEndpointManagerDataplane struct {

// IP of the tunnel / overlay device
tunnelIP net.IP
iptablesFilterTable IptablesTable
iptablesFilterTable Table
ipSetIDAlloc *idalloc.IDAllocator
}

Expand All @@ -384,7 +384,7 @@ type serviceKey struct {
}

type bpfAllowChainRenderer interface {
WorkloadInterfaceAllowChains(endpoints map[proto.WorkloadEndpointID]*proto.WorkloadEndpoint) []*iptables.Chain
WorkloadInterfaceAllowChains(endpoints map[proto.WorkloadEndpointID]*proto.WorkloadEndpoint) []*generictables.Chain
}

type ManagerWithHEPUpdate interface {
Expand Down Expand Up @@ -415,8 +415,8 @@ func NewTestEpMgr(
VXLANPort: 4789,
VXLANVNI: 4096,
}),
iptables.NewNoopTable(),
iptables.NewNoopTable(),
generictables.NewNoopTable(),
generictables.NewNoopTable(),
nil,
logutils.NewSummarizer("test"),
new(environment.FakeFeatureDetector),
Expand All @@ -433,8 +433,8 @@ func newBPFEndpointManager(
ipSetIDAllocV4 *idalloc.IDAllocator,
ipSetIDAllocV6 *idalloc.IDAllocator,
iptablesRuleRenderer bpfAllowChainRenderer,
iptablesFilterTableV4 IptablesTable,
iptablesFilterTableV6 IptablesTable,
iptablesFilterTableV4 Table,
iptablesFilterTableV6 Table,
livenessCallback func(),
opReporter logutils.OpRecorder,
featureDetector environment.FeatureDetectorIface,
Expand Down Expand Up @@ -581,7 +581,7 @@ func newBPFEndpointManager(
m.dirtyServices = set.New[serviceKey]()
m.natExcludedCIDRs = ip.NewCIDRTrie()

var excludeCIDRsMatch = 1
excludeCIDRsMatch := 1

for _, c := range config.BPFExcludeCIDRsFromNAT {
cidr, err := ip.CIDRFromString(c)
Expand Down Expand Up @@ -651,11 +651,10 @@ func newBPFEndpointManager(
func newBPFEndpointManagerDataplane(
ipFamily proto.IPVersion,
ipMaps *bpfmap.IPMaps,
iptablesFilterTable IptablesTable,
iptablesFilterTable Table,
ipSetIDAlloc *idalloc.IDAllocator,
epMgr *bpfEndpointManager,
) *bpfEndpointManagerDataplane {

return &bpfEndpointManagerDataplane{
ipFamily: ipFamily,
ifaceToIpMap: map[string]net.IP{},
Expand All @@ -670,7 +669,7 @@ var _ hasLoadPolicyProgram = (*bpfEndpointManager)(nil)

func (m *bpfEndpointManager) repinJumpMaps() error {
oldBase := path.Join(bpfdefs.GlobalPinDir, "old_jumps")
err := os.Mkdir(oldBase, 0700)
err := os.Mkdir(oldBase, 0o700)
if err != nil && !os.IsExist(err) {
return fmt.Errorf("cannot create %s: %w", oldBase, err)
}
Expand Down Expand Up @@ -1391,7 +1390,7 @@ func (m *bpfEndpointManager) syncIfaceProperties() error {
if m.bpfDisableGROForIfaces != nil {
expr := m.bpfDisableGROForIfaces.String()
if len(expr) > 0 {
var config = map[string]bool{
config := map[string]bool{
ethtool.EthtoolRxGRO: false,
}

Expand Down Expand Up @@ -1423,7 +1422,6 @@ func (m *bpfEndpointManager) syncIfaceProperties() error {

return maps.IterNone
})

if err != nil {
return fmt.Errorf("iterating over counters map failed")
}
Expand Down Expand Up @@ -1602,7 +1600,6 @@ func (m *bpfEndpointManager) reportHealth(ready bool, detail string) {
}

func (m *bpfEndpointManager) doApplyPolicyToDataIface(iface string) (bpfInterfaceState, error) {

var (
err error
up bool
Expand Down Expand Up @@ -2115,7 +2112,7 @@ func (m *bpfEndpointManager) doApplyPolicy(ifaceName string) (bpfInterfaceState,
attachPreamble = v4Readiness != ifaceIsReady
}

//Attach preamble TC program
// Attach preamble TC program
if attachPreamble {
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2259,8 +2256,8 @@ func isLinkNotFoundError(err error) bool {
var calicoRouterIP = net.IPv4(169, 254, 1, 1).To4()

func (d *bpfEndpointManagerDataplane) wepTCAttachPoint(ap *tc.AttachPoint, policyIdx, filterIdx int,
polDirection PolDirection) *tc.AttachPoint {

polDirection PolDirection,
) *tc.AttachPoint {
ap = d.configureTCAttachPoint(polDirection, ap, false)
ifaceName := ap.IfaceName()
if d.ipFamily == proto.IPVersion_IPV6 {
Expand All @@ -2284,8 +2281,8 @@ func (d *bpfEndpointManagerDataplane) wepTCAttachPoint(ap *tc.AttachPoint, polic
}

func (d *bpfEndpointManagerDataplane) wepApplyPolicyToDirection(readiness ifaceReadiness, state *bpfInterfaceState,
endpoint *proto.WorkloadEndpoint, polDirection PolDirection, ap *tc.AttachPoint) (*tc.AttachPoint, error) {

endpoint *proto.WorkloadEndpoint, polDirection PolDirection, ap *tc.AttachPoint,
) (*tc.AttachPoint, error) {
var policyIdx, filterIdx int

if d.hostIP == nil {
Expand Down Expand Up @@ -2340,8 +2337,8 @@ func (m *bpfEndpointManager) loadFilterProgram(ap attachPoint) {
}

func (d *bpfEndpointManagerDataplane) wepApplyPolicy(ap *tc.AttachPoint,
endpoint *proto.WorkloadEndpoint, polDirection PolDirection) error {

endpoint *proto.WorkloadEndpoint, polDirection PolDirection,
) error {
var profileIDs []string
var tier *proto.TierInfo
if endpoint != nil {
Expand Down Expand Up @@ -2386,7 +2383,6 @@ func (d *bpfEndpointManagerDataplane) wepApplyPolicy(ap *tc.AttachPoint,
}

func (m *bpfEndpointManager) addHostPolicy(rules *polprog.Rules, hostEndpoint *proto.HostEndpoint, polDirection PolDirection) {

// When there is applicable pre-DNAT policy that does not explicitly Allow or Deny traffic,
// we continue on to subsequent tiers and normal or AoF policy.
if len(hostEndpoint.PreDnatTiers) == 1 {
Expand Down Expand Up @@ -2414,7 +2410,6 @@ func (d *bpfEndpointManagerDataplane) applyPolicyToWeps(
endpoint *proto.WorkloadEndpoint,
ap *tc.AttachPoint,
) (*tc.AttachPoint, *tc.AttachPoint, error) {

ingressAttachPoint := *ap
egressAttachPoint := *ap

Expand Down Expand Up @@ -2443,7 +2438,6 @@ func (d *bpfEndpointManagerDataplane) applyPolicyToDataIface(
ap *tc.AttachPoint,
apxdp *xdp.AttachPoint,
) (*tc.AttachPoint, *tc.AttachPoint, *xdp.AttachPoint, error) {

ingressAttachPoint := *ap
egressAttachPoint := *ap
xdpAttachPoint := *apxdp
Expand Down Expand Up @@ -2477,7 +2471,6 @@ func (d *bpfEndpointManagerDataplane) attachDataIfaceProgram(
state *bpfInterfaceState,
ap *tc.AttachPoint,
) (*tc.AttachPoint, error) {

if d.hostIP == nil {
// Do not bother and wait
return nil, fmt.Errorf("unknown host IP")
Expand Down Expand Up @@ -2725,8 +2718,10 @@ func (d *bpfEndpointManagerDataplane) configureTCAttachPoint(policyDirection Pol
return ap
}

const EndTierDrop = true
const NoEndTierDrop = false
const (
EndTierDrop = true
NoEndTierDrop = false
)

func (m *bpfEndpointManager) extractTiers(tier *proto.TierInfo, direction PolDirection, endTierDrop bool) (rTiers []polprog.Tier) {
dir := direction.RuleDir()
Expand Down Expand Up @@ -3353,7 +3348,7 @@ func (m *bpfEndpointManager) writePolicyDebugInfo(insns []asm.Insns, ifaceName s
if !m.bpfPolicyDebugEnabled {
return nil
}
if err := os.MkdirAll(bpf.RuntimePolDir, 0600); err != nil {
if err := os.MkdirAll(bpf.RuntimePolDir, 0o600); err != nil {
return err
}

Expand Down Expand Up @@ -3387,15 +3382,14 @@ func (m *bpfEndpointManager) writePolicyDebugInfo(insns []asm.Insns, ifaceName s
return err
}

if err := os.WriteFile(filename, buffer.Bytes(), 0600); err != nil {
if err := os.WriteFile(filename, buffer.Bytes(), 0o600); err != nil {
return err
}
log.Debugf("Policy iface %s hook %s written to %s", ifaceName, h, filename)
return nil
}

func (m *bpfEndpointManager) updatePolicyProgram(rules polprog.Rules, polDir string, ap attachPoint, ipFamily proto.IPVersion) error {

progName := policyProgramName(ap.IfaceName(), polDir, proto.IPVersion(ipFamily))

var opts []polprog.Option
Expand Down Expand Up @@ -3438,7 +3432,6 @@ func (m *bpfEndpointManager) loadTCLogFilter(ap *tc.AttachPoint) (fileDescriptor

fd, err := bpf.LoadBPFProgramFromInsns(logFilter, "calico_log_filter",
"Apache-2.0", uint32(unix.BPF_PROG_TYPE_SCHED_CLS))

if err != nil {
return nil, 0, fmt.Errorf("failed to load BPF log filter program: %w", err)
}
Expand Down Expand Up @@ -3908,8 +3901,8 @@ func (m *bpfEndpointManager) updatePolicyCache(name string, owner string, inboun
}

func (m *bpfEndpointManager) addRuleInfo(rule *proto.Rule, idx int,
owner string, direction PolDirection, polName string) polprog.RuleMatchID {

owner string, direction PolDirection, polName string,
) polprog.RuleMatchID {
matchID := m.dp.ruleMatchID(direction.RuleDir(), rule.Action, owner, polName, idx)
m.dirtyRules.Discard(matchID)

Expand Down
13 changes: 5 additions & 8 deletions felix/dataplane/linux/bpf_ep_mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (m *mockDataplane) ensureProgramLoaded(ap attachPoint, ipFamily proto.IPVer
m.mutex.Lock()
defer m.mutex.Unlock()

//var res tc.AttachResult // we don't care about the values
// var res tc.AttachResult // we don't care about the values

if apxdp, ok := ap.(*xdp.AttachPoint); ok {
apxdp.HookLayoutV4 = hook.Layout{
Expand Down Expand Up @@ -262,7 +262,6 @@ func (f mockFD) FD() uint32 {
}

var _ = Describe("BPF Endpoint Manager", func() {

var (
bpfEpMgr *bpfEndpointManager
dp *mockDataplane
Expand All @@ -281,8 +280,8 @@ var _ = Describe("BPF Endpoint Manager", func() {
commonMaps *bpfmap.CommonMaps
rrConfigNormal rules.Config
ruleRenderer rules.RuleRenderer
filterTableV4 IptablesTable
filterTableV6 IptablesTable
filterTableV4 Table
filterTableV6 Table
ifStateMap *mock.Map
countersMap *mock.Map
jumpMap *mock.Map
Expand Down Expand Up @@ -491,7 +490,7 @@ var _ = Describe("BPF Endpoint Manager", func() {
hostEp := proto.HostEndpoint{
Name: "uthost-eth0",
PreDnatTiers: []*proto.TierInfo{
&proto.TierInfo{
{
Name: "default",
IngressPolicies: []string{"mypolicy"},
},
Expand All @@ -501,7 +500,7 @@ var _ = Describe("BPF Endpoint Manager", func() {
hostEpNorm := proto.HostEndpoint{
Name: "uthost-eth0",
Tiers: []*proto.TierInfo{
&proto.TierInfo{
{
Name: "default",
IngressPolicies: []string{"mypolicy"},
EgressPolicies: []string{"mypolicy"},
Expand Down Expand Up @@ -607,7 +606,6 @@ var _ = Describe("BPF Endpoint Manager", func() {
"eth0": {Ingress: 12345},
}
}

})

It("should detach from eth0 when eth0 up before first CompleteDeferredWork()", func() {
Expand Down Expand Up @@ -851,7 +849,6 @@ var _ = Describe("BPF Endpoint Manager", func() {
binary.LittleEndian.PutUint64(k, ingDenyRuleMatchId)
_, err = rcMap.Get(k)
Expect(err).To(HaveOccurred())

})

It("should cleanup the bpf map after restart", func() {
Expand Down
Loading

0 comments on commit 92b2660

Please sign in to comment.