Skip to content

Commit

Permalink
program.ipfix: redesign RSS framework
Browse files Browse the repository at this point in the history
Hardware RSS queues were implicitely supported by starting multiple
instances of probe_rss, since intel_mp did not require a controlling
process for RSS.  To prepare for merging of the mellanox driver, the
probe is rewritten to start all RSS instances from a single master
process.
  • Loading branch information
alexandergall committed Jul 16, 2018
1 parent 49a24f0 commit 5b9e56e
Show file tree
Hide file tree
Showing 3 changed files with 301 additions and 233 deletions.
174 changes: 140 additions & 34 deletions src/program/ipfix/lib.lua
Expand Up @@ -16,7 +16,10 @@ local basic = require("apps.basic.basic_apps")
local arp = require("apps.ipv4.arp")
local ipfix = require("apps.ipfix.ipfix")
local template = require("apps.ipfix.template")
local rss = require("apps.rss.rss")
local ifmib = require("lib.ipc.shmem.iftable_mib")
local Transmitter = require("apps.interlink.transmitter")


local ifmib_dir = '/ifmib'

Expand Down Expand Up @@ -238,26 +241,70 @@ function configure_graph (arg, in_graph)
return config, graph
end

function parse_jit_option_fn (jit)
return function (arg)
if arg:match("^v") then
local file = arg:match("^v=(.*)")
if file == '' then file = nil end
jit.v = file
elseif arg:match("^p") then
local opts, file = arg:match("^p=([^,]*),?(.*)")
if file == '' then file = nil end
jit.p = { opts, file }
elseif arg:match("^dump") then
local opts, file = arg:match("^dump=([^,]*),?(.*)")
if file == '' then file = nil end
jit.dump = { opts, file }
elseif arg:match("^opt") then
local opt = arg:match("^opt=(.*)")
table.insert(jit.opts, opt)
elseif arg:match("^tprof") then
jit.traceprof = true
end
end
end

local function set_jit_options (jit)
if not jit then return end
if jit.v then
require("jit.v").start(jit.v)
end
if jit.p and #jit.p > 0 then
require("jit.p").start(unpack(jit.p))
end
if jit.traceprof then
require("lib.traceprof.traceprof").start()
end
if jit.dump and #jit.dump > 0 then
require("jit.dump").on(unpack(jit.dump))
end
if jit.opts and #jit.opts > 0 then
require("jit.opt").start(unpack(jit.opts))
end
end

local function clear_jit_options (jit)
if not jit then return end
if jit.dump then
require("jit.dump").off()
end
if jit.traceprof then
require("lib.traceprof.traceprof").stop()
end
if jit.p then
require("jit.p").stop()
end
if jit.v then
require("jit.v").stop()
end
end

-- Run an instance of the ipfix probe
function run (arg, duration, busywait, cpu, jit)
local config = configure_graph(arg)

if jit then
if jit.v then
require("jit.v").start(jit.v)
end
if jit.p and #jit.p > 0 then
require("jit.p").start(unpack(jit.p))
end
if jit.traceprof then
require("lib.traceprof.traceprof").start()
end
if jit.dump and #jit.dump > 0 then
require("jit.dump").on(unpack(jit.dump))
end
if jit.opts and #jit.opts > 0 then
require("jit.opt").start(unpack(jit.opts))
end
end
if cpu then numa.bind_to_cpu(cpu) end
set_jit_options(jit)

local done
if not duration and config.input_type == "pcap" then
Expand All @@ -267,28 +314,16 @@ function run (arg, duration, busywait, cpu, jit)
end

local t1 = now()
if cpu then numa.bind_to_cpu(cpu) end

engine.busywait = busywait
if busywait ~= nil then
engine.busywait = busywait
end
engine.main({ duration = duration, done = done, measure_latency = false })

if jit then
if jit.dump then
require("jit.dump").off()
end
if jit.traceprof then
require("lib.traceprof.traceprof").stop()
end
if jit.p then
require("jit.p").stop()
end
if jit.v then
require("jit.v").stop()
end
end
clear_jit_options(jit)

