Skip to content

Commit

Permalink
apps.xdp: add filter option (offload non-matching packets to kernel)
Browse files Browse the repository at this point in the history
  • Loading branch information
eugeneia committed Feb 7, 2020
1 parent 48ff71c commit c158ac8
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 18 deletions.
6 changes: 6 additions & 0 deletions src/apps/xdp/README.md
Expand Up @@ -39,6 +39,12 @@ Due to a combination of how Snabb uses packet buffers and a limitation of

*Required*. The name of the interface as shown in `ip link`.

— Key **filter**

*Optional*. A `pcap-filter(7)` expression. If given, packets that do not match
the filter will we passed on to the host networking stack. Must be the same for
all instances of the XDP app on a given interface!

— Key **queue**

*Optional*. Queue to bind to (zero based). The default is queue 0.
Expand Down
91 changes: 73 additions & 18 deletions src/apps/xdp/xdp.lua
Expand Up @@ -5,6 +5,7 @@ module(...,package.seeall)
local S = require("syscall")
local ffi = require("ffi")
local bpf = require("apps.xdp.bpf")
local pf = require("apps.xdp.pf_ebpf_codegen")
local lib = require("core.lib")
local bits = lib.bits
local band, bor, rshift, tobit = bit.band, bit.bor, bit.rshift, bit.tobit
Expand Down Expand Up @@ -349,6 +350,7 @@ end
XDP = {
config = {
ifname = {required=true}, -- interface name
filter = {}, -- interface pcap-filter(7) (optional)
queue = {default=0} -- interface queue (zero based)
},
-- Class variables:
Expand All @@ -364,7 +366,7 @@ driver = XDP
function XDP:new (conf)
assert(snabb_xdp_enabled, "Snabb XDP mode must be enabled.")
-- Ensure interface is initialized for XDP usage.
local lockfd, mapfd = self:open_interface(conf.ifname)
local lockfd, mapfd = self:open_interface(conf.ifname, conf.filter)
-- Create XDP socket (xsk) for queue.
local xsk = self:create_xsk(conf.ifname, lockfd, conf.queue)
-- Attach the socket to queue in the BPF map.
Expand All @@ -374,7 +376,7 @@ function XDP:new (conf)
return setmetatable(xsk, {__index=XDP})
end

function XDP:open_interface (ifname)
function XDP:open_interface (ifname, filter)
-- Open an interface-dependent file we know should exist to use as a
-- Snabb-wide lock. The contents of the file are really irrelevant here.
-- However, we depend on the file not being locked by other applications in
Expand All @@ -394,7 +396,7 @@ function XDP:open_interface (ifname)
S.mkdir("/sys/fs/bpf/snabb/"..ifname, "rwxu, rgrp, xgrp, roth, xoth")
-- Create xskmap and XDP program to run on the NIC.
mapfd = self:create_xskmap()
progfd = self:xdp_prog(mapfd)
progfd = self:xdp_prog(mapfd, filter)
self:set_link_xdp(ifname, progfd)
-- Pin xskmap so it can be accessed by other Snabb processes to attach to
-- the interface. Also pin the XDP program, just 'cause.
Expand Down Expand Up @@ -434,13 +436,34 @@ function XDP:create_xskmap ()
error("Failed to create BPF map: "..tostring(err))
end

function XDP:xdp_prog (xskmap)
function XDP:xdp_prog (xskmap, filter)
-- Assemble and load XDP BPF program.
-- If we have a filter argument, compile a filter that passes non-matching
-- packets on to the kernel networking stack (XDP_PASS). Append to it our
-- regular XSK forwarding code (XDP:xdp_forward) so packets that pass
-- the filter are forwarded to attached XDP sockets.
local flt = (filter and pf.compile(filter)) or {}
for _, ins in ipairs(self:xdp_forward(xskmap)) do
-- Append forwarding logic to filter.
table.insert(flt, ins)
end
local asm = bpf.asm(flt)
local prog, err, log = S.bpf_prog_load(
'xdp', asm, ffi.sizeof(asm) / ffi.sizeof(bpf.ins), "Apache 2.0"
)
if prog then
return prog
else
error(tostring(err).."\n"..log)
end
end

function XDP:xdp_forward (xskmap)
local c, f, m, a, s, j, fn =
bpf.c, bpf.f, bpf.m, bpf.a, bpf.s, bpf.j, bpf.fn
-- The program below looks up the incoming packet's queue index in xskmap to
-- find the corresponding XDP socket (xsk) to deliver the packet to.
local insns = bpf.asm{
return {
-- r3 = XDP_ABORTED
{ op=bor(c.ALU, a.MOV, s.K), dst=3, imm=0 },
-- r2 = ((struct xdp_md *)ctx)->rx_queue_index
Expand All @@ -453,17 +476,9 @@ function XDP:xdp_prog (xskmap)
-- EXIT:
{ op=bor(c.JMP, j.EXIT) }
}
local prog, err, log = S.bpf_prog_load(
'xdp', insns, ffi.sizeof(insns) / ffi.sizeof(bpf.ins), "Apache 2.0"
)
if prog then
return prog
else
error(tostring(err).."\n"..log)
end
end

function XDP:set_link_xdp(ifname, prog)
function XDP:set_link_xdp (ifname, prog)
-- Open a NETLINK socket, and transmit command that attaches XDP program
-- prog to link by ifname.
local netlink = assert(S.socket('netlink', 'raw', 'route'))
Expand Down Expand Up @@ -748,6 +763,8 @@ function selftest ()
selftest_duplex(xdpdeva, xdpmaca, xdpdevb, xdpmacb, nqueues)
print("test: rxtx_match")
selftest_rxtx_match(xdpdeva, xdpmaca, xdpdevb, xdpmacb)
print("test: rxtx_match_filter")
selftest_rxtx_match_filter(xdpdeva, xdpmaca, xdpdevb, xdpmacb)
if nqueues > 1 then
print("test: share_interface")
selftest_share_interface(xdpdeva, xdpmaca, xdpdevb, xdpmacb, nqueues)
Expand All @@ -766,6 +783,7 @@ local function random_v4_packets (conf)
for _=1,100 do
local ip = ipv4:new{src=lib.random_bytes(4),
dst=lib.random_bytes(4)}
if conf.protocol then ip:protocol(conf.protocol) end
ip:total_length(size - eth:sizeof())
local payload_length = ip:total_length() - ip:sizeof()
local p = packet.allocate()
Expand Down Expand Up @@ -796,10 +814,10 @@ function selftest_rxtx (xdpdeva, xdpmaca, xdpdevb, xdpmacb, nqueues)
ifname = xdpdeva,
queue = queue
})
config.app(c, queue_b, XDP, {
ifname = xdpdevb,
queue = queue
})
config.app(c, queue_b, XDP, {
ifname = xdpdevb,
queue = queue
})
config.link(c, "source.output"..queue.." -> "..queue_a..".input")
config.link(c, queue_b..".output -> sink.input"..queue)
end
Expand Down Expand Up @@ -901,6 +919,43 @@ function selftest_rxtx_match (xdpdeva, xdpmaca, xdpdevb, xdpmacb)
assert(#engine.app_table.match:errors() == 0, "Match errors.")
end

function selftest_rxtx_match_filter (xdpdeva, xdpmaca, xdpdevb, xdpmacb)
local c = config.new()
local synth = require("apps.test.synth")
local npackets = require("apps.test.npackets")
local match = require("apps.test.match")
config.app(c, "source", synth.Synth, {
packets = random_v4_packets{
sizes = {60,64,67,128,133,192,256,384,512,777,1024,1500,1501},
src = xdpmaca,
dst = xdpmacb,
protocol = 42
}})
config.app(c, "npackets", npackets.Npackets, {npackets=1000})
config.app(c, "match", match.Match)
config.app(c, xdpdeva, XDP, {ifname=xdpdeva})
config.app(c, xdpdevb, XDP, {ifname=xdpdevb, filter="ip proto 42"})
config.link(c, "source.output -> "..xdpdeva..".input")
config.link(c, xdpdevb..".output -> match.rx")
config.link(c, "source.copy -> npackets.input")
config.link(c, "npackets.output -> match.comparator")
-- Test redirect
engine.configure(c)
engine.main{ duration=.1 }
engine.report_links()
engine.report_apps()
assert(#engine.app_table.match:errors() == 0, "Match errors.")
-- Test pass
engine.configure(config.new())
config.app(c, xdpdevb, XDP, {ifname=xdpdevb, filter="ip6 proto 77"})
engine.configure(c)
engine.main{ duration=.1 }
engine.report_links()
assert(#engine.app_table.match:errors() == 1000, "Matched packets.")
assert(link.stats(engine.app_table[xdpdevb].output.output).rxpackets == 0,
"Too many packets received on "..xdpdevb)
end

function selftest_share_interface_worker (xdpdev, queue)
snabb_enable_xdp()
local c = config.new()
Expand Down

0 comments on commit c158ac8

Please sign in to comment.