local t2 = now()
local stats = link.stats(engine.app_table.ipfix.input.input)
local stats = link.stats(engine.app_table['ipfix'..config.instance].input.input)
print("IPFIX probe stats:")
local comma = lib.comma_value
print(string.format("bytes: %s packets: %s bps: %s Mpps: %s",
Expand All @@ -298,3 +333,74 @@ function run (arg, duration, busywait, cpu, jit)
comma(stats.rxpackets / ((t2 - t1) * 1000000))))

end

-- Run an instance of the RSS app. The output links can either be
-- interlinks or regular links with an instance of an ipfix probe
-- attached.
function run_rss(config, inputs, outputs, duration, busywait, cpu, jit)
if cpu then numa.bind_to_cpu(cpu) end
set_jit_options(jit)

local graph = app_graph.new()
app_graph.app(graph, "rss", rss.rss, config)

-- An input describes a physical interface
local tags, in_app_specs = {}, {}
for n, input in ipairs(inputs) do
local suffix = #inputs > 1 and n or ''
local input_name = "input"..suffix
local in_link, in_app = in_apps.pci(input.device.."/"..input.rxq)
input.config.pciaddr = input.device
table.insert(in_app_specs,
{ pciaddr = input.device,
name = input_name,
ifname = input.name or
(input.device:gsub("[:%.]", "_")),
ifalias = input.description })
app_graph.app(graph, input_name, unpack(in_app))
local link_name = "input"..suffix
if input.tag then
local tag = input.tag
assert(not(tags[tag]), "Tag not unique: "..tag)
link_name = "vlan"..tag
end
app_graph.link(graph, input_name.."."..in_link.output
.." -> rss."..link_name)
end

-- An output describes either an interlink or a complete ipfix app
for _, output in ipairs(outputs) do
if output.type == 'interlink' then
-- Keys
-- link_name name of the link
app_graph.app(graph, output.link_name, Transmitter)
app_graph.link(graph, "rss."..output.link_name.." -> "
..output.link_name..".input")
else
-- Keys
-- link_name name of the link
-- args probe configuration
-- instance # of embedded instance
output.args.instance = output.instance
local config = configure_graph(output.args, graph)
app_graph.link(graph, "rss."..output.link_name
.." -> ipfix"..output.instance..".input")
end
end

engine.configure(graph)
for _, spec in ipairs(in_app_specs) do
create_ifmib(engine.app_table[spec.name].stats,
spec.ifname, spec.ifalias)
end
require("jit").flush()

local engine_opts = { no_report = true, measure_latency = false }
if duration ~= 0 then engine_opts.duration = duration end
if busywait ~= nil then
engine.busywait = busywait
end
engine.main(engine_opts)

clear_jit_options(jit)
end
21 changes: 1 addition & 20 deletions src/program/ipfix/probe/probe.lua
Expand Up @@ -148,26 +148,7 @@ function run (args)
["cpu"] = function (arg)
cpu = tonumber(arg)
end,
j = function (arg)
if arg:match("^v") then
local file = arg:match("^v=(.*)")
if file == '' then file = nil end
jit.v = file
elseif arg:match("^p") then
local opts, file = arg:match("^p=([^,]*),?(.*)")
if file == '' then file = nil end
jit.p = { opts, file }
elseif arg:match("^dump") then
local opts, file = arg:match("^dump=([^,]*),?(.*)")
if file == '' then file = nil end
jit.dump = { opts, file }
elseif arg:match("^opt") then
local opt = arg:match("^opt=(.*)")
table.insert(jit.opts, opt)
elseif arg:match("^tprof") then
jit.traceprof = true
end
end
j = require("program.ipfix.lib").parse_jit_option_fn(jit)
}

args = lib.dogetopt(args, opt, "hD:i:o:p:m:a:c:j:b", long_opts)
Expand Down

0 comments on commit 5b9e56e

Please sign in to comment.